Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support Cumulative ack #183

Closed
adaiboy opened this issue Feb 11, 2020 · 10 comments
Closed

support Cumulative ack #183

adaiboy opened this issue Feb 11, 2020 · 10 comments
Assignees

Comments

@adaiboy
Copy link

adaiboy commented Feb 11, 2020

pulsar-client-go use CommandAck_Individual now, and we can subscrib a topic in exclusive, so add CommandAck_Cumulative is usefull.

and it seems not very difficult, may is it already in your scheduler?

@merlimat
Copy link
Contributor

The cumulative ack was left out on purpose and the reasoning was (in weighted order):

  • Keep things simple in API
  • Keep things simple in client impl
  • Cumulative ack doesn't provide much more performance/effciency
  • Sequential individual acking will be equivalent

@adaiboy
Copy link
Author

adaiboy commented Feb 13, 2020

The cumulative ack was left out on purpose and the reasoning was (in weighted order):

  • Keep things simple in API
  • Keep things simple in client impl
  • Cumulative ack doesn't provide much more performance/effciency
  • Sequential individual acking will be equivalent

i have doubts with the " doesn't provide much more peformance" than individual ack

  • Cumulative ack doesn't provide much more performance/effciency
  • Sequential individual acking will be equivalent

i set up a test cluster, with a persistent topic , just one subscription with exclusive type,
though some config in broker not be perfect, but i tuned the configure some days according to the source code
and i make a test with the test cluster
截屏2020-02-1316 09 59

first, i modified the consumer-series add AckCumulative to compare cumulative ack with individual ack and make a producer send msg with best effort
then, use cumulative consumer to consume entry , and cumulative ack when 10000 entries received, the consumer can catch the producer
lastly, clear all. restart the producer, but start a consumer with individual ack
just in the red frame, the consumer is much slower than producer

  • the produce speed is the green line
  • the consume speed is the yellow line

as every ack is a "rpc" with CommandAck, the reduce the ack cnt will be usefull in theory.

func consume(name string, c pulsar.Consumer) {
    ctx := context.Background()
    batch := 10000
    if *singleAck > 0 {
        log.Println("single ack")
    } else {
        log.Println("cumulative ack")
    }
    if *mode == "shared" || *singleAck > 0 {
        batch = 1
    }
    cnt := 0
    for {
        msg, err := c.Receive(ctx)
        if err != nil {
            log.Fatalf("%s Receive fail:%v\n", name, err)
        }
        atomic.AddInt64(&recvMsgCnt, 1)
        cnt++
        if cnt >= batch {
            if batch > 1 {
                c.AckCumulative(msg)
            } else {
                c.Ack(msg)
            }
            cnt = 0
        }
    }
}

@flowchartsman
Copy link
Contributor

flowchartsman commented Oct 11, 2021

I would like to chime in that there is a use-case this would be most useful: batch consumers. For example, if I want to make an every 6h/12h/24h report or bundle from a topic.

To do this with a normal subscription, I would need to retain a potentially-unbounded set of messages (or at least message IDs) in memory to ack them all when complete, which is wasteful. The other method (and the one I am currently using) is to use a reader, however this requires me to store my last offset in external state between job runs. With cumulative ack, I could accomplish this entirely within the pulsar ecosystem by simply resuming my subscription and then exiting the loop after an idle period or when I begin to receive messages within a certain small distance of job start time.

@merlimat
Copy link
Contributor

@flowchartsman In your example there would be no need to keep the message ids, you could use a Reader and do SeekByTime():

// Reset the subscription associated with this consumer to a specific message publish time.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
	// the individual partitions.
	//
	// @param timestamp
	//            the message publish time where to reposition the subscription
	//
	SeekByTime(time time.Time) error

@flowchartsman
Copy link
Contributor

Yes, you're right. I suppose I could just say "this job runs at somewhere around midnight and starts from 6pm. and then again at 6 am and starts from midnight" and then break when I reach messages on or after my "run" time. I wasn't correctly recalling the seek to time functionality because we had to avoid using it initially because of stability issues arising from starting with latestmessageid and seeking backwards, but if those are resolved now, you're right, that's probably best.

@xiaofan-luan
Copy link

Cumulative commit seems to be a reasonable feature.
In our scenario, we use a chan to handle all message. If one ack failed(in current pulsar go client ack didn't throw error, I saw some pr is fixing it), what is the expected behaviour?
Do we need to keep all the messages failed and retry later, that seems to be to much for a user

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Dec 6, 2022

Hi, @merlimat , I think comulative ack needs to be supported in go client.

Cumulative ack doesn't provide much more performance/effciency

In Java client, I think this point has no problem. Because in the default ack mode of Java client, the ack requests will be pushed to the pending queue and flush async. But in the go client, only one message will be acknowledged per acknowledgment request. Which means cumulative ack will save the ack rpc times.

messageIDs := make([]*pb.MessageIdData, 1)
messageIDs[0] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
EntryId: proto.Uint64(uint64(msgID.entryID)),
}
reqID := pc.client.rpcClient.NewRequestID()
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
}

Keep things simple in API

Just two more interface should be added.

 // Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
AckCumulative(Message) error

// Acknowledge the reception of all the messages in the stream up to (and including) the provided message, identified by its MessageID
AckIDCumulative(MessageID) error

Keep things simple in client impl

I think the implementation of cumulative is not complicated. It's just need to modify the ack command, which add AckType: pb.CommandAck_Cumulative will be fine.

Sequential individual acking will be equivalent

In some scenarios, the lack of cumulative ack will bring great trouble to users.

In our scenario, we use a chan to handle all message. If one ack failed(in current pulsar go client ack didn't throw error, I saw some pr is fixing it), what is the expected behaviour?
Do we need to keep all the messages failed and retry later, that seems to be to much for a user

As @xiaofan-luan shows, cumulative ack actually provides a way for users to decide when their messages are acknowledged by default. For example, a user need to consume 10000 messages continuously. The user may discard all previous messages because the 8000th message is wrong. In this scenario, if cumulative ack is introduced, the user can call cumulative ack directly with the 8000th message and start to receive new message sequence. Caching message ids here I don't think is a good idea.

If cumulative ack is considered to be introduced, I will implement it. @RobertIndie @nodece Could you give some comments?

Thanks.

@RobertIndie
Copy link
Member

Hi @Gleiphir2769 , Thanks for bringing this up. Overall, I'm +1 for implementing this feature.

But in the go client, only one message will be acknowledged per acknowledgment request. Which means cumulative ack will save the ack rpc times.

Right, supporting the cumulative ack will also improve the performance. But We could also implement the ack group tracker to aggregate serval ack(individual or cumulative) into one RPC. This could improve the performance of the individual ack.

For example, a user need to consume 10000 messages continuously. The user may discard all previous messages because the 8000th message is wrong. In this scenario, if cumulative ack is introduced, the user can call cumulative ack directly with the 8000th message and start to receive new message sequence. Caching message ids here I don't think is a good idea.

This case makes sense to me.

+1 for implementing this feature.

BewareMyPower pushed a commit that referenced this issue Jan 3, 2023
Master Issue: #183 

### Motivation

Cumulative acknowledgement is a useful feature. Users can use this feature to ack messages in the stream up to (and including) provided message. 

Issue #183 shows more details.

### Modifications

- Add two api `AckCumulative` and `AckIDCumulative` for `Consumer`.

``` golang

// AckCumulative the reception of all the messages in the stream up to (and including)
// the provided message.
AckCumulative(msg Message) error

// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
AckIDCumulative(msgID MessageID) error

```

- Add the `AckCumulative` and `AckIDCumulative` implementation for `consumer`, `multiTopicConsumer`, `regexConsumer` and `mockConsumer`.

- Add the unit test `TestConsumerNoBatchCumulativeAck` `TestConsumerBatchCumulativeAck` `TestCumulativeAckWithResponse` for cumulative ack in `consumer_test.go`.
@geniusjoe
Copy link
Contributor

Hi @Gleiphir2769 , Thanks for bringing this up. Overall, I'm +1 for implementing this feature.

But in the go client, only one message will be acknowledged per acknowledgment request. Which means cumulative ack will save the ack rpc times.

Right, supporting the cumulative ack will also improve the performance. But We could also implement the ack group tracker to aggregate serval ack(individual or cumulative) into one RPC. This could improve the performance of the individual ack.

For example, a user need to consume 10000 messages continuously. The user may discard all previous messages because the 8000th message is wrong. In this scenario, if cumulative ack is introduced, the user can call cumulative ack directly with the 8000th message and start to receive new message sequence. Caching message ids here I don't think is a good idea.

This case makes sense to me.

+1 for implementing this feature.

I think grouping acks feature has been supported in PR(#957)

@RobertIndie
Copy link
Member

Closed this as implemented in #903

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants