Skip to content

[PIP 106] Broker extensions to provide operators of enterprise-wide clusters better control and flexibility #12858

Closed
@madhavan-narayanan

Description

@madhavan-narayanan

Motivation

The central messaging platform at Intuit uses Apache Pulsar. The platform team operates multiple clusters that are used by hundreds of teams across Intuit. For complete visibility and to better serve customers, the platform needs the ability to intercept all key broker and ledger events. Also needed is the ability to transparently control the format of messages that get persisted in the disk store

Goal

The scope of this proposal is limited to broker events and operations. This PIP addresses only the traceability/interceptability at the broker level. To achieve end-to-end tracing, it is also desirable to intercept events at the pulsar proxy level (for topic lookup, ownership assignment flow etc), but that can be handled in a separate PIP.

API Changes

Would like to propose the following solution

  • Extend the existing interface org.apache.pulsar.broker.intercept.BrokerInterceptor to support the following granular events with all relevant context information
    void onConnectionCreated(ServerCnx cnx);
    void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata);
    void void consumerCreated(ServerCnx cnx,Consumer consumer, Map<String, String> metadata);
    void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,long entryId, Rate rateIn, Topic.PublishContext publishContext);
    void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,long entryId, ByteBuf headersAndPayload);
    void void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd);

  • Support a new interface org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor to allow interception of write and read operations of a managed ledger and modify the payload. The interface details are given in the next section

  • Support dynamic load of managed ledger interceptor implementations through a broker configuration parameter 'brokerEntryPayloadProcessors' in class org.apache.pulsar.broker.ServiceConfiguration
    Set<String> brokerEntryPayloadProcessors;

Implementation

  • interface BrokerInterceptor should be extended to include the additional callback methods specified in the section above.

  • The new callback methods need to be invoked at appropriate places in pulsar-broker module (in classes ServerCnx, Producer, Consumer)

  • A new interface ManagedLedgerPayloadProcessor to be added with the following content
    image

  • A new configuration parameter 'brokerEntryPayloadProcessors' should be supported in broker.conf. This can be a list of processors

  • The existing class org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl should be extended to support 2 additional operations.
    processPayloadBeforeLedgerWrite(OpAddEntry op, ByteBuf ledgerData)
    processPayloadBeforeEntryCache(Bytebuf ledgerData)
    ManagedLedgerInterceptorImpl should internally use ManagedLedgerPayloadProcessor instance(s) to handle the above payload processing operations.

  • OpAddEntry should use the method ManagedLedgerInterceptor::processPayloadBeforeLedgerWrite to support processing of the payload before it gets written to the ledger

  • EntryCacheManager (and EntryCacheImpl) should use method ManagedLedgerInterceptor::processPayloadBeforeEntryCache to process the payload immediately after it is read from the ledger

Reject Alternatives

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions