Skip to content
This repository has been archived by the owner on Mar 8, 2023. It is now read-only.

Commit

Permalink
update: achinery new fuc
Browse files Browse the repository at this point in the history
  • Loading branch information
sysatom committed Apr 15, 2021
1 parent e6e9a61 commit 9e9bf5c
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 93 deletions.
10 changes: 1 addition & 9 deletions cmd/cron/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 1 addition & 9 deletions cmd/message/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions internal/app/cron/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cron
import (
"github.com/go-redis/redis/v8"
"github.com/google/wire"
"github.com/streadway/amqp"
"github.com/tsundata/assistant/api/pb"
"github.com/tsundata/assistant/internal/app/cron/rules"
"github.com/tsundata/assistant/internal/pkg/app"
Expand All @@ -12,11 +11,11 @@ import (
"os"
)

func NewApp(logger *logger.Logger, rdb *redis.Client, mq *amqp.Connection, subClient pb.SubscribeClient,
func NewApp(logger *logger.Logger, rdb *redis.Client, subClient pb.SubscribeClient,
midClient pb.MiddleClient, msgClient pb.MessageClient, wfClient pb.WorkflowClient) (*app.Application, error) {
name := os.Getenv("APP_NAME")

b := rulebot.New(rdb, mq, subClient, midClient, msgClient, wfClient, nil, rules.Options...)
b := rulebot.New(rdb, subClient, midClient, msgClient, wfClient, nil, rules.Options...)

logger.Info("start cron bot " + b.Name())

Expand Down
5 changes: 2 additions & 3 deletions internal/app/message/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/google/wire"
"github.com/jmoiron/sqlx"
"github.com/spf13/viper"
"github.com/streadway/amqp"
"github.com/tsundata/assistant/api/pb"
"github.com/tsundata/assistant/internal/app/message/rules"
"github.com/tsundata/assistant/internal/app/message/service"
Expand Down Expand Up @@ -35,11 +34,11 @@ func NewOptions(v *viper.Viper) (*Options, error) {
return o, err
}

func NewApp(o *Options, logger *logger.Logger, rs *rpc.Server, db *sqlx.DB, mq *amqp.Connection,
func NewApp(o *Options, logger *logger.Logger, rs *rpc.Server, db *sqlx.DB,
subClient pb.SubscribeClient, midClient pb.MiddleClient, msgClient pb.MessageClient,
taskClient pb.TaskClient, wfClient pb.WorkflowClient) (*app.Application, error) {

b := rulebot.New(nil, mq, subClient, midClient, msgClient, wfClient, taskClient, rules.Options...)
b := rulebot.New(nil, subClient, midClient, msgClient, wfClient, taskClient, rules.Options...)

message := service.NewManage(db, logger, b, o.Webhook, wfClient, msgClient, midClient)
err := rs.Register(func(s *grpc.Server) error {
Expand Down
25 changes: 0 additions & 25 deletions internal/app/message/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package rules

import (
"context"
"fmt"
"github.com/streadway/amqp"
"github.com/tsundata/assistant/api/pb"
"github.com/tsundata/assistant/internal/pkg/rulebot"
"github.com/tsundata/assistant/internal/pkg/utils"
Expand Down Expand Up @@ -241,7 +239,6 @@ var rules = []Rule{
HelpMessage: `Test`,
ParseMessage: func(b *rulebot.RuleBot, s string, args []string) []string {
// task

_, err := b.TaskClient.Send(context.Background(), &pb.JobRequest{
Name: "echo",
Args: time.Now().String(),
Expand All @@ -250,28 +247,6 @@ var rules = []Rule{
return []string{"error: " + err.Error()}
}

// mq

ch, err := b.MQ.Channel()
if err != nil {
return []string{"error: " + err.Error()}
}
defer ch.Close()

q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
if err != nil {
return []string{"error call: " + err.Error()}
}

body := fmt.Sprintf("Hello World %d", time.Now().Unix())
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
return []string{"error call: " + err.Error()}
}

return []string{"test done"}
},
},
Expand Down
27 changes: 0 additions & 27 deletions internal/app/worker/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/tsundata/assistant/internal/app/worker/tasks"
"github.com/tsundata/assistant/internal/pkg/app"
"github.com/tsundata/assistant/internal/pkg/logger"
"log"
"os"
)

Expand Down Expand Up @@ -45,32 +44,6 @@ func NewApp(logger *logger.Logger, server *machinery.Server, mq *amqp.Connection
}
}()

// mq
go func() {
ch, err := mq.Channel()
if err != nil {
logger.Error(err)
return
}
q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
if err != nil {
logger.Error(err)
return
}

msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
logger.Error(err)
return
}

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
}()

return a, nil
}

Expand Down
60 changes: 47 additions & 13 deletions internal/pkg/machinery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package machinery

import (
"errors"
"fmt"
"github.com/RichardKnop/machinery/v2"
amqpBackend "github.com/RichardKnop/machinery/v2/backends/amqp"
redisBackend "github.com/RichardKnop/machinery/v2/backends/redis"
amqpBroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
redisBroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/google/wire"
Expand All @@ -16,7 +19,12 @@ const AMQPExchange = "machinery_exchange"
const AMQPBindingKey = "machinery_task"

type Options struct {
// amqp
URL string `yaml:"url"`

// redis
Addr string `yaml:"addr"`
Password string `yaml:"password"`
}

func NewOptions(v *viper.Viper) (*Options, error) {
Expand All @@ -25,25 +33,51 @@ func NewOptions(v *viper.Viper) (*Options, error) {
if err = v.UnmarshalKey("rabbitmq", o); err != nil {
return nil, errors.New("unmarshal machinery redis option error")
}
if err = v.UnmarshalKey("redis", o); err != nil {
return nil, errors.New("unmarshal machinery redis option error")
}

return o, err
}

func New(o *Options) (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: DefaultQueue,
ResultsExpireIn: 3600,
Broker: o.URL,
ResultBackend: o.URL,
AMQP: &config.AMQPConfig{
Exchange: AMQPExchange,
ExchangeType: "direct",
BindingKey: AMQPBindingKey,
},
}
// use rabbitmq or redis
if o.URL != "" {
cnf := &config.Config{
DefaultQueue: DefaultQueue,
ResultsExpireIn: 3600,
Broker: o.URL,
ResultBackend: o.URL,
AMQP: &config.AMQPConfig{
Exchange: AMQPExchange,
ExchangeType: "direct",
BindingKey: AMQPBindingKey,
},
}

server := machinery.NewServer(cnf, amqpBroker.New(cnf), amqpBackend.New(cnf), eagerlock.New())
return server, nil
} else {
cnf := &config.Config{
DefaultQueue: DefaultQueue,
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
broker := redisBroker.NewGR(cnf, []string{fmt.Sprintf("%s@%s", o.Password, o.Addr)}, 0)
backend := redisBackend.NewGR(cnf, []string{fmt.Sprintf("%s@%s", o.Password, o.Addr)}, 0)
lock := eagerlock.New()

server := machinery.NewServer(cnf, amqpBroker.New(cnf), amqpBackend.New(cnf), eagerlock.New())
return server, nil
server := machinery.NewServer(cnf, broker, backend, lock)
return server, nil
}
}

var ProviderSet = wire.NewSet(New, NewOptions)
5 changes: 1 addition & 4 deletions internal/pkg/rulebot/rulebot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rulebot
import (
"fmt"
"github.com/go-redis/redis/v8"
"github.com/streadway/amqp"
"github.com/tsundata/assistant/api/pb"
"github.com/tsundata/assistant/internal/pkg/version"
"log"
Expand All @@ -18,7 +17,6 @@ type RuleBot struct {
rules []RuleParser

RDB *redis.Client
MQ *amqp.Connection

SubClient pb.SubscribeClient
MidClient pb.MiddleClient
Expand All @@ -27,7 +25,7 @@ type RuleBot struct {
TaskClient pb.TaskClient
}

func New(RDB *redis.Client, MQ *amqp.Connection,
func New(RDB *redis.Client,
SubClient pb.SubscribeClient, MidClient pb.MiddleClient, MsgClient pb.MessageClient,
WfClient pb.WorkflowClient, TaskClient pb.TaskClient,
opts ...Option) *RuleBot {
Expand All @@ -39,7 +37,6 @@ func New(RDB *redis.Client, MQ *amqp.Connection,
}

s.RDB = RDB
s.MQ = MQ
s.SubClient = SubClient
s.MidClient = MidClient
s.MsgClient = MsgClient
Expand Down

0 comments on commit 9e9bf5c

Please sign in to comment.