Skip to content

Commit

Permalink
SDC-9186: Service: Add additional whole file format methods to genera…
Browse files Browse the repository at this point in the history
…tor service

SDC side of API-221.

Change-Id: I10bb933527c023c07c68bb6785843abc5ed5de58
Reviewed-on: https://review.streamsets.net/14932
Reviewed-by: Santhosh Kumar <santhosh@streamsets.com>
  • Loading branch information
jarcec committed Jun 8, 2018
1 parent d291579 commit 4c4cd49
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import com.streamsets.datacollector.util.LambdaUtil;
import com.streamsets.datacollector.validation.Issue;
import com.streamsets.pipeline.api.FileRef;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.service.Service;
import com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService;
import com.streamsets.pipeline.api.service.dataformats.DataFormatParserService;
import com.streamsets.pipeline.api.service.dataformats.DataGenerator;
import com.streamsets.pipeline.api.service.dataformats.DataParser;
import com.streamsets.pipeline.api.service.dataformats.DataParserException;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.api.service.dataformats.WholeFileExistsAction;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -116,7 +119,7 @@ public boolean isPlainTextCompatible() {
);
}

@Override
@Override // From DataFormatParserService, DataFormatGeneratorService
public String getCharset() {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
Expand Down Expand Up @@ -148,11 +151,52 @@ public int getStringBuilderPoolSize() {
);
}

@Override // From DataFormatParserService
@Override // From DataFormatParserService, DataFormatGeneratorService
public boolean isWholeFileFormat() {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
() -> ((DataFormatParserService)serviceBean.getService()).isWholeFileFormat()
() -> {
if(serviceBean.getService() instanceof DataFormatGeneratorService) {
return ((DataFormatGeneratorService)serviceBean.getService()).isWholeFileFormat();
} else if(serviceBean.getService() instanceof DataFormatParserService) {
return ((DataFormatParserService) serviceBean.getService()).isWholeFileFormat();
} else {
throw new IllegalStateException("Called on wrong service: " + serviceBean.getService().getClass().getCanonicalName());
}
}
);
}

@Override // From DataFormatGeneratorService
public String wholeFileFilename(Record record) throws StageException {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
StageException.class,
() -> ((DataFormatGeneratorService)serviceBean.getService()).wholeFileFilename(record)
);
}

@Override // From DataFormatGeneratorService
public WholeFileExistsAction wholeFileExistsAction() {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
() -> ((DataFormatGeneratorService)serviceBean.getService()).wholeFileExistsAction()
);
}

@Override // From DataFormatGeneratorService
public boolean wholeFileIncludeChecksumInTheEvents() {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
() -> ((DataFormatGeneratorService)serviceBean.getService()).wholeFileIncludeChecksumInTheEvents()
);
}

@Override // From DataFormatGeneratorService
public WholeFileChecksumAlgorithm wholeFileChecksumAlgorithm() {
return LambdaUtil.privilegedWithClassLoader(
serviceBean.getDefinition().getStageClassLoader(),
() -> ((DataFormatGeneratorService)serviceBean.getService()).wholeFileChecksumAlgorithm()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import com.streamsets.pipeline.api.ConfigGroups;
import com.streamsets.pipeline.api.ConfigIssue;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.ValueChooserModel;
import com.streamsets.pipeline.api.base.BaseService;
import com.streamsets.pipeline.api.el.ELEval;
import com.streamsets.pipeline.api.el.ELVars;
import com.streamsets.pipeline.api.service.ServiceDef;
import com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService;
import com.streamsets.pipeline.api.service.dataformats.DataGenerator;
import com.streamsets.pipeline.api.service.dataformats.DataGeneratorException;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.api.service.dataformats.WholeFileExistsAction;
import com.streamsets.pipeline.config.DataFormat;
import com.streamsets.pipeline.lib.el.RecordEL;
import com.streamsets.pipeline.stage.destination.lib.DataGeneratorFormatConfig;
import com.streamsets.service.lib.ShimUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,6 +114,35 @@ public String getCharset() {
return dataGeneratorFormatConfig.charset;
}

@Override
public boolean isWholeFileFormat() {
return dataFormat == DataFormat.WHOLE_FILE;
}

@Override
public String wholeFileFilename(Record record) throws StageException {
ELEval eval = getContext().createELEval("fileNameEL");
ELVars vars = getContext().createELVars();
RecordEL.setRecordInContext(vars, record);

return eval.eval(vars, dataGeneratorFormatConfig.fileNameEL, String.class);
}

@Override
public WholeFileExistsAction wholeFileExistsAction() {
return null;
}

@Override
public boolean wholeFileIncludeChecksumInTheEvents() {
return false;
}

@Override
public WholeFileChecksumAlgorithm wholeFileChecksumAlgorithm() {
return null;
}

/**
* Temporary wrapper to change DataGeneratorException from the *.lib.* to *.api.* as it's expected in the
* service world. This will be removed once all stages gets migrated off the older code to services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.streamsets.pipeline.config;

import com.streamsets.pipeline.api.Label;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.lib.hashing.HashingUtil;

public enum ChecksumAlgorithm implements Label {
Expand All @@ -42,4 +43,16 @@ public HashingUtil.HashType getHashType() {
return this.hashType;
}

public static ChecksumAlgorithm forApi(WholeFileChecksumAlgorithm other) {
switch (other) {
case MD5: return MD5;
case SHA1: return SHA1;
case SHA256: return SHA256;
case SHA512: return SHA512;
case MURMUR3_32: return MURMUR3_32;
case MURMUR3_128: return MURMUR3_128;
default:
throw new IllegalArgumentException("Unknown algorithm: " + other.name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.streamsets.pipeline.api.el.ELEvalException;
import com.streamsets.pipeline.api.el.ELVars;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.config.ChecksumAlgorithm;
import com.streamsets.pipeline.lib.event.EventCreator;
import com.streamsets.pipeline.lib.generator.StreamCloseEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService;
import com.streamsets.pipeline.api.service.dataformats.DataGenerator;
import com.streamsets.pipeline.api.service.dataformats.DataGeneratorException;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.api.service.dataformats.WholeFileExistsAction;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -56,6 +58,31 @@ public String getCharset() {
return "UTF-8";
}

@Override
public boolean isWholeFileFormat() {
return false;
}

@Override
public String wholeFileFilename(Record record) {
return null;
}

@Override
public WholeFileExistsAction wholeFileExistsAction() {
return null;
}

@Override
public boolean wholeFileIncludeChecksumInTheEvents() {
return false;
}

@Override
public WholeFileChecksumAlgorithm wholeFileChecksumAlgorithm() {
return null;
}

private static class DataGeneratorImpl implements DataGenerator {

private final JsonRecordWriter recordWriter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2018 StreamSets Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamsets.pipeline.sdk.service;

import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.base.BaseService;
import com.streamsets.pipeline.api.service.ServiceDef;
import com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService;
import com.streamsets.pipeline.api.service.dataformats.DataGenerator;
import com.streamsets.pipeline.api.service.dataformats.WholeFileChecksumAlgorithm;
import com.streamsets.pipeline.api.service.dataformats.WholeFileExistsAction;

import java.io.IOException;
import java.io.OutputStream;

@ServiceDef(
provides = DataFormatGeneratorService.class,
version = 1,
label = "(Test) Runner implementation of very simple DataFormatGeneratorService that will always work with whole file format."
)
public class SdkWholeFileDataFormatGeneratorService extends BaseService implements DataFormatGeneratorService {

private String wholeFileNamePath;
private WholeFileExistsAction existsAction;
private boolean includeChecksumInTheEvents;
private WholeFileChecksumAlgorithm checksumAlgorithm;

public SdkWholeFileDataFormatGeneratorService() {
this("/fileName", WholeFileExistsAction.OVERWRITE, false, WholeFileChecksumAlgorithm.MD5);
}

public SdkWholeFileDataFormatGeneratorService(
String wholeFileNamePath,
WholeFileExistsAction existsAction,
boolean includeChecksumInTheEvents,
WholeFileChecksumAlgorithm checksumAlgorithm
) {
this.wholeFileNamePath = wholeFileNamePath;
this.existsAction = existsAction;
this.includeChecksumInTheEvents = includeChecksumInTheEvents;
this.checksumAlgorithm = checksumAlgorithm;
}


@Override
public DataGenerator getGenerator(OutputStream os) throws IOException {
throw new UnsupportedOperationException("Only Whole File Format is supported");
}

@Override
public boolean isPlainTextCompatible() {
throw new UnsupportedOperationException("Only Whole File Format is supported");
}

@Override
public String getCharset() {
throw new UnsupportedOperationException("Only Whole File Format is supported");
}

@Override
public boolean isWholeFileFormat() {
return true;
}

@Override
public String wholeFileFilename(Record record) throws StageException {
return record.get(wholeFileNamePath).getValueAsString();
}

@Override
public WholeFileExistsAction wholeFileExistsAction() {
return existsAction;
}

@Override
public boolean wholeFileIncludeChecksumInTheEvents() {
return includeChecksumInTheEvents;
}

@Override
public WholeFileChecksumAlgorithm wholeFileChecksumAlgorithm() {
return checksumAlgorithm;
}
}

0 comments on commit 4c4cd49

Please sign in to comment.