Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2442,6 +2442,13 @@ project(':storage:inkless') {
exclude group: "com.fasterxml.jackson.core"
exclude group: "org.slf4j"
}

implementation(libs.nettyReactorCore)
implementation(libs.nettyReactorHttp)
implementation(libs.nettyTcNativeBoringSSLStatic)
implementation(libs.nettyTcNativeBoringSSLStaticLinuxAarch64)
implementation(libs.nettyTcNativeBoringSSLStaticLinuxX86_64)

implementation(libs.gcsSdk) {
exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
exclude group: 'org.checkerframework', module: 'checker-qual'
Expand Down
8 changes: 8 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ versions += [
mavenArtifact: "3.9.6",
metrics: "2.2.0",
mockito: "5.14.2",
// Required to be compatible with BoringSSL version
nettyReactor: "1.1.22",
nettyTcNativeBoringSSL: "2.0.65.Final",
opentelemetryProto: "1.3.2-alpha",
postgresql: "42.7.4",
protobuf: "3.25.5", // a dependency of opentelemetryProto
Expand Down Expand Up @@ -238,6 +241,11 @@ libs += [
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
mockitoCore: "org.mockito:mockito-core:$versions.mockito",
mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
nettyReactorCore: "io.projectreactor.netty:reactor-netty-core:$versions.nettyReactor",
nettyReactorHttp: "io.projectreactor.netty:reactor-netty-http:$versions.nettyReactor",
nettyTcNativeBoringSSLStatic: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL",
nettyTcNativeBoringSSLStaticLinuxAarch64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-aarch_64",
nettyTcNativeBoringSSLStaticLinuxX86_64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-x86_64",
pcollections: "org.pcollections:pcollections:$versions.pcollections",
opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
postgresql: "org.postgresql:postgresql:$versions.postgresql",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.aiven.inkless.storage_backend.gcs;

import com.google.api.client.http.HttpTransport;
import com.google.cloud.BaseServiceException;
import com.google.cloud.ReadChannel;
import com.google.cloud.http.HttpTransportOptions;
Expand All @@ -34,6 +35,7 @@
import java.nio.channels.ReadableByteChannel;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -43,6 +45,8 @@
import io.aiven.inkless.storage_backend.common.KeyNotFoundException;
import io.aiven.inkless.storage_backend.common.StorageBackend;
import io.aiven.inkless.storage_backend.common.StorageBackendException;
import io.aiven.inkless.storage_backend.gcs.nettyhttpclient.ReactorNettyTransport;


@CoverageIgnore // tested on integration level
public class GcsStorage implements StorageBackend {
Expand All @@ -60,8 +64,12 @@ public class GcsStorage implements StorageBackend {
public void configure(final Map<String, ?> configs) {
final GcsStorageConfig config = new GcsStorageConfig(configs);
this.bucketName = config.bucketName();
final String gcsUrl = Optional.ofNullable(config.endpointUrl()).orElse("https://www.googleapis.com");

final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder();
final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> {
return (HttpTransport) ReactorNettyTransport.get(gcsUrl);
});

final StorageOptions.Builder builder = StorageOptions.newBuilder()
.setCredentials(config.credentials())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;

import com.google.api.client.http.LowLevelHttpRequest;
import com.google.api.client.http.LowLevelHttpResponse;

import java.io.IOException;
import java.net.URI;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Flux;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

import static io.netty.buffer.Unpooled.wrappedBuffer;

public class ReactorNettyRequest extends LowLevelHttpRequest {

private HttpClient client;
private final String method;
private final URI uri;
private final HttpHeaders headers = new DefaultHttpHeaders();

public ReactorNettyRequest(HttpClient client, String method, String url) {
this.client = client;
this.method = method;
this.uri = URI.create(url);
}

@Override
public void addHeader(String name, String value) {
headers.add(name, value);
}

@Override
public LowLevelHttpResponse execute() throws IOException {
final HttpClient.ResponseReceiver<?> receiver;
final ByteBuf buffer;
switch (method) {
case "POST":
case "PUT":
case "DELETE":
if (getStreamingContent() != null) {
long contentLength = getContentLength();
buffer = wrappedBuffer(new byte[Math.toIntExact(contentLength)]);
buffer.resetWriterIndex();
try (ByteBufOutputStream out = new ByteBufOutputStream(buffer)) {
getStreamingContent().writeTo(out);
}

String contentType = getContentType();
String contentEncoding = getContentEncoding();

if (contentType != null) {
headers.set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType);
}
if (contentEncoding != null) {
headers.set(HttpHeaderNames.CONTENT_ENCODING.toString(), contentEncoding);
}
if (contentLength >= 0) {
headers.set(HttpHeaderNames.CONTENT_LENGTH.toString(), Math.toIntExact(contentLength));
}
} else {
buffer = wrappedBuffer(new byte[0]);
}

client = client.headers(cons -> {
cons.add(headers);
});

HttpClient.RequestSender sender;
switch (method) {
case "POST":
sender = client.post();
break;
case "PUT":
sender = client.put();
break;
case "DELETE":
sender = client.delete();
break;
default:
throw new RuntimeException("unknown method");
}
receiver = sender.uri(uri).send(ByteBufFlux.fromInbound(Flux.just(buffer)));
break;
case "GET":
client = client.headers(cons -> {
cons.add(headers);
});
receiver = client.get().uri(uri);
break;
default:
throw new RuntimeException("Unsupported method " + method);
}

final ReactorNettyResponse block = receiver.responseSingle((response, content) -> {
return content.map(bb -> {
// Buffer is directly allocated, copy bytes to heap and
// allow the Reactor framework to release the direct buffer
// and free the memory for reuse.
final byte[] clone = new byte[bb.readableBytes()];
bb.readBytes(clone);
return new ReactorNettyResponse(response, clone);
}).defaultIfEmpty(new ReactorNettyResponse(response, null));
}).block();
if (block == null) {
throw new RuntimeException("No response received.");
}
return block;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;

import com.google.api.client.http.LowLevelHttpResponse;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.netty.http.client.HttpClientResponse;

public class ReactorNettyResponse extends LowLevelHttpResponse {

private final HttpClientResponse response;
private final byte[] content;

public ReactorNettyResponse(HttpClientResponse response, byte[] content) {
this.response = response;
if (content == null) {
this.content = new byte[0];
} else {
this.content = content;
}
}

@Override
public InputStream getContent() throws IOException {
return new ByteArrayInputStream(content);
}

@Override
public String getContentEncoding() throws IOException {
return response.responseHeaders().get(HttpHeaderNames.CONTENT_ENCODING);
}

@Override
public long getContentLength() throws IOException {
return content.length;
}

@Override
public String getContentType() throws IOException {
return response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE);
}

@Override
public String getStatusLine() throws IOException {
final StringBuilder buf = new StringBuilder();

buf.append(response.version()).append(" ").append(getStatusCode()).append(" ");
if (this.getReasonPhrase() != null) {
buf.append(getReasonPhrase());
}
return buf.toString();
}

@Override
public int getStatusCode() throws IOException {
return response.status().code();
}

@Override
public String getReasonPhrase() throws IOException {
return response.status().reasonPhrase();
}

@Override
public int getHeaderCount() throws IOException {
return response.responseHeaders().size();
}

@Override
public String getHeaderName(int index) throws IOException {
return response.responseHeaders().entries().get(index).getKey();
}

@Override
public String getHeaderValue(int index) throws IOException {
return response.responseHeaders().entries().get(index).getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;

import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpRequest;

import java.io.IOException;
import java.net.URI;

import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class ReactorNettyTransport extends HttpTransport {

private static ReactorNettyTransport instance;

private final URI uri;
private final HttpClient client;
private final LoopResources clientEventLoopGroup;
private final ConnectionProvider connectionProvider;
private boolean shutdown = false;

private ReactorNettyTransport(final String endpoint) {
uri = URI.create(endpoint);
final int port;
if (uri.getPort() == -1) {
// No port defined, select by the scheme
port = "https://".equals(uri.getScheme()) ? 80 : 443;
} else {
port = uri.getPort();
}

connectionProvider = ConnectionProvider.builder("custom")
.maxConnections(96)
.build();

clientEventLoopGroup = LoopResources.create("gcs-netty-http", 32, true);

client = HttpClient.create(connectionProvider)
.runOn(clientEventLoopGroup)
.host(uri.getHost())
.port(uri.getPort())
.keepAlive(true)
.followRedirect(false)
.compress(false)
.option(ChannelOption.SO_KEEPALIVE, true)
.headers(cons -> cons.set(HttpHeaderNames.HOST, String.format("%s:%s", uri.getHost(), port)));
client.warmup().block();
}

@Override
protected LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
return new ReactorNettyRequest(client, method, url);
}

@Override
public void shutdown() throws IOException {
if (!shutdown) {
connectionProvider.disposeLater().block();
clientEventLoopGroup.disposeLater().block();
shutdown = true;
}
}

/**
* Returns whether the transport is shutdown or not.
*
* @return true if the transport is shutdown.
* @since 1.44.0
*/
@Override
public boolean isShutdown() {
return shutdown;
}

public synchronized static ReactorNettyTransport get(String gcsUri) {
if (instance == null) {
instance = new ReactorNettyTransport(gcsUri);
}
return instance;
}

}
Loading