-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connection code of HttpSink Plugin for #874. #2987
Connection code of HttpSink Plugin for #874. #2987
Conversation
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Looks good overall. A few things below:
HTTP_BASIC("http_basic"), | ||
BEARER_TOKEN("bearer_token"), | ||
UNAUTHENTICATED("unauthenticated"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requirements say mTLS
as well. Is it going to be supported here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @oeyh Yes, mTLS is also supported.
if (currentBuffer == null) { | ||
this.currentBuffer = bufferFactory.getBuffer(); | ||
} | ||
records.forEach(record -> { | ||
try{ | ||
// logic to fetch the records in batch as per threshold limit - checkThresholdExceed(); | ||
// apply the codec | ||
// push to http end point | ||
}catch(Exception e){ | ||
// In case of any exception, need to write the exception in dlq - logFailureForDlqObjects(); | ||
// In case of any exception, need to push the web hook url- logFailureForWebHook(); | ||
final Event event = record.getData(); | ||
try { | ||
currentBuffer.writeEvent(event.toJsonString().getBytes()); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
if(event.getEventHandle() != null) { | ||
this.bufferedEventHandles.add(event.getEventHandle()); | ||
} | ||
//TODO: implement end to end acknowledgement | ||
final List<HttpEndPointResponse> failedHttpEndPointResponses = pushToEndPoint(getCurrentBufferData(currentBuffer)); | ||
if(!failedHttpEndPointResponses.isEmpty()) { | ||
//TODO send to DLQ and webhook | ||
} else { | ||
LOG.info("data pushed to all the end points successfully"); | ||
} | ||
currentBuffer = bufferFactory.getBuffer(); | ||
releaseEventHandles(Boolean.TRUE); | ||
|
||
}); | ||
reentrantLock.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to put those in a try-finally block to make sure the lock will be released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is yet to be completed, right? Threshold options haven't been incorporated here. We also may not want to throw an exception in the middle of the loop to interrupt the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @oeyh , PR is updated with try-finally block and Threshold implementation shall be in the next PR.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>
Description
Connection code of HttpSink Plugin for #874.
Issues Resolved
Resolves #874
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.