From ff4e2cbb62c13a77751c72ae1ebec08d92f11d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 11 Dec 2024 15:26:30 +0100 Subject: [PATCH] Use locks instead of synchronized blocks in producer coordinator --- .../stream/impl/ProducersCoordinator.java | 210 ++++++++++-------- 1 file changed, 117 insertions(+), 93 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 860a924bc2..62c98434fb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -410,6 +410,7 @@ private static class ProducerTracker implements AgentTracker { private volatile byte publisherId; private volatile ClientProducersManager clientProducersManager; private final AtomicBoolean recovering = new AtomicBoolean(false); + private final Lock trackerLock = new ReentrantLock(); private ProducerTracker( long uniqueId, String reference, String stream, StreamProducer producer) { @@ -421,10 +422,12 @@ private ProducerTracker( @Override public void assign(byte producerId, Client client, ClientProducersManager manager) { - synchronized (ProducerTracker.this) { - this.publisherId = producerId; - this.clientProducersManager = manager; - } + lock( + this.trackerLock, + () -> { + this.publisherId = producerId; + this.clientProducersManager = manager; + }); this.producer.setPublisherId(producerId); this.producer.setClient(client); } @@ -451,9 +454,7 @@ public String stream() { @Override public void unavailable() { - synchronized (ProducerTracker.this) { - this.clientProducersManager = null; - } + lock(this.trackerLock, () -> this.clientProducersManager = null); this.producer.unavailable(); } @@ -464,10 +465,14 @@ public void running() { } @Override - public synchronized void cancel() { - if (this.clientProducersManager != null) { - this.clientProducersManager.unregister(this); - } + public void cancel() { + lock( + this.trackerLock, + () -> { + if (this.clientProducersManager != null) { + this.clientProducersManager.unregister(this); + } + }); } @Override @@ -503,6 +508,7 @@ private static class TrackingConsumerTracker implements AgentTracker { private final StreamConsumer consumer; private volatile ClientProducersManager clientProducersManager; private final AtomicBoolean recovering = new AtomicBoolean(false); + private final Lock trackerLock = new ReentrantLock(); private TrackingConsumerTracker(long uniqueId, String stream, StreamConsumer consumer) { this.uniqueId = uniqueId; @@ -512,9 +518,7 @@ private TrackingConsumerTracker(long uniqueId, String stream, StreamConsumer con @Override public void assign(byte producerId, Client client, ClientProducersManager manager) { - synchronized (TrackingConsumerTracker.this) { - this.clientProducersManager = manager; - } + lock(this.trackerLock, () -> this.clientProducersManager = manager); this.consumer.setTrackingClient(client); } @@ -540,9 +544,7 @@ public String stream() { @Override public void unavailable() { - synchronized (TrackingConsumerTracker.this) { - this.clientProducersManager = null; - } + lock(this.trackerLock, () -> this.clientProducersManager = null); this.consumer.unavailable(); } @@ -553,10 +555,8 @@ public void running() { } @Override - public synchronized void cancel() { - if (this.clientProducersManager != null) { - this.clientProducersManager.unregister(this); - } + public void cancel() { + lock(this.trackerLock, () -> this.clientProducersManager.unregister(this)); } @Override @@ -599,6 +599,7 @@ private class ClientProducersManager implements Comparable> streamToTrackers = new ConcurrentHashMap<>(); private final Client client; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Lock managerLock = new ReentrantLock(); private ClientProducersManager( Broker targetNode, @@ -670,7 +671,8 @@ private ClientProducersManager( "Received metadata notification for '{}', stream is likely to have become unavailable", stream); Set affectedTrackers; - synchronized (ClientProducersManager.this) { + this.managerLock.lock(); + try { affectedTrackers = streamToTrackers.remove(stream); LOGGER.debug( "Affected publishers and consumer trackers after metadata update: {}", @@ -686,6 +688,8 @@ private ClientProducersManager( } }); } + } finally { + this.managerLock.unlock(); } if (affectedTrackers != null && !affectedTrackers.isEmpty()) { environment @@ -840,79 +844,97 @@ private void recoverAgent(Broker node, List candidates, AgentTrac } } - private synchronized void register(AgentTracker tracker) { - if (this.isFullFor(tracker)) { - throw new IllegalStateException("Cannot add subscription tracker, the manager is full"); - } - if (this.isClosed()) { - throw new IllegalStateException("Cannot add subscription tracker, the manager is closed"); - } - checkNotClosed(); - if (tracker.identifiable()) { - ProducerTracker producerTracker = (ProducerTracker) tracker; - int index = pickSlot(this.producers, producerTracker, this.producerIndexSequence); - this.checkNotClosed(); - Response response = - callAndMaybeRetry( - () -> - this.client.declarePublisher( - (byte) index, tracker.reference(), tracker.stream()), - RETRY_ON_TIMEOUT, - "Declare publisher request for publisher %d on stream '%s'", - producerTracker.uniqueId(), - producerTracker.stream()); - if (response.isOk()) { - tracker.assign((byte) index, this.client, this); - } else { - String message = - "Error while declaring publisher: " - + formatConstant(response.getResponseCode()) - + ". Could not assign producer to client."; - LOGGER.info(message); - throw new StreamException(message, response.getResponseCode()); - } - producers.put(tracker.id(), producerTracker); - } else { - tracker.assign((byte) 0, this.client, this); - trackingConsumerTrackers.add(tracker); - } - streamToTrackers - .computeIfAbsent(tracker.stream(), s -> ConcurrentHashMap.newKeySet()) - .add(tracker); + private void register(AgentTracker tracker) { + lock( + this.managerLock, + () -> { + if (this.isFullFor(tracker)) { + throw new IllegalStateException( + "Cannot add subscription tracker, the manager is full"); + } + if (this.isClosed()) { + throw new IllegalStateException( + "Cannot add subscription tracker, the manager is closed"); + } + checkNotClosed(); + if (tracker.identifiable()) { + ProducerTracker producerTracker = (ProducerTracker) tracker; + int index = pickSlot(this.producers, producerTracker, this.producerIndexSequence); + this.checkNotClosed(); + Response response = + callAndMaybeRetry( + () -> + this.client.declarePublisher( + (byte) index, tracker.reference(), tracker.stream()), + RETRY_ON_TIMEOUT, + "Declare publisher request for publisher %d on stream '%s'", + producerTracker.uniqueId(), + producerTracker.stream()); + if (response.isOk()) { + tracker.assign((byte) index, this.client, this); + } else { + String message = + "Error while declaring publisher: " + + formatConstant(response.getResponseCode()) + + ". Could not assign producer to client."; + LOGGER.info(message); + throw new StreamException(message, response.getResponseCode()); + } + producers.put(tracker.id(), producerTracker); + } else { + tracker.assign((byte) 0, this.client, this); + trackingConsumerTrackers.add(tracker); + } + streamToTrackers + .computeIfAbsent(tracker.stream(), s -> ConcurrentHashMap.newKeySet()) + .add(tracker); + }); } - private synchronized void unregister(AgentTracker tracker) { - LOGGER.debug( - "Unregistering {} {} from manager on {}", tracker.type(), tracker.uniqueId(), this.name); - if (tracker.identifiable()) { - producers.remove(tracker.id()); - } else { - trackingConsumerTrackers.remove(tracker); - } - streamToTrackers.compute( - tracker.stream(), - (s, trackersForThisStream) -> { - if (s == null || trackersForThisStream == null) { - // should not happen - return null; + private void unregister(AgentTracker tracker) { + lock( + this.managerLock, + () -> { + LOGGER.debug( + "Unregistering {} {} from manager on {}", + tracker.type(), + tracker.uniqueId(), + this.name); + if (tracker.identifiable()) { + producers.remove(tracker.id()); } else { - trackersForThisStream.remove(tracker); - return trackersForThisStream.isEmpty() ? null : trackersForThisStream; + trackingConsumerTrackers.remove(tracker); } + streamToTrackers.compute( + tracker.stream(), + (s, trackersForThisStream) -> { + if (s == null || trackersForThisStream == null) { + // should not happen + return null; + } else { + trackersForThisStream.remove(tracker); + return trackersForThisStream.isEmpty() ? null : trackersForThisStream; + } + }); + closeIfEmpty(); }); - closeIfEmpty(); } - synchronized boolean isFullFor(AgentTracker tracker) { - if (tracker.identifiable()) { - return producers.size() == maxProducersByClient; - } else { - return trackingConsumerTrackers.size() == maxTrackingConsumersByClient; - } + boolean isFullFor(AgentTracker tracker) { + return lock( + this.managerLock, + () -> { + if (tracker.identifiable()) { + return producers.size() == maxProducersByClient; + } else { + return trackingConsumerTrackers.size() == maxTrackingConsumersByClient; + } + }); } - synchronized boolean isEmpty() { - return producers.isEmpty() && trackingConsumerTrackers.isEmpty(); + boolean isEmpty() { + return lock( + this.managerLock, () -> producers.isEmpty() && trackingConsumerTrackers.isEmpty()); } private void checkNotClosed() { @@ -930,13 +952,15 @@ boolean isClosed() { private void closeIfEmpty() { if (!closed.get()) { - synchronized (this) { - if (this.isEmpty()) { - this.close(); - } else { - LOGGER.debug("Not closing producer manager {} because it is not empty", this.id); - } - } + lock( + this.managerLock, + () -> { + if (this.isEmpty()) { + this.close(); + } else { + LOGGER.debug("Not closing producer manager {} because it is not empty", this.id); + } + }); } }