Skip to content

Commit

Permalink
Core, AWS: Fix Kryo serialization failure for FileIO (apache#5437)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhpk234 authored Sep 1, 2022
1 parent 26cdc7a commit ec9643e
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 13 deletions.
34 changes: 34 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
*/
package org.apache.iceberg;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.ClosureSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
Expand All @@ -35,6 +40,7 @@
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.util.ByteBuffers;
import org.junit.Assert;
import org.objenesis.strategy.StdInstantiatorStrategy;

public class TestHelpers {

Expand Down Expand Up @@ -150,6 +156,34 @@ public static void assertSameSchemaMap(Map<Integer, Schema> map1, Map<Integer, S
});
}

public static class KryoHelpers {
private KryoHelpers() {}

@SuppressWarnings("unchecked")
public static <T> T roundTripSerialize(T obj) throws IOException {
Kryo kryo = new Kryo();

// required for avoiding requirement of zero arg constructor
kryo.setInstantiatorStrategy(
new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));

// required for serializing and deserializing $$Lambda$ Anonymous Classes
kryo.register(SerializedLambda.class);
kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());

ByteArrayOutputStream bytes = new ByteArrayOutputStream();

try (Output out = new Output(new ObjectOutputStream(bytes))) {
kryo.writeClassAndObject(out, obj);
}

try (Input in =
new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) {
return (T) kryo.readClassAndObject(in);
}
}
}

private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> {
private final String message;

Expand Down
11 changes: 6 additions & 5 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class S3FileIO
private String credential = null;
private SerializableSupplier<S3Client> s3;
private AwsProperties awsProperties;
private Map<String, String> properties = null;
private SerializableMap<String, String> properties = null;
private transient volatile S3Client client;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -156,7 +157,7 @@ public void deleteFile(String path) {

@Override
public Map<String, String> properties() {
return properties;
return properties.immutableMap();
}

/**
Expand Down Expand Up @@ -350,8 +351,8 @@ public String getCredential() {

@Override
public void initialize(Map<String, String> props) {
this.awsProperties = new AwsProperties(props);
this.properties = props;
this.properties = SerializableMap.copyOf(props);
this.awsProperties = new AwsProperties(properties);

// Do not override s3 client if it was provided
if (s3 == null) {
Expand All @@ -373,7 +374,7 @@ public void initialize(Map<String, String> props) {
.hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
.buildChecked();
MetricsContext context = ctor.newInstance("s3");
context.initialize(props);
context.initialize(properties);
this.metrics = context;
} catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) {
LOG.warn(
Expand Down
23 changes: 23 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
Expand Down Expand Up @@ -265,6 +266,28 @@ public void testFileIOJsonSerialization() {
}
}

@Test
public void testS3FileIOKryoSerialization() throws IOException {
FileIO testS3FileIO = new S3FileIO();

// s3 fileIO should be serializable when properties are passed as immutable map
testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testS3FileIO);

Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties());
}

@Test
public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundException {
FileIO testS3FileIO = new S3FileIO();

// s3 fileIO should be serializable when properties are passed as immutable map
testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testS3FileIO);

Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties());
}

private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);

Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ project(':iceberg-api') {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
compileOnly "com.google.errorprone:error_prone_annotations"
testImplementation "org.apache.avro:avro"
testImplementation "com.esotericsoftware:kryo"
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -278,6 +279,7 @@ project(':iceberg-core') {
testImplementation 'org.mock-server:mockserver-client-java'
testImplementation "org.xerial:sqlite-jdbc"
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation "com.esotericsoftware:kryo"
}
}

Expand Down Expand Up @@ -390,6 +392,7 @@ project(':iceberg-aws') {
exclude module: "logback-classic"
exclude group: 'junit'
}
testImplementation "com.esotericsoftware:kryo"
}

sourceSets {
Expand Down Expand Up @@ -432,6 +435,7 @@ project(':iceberg-gcp') {
exclude group: 'javax.servlet', module: 'servlet-api'
exclude group: 'com.google.code.gson', module: 'gson'
}
testImplementation "com.esotericsoftware:kryo"
}
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,7 +45,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable {
"s3n", S3_FILE_IO_IMPL);

private final Map<String, FileIO> ioInstances = Maps.newHashMap();
private Map<String, String> properties;
private SerializableMap<String, String> properties;
private SerializableSupplier<Configuration> hadoopConf;

/**
Expand Down Expand Up @@ -76,13 +77,13 @@ public void deleteFile(String location) {

@Override
public Map<String, String> properties() {
return properties;
return properties.immutableMap();
}

@Override
public void initialize(Map<String, String> newProperties) {
close(); // close and discard any existing FileIO instances
this.properties = newProperties;
this.properties = SerializableMap.copyOf(newProperties);
}

@Override
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -102,6 +106,28 @@ public void testDeletePrefix() {
() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator());
}

@Test
public void testHadoopFileIOKryoSerialization() throws IOException {
FileIO testHadoopFileIO = new HadoopFileIO();

// hadoop fileIO should be serializable when properties are passed as immutable map
testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testHadoopFileIO);

Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties());
}

@Test
public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException {
FileIO testHadoopFileIO = new HadoopFileIO();

// hadoop fileIO should be serializable when properties are passed as immutable map
testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testHadoopFileIO);

Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties());
}

private void createRandomFiles(Path parent, int count) {
random
.ints(count)
Expand Down
51 changes: 51 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

public class TestResolvingIO {

@Test
public void testResolvingFileIOKryoSerialization() throws IOException {
FileIO testResolvingFileIO = new ResolvingFileIO();

// resolving fileIO should be serializable when properties are passed as immutable map
testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO =
TestHelpers.KryoHelpers.roundTripSerialize(testResolvingFileIO);

Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties());
}

@Test
public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotFoundException {
FileIO testResolvingFileIO = new ResolvingFileIO();

// resolving fileIO should be serializable when properties are passed as immutable map
testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO);

Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties());
}
}
11 changes: 6 additions & 5 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +56,7 @@ public class GCSFileIO implements FileIO {
private transient volatile Storage storage;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private Map<String, String> properties = null;
private SerializableMap<String, String> properties = null;

/**
* No-arg constructor to load the FileIO dynamically.
Expand Down Expand Up @@ -105,7 +106,7 @@ public void deleteFile(String path) {

@Override
public Map<String, String> properties() {
return properties;
return properties.immutableMap();
}

private Storage client() {
Expand All @@ -121,8 +122,8 @@ private Storage client() {

@Override
public void initialize(Map<String, String> props) {
this.properties = props;
this.gcpProperties = new GCPProperties(props);
this.properties = SerializableMap.copyOf(props);
this.gcpProperties = new GCPProperties(properties);

this.storageSupplier =
() -> {
Expand All @@ -139,7 +140,7 @@ public void initialize(Map<String, String> props) {
.hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
.buildChecked();
MetricsContext context = ctor.newInstance("gcs");
context.initialize(props);
context.initialize(properties);
this.metrics = context;
} catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) {
LOG.warn(
Expand Down
26 changes: 26 additions & 0 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import java.util.Random;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -96,4 +100,26 @@ public void testDelete() {
.count())
.isZero();
}

@Test
public void testGCSFileIOKryoSerialization() throws IOException {
FileIO testGCSFileIO = new GCSFileIO();

// gcs fileIO should be serializable when properties are passed as immutable map
testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testGCSFileIO);

Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties());
}

@Test
public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundException {
FileIO testGCSFileIO = new GCSFileIO();

// gcs fileIO should be serializable when properties are passed as immutable map
testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testGCSFileIO);

Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties());
}
}
1 change: 1 addition & 0 deletions versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ org.springframework:* = 5.3.9
org.springframework.boot:* = 2.5.4
org.mock-server:mockserver-netty = 5.13.2
org.mock-server:mockserver-client-java = 5.13.2
com.esotericsoftware:kryo = 4.0.2

0 comments on commit ec9643e

Please sign in to comment.