Skip to content

Conversation

@chadlagore
Copy link
Contributor

@chadlagore chadlagore commented Aug 26, 2020

Fixes bug #87.

This condition causes an infinite loop and throttling from AWS when the shard is empty. In my example on the ticket, I show it hitting the Kinesis API iteratively for minutes before giving up on the shard. We should more gracefully handle empty shards which admit no lastReadSequenceNumber no matter how many times you hit them sequentially.

Moreover, I believe the condition is unnecessary, because one can just increase maxReadTimeInMs if you want to spend a longer time reading on the shard.

Copy link

@elainearbaugh elainearbaugh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me. I don't quite understand the comment about stuck in the loop where we have data near the tip of the stream but we are not spending enough time to read it -- why would reading data at the beginning be any different than elsewhere?

@chadlagore
Copy link
Contributor Author

chadlagore commented Sep 2, 2020

Agreed. Here is an example of a consumer that is working from a set of empty shards running this code (these are the persisted checkpoint offsets):

# batch3 (no data)
v1
{"batchWatermarkMs":0,"batchTimestampMs":1599006537913,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"3"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}
{"metadata":{"streamName":"stream_name","batchId":"3"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}

# batch 4 (data appears)
v1
{"batchWatermarkMs":1599005056342,"batchTimestampMs":1599006671256,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"4"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49609690170106349127857093631690192845261414844357148674"}}
{"metadata":{"streamName":"stream_name","batchId":"4"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}

# batch 5
v1
{"batchWatermarkMs":1599005056342,"batchTimestampMs":1599006691777,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"5"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49609690170106349127857093631690192845261414844357148674"}}
{"metadata":{"streamName":"stream_name","batchId":"5"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}

Notice at batch 4, data appears and we move to the sequence number of the incoming data.

@itsvikramagr
Copy link
Contributor

itsvikramagr commented Sep 2, 2020

@chadlagore @elainearbaugh

In the code, I am handling the following scenario - Say we have started reading from time_horizon. So we need to make multiple get-records API calls to reach to a point where kinesis has data in it. (unfortunately unlike other data sources, Kinesis streams won't give the first available record in 1 API call). And if we dont reach the point where kinesis has data on it within the specified maxFetchTimeInMs, we will have a similar problem in the next micro-batch which will again start reading from trim_horizon. And this loop with continue and we would not process any data from that particular stream.

I agree that the current approach violates the meaning of maxFetchTimeInMs and will lead to AWS throttling when we are already on the tip of the stream and there is no new data to read.

Do you have any good ideas in handling the above-mentioned scenario?

@itsvikramagr itsvikramagr merged commit d64907a into qubole:2.4.0 Sep 7, 2020
@chadlagore
Copy link
Contributor Author

Thanks for merging this @itsvikramagr. I believe the answer to your question is that in a call at TRIM_HORIZON (or any timestamp prior to the retention period), and with an otherwise empty stream, Kinesis is still able to return the new record at the tip of the stream, then this library moves the sequence number to the that point. The > 1 call GetRecords API Kinesis has is a bit odd, but I think that is characteristic of Kinesis addressed by the maxFetchTimeInMs param.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants