Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel jdbc driver #23932

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

mdesmet
Copy link
Contributor

@mdesmet mdesmet commented Oct 27, 2024

Description

Make use of the spooled protocol to parallelise downloading spooled segments.
Performance related fixes based on

Additional context and related issues

Instead of using a BlockingArrayQueue we use LinkedConcurrentQueue and block memory allocation based on segment size (this is not based on memory size but on the file size of the segment.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(*) Release notes are required, with the following suggested text:

## Section
* Implement parallel JDBC driver. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Oct 27, 2024
@mdesmet mdesmet requested a review from wendigo October 27, 2024 07:14
@github-actions github-actions bot added the jdbc Relates to Trino JDBC driver label Oct 27, 2024
@@ -41,6 +50,13 @@
public class ResultRowsDecoder
implements AutoCloseable
{
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, this should be provided externally and closed

0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("Segment loader-%s").setDaemon(true).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

segment-loader-%s

@@ -117,6 +133,9 @@ public ResultRows toRows(List<Column> columns, QueryData data)

private ResultRows segmentToRows(Segment segment)
{
// block until we can allocate the memory for the segment
waitForMemory(segment.getSegmentSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the allocation of memory should happen while creating DefferedIterable


private static void waitForMemory(long requiredMemory)
{
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy. Instead of going up to the Xmx, this should be a configurable property that is passed to the JDBC driver, let's call it prefetchingBufferSize


while (heapMemoryUsage.getUsed() + requiredMemory > heapMemoryUsage.getMax()) {
try {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?


public DeferredIterable(Future<Iterable<List<Object>>> future)
{
this.future = future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rnn

try (ConnectedStatement connectedStatement = newStatement()) {
// Create a new recording
// Start a JFR recording
Process startRecording = Runtime.getRuntime().exec("jcmd " + getProcessId() + " JFR.start name=MyRecording filename=myrecording.jfr");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

String test = resultSet.getString(2);
}
// Stop the JFR recording
Process stopRecording = Runtime.getRuntime().exec("jcmd " + getProcessId() + " JFR.stop name=MyRecording");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

if you need JFR you can capture it programmatically

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in java 8. However I don't think we need this feature in the production code, this was just to aid in providing the benchmarks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests can run with a higher Java

TimeUnit.MINUTES.toSeconds(minutes);

// Print the formatted elapsed time
System.out.println(String.format("Query Elapsed Time: %d minutes, %d seconds", minutes, seconds));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis isn't a unit test, drop it or make it a proper test

@@ -379,6 +380,8 @@ private TestingTrinoServer(

pluginInstaller = injector.getInstance(PluginInstaller.class);

plugins.forEach(pluginInstaller::installPlugin);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need that. You can add a setup action that will install a plugin. Remove

@@ -367,6 +371,30 @@ public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration t
}
}

private java.time.Duration toJavaTime(Duration duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. You can convert Duration by getting it's millis. Remove it

@mdesmet mdesmet force-pushed the mdesmet/feat/parallel-driver branch from d2d47d2 to 6d686ce Compare November 5, 2024 15:18
@mdesmet mdesmet force-pushed the mdesmet/feat/parallel-driver branch from 6d686ce to 13eb7d9 Compare November 17, 2024 14:13
@@ -300,6 +300,9 @@ public class ClientOptions
@Option(names = "--decimal-data-size", description = "Show data size and rate in base 10 rather than base 2")
public boolean decimalDataSize;

@Option(names = "--prefetch-buffer-size", paramLabel = "<prefetch-buffer-size>", defaultValue = "64000", description = "Experimental spooled protocol prefetch buffer size, default: + " + DEFAULT_VALUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make it a DataSize, you'll need to register DataSize converter similarly to a Duration one

@@ -97,7 +101,10 @@ private ClientSession(
String transactionId,
Duration clientRequestTimeout,
boolean compressionDisabled,
Optional<String> encoding)
Optional<String> encoding,
String prefetchBufferSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSize

Optional<String> encoding)
Optional<String> encoding,
String prefetchBufferSize,
ExecutorService decoderExecutorService,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be added to ClientSession

@mdesmet mdesmet force-pushed the mdesmet/feat/parallel-driver branch from 13eb7d9 to 8d843ca Compare November 19, 2024 15:51
@mdesmet mdesmet force-pushed the mdesmet/feat/parallel-driver branch from 8d843ca to 5624578 Compare November 20, 2024 09:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed jdbc Relates to Trino JDBC driver
Development

Successfully merging this pull request may close these issues.

2 participants