|
1 | 1 | [](https://goreportcard.com/report/github.com/Maximilan4/rmq)
|
| 2 | +[](http://unlicense.org/) |
2 | 3 |
|
3 |
| -## rmq: |
4 |
| -Wrappers for producing and consuming in RabbitMQ. |
5 |
| -see example section |
| 4 | +## RMQ: |
| 5 | +Library provide some wrappers of `github.com/rabbitmq/amqp091-go` |
| 6 | + |
| 7 | +### Connection: |
| 8 | +```golang |
| 9 | +connection := rmq.NewDefaultConnection(context.Background(), "amqp://test:test@localhost:5672") |
| 10 | +// or use rmq.NewConnection with callback for construct connection with options |
| 11 | +err := connection.Connect(context.TODO()) |
| 12 | + |
| 13 | +if err != nil { |
| 14 | + log.Fatal(err) |
| 15 | +} |
| 16 | + |
| 17 | +``` |
| 18 | +### Working with schema: |
| 19 | +```golang |
| 20 | +schema, err := connection.Schema() // creates a new schema with separate channel inside |
| 21 | +if err != nil { |
| 22 | + log.Fatal(err) |
| 23 | +} |
| 24 | + |
| 25 | +// declare an exchange |
| 26 | +err = schema.Exchange.Declare(&rmq.DeclareParams{Name: "test-exchange", Kind: rmq.DirectExchange}) |
| 27 | +if err != nil { |
| 28 | + log.Fatal(err) |
| 29 | +} |
| 30 | + |
| 31 | +// declare two queue |
| 32 | +err = schema.Queue.DeclareMulti(&rmq.DeclareParams{Name: "test-q1"}, &rmq.DeclareParams{Name: "test-q2"}) |
| 33 | +if err != nil { |
| 34 | + log.Fatal(err) |
| 35 | +} |
| 36 | +// bind queues to exchange |
| 37 | +err = schema.Queue.BindMulti( |
| 38 | + &rmq.QueueBindParams{Name: "test-q1", Key: "rk1", Exchange: "test-exchange"}, |
| 39 | + &rmq.QueueBindParams{Name: "test-q2", Key: "rk1", Exchange: "test-exchange"}, |
| 40 | +) |
| 41 | + |
| 42 | +if err != nil { |
| 43 | + log.Fatal(err) |
| 44 | +} |
| 45 | +``` |
| 46 | +### Consumer: |
| 47 | +```golang |
| 48 | +consumer := rmq.NewConsumer(connection, &rmq.ConsumerConfig{ |
| 49 | + WorkersCount: 3, // 3 workers goroutine will be started |
| 50 | + Synchronous: false, // run handler in single goroutine or not |
| 51 | + }) |
| 52 | +//define a message handler (use defaults or write own) |
| 53 | +handler := rmq.NewDefaultMessageHandler(func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (rmq.MsgAction, error) { |
| 54 | + fmt.Println(msg.Body) |
| 55 | + return rmq.ActionAck, nil |
| 56 | +}) |
| 57 | +// start worker |
| 58 | +err := consumer.StartWorkersGroup(&rmq.ConsumeParams{Queue: "test"}, handler) |
| 59 | +// or use consumer.StartWorker(...) for single consuming process |
| 60 | +if err != nil { |
| 61 | + log.Fatal(err) |
| 62 | +} |
| 63 | +``` |
| 64 | +### Publisher: |
| 65 | +```golang |
| 66 | +// create a new publisher instance |
| 67 | +publisher := rmq.NewPublisher(connection, &rmq.PublisherConfig{ |
| 68 | + MaxChannelsCount: 10, // max pool channels count |
| 69 | +}) |
| 70 | +err = publisher.Init() |
| 71 | + |
| 72 | +if err != nil { |
| 73 | + log.Fatal(err) |
| 74 | +} |
| 75 | +err = publisher.Publish(context.TODO(), &rmq.PublishMessage{ |
| 76 | + ExchangeName: "main_exchange", |
| 77 | + RoutingKey: "main", |
| 78 | + Publishing: amqp.Publishing{ |
| 79 | + ContentType: "application/octet-stream", |
| 80 | + Body: []byte("test test"), |
| 81 | + }, |
| 82 | +}) |
| 83 | + |
| 84 | +if err != nil { |
| 85 | + log.Fatal(err) |
| 86 | +} |
0 commit comments