-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve connector failure handling logic #85
Conversation
Codecov ReportPatch coverage:
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more Additional details and impacted files@@ Coverage Diff @@
## master #85 +/- ##
============================================
+ Coverage 65.12% 67.00% +1.88%
- Complexity 309 330 +21
============================================
Files 53 55 +2
Lines 1878 1961 +83
Branches 167 170 +3
============================================
+ Hits 1223 1314 +91
+ Misses 570 563 -7
+ Partials 85 84 -1
... and 3 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java
Show resolved
Hide resolved
connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java
Show resolved
Hide resolved
connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java
Show resolved
Hide resolved
numPendingRow = 0; | ||
break; | ||
} catch (Exception e) { | ||
LOG.error(String.format("write data error (attempt %s)", i), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's better to LOG.warn rather than LOG.error? print the error only when the maxRetries is used out and failOnError is false.
btw, using %d for int i.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I've fixed this.
if (failOnError) { | ||
throw e; | ||
} | ||
} else if (i + 1 <= maxRetries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the condition(i+1<=maxRetries) is useless.
could you please explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think you're right. I've updated the logic and removed this condition.
} | ||
// We do not know whether the failure was due to an expired session or | ||
// an issue with the query, so we renew the session anyway to be more robust. | ||
renewSession(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should get the ResultSet of nebulaBatchExecutor.executeBatch(session)
and judge the errorCode. If the ErrorCode of ResultSet is E_SEMANTIC_ERROR and E_SYNTAX_ERROR, there's no need to retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I've updated the logic to support this.
if (failOnError) { | ||
throw e; | ||
} | ||
renewSession(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only condition to renewSession is there are SESSION_INVALD and SESSION_TIMEOUT error code.
- For IOException, we do not need to renew
Session
but need to renew Session'sConnection
, then we can reduce the re-auth opertion. - For EXECUTION_ERROR error code, the reason is on Storaged, we can just re-execute with the same
Session
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nicole00 Yeah, I can see that there may be situations where renewing the session is unnecessary, but here I took a simple approach to make sure the subsequent operation always starts in a clean state.
- For
IOException
, right now we use theNebulaPool.getSession()
method to get the session. I looked at its implementation and it internally manages a pool of connections. There may be opportunities to avoid unnecessary authentication, but that may require us to manage sessions/connections in the Flink connector itself. This is probably a bigger body of work. Maybe this is something we can consider in the future, or something that can be improved inside the NebulaGraph Java client? - For
E_EXECUTION_ERROR
and many other error codes, I'm worried about the situation of "partial success" even the final outcome is an error. For example, there may be a temporary issue with one of the storaged services while the other ones function normally. I've also seen the session times out while the slow query is still being executed in storaged. This is problematic in Flink applications where we process a mix of insert/update/delete events, and concurrent writes to the same key may result in out-of-order processing and corrupted data at the sink. Therefore, I'd like to renew the session anyway, which proactively kills the existing session (and any query that are still running), so that we rule out the possibility of concurrent queries. In other words, in Flink I'd prefer there is only one query running for the session attached to each Flink task, and I'd be extremely careful about session reuse after an error.
In general, I think the current "catch-all" session renew logic does not affect the correctness of error handling, and it's robust due to its simplicity. There may be situations where some renewals can be avoided, but we could treat this as an optimization to revisit in some future PR. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linhr Yeah, you are right, renew session anyway
is totally a correct and simplest way when query error, and is also the roughest way 😁.
- java client indeed has retry mechanism for IOException, need to config the
reconnect
as true whengetSession
, so we do not need to process this in Flink connector. - And For the concurrent query you mentioned, it does not exist here, because the session does not support concurrent query, and the client has a synchronized lock on
execute
interface. - At last, I think it's ok to optimize the re-execute logic in future, and maybe we will replace the java client session to
SessionPool
, which already process the re-execute and we just need to provide some configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nicole00 Thanks for the comment! Yeah it would be great to further improve the code in the future using the built-in error handling support in the Java client.
Regarding the concurrent query, let me try to explain the situation a bit more. If the previous query fails, it may not have failed cleanly. For example, when the query times out, we get the error and the lock is released. At this point we can run another query to insert the same data using the same session. However, the previous failed query is still being executed, so we get concurrent queries. Since we do not have transactional writes and the previous execution is not rolled back, the actual data finally written is not well defined. This is of course an edge case, but I was concerned that it may lead to corrupted data, so I renewed the session to be safe.
Happy to discuss!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonderful pr! Thanks @linhr for your contributions, happy to discuss with you.
Thank you @linhr for the contribution, it's great to have you in the NebulaGraph community! |
What type of PR is this?
What problem(s) does this PR solve?
Issue(s) number
N/A
Description
We have found a few edge cases when the connector does not handle failures gracefully. This PR tries to make the implementation more robust, based on our observations running the connector in production in the past few months.
How do you solve it?
We have the following changes to the connector:
NebulaBatchExecutor.executeBatch()
method. This prevents Java heap memory to grow out-of-bound when there are a large number of correlated errors.We introduce three new options for the Table API connector:
max-retries
: Maximum number of retries in the execution. The default value is3
.retry-delay-ms
: The delay between retries, in milliseconds. The default value is1000
.failure-handler
: The failure handling mode when execution retries are exhausted. The valid values are (1)ignore
, which skips the batch that fails; (2)fail
, which fails the task. When the task fails, Flink may restart the task depending on how the job is configured. This also allows for detection and manual intervention when there is an unrecoverable error (due to corrupted data, unhealthy NebulaGraph cluster, bad network connection, etc.). The default value isignore
, which is consistent with the current implementation. So the option backward-compatible.The DataStream connector can be configured similarly using
ExecutionOptions
.Special notes for your reviewer, ex. impact of this fix, design document, etc:
N/A