Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<!-- Suppress autogenerated test files by protobuf compiler -->
<suppress checks=".*" files="/protobuf/ComplexTypes.java"/>
<suppress checks=".*" files="/protobuf/Sample.java"/>
<suppress checks=".*" files="/protobuf/CompositeTypes.java"/>

<!-- Allow static imports in tests -->
<suppress checks="AvoidStaticImport" files="[\\/]src[\\/]test[\\/].*" />
Expand Down
31 changes: 31 additions & 0 deletions pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,36 @@
<artifactId>protobuf-java</artifactId>
<version>${proto.version}</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier}</protocArtifact>
<protoTestSourceRoot>${basedir}/src/test/resources</protoTestSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.inputformat.protobuf;

import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


//TODO: Add support for Schema Registry
public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);

public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
public static final String PROTO_CLASS_NAME = "protoClassName";

private ProtoBufRecordExtractor _recordExtractor;
private String _protoClassName;
private Message.Builder _builder;

@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with what a descriptor file represents after reading https://github.com/os72/protobuf-dynamic and viewing sample.desc . Is that a binary file that is generated by proto compiler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. So basically .proto file contains the schema. From this schema, you can either generate java classes or you can generate .desc binary file. The latter approach allows us to parse any proto message easily without relying on java impl. It is a bit slower though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the slowness should only be a one-time overhead right? we do not decode desc files on every message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slowness is not due to the decoding of .desc file.
It is due to the fact we are using DynamicMessage class instead of compiled Proto java class to deserialize the messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are on my machine with proto version we are using. Significant improvments from ones in blog post.

Benchmark                                             (_numRecords)  Mode  Cnt   Score   Error  Units
BenchmarkDynamicMessage.compiledClassDeserialization         100000  avgt    5  12.787 ± 0.297  ms/op
BenchmarkDynamicMessage.dynamicClassDeserialization          100000  avgt    5  28.150 ± 0.691  ms/op

"Protocol Buffer schema descriptor file must be provided");

_protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
InputStream descriptorFileInputStream = ProtoBufUtils.getDescriptorFileInputStream(
props.get(DESCRIPTOR_FILE_PATH));
Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
_recordExtractor = new ProtoBufRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
_builder = dynamicMessage.newBuilderForType();
}

private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
throws IOException {
try {
DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);

if (!StringUtils.isEmpty(_protoClassName)) {
return dynamicSchema.getMessageDescriptor(_protoClassName);
} else {
return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
}
} catch (Descriptors.DescriptorValidationException e) {
throw new IOException("Descriptor file validation failed", e);
}
}

@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
return decode(payload, 0, payload.length, destination);
}

@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
Message message;
try {
_builder.mergeFrom(payload);
message = _builder.build();
} catch (Exception e) {
LOGGER.error("Not able to decode protobuf message", e);
return destination;
} finally {
_builder.clear();
}
_recordExtractor.extract(message, destination);
return destination;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@
*/
package org.apache.pinot.plugin.inputformat.protobuf;

import com.google.common.base.Preconditions;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.apache.pinot.spi.utils.ResourceFinder;


public class ProtoBufRecordReader implements RecordReader {
Expand All @@ -41,7 +40,7 @@ public class ProtoBufRecordReader implements RecordReader {

private InputStream _inputStream;
private boolean _hasNext;
private DynamicMessage _dynamicMessage;
private Message.Builder _builder;

private boolean hasMoreToRead()
throws IOException {
Expand All @@ -67,32 +66,30 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
throws IOException {
_dataFile = dataFile;
ProtoBufRecordReaderConfig protoBufRecordReaderConfig = (ProtoBufRecordReaderConfig) recordReaderConfig;
InputStream fin = getDescriptorFileInputStream(protoBufRecordReaderConfig);
Descriptors.Descriptor descriptor = buildProtoBufDescriptor(fin);
Preconditions.checkNotNull(protoBufRecordReaderConfig.getDescriptorFile(),
"Protocol Buffer schema descriptor file must be provided");
Descriptors.Descriptor descriptor = buildProtoBufDescriptor(protoBufRecordReaderConfig);
_recordExtractor = new ProtoBufRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
_dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
_builder = dynamicMessage.newBuilderForType();
init();
}

private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
private Descriptors.Descriptor buildProtoBufDescriptor(ProtoBufRecordReaderConfig protoBufRecordReaderConfig)
throws IOException {
try {
InputStream fin = ProtoBufUtils.getDescriptorFileInputStream(
protoBufRecordReaderConfig.getDescriptorFile().toString());
DescriptorProtos.FileDescriptorSet set = DescriptorProtos.FileDescriptorSet.parseFrom(fin);
Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(set.getFile(0), new Descriptors.FileDescriptor[]{});
return fileDescriptor.getMessageTypes().get(0);
} catch (Descriptors.DescriptorValidationException e) {
throw new IOException("Descriptor file validation failed", e);
} catch (Exception e) {
throw new IOException("Failed to create Protobuf descriptor", e);
}
}

private InputStream getDescriptorFileInputStream(ProtoBufRecordReaderConfig protoBufRecordReaderConfig)
throws IOException {
URI descriptorFileURI = protoBufRecordReaderConfig.getDescriptorFile();
return ResourceFinder.openResource(descriptorFileURI);
}

@Override
public boolean hasNext() {
return _hasNext;
Expand All @@ -109,11 +106,12 @@ public GenericRow next(GenericRow reuse)
throws IOException {
Message message;
try {
Message.Builder builder = _dynamicMessage.newBuilderForType();
builder.mergeDelimitedFrom(_inputStream);
message = builder.build();
_builder.mergeDelimitedFrom(_inputStream);
message = _builder.build();
} catch (Exception e) {
throw new IOException("Caught exception while reading protobuf object", e);
} finally {
_builder.clear();
}
_recordExtractor.extract(message, reuse);
_hasNext = hasMoreToRead();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.inputformat.protobuf;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ProtoBufUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufUtils.class);
public static final String TMP_DIR_PREFIX = "pinot-protobuf";

private ProtoBufUtils() {
}

public static InputStream getDescriptorFileInputStream(String descriptorFilePath)
throws Exception {
URI descriptorFileURI = URI.create(descriptorFilePath);
String scheme = descriptorFileURI.getScheme();
if (scheme == null) {
scheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
}
if (PinotFSFactory.isSchemeSupported(scheme)) {
PinotFS pinotFS = PinotFSFactory.create(scheme);
Path localTmpDir = Files.createTempDirectory(TMP_DIR_PREFIX + System.currentTimeMillis());
File protoDescriptorLocalFile = createLocalFile(descriptorFileURI, localTmpDir.toFile());
LOGGER.info("Copying protocol buffer descriptor file from source: {} to dst: {}", descriptorFilePath,
protoDescriptorLocalFile.getAbsolutePath());
pinotFS.copyToLocalFile(descriptorFileURI, protoDescriptorLocalFile);
return new FileInputStream(protoDescriptorLocalFile);
} else {
throw new RuntimeException(String.format("Scheme: %s not supported in PinotFSFactory"
+ " for protocol buffer descriptor file: %s.", scheme, descriptorFilePath));
}
}

public static File createLocalFile(URI srcURI, File dstDir) {
String sourceURIPath = srcURI.getPath();
File dstFile = new File(dstDir, new File(sourceURIPath).getName());
LOGGER.debug("Created empty local temporary file {} to copy protocol "
+ "buffer descriptor {}", dstFile.getAbsolutePath(), srcURI);
return dstFile;
}
}
Loading