Skip to content
This repository has been archived by the owner on Mar 8, 2023. It is now read-only.

Commit

Permalink
fix: subscribe register
Browse files Browse the repository at this point in the history
  • Loading branch information
sysatom committed Jul 27, 2021
1 parent ff39e1e commit 54862f9
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
3 changes: 2 additions & 1 deletion internal/app/gateway/controller/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/tsundata/assistant/internal/pkg/util"
"github.com/tsundata/assistant/internal/pkg/vendors/newrelic"
"github.com/tsundata/assistant/internal/pkg/vendors/telegram"
"github.com/tsundata/assistant/internal/pkg/version"
"net/http"
"regexp"
"strings"
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewGatewayController(opt *config.AppConfig, rdb *redis.Client, logger log.L
}

func (gc *GatewayController) Index(c *fiber.Ctx) error {
return c.SendString("Gateway")
return c.SendString(fmt.Sprintf("Gateway %s", version.Version))
}

func (gc *GatewayController) SlackEvent(c *fiber.Ctx) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/spider/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewApp(
// spider
go func() {
// Delayed loading
time.Sleep(10 * time.Second)
time.Sleep(5 * time.Minute)
s := crawler.New()
s.SetService(c, rdb, logger, subscribe, middle, message)
err := s.LoadRule()
Expand Down
10 changes: 6 additions & 4 deletions internal/app/spider/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/tsundata/assistant/internal/pkg/log"
"github.com/tsundata/assistant/internal/pkg/util"
"github.com/tsundata/assistant/internal/pkg/version"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
"strconv"
"strings"
Expand Down Expand Up @@ -100,14 +101,14 @@ func (s *Crawler) Daemon() {
s.logger.Info("subscribe spider starting...")

for name, job := range s.jobs {
s.logger.Info("spider " + name + ": crawl...")
go s.ruleWorker(name, job)
}

go s.resultWorker()
}

func (s *Crawler) ruleWorker(name string, r rule.Rule) {
s.logger.Info("spider "+name+": crawl...", zap.String("spider", name))
p, err := cron.ParseUTC(r.When)
if err != nil {
s.logger.Error(err)
Expand All @@ -120,6 +121,7 @@ func (s *Crawler) ruleWorker(name string, r rule.Rule) {
}
for {
if nextTime.Format("2006-01-02 15:04") == time.Now().Format("2006-01-02 15:04") {
s.logger.Info("spider "+name+": scheduled", zap.String("spider", name))
state, err := s.subscribe.Status(context.Background(), &pb.SubscribeRequest{
Text: name,
})
Expand All @@ -136,9 +138,9 @@ func (s *Crawler) ruleWorker(name string, r rule.Rule) {
result := func() []string {
defer func() {
if r := recover(); r != nil {
s.logger.Warn("ruleWorker recover " + name)
s.logger.Warn("ruleWorker recover " + name, zap.String("spider", name))
if v, ok := r.(error); ok {
s.logger.Error(v)
s.logger.Error(v, zap.String("spider", name))
}
}
}()
Expand All @@ -155,7 +157,7 @@ func (s *Crawler) ruleWorker(name string, r rule.Rule) {
}
nextTime, err = p.Next(time.Now())
if err != nil {
s.logger.Error(err)
s.logger.Error(err, zap.String("spider", name))
continue
}
time.Sleep(2 * time.Second)
Expand Down
7 changes: 6 additions & 1 deletion internal/app/subscribe/service/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func (s *Subscribe) Register(ctx context.Context, payload *pb.SubscribeRequest)
return nil, err
}

if len(resp) == 0 {
exist := true
if len(resp) == 0 || (len(resp) == 1 && resp[0] == nil) {
exist = false
}

if !exist {
_, err = s.rdb.HMSet(ctx, RuleKey, payload.GetText(), "true").Result()
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion internal/app/web/controller/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/tsundata/assistant/internal/pkg/sdk"
"github.com/tsundata/assistant/internal/pkg/util"
"github.com/tsundata/assistant/internal/pkg/vendors"
"github.com/tsundata/assistant/internal/pkg/version"
"github.com/yuin/goldmark"
"github.com/yuin/goldmark/extension"
"github.com/yuin/goldmark/renderer/html"
Expand All @@ -37,7 +38,7 @@ func NewWebController(opt *config.AppConfig, rdb *redis.Client, logger log.Logge
}

func (wc *WebController) Index(c *fiber.Ctx) error {
return c.SendString("Web")
return c.SendString(fmt.Sprintf("Web %s", version.Version))
}

func (wc *WebController) Echo(c *fiber.Ctx) error {
Expand Down

0 comments on commit 54862f9

Please sign in to comment.