-
Notifications
You must be signed in to change notification settings - Fork 79
Description
I have a Spark structured stream that is using Qubole Kinesis connector 1.2.0 on a Kinesis stream with 2 shards with 2 day retention period.
These are my Qubole Kinesis configs:
streamName: '.......'
endpointUrl: 'https://kinesis.us-east-1.amazonaws.com'
startingPosition: 'latest'
failondataloss: false
kinesis.executor.maxFetchTimeInMs: 60000
kinesis.executor.maxFetchRecordsPerShard: 1000000
kinesis.executor.maxRecordPerRead: 10000
kinesis.executor.addIdleTimeBetweenReads: true
kinesis.executor.idleTimeBetweenReadsInMs: 1000
kinesis.client.describeShardInterval: '1s'
kinesis.client.numRetries: 10
kinesis.client.retryIntervalMs: 3000
kinesis.client.maxRetryIntervalMs: 10000
kinesis.client.avoidEmptyBatches: false
Here's the issue I'm having... imagine a few records are pushed at time 00:00:00, then next set of records at 20:00:00. There's about a 20 hour gap between the 2 sets of records in the Kinesis stream. After this fix 5bd378b was introduced in 1.2.0, I ran into an issue where Spark is unable to fetch the later records from time 20:00:00 due to the long gap of no activity. It seems removing idle time between reads and increasing max fetch time helps in getting the later set of records. When I switch back to 1.1.4, then the provided Qubole Kinesis configs from above works, but I notice that it doesn't honor max fetch time and so Spark spends more time trying to get to the tip and may block other spark jobs until this gets done.
What's the recommended approach in this case? Using timestamp as offset may work better?