You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently there is no way to have a accurate backlog for a subscription:
you have only the number of "entries", not messages
server side filters (PIP-105) may filter out some messages
Having the number of entries is sometimes not enough because with batch messages the amount of work on the Consumers is proportional to the number of messages, that may vary from entry to entry.
Goal
The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and provide detailed information about that is expected to be delivered to Consumers.
The API will allow users to calculate the backlog since the latest unacked position (lastMarkDeletePosition) or since a given Position onwards.
The operation will be quite expensive because we have to load the messages from storage and pass them to the filters, but due to the dynamic nature of Pulsar subscriptions there is no other way to have this value.
One good strategy to do monitoring/alerting is to setup alerts on the usual "stats" and use this new API to inspect the subscription deeper, typically be issuing a manual command.
API Changes
internal ManagedCursor API:
CompletableFuture<ScanOutcome> scan(Optional<Position> startingPosition, Predicate<Entry> condition, long maxEntries, long timeOutMs);
This method scans the Cursor from the given position (or from lastMarkDelete position if startingPosition is not provided) to the tail.
There is a time limit and a maxEntries limit, these are needed in order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the processing for some reasons.
New REST API:
@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog Backlog")
@ApiOperation(value = "Analyze a subscription, by scanning all the unprocessed messages")
public void analyzeBacklog SubscriptionBacklog(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Position", required = false)
@QueryParam("position") String position,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
API response model:
public class AnalyzeSubscriptionBacklogResult {
private long entries;
private long messages;
private long filterRejectedEntries;
private long filterAcceptedEntries;
private long filterRescheduledEntries;
private long filterRejectedMessages;
private long filterAcceptedMessages;
private long filterRescheduledMessages;
private boolean aborted;
private Position startingPosition;
private Position lastScannedPosition;
The response contains "aborted=true" is the request has been aborted by some internal limitations, like a timeout or the scan hit the max number of entries.
We are not going to provide more details about the reason of the stop. It will make the API too detailed and harder to maintain. Also, in the logs of the broker you will find the details.
New PulsarAdmin API:
/**
* Analyze subscription backlog.
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
* and also Subscription filters.
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param position the position to start the scanning from
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, Position position)
throws PulsarAdminException;
/**
* Analyze subscription backlog.
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
* and also Subscription filters.
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param position the position to start the scanning from
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
CompletableFuture<AnaliseSubscriptionBacklogResult> analiseSubscriptionBacklogAsync(String topic,
String subscriptionName, Position position);
A pulsar-admin command will be added as well as usual.
New configuration entries in broker.conf:
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog"
)
private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog"
)
private long subscriptionBacklogScanMaxEntries = 10_000;
Implementation
The implementation is pretty straightforward:
add a new API in ManagedCursor to do the Scan
add the REST API
implement in PersistentSubscription a analiseBacklog method that does the scan
The the PersistentSubscription runs the scan:
it applies the filters if they are present
it considers individuallyDeletedMessages
Reject Alternatives
We could store somehow some counter about the number of logical messages during writes. But that does not work for a few reasons:
you cannot know which subscriptions will be created in a topic
subscription can be created from the past (Earliest)
subscription filters may change over time: they are usually configured using Subscription Properties, and those properties are dynamic
doing computations on the write path (like running filters) kills latency and thoughtput
Use a client to clone the subscription and consume data.
This doesn't work because you have to transfer the data to the client, and this is possibly a huge amount of work and a waste of resources.
The text was updated successfully, but these errors were encountered:
eolivelli
changed the title
PIP-187 Add API to analyse a subscription backlog and provide a accurate value
PIP-187 Add API to analyze a subscription backlog and provide a accurate value
Jul 14, 2022
Motivation
Currently there is no way to have a accurate backlog for a subscription:
Having the number of entries is sometimes not enough because with batch messages the amount of work on the Consumers is proportional to the number of messages, that may vary from entry to entry.
Goal
The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and provide detailed information about that is expected to be delivered to Consumers.
The API will allow users to calculate the backlog since the latest unacked position (lastMarkDeletePosition) or since a given Position onwards.
The operation will be quite expensive because we have to load the messages from storage and pass them to the filters, but due to the dynamic nature of Pulsar subscriptions there is no other way to have this value.
One good strategy to do monitoring/alerting is to setup alerts on the usual "stats" and use this new API to inspect the subscription deeper, typically be issuing a manual command.
API Changes
internal ManagedCursor API:
CompletableFuture<ScanOutcome> scan(Optional<Position> startingPosition, Predicate<Entry> condition, long maxEntries, long timeOutMs);
This method scans the Cursor from the given position (or from lastMarkDelete position if startingPosition is not provided) to the tail.
There is a time limit and a maxEntries limit, these are needed in order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the processing for some reasons.
New REST API:
API response model:
The response contains "aborted=true" is the request has been aborted by some internal limitations, like a timeout or the scan hit the max number of entries.
We are not going to provide more details about the reason of the stop. It will make the API too detailed and harder to maintain. Also, in the logs of the broker you will find the details.
New PulsarAdmin API:
A
pulsar-admin
command will be added as well as usual.New configuration entries in broker.conf:
Implementation
The implementation is pretty straightforward:
The the PersistentSubscription runs the scan:
Reject Alternatives
This doesn't work because you have to transfer the data to the client, and this is possibly a huge amount of work and a waste of resources.
The text was updated successfully, but these errors were encountered: