Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bzip2 compressed feature set json as pipeline option #466

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make decompressor and compressor more generic and extensible
  • Loading branch information
Shu Heng committed Feb 12, 2020
commit 0fe5654a695629747d7654e1979c5e18a7d5841c
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.util.ProtoUtil;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -225,7 +221,10 @@ private ImportOptions getPipelineOptions(
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);

pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets)));
OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setProject(projectId);
pipelineOptions.setUpdate(update);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.util.ProtoUtil;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -95,7 +96,11 @@ private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.util;
package feast.core.job.option;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.FeatureSetProto;
import java.io.IOException;
import feast.ingestion.options.OptionByteConverter;
import java.util.ArrayList;
import java.util.List;

public class ProtoUtil {
public class FeatureSetJsonByteConverter
implements OptionByteConverter<List<FeatureSetProto.FeatureSet>> {

public static String toJson(List<FeatureSetProto.FeatureSet> featureSets) throws IOException {
/**
* Convert list of feature sets to json strings joined by new line, represented as byte arrays
*
* @param featureSets List of feature set protobufs
* @return Byte array representation of the json strings
* @throws InvalidProtocolBufferException
*/
@Override
public byte[] toByte(List<FeatureSetProto.FeatureSet> featureSets)
throws InvalidProtocolBufferException {
JsonFormat.Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
return String.join("\n", featureSetsJson);
return String.join("\n", featureSetsJson).getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.ProtoUtil;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand Down Expand Up @@ -126,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
expectedPipelineOptions.setAppName("DataflowJobManager");
expectedPipelineOptions.setJobName(jobName);
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet))));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

ArgumentCaptor<ImportOptions> captor = ArgumentCaptor.forClass(ImportOptions.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@
import feast.core.StoreProto.Store.Subscription;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.util.ProtoUtil;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -124,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setProject("");
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
expectedPipelineOptions.setProject("");

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet))));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

String expectedJobId = "feast-job-0";
ArgumentCaptor<ImportOptions> pipelineOptionsCaptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.util;
package feast.core.job.option;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.types.ValueProto;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;

public class ProtoUtilTest {
public class FeatureSetJsonByteConverterTest {

private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) {
List<FeatureSetProto.FeatureSpec> features =
Expand Down Expand Up @@ -60,15 +60,15 @@ private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer number
}

@Test
public void testConversionToJson() throws IOException {
public void shouldConvertFeatureSetsAsJsonStringBytes() throws InvalidProtocolBufferException {
int nrOfFeatureSet = 1;
int nrOfFeatures = 1;
List<FeatureSetProto.FeatureSet> featureSets =
IntStream.range(1, nrOfFeatureSet + 1)
.mapToObj(i -> newFeatureSet(i, nrOfFeatures))
.collect(Collectors.toList());

String expectedOutput =
String expectedOutputString =
"{\"version\":1,"
+ "\"entities\":[{\"name\":\"entity\",\"valueType\":2}],"
+ "\"features\":[{\"name\":\"feature1\",\"valueType\":6}],"
Expand All @@ -77,6 +77,7 @@ public void testConversionToJson() throws IOException {
+ "\"kafkaSourceConfig\":{"
+ "\"bootstrapServers\":\"somebrokers:9092\","
+ "\"topic\":\"sometopic\"}}}";
assertEquals(expectedOutput, ProtoUtil.toJson(featureSets));
FeatureSetJsonByteConverter byteConverter = new FeatureSetJsonByteConverter();
assertEquals(expectedOutputString, new String(byteConverter.toByte(featureSets)));
}
}
8 changes: 5 additions & 3 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import feast.core.FeatureSetProto.FeatureSet;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.BZip2Decompressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.StringListStreamConverter;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
import feast.ingestion.utils.CompressionUtil;
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.utils.StoreUtil;
Expand Down Expand Up @@ -81,8 +82,9 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti

log.info("Starting import job with settings: \n{}", options.toString());

List<String> featureSetJson =
CompressionUtil.decompressAsListOfString(options.getFeatureSetJson());
BZip2Decompressor<List<String>> decompressor =
new BZip2Decompressor<>(new StringListStreamConverter());
List<String> featureSetJson = decompressor.decompress(options.getFeatureSetJson());
List<FeatureSet> featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson);
List<Store> stores = SpecUtil.parseStoreJsonList(options.getStoreJson());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.utils;
package feast.ingestion.options;

import java.io.*;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;

public class CompressionUtil {
public class BZip2Compressor<T> implements OptionCompressor<T> {

public static List<String> decompressAsListOfString(byte[] compressedFeatureSets)
throws IOException {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressedFeatureSets);
BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input)); ) {
return reader.lines().collect(Collectors.toList());
}
}
private final OptionByteConverter<T> byteConverter;

public static byte[] compress(String origStr) throws IOException {
public BZip2Compressor(OptionByteConverter<T> byteConverter) {
this.byteConverter = byteConverter;
}
/**
* Compress pipeline option using BZip2
*
* @param option Pipeline option value
* @return BZip2 compressed option value
* @throws IOException
*/
@Override
public byte[] compress(T option) throws IOException {
ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
try (BZip2CompressorOutputStream bzip2Output =
new BZip2CompressorOutputStream(compressedStream)) {
bzip2Output.write(origStr.getBytes());
bzip2Output.write(byteConverter.toByte(option));
}

return compressedStream.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.options;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;

public class BZip2Decompressor<T> implements OptionDecompressor<T> {

private final InputStreamConverter<T> inputStreamConverter;

public BZip2Decompressor(InputStreamConverter<T> inputStreamConverter) {
this.inputStreamConverter = inputStreamConverter;
}

@Override
public T decompress(byte[] compressed) throws IOException {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressed);
BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream)) {
return inputStreamConverter.readStream(bzip2Input);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.options;

import java.io.IOException;
import java.io.InputStream;

public interface InputStreamConverter<T> {

/**
* Used in conjunction with {@link OptionDecompressor} to decompress the pipeline option
*
* @param inputStream Input byte stream in compressed format
* @return Decompressed pipeline option value
*/
T readStream(InputStream inputStream) throws IOException;
}
Loading