-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement parallel jdbc driver #23932
base: master
Are you sure you want to change the base?
Conversation
@@ -41,6 +50,13 @@ | |||
public class ResultRowsDecoder | |||
implements AutoCloseable | |||
{ | |||
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor( | |||
5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrong, this should be provided externally and closed
0L, | ||
TimeUnit.SECONDS, | ||
new LinkedBlockingQueue<>(), | ||
new ThreadFactoryBuilder().setNameFormat("Segment loader-%s").setDaemon(true).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segment-loader-%s
@@ -117,6 +133,9 @@ public ResultRows toRows(List<Column> columns, QueryData data) | |||
|
|||
private ResultRows segmentToRows(Segment segment) | |||
{ | |||
// block until we can allocate the memory for the segment | |||
waitForMemory(segment.getSegmentSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the allocation of memory should happen while creating DefferedIterable
|
||
private static void waitForMemory(long requiredMemory) | ||
{ | ||
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is racy. Instead of going up to the Xmx, this should be a configurable property that is passed to the JDBC driver, let's call it prefetchingBufferSize
|
||
while (heapMemoryUsage.getUsed() + requiredMemory > heapMemoryUsage.getMax()) { | ||
try { | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
|
||
public DeferredIterable(Future<Iterable<List<Object>>> future) | ||
{ | ||
this.future = future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rnn
try (ConnectedStatement connectedStatement = newStatement()) { | ||
// Create a new recording | ||
// Start a JFR recording | ||
Process startRecording = Runtime.getRuntime().exec("jcmd " + getProcessId() + " JFR.start name=MyRecording filename=myrecording.jfr"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??
String test = resultSet.getString(2); | ||
} | ||
// Stop the JFR recording | ||
Process stopRecording = Runtime.getRuntime().exec("jcmd " + getProcessId() + " JFR.stop name=MyRecording"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??
if you need JFR you can capture it programmatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in java 8. However I don't think we need this feature in the production code, this was just to aid in providing the benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests can run with a higher Java
TimeUnit.MINUTES.toSeconds(minutes); | ||
|
||
// Print the formatted elapsed time | ||
System.out.println(String.format("Query Elapsed Time: %d minutes, %d seconds", minutes, seconds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THis isn't a unit test, drop it or make it a proper test
@@ -379,6 +380,8 @@ private TestingTrinoServer( | |||
|
|||
pluginInstaller = injector.getInstance(PluginInstaller.class); | |||
|
|||
plugins.forEach(pluginInstaller::installPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need that. You can add a setup action that will install a plugin. Remove
@@ -367,6 +371,30 @@ public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration t | |||
} | |||
} | |||
|
|||
private java.time.Duration toJavaTime(Duration duration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed. You can convert Duration by getting it's millis. Remove it
d2d47d2
to
6d686ce
Compare
6d686ce
to
13eb7d9
Compare
@@ -300,6 +300,9 @@ public class ClientOptions | |||
@Option(names = "--decimal-data-size", description = "Show data size and rate in base 10 rather than base 2") | |||
public boolean decimalDataSize; | |||
|
|||
@Option(names = "--prefetch-buffer-size", paramLabel = "<prefetch-buffer-size>", defaultValue = "64000", description = "Experimental spooled protocol prefetch buffer size, default: + " + DEFAULT_VALUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make it a DataSize, you'll need to register DataSize converter similarly to a Duration one
@@ -97,7 +101,10 @@ private ClientSession( | |||
String transactionId, | |||
Duration clientRequestTimeout, | |||
boolean compressionDisabled, | |||
Optional<String> encoding) | |||
Optional<String> encoding, | |||
String prefetchBufferSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSize
Optional<String> encoding) | ||
Optional<String> encoding, | ||
String prefetchBufferSize, | ||
ExecutorService decoderExecutorService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be added to ClientSession
13eb7d9
to
8d843ca
Compare
8d843ca
to
5624578
Compare
Description
Make use of the spooled protocol to parallelise downloading spooled segments.
Performance related fixes based on
Additional context and related issues
Instead of using a
BlockingArrayQueue
we useLinkedConcurrentQueue
and block memory allocation based on segment size (this is not based on memory size but on the file size of the segment.Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(*) Release notes are required, with the following suggested text: