Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix safeLock not run when interrupt & Fix partial notification being cancelled in ServiceInstancesChangedListener #14730

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@ public class LockUtils {

public static void safeLock(Lock lock, int timeout, Runnable runnable) {
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.error(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Try to lock failed, timeout: " + timeout,
new TimeoutException());
boolean interrupted = false;
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.error(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Try to lock failed, timeout: " + timeout,
new TimeoutException());
}
} catch (InterruptedException e) {
logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e);
interrupted = true;
}
runnable.run();
} catch (InterruptedException e) {
logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e);
Thread.currentThread().interrupt();
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
try {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void testInterrupt() {
thread.interrupt();
await().until(() -> thread.getState() == State.TERMINATED);

Assertions.assertFalse(locked.get());
Assertions.assertTrue(locked.get());

reentrantLock.unlock();
}
Expand All @@ -141,4 +141,24 @@ void testHoldLock() throws InterruptedException {
Assertions.assertTrue(lockTime.get() - startTime > 1000);
Assertions.assertTrue(lockTime.get() - startTime < 10000);
}

@RepeatedTest(5)
void testInterrupted() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();

AtomicLong lockTime = new AtomicLong(0);
long startTime = System.currentTimeMillis();
Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
LockUtils.safeLock(reentrantLock, 10000, () -> {
lockTime.set(System.currentTimeMillis());
});
});
thread.start();

await().until(() -> thread.getState() == State.TERMINATED);
Assertions.assertTrue(lockTime.get() >= startTime);
Assertions.assertTrue(lockTime.get() - startTime < 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,28 +182,8 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
}

int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) { // retry every 10 seconds
if (emptyNum != 0) {
hasEmptyMetadata = true;
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(
new AddressRefreshRetryTask(retryPermission, event.getServiceName()),
10_000L,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(
INTERNAL_ERROR,
"unknown error in registry module",
"",
"Error submitting async retry task.");
}
logger.warn(
INTERNAL_ERROR, "unknown error in registry module", "", "Address refresh try task submitted");
}

// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
Expand All @@ -214,10 +194,12 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
"",
"Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");

submitRetryTask(event);
return;
}
} else {
hasEmptyMetadata = false;
}
hasEmptyMetadata = false;

Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
Expand All @@ -241,6 +223,30 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {

this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();

if (hasEmptyMetadata) {
submitRetryTask(event);
}
}

private void submitRetryTask(ServiceInstancesChangedEvent event) {
// retry every 10 seconds
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(
new AddressRefreshRetryTask(retryPermission, event.getServiceName()),
10_000L,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(
INTERNAL_ERROR, "unknown error in registry module", "", "Error submitting async retry task.");
}
logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Address refresh try task submitted");
}
}

public synchronized void addListenerAndNotify(URL url, NotifyListener listener) {
Expand Down
Loading