-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit on concurrency #57
Comments
The rabbitmq-cli-Consumer does not deal with concurrency. It processes one message at a time. If you need concurrency, you can spin up as mutch instances as you need. |
@corvus-ch Would you be willing to accept a PR that implements handling of concurrency? We currently have a PHP version of what |
Dealing with concurrency is a complexity I tried to keep out of the project. The way it is designed is to delegate it to a suitable process supervisor. With Kubernetes in mind (and similar platforms for that matter), I can see why this might be helpful from an orchestrationanl point of view. If I would accept such a PR, the following requirements must be met:
With those two things in mind, feel free to go ahead and provide PR. |
@corvus-ch Working through the code I'm wondering why the Shouldn't the responsibility of ack/nack'ing be lying on the consumer? I would imagine the processor returns success/failure and the consumer will ack/nack based on that. |
@corvus-ch I have something that is working to some extent, but I'm not sure where that exception is coming from.
|
@estahn Can you do a pull request with your changes so I can have a look? Searching for that error on the web, indicates that your code might be mixing up messages and acknowledgments and or that sharing channels across threads/(go functions?) is an issue. Without looking at the code, it is not possible for me to tell what the issue is. |
@corvus-ch Will do shortly. I think the issue is double acking of the message (streadway/amqp#83). |
Just some comments:
|
In order to find what the issue with the unknown delivery tag is, I have hacked together a minimal example. package main
import (
"flag"
"log"
"github.com/streadway/amqp"
)
var (
uri = flag.String("uri", "amqp://guest:guest@localhost", "AMQP URI")
queue = flag.String("queue", "myqueue", "Ephemeral AMQP queue name")
ctag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
)
func init() {
flag.Parse()
}
func main() {
conn, err := amqp.Dial(*uri)
if err != nil {
log.Fatalf("dial: %v", err)
return
}
channel, err := conn.Channel()
if err != nil {
log.Fatalf("channel: %v", err)
return
}
queue, err := channel.QueueDeclare(*queue, true, false, false, false, nil)
if err != nil {
log.Fatalf("queue Declare: %v", err)
return
}
if err := channel.Qos(5, 0, true); err != nil {
log.Fatalf("qos: %v", err)
return
}
deliveries, err := channel.Consume(queue.Name, *ctag, false, false, false, false, nil)
if err != nil {
log.Fatalf("queue Consume: %v", err)
return
}
done := make(chan error)
for i := 0; i < 10; i++ {
go handle(i, deliveries, done)
}
if err := <- done; err != nil {
log.Fatalf("error during message consumption: %v", err)
}
}
func handle(i int, deliveries <-chan amqp.Delivery, done chan error) {
for d := range deliveries {
log.Printf("[%d] got %dB delivery: [%v] %q", i, len(d.Body), d.DeliveryTag, d.Body)
if err := d.Ack(false); err != nil {
done <- err
}
}
done <- nil
} The issue lies with the message acknowledgement. The boolean passed to To fix this, the boolean in https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L28 and https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L33 needs to be changed to false. |
@corvus-ch Yay! Nice job tracking this down! Docu: https://github.com/streadway/amqp/blob/master/delivery.go#L113-L115 I'll add the adjustments. |
Hi all - I also needed an option for message processing concurrency, and wanted to implement it using multiple RabbitMQ channels to be as close as possible to how our project handles RabbitMQ in other languages (we have consumers in node and python, as well as PHP). This seems to be the canonical way to handle multiple concurrent consumers in a single client in RabbitMQ. |
@corvus-ch What is the intent of the mutex in processor.Process? After further testing, my multi-channel solution is fetching messages in parallel, but only processing one at a time because each call has to wait for the mutex to free up. The simple solution is to remove the mutex, but that could obviously cause issues that I'm not seeing. The more complicated solution would be to have a separate processor per channel, but I don't want to pursue that solution if there's a simpler one. |
@corvus-ch I think I answered my own question - Processor had a single os.Cmd that it was re-using for each new call. I just pushed an update to my PR that uses a new os.Cmd for each call to Process instead. |
What's the limit on concurrency? If we have 10k messages in the queue, will this spin-up 10k PHP processes at the same time?
The text was updated successfully, but these errors were encountered: