-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add Protocol Buffer Stream Decoder #8972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
ba2e685
Add protocol buffer decoder
KKcorps 65fa1ed
Allow fetching descriptor based on message name specified
KKcorps 02c1749
bug fix: inverted property check condition
KKcorps a6734f0
Add tests for protocol buffers
d739f0f
Removing generated proto files
3b61731
Add support for multiple filesystems to download descriptor file
f88c7cf
Reuse message builder
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
...uf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.plugin.inputformat.protobuf; | ||
|
|
||
| import com.github.os72.protobuf.dynamic.DynamicSchema; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.protobuf.Descriptors; | ||
| import com.google.protobuf.DynamicMessage; | ||
| import com.google.protobuf.Message; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.pinot.spi.data.readers.GenericRow; | ||
| import org.apache.pinot.spi.stream.StreamMessageDecoder; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| //TODO: Add support for Schema Registry | ||
| public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class); | ||
|
|
||
| public static final String DESCRIPTOR_FILE_PATH = "descriptorFile"; | ||
| public static final String PROTO_CLASS_NAME = "protoClassName"; | ||
|
|
||
| private ProtoBufRecordExtractor _recordExtractor; | ||
| private String _protoClassName; | ||
| private Message.Builder _builder; | ||
|
|
||
| @Override | ||
| public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) | ||
| throws Exception { | ||
| Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH), | ||
| "Protocol Buffer schema descriptor file must be provided"); | ||
|
|
||
| _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, ""); | ||
| InputStream descriptorFileInputStream = ProtoBufUtils.getDescriptorFileInputStream( | ||
| props.get(DESCRIPTOR_FILE_PATH)); | ||
| Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream); | ||
| _recordExtractor = new ProtoBufRecordExtractor(); | ||
| _recordExtractor.init(fieldsToRead, null); | ||
| DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor); | ||
| _builder = dynamicMessage.newBuilderForType(); | ||
| } | ||
|
|
||
| private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin) | ||
| throws IOException { | ||
| try { | ||
| DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin); | ||
|
|
||
| if (!StringUtils.isEmpty(_protoClassName)) { | ||
| return dynamicSchema.getMessageDescriptor(_protoClassName); | ||
| } else { | ||
| return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]); | ||
| } | ||
| } catch (Descriptors.DescriptorValidationException e) { | ||
| throw new IOException("Descriptor file validation failed", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public GenericRow decode(byte[] payload, GenericRow destination) { | ||
| return decode(payload, 0, payload.length, destination); | ||
| } | ||
|
|
||
| @Override | ||
| public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { | ||
| Message message; | ||
| try { | ||
| _builder.mergeFrom(payload); | ||
| message = _builder.build(); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Not able to decode protobuf message", e); | ||
| return destination; | ||
| } finally { | ||
| _builder.clear(); | ||
| } | ||
| _recordExtractor.extract(message, destination); | ||
| return destination; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
...ot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.plugin.inputformat.protobuf; | ||
|
|
||
| import java.io.File; | ||
| import java.io.FileInputStream; | ||
| import java.io.InputStream; | ||
| import java.net.URI; | ||
| import java.nio.file.Files; | ||
| import java.nio.file.Path; | ||
| import org.apache.pinot.spi.filesystem.PinotFS; | ||
| import org.apache.pinot.spi.filesystem.PinotFSFactory; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| public class ProtoBufUtils { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufUtils.class); | ||
| public static final String TMP_DIR_PREFIX = "pinot-protobuf"; | ||
|
|
||
| private ProtoBufUtils() { | ||
| } | ||
|
|
||
| public static InputStream getDescriptorFileInputStream(String descriptorFilePath) | ||
| throws Exception { | ||
| URI descriptorFileURI = URI.create(descriptorFilePath); | ||
| String scheme = descriptorFileURI.getScheme(); | ||
| if (scheme == null) { | ||
| scheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; | ||
| } | ||
| if (PinotFSFactory.isSchemeSupported(scheme)) { | ||
| PinotFS pinotFS = PinotFSFactory.create(scheme); | ||
| Path localTmpDir = Files.createTempDirectory(TMP_DIR_PREFIX + System.currentTimeMillis()); | ||
| File protoDescriptorLocalFile = createLocalFile(descriptorFileURI, localTmpDir.toFile()); | ||
| LOGGER.info("Copying protocol buffer descriptor file from source: {} to dst: {}", descriptorFilePath, | ||
| protoDescriptorLocalFile.getAbsolutePath()); | ||
| pinotFS.copyToLocalFile(descriptorFileURI, protoDescriptorLocalFile); | ||
| return new FileInputStream(protoDescriptorLocalFile); | ||
| } else { | ||
| throw new RuntimeException(String.format("Scheme: %s not supported in PinotFSFactory" | ||
| + " for protocol buffer descriptor file: %s.", scheme, descriptorFilePath)); | ||
| } | ||
| } | ||
|
|
||
| public static File createLocalFile(URI srcURI, File dstDir) { | ||
| String sourceURIPath = srcURI.getPath(); | ||
| File dstFile = new File(dstDir, new File(sourceURIPath).getName()); | ||
| LOGGER.debug("Created empty local temporary file {} to copy protocol " | ||
| + "buffer descriptor {}", dstFile.getAbsolutePath(), srcURI); | ||
| return dstFile; | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with what a descriptor file represents after reading https://github.com/os72/protobuf-dynamic and viewing
sample.desc. Is that a binary file that is generated by proto compiler?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. So basically
.protofile contains the schema. From this schema, you can either generate java classes or you can generate.descbinary file. The latter approach allows us to parse any proto message easily without relying on java impl. It is a bit slower though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the slowness should only be a one-time overhead right? we do not decode
descfiles on every messageThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The slowness is not due to the decoding of .desc file.
It is due to the fact we are using
DynamicMessageclass instead of compiled Proto java class to deserialize the messages.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some benchmark on this few years back - https://codeburst.io/using-dynamic-messages-in-protocol-buffers-in-scala-9fda4f0efcb3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are on my machine with proto version we are using. Significant improvments from ones in blog post.