Skip to content

Commit

Permalink
Explore acquisition scheduler offloading.
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Oct 16, 2024
1 parent f27d3bc commit 0b961f7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import io.r2dbc.spi.Wrapped;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand All @@ -48,6 +49,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand All @@ -62,7 +64,7 @@
/**
* An implementation of {@link Connection} for connecting to a PostgreSQL database.
*/
final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection {
final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection, Wrapped<Object> {

private final Logger logger = Loggers.getLogger(this.getClass());

Expand Down Expand Up @@ -384,6 +386,21 @@ public String toString() {
'}';
}

@Override
public <E> E unwrap(Class<E> targetClass) {

if (targetClass == Scheduler.class) {
return targetClass.cast(this.client.getScheduler());
}

return Wrapped.super.unwrap(targetClass);
}

@Override
public Object unwrap() {
return null;
}

@Override
public Mono<Boolean> validate(ValidationDepth depth) {

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/r2dbc/postgresql/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.util.Optional;
import java.util.TimeZone;
Expand Down Expand Up @@ -121,6 +122,12 @@ default Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
*/
Optional<Integer> getProcessId();

/**
* @return returns the EventLoop as scheduler.
* @since 1.0.7
*/
Scheduler getScheduler();

/**
* Returns the connected process secret key if it has been communicated.
*
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
Expand Down Expand Up @@ -54,6 +55,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
Expand Down Expand Up @@ -107,6 +109,8 @@ public final class ReactorNettyClient implements Client {

private final Connection connection;

private final Scheduler scheduler;

private ConnectionContext context;

private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
Expand Down Expand Up @@ -150,6 +154,9 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
this.byteBufAllocator = connection.outbound().alloc();
this.context = new ConnectionContext().withChannelId(connection.channel().toString());

EventLoop eventLoop = connection.channel().eventLoop();
this.scheduler = Schedulers.fromExecutorService(eventLoop, eventLoop.toString());

AtomicReference<Throwable> receiveError = new AtomicReference<>();

connection.inbound().receive()
Expand Down Expand Up @@ -453,6 +460,11 @@ public Optional<Integer> getProcessId() {
return Optional.ofNullable(this.processId);
}

@Override
public Scheduler getScheduler() {
return this.scheduler;
}

@Override
public Optional<Integer> getSecretKey() {
return Optional.ofNullable(this.secretKey);
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/io/r2dbc/postgresql/client/TestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

import java.util.ArrayList;
Expand Down Expand Up @@ -138,6 +140,11 @@ public Optional<Integer> getProcessId() {
return Optional.ofNullable(this.processId);
}

@Override
public Scheduler getScheduler() {
return Schedulers.immediate();
}

@Override
public Optional<Integer> getSecretKey() {
return Optional.ofNullable(this.secretKey);
Expand Down

0 comments on commit 0b961f7

Please sign in to comment.