Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit on concurrency #57

Open
estahn opened this issue Feb 22, 2019 · 13 comments
Open

Limit on concurrency #57

estahn opened this issue Feb 22, 2019 · 13 comments

Comments

@estahn
Copy link
Collaborator

estahn commented Feb 22, 2019

What's the limit on concurrency? If we have 10k messages in the queue, will this spin-up 10k PHP processes at the same time?

@corvus-ch
Copy link
Owner

The rabbitmq-cli-Consumer does not deal with concurrency. It processes one message at a time. If you need concurrency, you can spin up as mutch instances as you need.

@estahn
Copy link
Collaborator Author

estahn commented Feb 22, 2019

@corvus-ch Would you be willing to accept a PR that implements handling of concurrency? We currently have a PHP version of what rabbitmq-cli-consumer does, which basically runs sequentially as well. We upscale our Kubernetes pods based on the queue length, but that seems all a bit wasteful.

@corvus-ch
Copy link
Owner

Dealing with concurrency is a complexity I tried to keep out of the project. The way it is designed is to delegate it to a suitable process supervisor. With Kubernetes in mind (and similar platforms for that matter), I can see why this might be helpful from an orchestrationanl point of view.

If I would accept such a PR, the following requirements must be met:

  1. The default must be the current behaviour.
  2. It must be possible to keep the log output for different messageages apart. Here I refer to the STDOUT and STDERR captured from the script processing the message.

With those two things in mind, feel free to go ahead and provide PR.

@estahn
Copy link
Collaborator Author

estahn commented Feb 26, 2019

@corvus-ch Working through the code I'm wondering why the func (p *processor) Process is given the message as type delivery.Delivery instead of just the plain message.

Shouldn't the responsibility of ack/nack'ing be lying on the consumer? I would imagine the processor returns success/failure and the consumer will ack/nack based on that.

@estahn
Copy link
Collaborator Author

estahn commented Feb 27, 2019

@corvus-ch I have something that is working to some extent, but I'm not sure where that exception is coming from.

2019/02/27 14:41:50 Connecting RabbitMQ...
2019/02/27 14:41:50 Connected.
2019/02/27 14:41:50 Opening channel...
2019/02/27 14:41:50 Done.
2019/02/27 14:41:50 Setting QoS...
2019/02/27 14:41:50 Succeeded setting QoS.
2019/02/27 14:41:50 Declaring queue "foobar"...
2019/02/27 14:41:50 Registering consumer...
2019/02/27 14:41:50 Succeeded registering consumer.
2019/02/27 14:41:50 Processing messages with 3 workers.
2019/02/27 14:41:50 Waiting for messages...
2019/02/27 14:41:50 [Worker 2] Processing message...
2019/02/27 14:41:50 [Worker 0] Processing message...
2019/02/27 14:41:50 [Worker 1] Processing message...
hellohello2019/02/27 14:42:00 [Worker 0] Processed!
2019/02/27 14:42:00 [Worker 0] Processing message...
hello2019/02/27 14:42:00 [Worker 2] Processed!
2019/02/27 14:42:00 [Worker 2] Processing message...
2019/02/27 14:42:00 [Worker 1] Processed!
2019/02/27 14:42:00 Exception (406) Reason: "PRECONDITION_FAILED - unknown delivery tag 1"

@corvus-ch
Copy link
Owner

@estahn Can you do a pull request with your changes so I can have a look?

Searching for that error on the web, indicates that your code might be mixing up messages and acknowledgments and or that sharing channels across threads/(go functions?) is an issue. Without looking at the code, it is not possible for me to tell what the issue is.

@estahn
Copy link
Collaborator Author

estahn commented Feb 27, 2019

@corvus-ch Will do shortly. I think the issue is double acking of the message (streadway/amqp#83).

@estahn
Copy link
Collaborator Author

estahn commented Feb 27, 2019

Just some comments:

  • I have removed the mutex from the processor as I think it is not necessary if you move variables into local scope. If you want to keep the mutex then the current solution isn't working.
  • There are sporadic issues manifesting in unknown delivery tag 1. Possibly double ack-ing the message.
  • I would even go as far and remove the responsibility of ack/nack from the processor.

@corvus-ch
Copy link
Owner

In order to find what the issue with the unknown delivery tag is, I have hacked together a minimal example.

package main

import (
	"flag"
	"log"

	"github.com/streadway/amqp"
)

var (
	uri   = flag.String("uri", "amqp://guest:guest@localhost", "AMQP URI")
	queue = flag.String("queue", "myqueue", "Ephemeral AMQP queue name")
	ctag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
)

func init() {
	flag.Parse()
}

func main() {
	conn, err := amqp.Dial(*uri)
	if err != nil {
		log.Fatalf("dial: %v", err)
		return
	}

	channel, err := conn.Channel()
	if err != nil {
		log.Fatalf("channel: %v", err)
		return
	}

	queue, err := channel.QueueDeclare(*queue, true, false, false, false, nil)
	if err != nil {
		log.Fatalf("queue Declare: %v", err)
		return
	}

	if err := channel.Qos(5, 0, true); err != nil {
		log.Fatalf("qos: %v", err)
		return
	}

	deliveries, err := channel.Consume(queue.Name, *ctag, false, false, false, false, nil)
	if err != nil {
		log.Fatalf("queue Consume: %v", err)
		return
	}

	done := make(chan error)

	for i := 0; i < 10; i++ {
		go handle(i, deliveries, done)
	}

	if err := <- done; err != nil {
		log.Fatalf("error during message consumption: %v", err)
	}
}

func handle(i int, deliveries <-chan amqp.Delivery, done chan error) {
	for d := range deliveries {
		log.Printf("[%d] got %dB delivery: [%v] %q", i, len(d.Body), d.DeliveryTag, d.Body)
		if err := d.Ack(false); err != nil {
			done <- err
		}
	}
	done <- nil
}

The issue lies with the message acknowledgement. The boolean passed to Ack() is set to true the current and all prior unacknowledged messages will be acknowledged. With concurrency in place, this can lead to a situation where a worker tries to acknowledge a message which was already acknowledged by another worker. This occurs if worker A is processing message 1 and worker B is processing message 2 and worker B finishes before A. Once worker A tries to acknowledge its message, the error occurs.

To fix this, the boolean in https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L28 and https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L33 needs to be changed to false.

@estahn
Copy link
Collaborator Author

estahn commented Mar 3, 2019

@corvus-ch Yay! Nice job tracking this down!

Docu: https://github.com/streadway/amqp/blob/master/delivery.go#L113-L115

I'll add the adjustments.

@bclougherty
Copy link

Hi all - I also needed an option for message processing concurrency, and wanted to implement it using multiple RabbitMQ channels to be as close as possible to how our project handles RabbitMQ in other languages (we have consumers in node and python, as well as PHP). This seems to be the canonical way to handle multiple concurrent consumers in a single client in RabbitMQ.

@bclougherty
Copy link

@corvus-ch What is the intent of the mutex in processor.Process? After further testing, my multi-channel solution is fetching messages in parallel, but only processing one at a time because each call has to wait for the mutex to free up. The simple solution is to remove the mutex, but that could obviously cause issues that I'm not seeing. The more complicated solution would be to have a separate processor per channel, but I don't want to pursue that solution if there's a simpler one.

@bclougherty
Copy link

@corvus-ch I think I answered my own question - Processor had a single os.Cmd that it was re-using for each new call. I just pushed an update to my PR that uses a new os.Cmd for each call to Process instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants