-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][tidb-connector][#1631] add cdc error event handle function #1632
Conversation
leozlliang
commented
Oct 20, 2022
- add cdc error event handler
- fix issue when pull cdc event blocked
@lincoln-lil Thank you for contributions. Please use |
|
@leozlliang Can u add unit test to confirm this fix is worked. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leozlliang I have left some comment.
@@ -0,0 +1,250 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update license.
@@ -0,0 +1,86 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -0,0 +1,262 @@ | |||
/* | |||
* Copyright 2021 TiKV Project Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -0,0 +1,190 @@ | |||
/* | |||
* Copyright 2021 TiKV Project Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
@@ -114,6 +119,9 @@ public void open(final Configuration config) throws Exception { | |||
cdcClient = new CDCClient(session, keyRange); | |||
prewrites = new TreeMap<>(); | |||
commits = new TreeMap<>(); | |||
// if pull cdc event block when region split, cdc event will lose. | |||
// use queue to make sure pull event unblock. | |||
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capacity 50000000
can you give me some explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -114,6 +119,9 @@ public void open(final Configuration config) throws Exception { | |||
cdcClient = new CDCClient(session, keyRange); | |||
prewrites = new TreeMap<>(); | |||
commits = new TreeMap<>(); | |||
// if pull cdc event block when region split, cdc event will lose. | |||
// use queue to make sure pull event unblock. | |||
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>(50000000); | |
committedEvents = new LinkedBlockingQueue<>(50000000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -198,6 +207,8 @@ protected void readSnapshotEvents() throws Exception { | |||
|
|||
protected void readChangeEvents() throws Exception { | |||
LOG.info("read change event from resolvedTs:{}", resolvedTs); | |||
// child thread to sink committed rows. | |||
new Thread(this::asyncSink).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Executors.newSingleThreadExecutor(threadFactory)
will be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, done
…elSourceFunction.
|
||
private static void waitForSinkSize(String sinkName, int expectedSize) | ||
throws InterruptedException { | ||
while (sinkSize(sinkName) < expectedSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if expectSize not expected, is this function will exit? otherwise will block the ci stop.