Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Improved HA stability #9854

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1694e6f
Detect distributed setup early in activate process.
timw Jul 19, 2022
392c94b
Minor consistency fixes in embedded DB input validation.
timw Jul 19, 2022
06cf474
Factor embedded DB open methods into single core implementation
timw Jul 19, 2022
c6f96fd
Add distinct openInternal operation that bypasses online checks.
timw Jul 19, 2022
3b88eca
Fix database online checks for distributed database opens.
timw Jul 19, 2022
5808351
Use openInternal path for bypass access.
timw Jul 19, 2022
345c9b6
Use simpler but equivalent openDatabase calls
timw Jul 19, 2022
11427e7
Add tracing support to executors.
timw Jul 19, 2022
fb9170d
Add tracing support to scheduled executor service.
timw Jul 19, 2022
6ff2afd
Add a global executor for use instead of general one-off threads.
timw Jul 19, 2022
0ce5af1
Defer setting running to end of distributed plugin startup.
timw Jul 19, 2022
1c49c99
Defer installation of databases until distributed plugin is online.
timw Jul 19, 2022
cdcec5b
Sanity check that distributed plugin is online before attempting dist…
timw Jul 19, 2022
3c53320
Fix waiting for last task in ViewManager close.
timw Jul 19, 2022
db45760
Use tracing executor service in OrientDBEmbedded
timw Jul 19, 2022
ad6363f
Use internal open for ViewManager init
timw Jul 19, 2022
ede8fbf
Make ViewManager updates resilient to offline DB status.
timw Jul 19, 2022
4ec87b1
Log transaction ID on re-enqueue
timw Jul 28, 2022
8c8e67c
Move DistributedDatabase registration out of constructor.
timw Jul 28, 2022
d533fc8
Add task name based tracing to scheduled tasks.
timw Jul 28, 2022
99a0bfb
Avoid repeated distributed database shutdowns.
timw Jul 28, 2022
3d6d5e5
Fix race in registering installing database to guard against concurre…
timw Jul 28, 2022
d042c99
Eliminate duplicate scheduleTask code.
timw Jul 28, 2022
5899101
Guard database create to avoid partially initialised storage being cr…
timw Aug 2, 2022
61e3af5
DEV: Expand HazelcastPlugin startup time to widen window to expose ra…
timw Jul 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ public void loadAllDatabases() {
// In remote does nothing
}

@Override
public ODatabaseDocumentInternal openInternal(String iDbUrl, String user) {
throw new UnsupportedOperationException("Open for internal use is not supported in remote");
}

@Override
public ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.orientechnologies.common.thread;

import com.orientechnologies.common.log.OLogManager;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;

/**
* The same as thread {@link ScheduledThreadPoolExecutor} but also logs all exceptions happened
* inside of the tasks which caused tasks to stop.
*/
public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor {
public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor
implements TracingScheduledExecutorService {
public OScheduledThreadPoolExecutorWithLogging(int corePoolSize) {
super(corePoolSize);
}
Expand Down Expand Up @@ -58,4 +54,168 @@ protected void afterExecute(Runnable r, Throwable t) {
OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName());
}
}

@Override
public <T> Future<T> submit(String taskName, Callable<T> task) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
return task.call();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
});
}

@Override
public Future<?> submit(String taskName, Runnable task) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
task.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
});
}

@Override
public <T> Future<T> submit(String taskName, Runnable task, T result) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
task.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
},
result);
}

@Override
public void execute(String taskName, Runnable command) {
final OTracedExecutionException trace =
OTracedExecutionException.prepareTrace(taskName, command);
super.execute(
() -> {
try {
command.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, command);
}
});
}

@Override
public Future<?> submit(Runnable task) {
return submit((String) null, task);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return submit((String) null, task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(null, task, result);
}

@Override
public void execute(Runnable command) {
execute(null, command);
}

@Override
public ScheduledFuture<?> schedule(String taskName, Runnable command, long delay, TimeUnit unit) {
final OTracedExecutionException trace =
OTracedExecutionException.prepareTrace(taskName, command);
return super.schedule(
() -> {
try {
command.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, command);
}
},
delay,
unit);
}

@Override
public <V> ScheduledFuture<V> schedule(
String taskName, Callable<V> task, long delay, TimeUnit unit) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.schedule(
() -> {
try {
return task.call();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
},
delay,
unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
String taskName, Runnable command, long initialDelay, long period, TimeUnit unit) {
final OTracedExecutionException trace =
OTracedExecutionException.prepareTrace(taskName, command);
return super.scheduleAtFixedRate(
() -> {
try {
command.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, command);
}
},
initialDelay,
period,
unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
String taskName, Runnable command, long initialDelay, long delay, TimeUnit unit) {
final OTracedExecutionException trace =
OTracedExecutionException.prepareTrace(taskName, command);
return super.scheduleWithFixedDelay(
() -> {
try {
command.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, command);
}
},
initialDelay,
delay,
unit);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return schedule(null, command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return schedule(null, callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduleAtFixedRate(null, command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduleWithFixedDelay(null, command, initialDelay, delay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.orientechnologies.common.log.OLogManager;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -14,7 +15,8 @@
* The same as thread {@link ThreadPoolExecutor} but also logs all exceptions happened inside of the
* tasks which caused tasks to stop.
*/
public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor {
public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor
implements TracingExecutorService {
public OThreadPoolExecutorWithLogging(
int corePoolSize,
int maximumPoolSize,
Expand Down Expand Up @@ -79,4 +81,78 @@ protected void afterExecute(Runnable r, Throwable t) {
OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName());
}
}

@Override
public <T> Future<T> submit(String taskName, Callable<T> task) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
return task.call();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
});
}

@Override
public Future<?> submit(String taskName, Runnable task) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
task.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
});
}

@Override
public <T> Future<T> submit(String taskName, Runnable task, T result) {
final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task);
return super.submit(
() -> {
try {
task.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, task);
}
},
result);
}

@Override
public void execute(String taskName, Runnable command) {
final OTracedExecutionException trace =
OTracedExecutionException.prepareTrace(taskName, command);
super.execute(
() -> {
try {
command.run();
} catch (Exception e) {
throw OTracedExecutionException.trace(trace, e, taskName, command);
}
});
}

@Override
public Future<?> submit(Runnable task) {
return submit((String) null, task);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return submit((String) null, task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(null, task, result);
}

@Override
public void execute(Runnable command) {
execute(null, command);
}
}
Loading