Skip to content

Commit

Permalink
简化worker代码
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed Nov 7, 2019
1 parent f3e207d commit e387867
Showing 1 changed file with 33 additions and 34 deletions.
67 changes: 33 additions & 34 deletions workers/sneakerWorkers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Worker struct {
Arguments map[string]string `yaml:"arguments"`
Steps []int `yaml:"steps"`
Threads int `yaml:"threads"`
Log string `yaml:"log"`
Logger *log.Logger
}

func InitWorkers() {
Expand All @@ -41,69 +39,69 @@ func InitWorkers() {
yaml.Unmarshal(content, &AllWorkers)
}

func (worker *Worker) SubscribeMessageByQueue(arguments amqp.Table) error {
func (worker *Worker) SubscribeMessageByQueue(arguments amqp.Table) (err error) {
channel, err := utils.RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
fmt.Errorf("Channel: %s", err)
fmt.Println("Channel: ", err)
return
}
channel.QueueDeclare((*worker).Queue, (*worker).Durable, false, false, false, arguments)
if (*worker).Exchange != "" && (*worker).RoutingKey != "" {
channel.ExchangeDeclare((*worker).Exchange, "topic", (*worker).Durable, false, false, false, nil)
channel.QueueBind((*worker).Queue, (*worker).RoutingKey, (*worker).Exchange, false, nil)
channel.ExchangeDeclare((*worker).Arguments["x-dead-letter-exchange"], "topic", (*worker).Durable, false, false, false, nil)
channel.QueueBind((*worker).Queue, "#", (*worker).Arguments["x-dead-letter-exchange"], false, nil)
if len((*worker).Steps) > 0 {
channel.ExchangeDeclare((*worker).Queue+"-retry", "topic", (*worker).Durable, false, false, false, nil)
channel.QueueBind((*worker).Queue, "#", (*worker).Queue+"-retry", false, nil)
}
}
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)

for i, step := range (*worker).Steps {
_, err = channel.QueueDeclare(
(*worker).Arguments["x-dead-letter-exchange"]+"-"+strconv.Itoa(i+1), // queue name
(*worker).Durable, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
amqp.Table{"x-dead-letter-exchange": (*worker).Arguments["x-dead-letter-exchange"], "x-message-ttl": int32(step)}, // arguments
amqp.Table{"x-dead-letter-exchange": (*worker).Queue + "-retry", "x-message-ttl": int32(step)}, // arguments
)
if err != nil {
return fmt.Errorf("Queue Declare: %s", err)
fmt.Println("Queue Declare: ", err)
return
}
}

go func(queue string) {
go func() {
channel, err := utils.RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
fmt.Errorf("Channel: %s", err)
fmt.Println("Channel: ", err)
return
}
msgs, _ := channel.Consume(
queue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
(*worker).Queue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for _, w := range AllWorkers {
if w.Queue == queue {
for d := range msgs {
response := reflect.ValueOf(&w).MethodByName(w.Name).Call([]reflect.Value{reflect.ValueOf(&d.Body)})
if !(response[0].String() == "") && !response[1].IsNil() {
retry(response[0].String(), response[1].Bytes())
}
d.Ack(w.Ack)
}
for d := range msgs {
response := reflect.ValueOf(worker).MethodByName((*worker).Name).Call([]reflect.Value{reflect.ValueOf(&d.Body)})
if !(response[0].String() == "") && !response[1].IsNil() {
retry(response[0].String(), response[1].Bytes())
}
d.Ack((*worker).Ack)
}
}(worker.Queue)

return nil
}()
return
}

func retry(queueName string, message []byte) error {
func retry(queueName string, message []byte) (err error) {
channel, err := utils.RabbitMqConnect.Channel()
defer channel.Close()
err = (*channel).Publish(
Expand All @@ -122,7 +120,8 @@ func retry(queueName string, message []byte) error {
},
)
if err != nil {
return err
fmt.Println("Channel: ", err)
return
}
return nil
return
}

0 comments on commit e387867

Please sign in to comment.