-
Notifications
You must be signed in to change notification settings - Fork 471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Internally construct and use stream ARNs for all streams in multi-stream mode #1318
Internally construct and use stream ARNs for all streams in multi-stream mode #1318
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally lgtm, let's double check with @akidambisrinivasan though
* @return The {@link Arn} of the Kinesis stream. | ||
*/ | ||
public static Arn constructStreamArn( | ||
@NonNull final Region region, final long accountId, @NonNull final String streamName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the accountId just be a String value instead? StreamIdentifier uses accountId as a string. Would avoid any extra parsing we have to do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a long
for the accountId
can lead to truncation issues for accounts with leading zeros,
so this is actually a bug I think.
Thanks for the callout!
final Optional<Arn> streamArnOptional = streamConfig.streamIdentifier().streamArnOptional(); | ||
if (streamArnOptional.isPresent()) { | ||
Validate.isTrue(kinesisRegion.id().equals(streamArnOptional.get().region().get()), | ||
"The provided streamARN " + streamArnOptional.get() | ||
+ " does not match the Kinesis client's configured region - " + kinesisRegion); | ||
return super.put(streamIdentifier, streamConfig); | ||
} | ||
|
||
if (isMultiStreamMode) { | ||
return super.put(streamIdentifier, withStreamArn(streamConfig, kinesisRegion)); | ||
} | ||
|
||
return super.put(streamIdentifier, streamConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isPresent()
followed by get
is an anti-pattern.
Instead use map
or ifPresent
as appropriate. In this case, something like
final Optional<Arn> streamArnOptional = streamConfig.streamIdentifier().streamArnOptional(); | |
if (streamArnOptional.isPresent()) { | |
Validate.isTrue(kinesisRegion.id().equals(streamArnOptional.get().region().get()), | |
"The provided streamARN " + streamArnOptional.get() | |
+ " does not match the Kinesis client's configured region - " + kinesisRegion); | |
return super.put(streamIdentifier, streamConfig); | |
} | |
if (isMultiStreamMode) { | |
return super.put(streamIdentifier, withStreamArn(streamConfig, kinesisRegion)); | |
} | |
return super.put(streamIdentifier, streamConfig); | |
StreamConfig cfg = streamConfig.streamIdentifier().streamArnOptional().map(streamArn -> { | |
Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()), | |
"The provided streamARN " + streamArn + | |
+ " does not match the Kinesis client's configured region - " + kinesisRegion); | |
return streamConfig; | |
}).orElse(isMultiStreamMode ? withStreamArn(streamConfig, kinesisRegion):streamConfig); | |
return super.put(streamIdentifier, cfg); | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks!
c582565
to
6846fe7
Compare
|
||
@KinesisClientInternalApi | ||
public final class ArnUtil { | ||
private static final String SERVICE_NAME = "kinesis"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use something already existing e.g. KinesisAsyncClient.SERVICE_NAME ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thank you for pointing this out!
* Only to be used in multi-stream mode. | ||
* | ||
* @param streamConfig The {@link StreamConfig} object to return a copy of. | ||
* @param kinesisRegion The {@link Region} the stream exists in to be used for constructing the {@link Arn}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing comma - exists in, to be used for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks!
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, | ||
metricsConfig, processorConfig, retrievalConfig); | ||
|
||
final Scheduler schedulerSpy = spy(scheduler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a spy? can you not invoke runProcessLoop on scheduler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout! Spy is not needed here; must've copied from a previous unit test and forgotten to clean it up.
6b7e8e1
6846fe7
to
6b7e8e1
Compare
|
||
return super.put(streamIdentifier, streamConfig.streamIdentifier().streamArnOptional() | ||
.map(streamArn -> { | ||
Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized that this is invoked in checkAndSyncStreamShardsAndLeases when new streams are discovered, when a shard sync task is generated and updates the currentStreamConfigMap. In that case if the validation fails, it can short-circuit the processing because the exception is only caught in runProcessLoop. This can prevent other streams from being processed. Ideally if there is an issue with a stream, its reasonable for Leader to log an error and skip the stream and maybe even emit a metric. But not let processing of other streams be affected. Can you fix it as a follow up ?
Issue #, if available:
N/A.
Description of changes:
Construct stream ARNs (using the Kinesis client's configured region) for all stream configs upon writing to the
currentStreamConfigMap
in multi-stream mode.This should ensure that all Kinesis API requests from
KinesisDataFetcher
andKinesisShardDetector
are provided with the stream ARN in multi-stream applications.Note: An implication of stream ARNs being used for Kinesis API calls is that the
accountId
provided through theStreamTracker
must be correct, whereas previously applications may have consumed the streams as long as the providedstreamName
existed in the account that the Kinesis client was configured for.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.