Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aliyun: Add loading oss file into memory option to OSS client properties #10062

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ public class AliyunProperties implements Serializable {
*/
public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir";

public static final String OSS_LOAD_BEFORE_READING = "oss.load-before-reading";

private final String ossEndpoint;
private final String accessKeyId;
private final String accessKeySecret;
private final String securityToken;
private final String ossStagingDirectory;
private final boolean ossLoadBeforeReading;

public AliyunProperties() {
this(ImmutableMap.of());
Expand All @@ -96,6 +99,8 @@ public AliyunProperties(Map<String, String> properties) {
this.ossStagingDirectory =
PropertyUtil.propertyAsString(
properties, OSS_STAGING_DIRECTORY, System.getProperty("java.io.tmpdir"));
this.ossLoadBeforeReading =
PropertyUtil.propertyAsBoolean(properties, OSS_LOAD_BEFORE_READING, false);
}

public String ossEndpoint() {
Expand All @@ -117,4 +122,8 @@ public String securityToken() {
public String ossStagingDirectory() {
return ossStagingDirectory;
}

public boolean ossLoadBeforeReading() {
return ossLoadBeforeReading;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ public long getLength() {

@Override
public SeekableInputStream newStream() {
return new OSSInputStream(client(), uri(), metrics());
return new OSSInputStream(client(), uri(), metrics(), aliyunProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.GetObjectRequest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
Expand All @@ -49,11 +53,11 @@ class OSSInputStream extends SeekableInputStream {

private final Counter readBytes;
private final Counter readOperations;
private AliyunProperties aliyunProperties;

OSSInputStream(OSS client, OSSURI uri) {
this(client, uri, MetricsContext.nullMetrics());
}

OSSInputStream(OSS client, OSSURI uri, MetricsContext metrics) {
this.client = client;
this.uri = uri;
Expand All @@ -63,6 +67,15 @@ class OSSInputStream extends SeekableInputStream {
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
}

OSSInputStream(OSS client, OSSURI uri, MetricsContext metrics, AliyunProperties aliyunProperties) {
this.client = client;
this.uri = uri;
this.createStack = Thread.currentThread().getStackTrace();
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
this.aliyunProperties = aliyunProperties;
}

@Override
public long getPos() {
return next;
Expand Down Expand Up @@ -145,9 +158,23 @@ private void positionStream() throws IOException {

private void openStream() throws IOException {
closeStream();

// read data from oss and store in memory
// avoid exception about connection reset
GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()).withRange(pos, -1);
stream = client.getObject(request).getObjectContent();
if (aliyunProperties.ossLoadBeforeReading()) {
try (InputStream ossInputStream = client.getObject(request).getObjectContent(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
byte[] buffer = new byte[16384];
int len;
while ((len = ossInputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, len);
}
outputStream.flush();
byte[] data = outputStream.toByteArray();
stream = new ByteArrayInputStream(data);
}
} else {
stream = client.getObject(request).getObjectContent();
}
}

private void closeStream() throws IOException {
Expand Down