Skip to content

Conversation

streadway
Copy link
Owner

This is not an issue but rather a heads up.

There is a new feature in RabbitMQ nightlies that requires client support. Java, .NET, Erlang and Ruby (Bunny, amqp gem) already support it their development branches, with librabbitmq-c and 2 Python clients having some progress on it.

It would be nice if the Go client had support for it before 3.2 comes out (which won't happen for a few more months). I'd be happy to help with it but unfortunately not in the near future.

Sean Treadway added 2 commits August 10, 2013 18:09
Following the [json
spec](http://hg.rabbitmq.com/rabbitmq-codegen/file/tip/amqp-rabbitmq-0.9.1.json#l153)
add these two methods.  The client needs to advertise support in the
connection extensions for these methods to be sent.
RabbitMQ delivers this method when it activates and deactivates TCP
pushback on all connections due to a limited global resource.
@streadway
Copy link
Owner

Thanks for the heads up.

Do you have a link to any discussion around these extensions? I imagine the use case is for server telemetry to identify whether the server is slow or has begun pushback, but what is the expected behavior of the client when it does receive a connection.blocked method?

@michaelklishin
Copy link
Collaborator Author

If the app receiving connection.blocked can control publishing, it should suspend it until the connection is unblocked. If not, at least log a warning to make sure ops notice that consumers do not keep up with producers.

@michaelklishin
Copy link
Collaborator Author

Note that connections are blocked only of a Rabbit node is low on a resource (RAM, disk or # of available file descriptors) and you try to publish a message. Connections that only consume are not blocked.

@streadway
Copy link
Owner

How would these new methods relate to client and server heartbeats? For example, should the client lib disable the read/write deadline on the TCP connection or heartbeat checks while the connection is blocked?

Also, this library presents a blocking interface to both synchronous and asynchronous methods, which could cause a deadlock when responding to connection.blocked methods while delivering a stream of basic.publish methods. Imagine the following order of events:

S -> C: connection.blocked -> in flight
(packet loss or other event causing TCP C->S resync)
S: TCP no longer read.
(packet loss continues)
C: basic.publish -> blocked on full sndbuf that is no longer draining
(packet loss clears, S gets previous packet.  Does S ACK when blocked to drain the client sendbuf?)
C: read connection.blocked

In this library, when the sender is blocked on a basic.publish, the pushback is already effective and uninterruptible up to the application. Even if the application multiplexes sends of basic.publish and receives of connection.blocked messages, I struggle to see how the ordering can be guaranteed not to cause a deadlock so that basic.publish is never sent on a blocked TCP session.

In Go, I imagine the application to use something like this to behave properly:

func publish(conn *amqp.Connection, pubs chan amqp.Publishing) {
  sends := pubs
  chan, _ := conn.Channel()
  blocks := conn.NotifyBlocked(make(chan amqp.Blocking))

  for {
    select {
      case b := <-blocks:
        if b.Active {
          sends = nil
        } else {
          sends = pubs
        }
      case pub := <-sends: // a nil chan will never communicate
        chan.Publish(..., pub)
    }
  }
}

func main() {
  pubs := make(amqp.Publishing)
  go publish(setupConnection(), pubs)

  for work := range doWork() {
    pubs <- amqp.Publishing{
      Body: work.Result,
    }
  }
}

But... if the order, as described above happens, the select loop will be blocked on the Publish before the connection.blocked message will be selectable to stop trying to fill the client's TCP send buffer.

I feel like I missing something. Can you point me to anything that goes into more depth on how the connections are throttled in relation to when and how the connection.blocked method is sent, particularly on what other clients are doing?

Thanks!

@michaelklishin
Copy link
Collaborator Author

Heartbeats were originally added as a way to "undo" TCP redelivery by allowing clients to detect failed connections quicker. They are orthogonal to blocking.

Other clients simply invoke the provided callaback(s). RabbitMQ throttles connections by not reading from the socket until all alarms clear and it's time to unblock. Does this answer your question?

@michaelklishin
Copy link
Collaborator Author

Other clients do not perform throttling. I think future versions of some clients can do staged writes, but I don't have any good ideas for in-client throttling.

One idea that has been discussed if clients should raise exceptions
on an attempt to publish something on a blocked connection. However, this is not possible in practice because
there may be already a blocked basic.publish call that's waiting on a socket with filled buffers. So far none of the clients do this.

@michaelklishin
Copy link
Collaborator Author

What are outstanding questions on this?

@streadway
Copy link
Owner

I'm pretty sure I understand the methods. I hadn't merged the implementation because I was waiting for any further discoveries to emerge.

I believe these methods are mostly relevant for asynchronous clients because this lib is designed to propagate pushback for flow control. I would only use the blocked methods for logging as I believe there is a race condition between when the congestion window is closed by the server and when the blocked method arrives.

@michaelklishin
Copy link
Collaborator Author

They are primarly useful for logging.

@streadway
Copy link
Owner

It's just an integration test missing for this PR. The implementation seemed straightforward.

@michaelklishin
Copy link
Collaborator Author

Is there a good way to exclude a group of tests?

I'm happy to port the integration tests we have in other clients but it currently requires running against RabbitMQ tip.

@streadway
Copy link
Owner

I'd appreciate a port of the other test.

Is there a good way to exclude a group of tests?

There is already an integration tag that gets built when a server can be found at the AMQP_URL environment variable.

Use go test -tags integration to build and run the tests found in the integration_test.go file.

The lifecycle of the server isn't currently controlled from the integration tests. Either the lifecycle or tuning of the server would likely be needed for each test.

The tests are not being run in parallel.

@michaelklishin
Copy link
Collaborator Author

@streadway I'm a little stretched on time before RabbitMQ 3.2 ships (should be early next week but don't quote me on this) so perhaps we should merge this and add a test after 3.2 GA is available.

On the bright side, a good chunk of my OSS time budget is spent porting RabbitMQ tutorials code to Go ;)

@streadway
Copy link
Owner

Great news about the tutorials!

I'll merge and look at general test improvements later.

streadway pushed a commit that referenced this pull request Oct 17, 2013
Support connection.blocked (coming in RabbitMQ 3.2)
@streadway streadway merged commit 7e47df8 into master Oct 17, 2013
@streadway streadway deleted the 75-connection.blocked branch October 17, 2013 16:24
@michaelklishin michaelklishin mentioned this pull request Apr 8, 2019
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants