diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/FirehoseGroups.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/FirehoseGroups.java index e1f90aa3c6d..86d28b6992f 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/FirehoseGroups.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/FirehoseGroups.java @@ -31,6 +31,7 @@ public enum FirehoseGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/Groups.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/Groups.java index 7986b24fbc2..095341e1f9c 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/Groups.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/kinesis/Groups.java @@ -31,6 +31,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/s3/Groups.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/s3/Groups.java index 92e525d11bd..ddc351ad53e 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/s3/Groups.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/destination/s3/Groups.java @@ -33,6 +33,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/DataFormatChooserValues.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/DataFormatChooserValues.java index 86ae1da06b8..8ab2119bf52 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/DataFormatChooserValues.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/DataFormatChooserValues.java @@ -32,7 +32,8 @@ public DataFormatChooserValues() { DataFormat.DELIMITED, DataFormat.SDC_JSON, DataFormat.XML, - DataFormat.PROTOBUF + DataFormat.PROTOBUF, + DataFormat.BINARY ); } } diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/Groups.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/Groups.java index 2f4111b00d5..f5a9f6579c6 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/Groups.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/Groups.java @@ -33,7 +33,8 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), - ADVANCED("Advanced") + ADVANCED("Advanced"), + DATAGRAM("Datagram") ; private final String label; diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/KinesisConsumerConfigBean.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/KinesisConsumerConfigBean.java index 336df505537..4937ff129a5 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/KinesisConsumerConfigBean.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/kinesis/KinesisConsumerConfigBean.java @@ -24,7 +24,6 @@ import com.streamsets.pipeline.api.ConfigDefBean; import com.streamsets.pipeline.api.ValueChooserModel; import com.streamsets.pipeline.config.DataFormat; -import com.streamsets.pipeline.config.DataFormatChooserValues; import com.streamsets.pipeline.stage.lib.aws.ProxyConfig; import com.streamsets.pipeline.stage.lib.kinesis.KinesisConfigBean; import com.streamsets.pipeline.stage.origin.lib.DataParserFormatConfig; diff --git a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/s3/Groups.java b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/s3/Groups.java index f43b9bf241c..f1c4f91a374 100644 --- a/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/s3/Groups.java +++ b/aws-lib/src/main/java/com/streamsets/pipeline/stage/origin/s3/Groups.java @@ -35,7 +35,8 @@ public enum Groups implements Label { LOG("Log"), AVRO("Avro"), BINARY("Binary"), - PROTOBUF("Protobuf") + PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/lib/http/Groups.java b/basic-lib/src/main/java/com/streamsets/pipeline/lib/http/Groups.java index 780850f88ac..23950c8c64c 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/lib/http/Groups.java +++ b/basic-lib/src/main/java/com/streamsets/pipeline/lib/http/Groups.java @@ -37,6 +37,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/destination/localfilesystem/Groups.java b/basic-lib/src/main/java/com/streamsets/pipeline/stage/destination/localfilesystem/Groups.java index dfee93f770f..1c7383a1f9d 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/destination/localfilesystem/Groups.java +++ b/basic-lib/src/main/java/com/streamsets/pipeline/stage/destination/localfilesystem/Groups.java @@ -32,6 +32,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/logtail/Groups.java b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/logtail/Groups.java index d423973b4a1..e39db1d6a22 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/logtail/Groups.java +++ b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/logtail/Groups.java @@ -34,6 +34,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/remote/Groups.java b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/remote/Groups.java index b2d09ec8452..80b52967225 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/remote/Groups.java +++ b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/remote/Groups.java @@ -35,6 +35,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/spooldir/Groups.java b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/spooldir/Groups.java index 7e64f3c3c64..c429680f8ab 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/spooldir/Groups.java +++ b/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/spooldir/Groups.java @@ -34,6 +34,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/basic-lib/src/test/java/com/streamsets/pipeline/stage/origin/udp/TestUDPSource.java b/basic-lib/src/test/java/com/streamsets/pipeline/stage/origin/udp/TestUDPSource.java index dcd56d2c704..38cdcbd30a5 100644 --- a/basic-lib/src/test/java/com/streamsets/pipeline/stage/origin/udp/TestUDPSource.java +++ b/basic-lib/src/test/java/com/streamsets/pipeline/stage/origin/udp/TestUDPSource.java @@ -28,10 +28,7 @@ import com.streamsets.pipeline.lib.util.ThreadUtil; import com.streamsets.pipeline.sdk.SourceRunner; import com.streamsets.pipeline.sdk.StageRunner; -<<<<<<< HEAD -======= import com.streamsets.pipeline.config.DatagramMode; ->>>>>>> d66c12f... Relocate import com.streamsets.testing.NetworkUtils; import org.apache.commons.io.IOUtils; import org.junit.Assert; diff --git a/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/Groups.java b/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/Groups.java index 515934d5b6b..dccd0b095a6 100644 --- a/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/Groups.java +++ b/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/Groups.java @@ -33,6 +33,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/commonlib/src/main/java/com/streamsets/pipeline/config/DataFormat.java b/commonlib/src/main/java/com/streamsets/pipeline/config/DataFormat.java index 9a747c6b187..2b20adbae02 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/config/DataFormat.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/config/DataFormat.java @@ -35,6 +35,7 @@ public enum DataFormat implements Label { AVRO("Avro", DataParserFormat.AVRO, DataGeneratorFormat.AVRO), BINARY("Binary", DataParserFormat.BINARY, DataGeneratorFormat.BINARY), PROTOBUF("Protobuf", DataParserFormat.PROTOBUF, DataGeneratorFormat.PROTOBUF), + DATAGRAM("Datagram", DataParserFormat.DATAGRAM, null), ; private final String label; diff --git a/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/DataParserFormat.java b/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/DataParserFormat.java index dc66a7f1b76..6cd75ee195d 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/DataParserFormat.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/DataParserFormat.java @@ -31,6 +31,7 @@ import com.streamsets.pipeline.lib.parser.protobuf.ProtobufDataParserFactory; import com.streamsets.pipeline.lib.parser.sdcrecord.SdcRecordDataParserFactory; import com.streamsets.pipeline.lib.parser.text.TextDataParserFactory; +import com.streamsets.pipeline.lib.parser.udp.DatagramParserFactory; import com.streamsets.pipeline.lib.parser.xml.XmlDataParserFactory; import java.lang.reflect.Constructor; @@ -48,6 +49,7 @@ public enum DataParserFormat implements DataFormat { AVRO(AvroDataParserFactory.class, AvroDataParserFactory.MODES, AvroDataParserFactory.CONFIGS), BINARY(BinaryDataParserFactory.class, BinaryDataParserFactory.MODES, BinaryDataParserFactory.CONFIGS), PROTOBUF(ProtobufDataParserFactory.class, ProtobufDataParserFactory.MODES, ProtobufDataParserFactory.CONFIGS), + DATAGRAM(DatagramParserFactory.class, DatagramParserFactory.MODES, DatagramParserFactory.CONFIGS), ; diff --git a/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/udp/DatagramParserFactory.java b/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/udp/DatagramParserFactory.java index af0ebf3ab72..dc1dff0702f 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/udp/DatagramParserFactory.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/lib/parser/udp/DatagramParserFactory.java @@ -39,11 +39,11 @@ public class DatagramParserFactory extends DataParserFactory { private static final String KEY_PREFIX = "collectd."; - static final String CONVERT_TIME_KEY = KEY_PREFIX + "convertTime"; + public static final String CONVERT_TIME_KEY = KEY_PREFIX + "convertTime"; static final boolean CONVERT_TIME_DEFAULT = false; - static final String AUTH_FILE_PATH_KEY = KEY_PREFIX + "authFilePath"; - static final String TYPES_DB_PATH_KEY = KEY_PREFIX + "typesDbPath"; - static final String EXCLUDE_INTERVAL_KEY = KEY_PREFIX + "excludeInterval"; + public static final String AUTH_FILE_PATH_KEY = KEY_PREFIX + "authFilePath"; + public static final String TYPES_DB_PATH_KEY = KEY_PREFIX + "typesDbPath"; + public static final String EXCLUDE_INTERVAL_KEY = KEY_PREFIX + "excludeInterval"; static final boolean EXCLUDE_INTERVAL_DEFAULT = true; public static final Map CONFIGS; @@ -52,8 +52,8 @@ public class DatagramParserFactory extends DataParserFactory { static { Map configs = new HashMap<>(); configs.put(CONVERT_TIME_KEY, CONVERT_TIME_DEFAULT); - configs.put(AUTH_FILE_PATH_KEY, null); - configs.put(TYPES_DB_PATH_KEY, null); + configs.put(AUTH_FILE_PATH_KEY, ""); + configs.put(TYPES_DB_PATH_KEY, ""); configs.put(EXCLUDE_INTERVAL_KEY, EXCLUDE_INTERVAL_DEFAULT); CONFIGS = Collections.unmodifiableMap(configs); } diff --git a/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatErrors.java b/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatErrors.java index e161a046847..89ebfbbbc8a 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatErrors.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatErrors.java @@ -43,7 +43,11 @@ public enum DataFormatErrors implements ErrorCode { DATA_FORMAT_302("Input data is not Base64 for record: {}"), - DATA_FORMAT_303("Could not parse XML object '{}'") + DATA_FORMAT_303("Could not parse XML object '{}'"), + + DATA_FORMAT_400("collectd Types DB '{}' not found"), + DATA_FORMAT_401("collectd Auth File '{}' not found"), + ; private final String msg; diff --git a/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatGroups.java b/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatGroups.java index d5bb3b594c3..2e1b59d14de 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatGroups.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/stage/common/DataFormatGroups.java @@ -32,6 +32,7 @@ public enum DataFormatGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; diff --git a/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/DataParserFormatConfig.java b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/DataParserFormatConfig.java index c461f69ba65..9a82a30ca67 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/DataParserFormatConfig.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/DataParserFormatConfig.java @@ -41,6 +41,8 @@ import com.streamsets.pipeline.config.LogModeChooserValues; import com.streamsets.pipeline.config.OnParseError; import com.streamsets.pipeline.config.OnParseErrorChooserValues; +import com.streamsets.pipeline.config.DatagramMode; +import com.streamsets.pipeline.config.DatagramModeChooserValues; import com.streamsets.pipeline.lib.parser.DataParserFactory; import com.streamsets.pipeline.lib.parser.DataParserFactoryBuilder; import com.streamsets.pipeline.lib.parser.avro.AvroDataParserFactory; @@ -48,6 +50,7 @@ import com.streamsets.pipeline.lib.parser.log.LogDataParserFactory; import com.streamsets.pipeline.lib.parser.log.RegExConfig; import com.streamsets.pipeline.lib.parser.text.TextDataParserFactory; +import com.streamsets.pipeline.lib.parser.udp.DatagramParserFactory; import com.streamsets.pipeline.lib.parser.xml.XmlDataParserFactory; import com.streamsets.pipeline.lib.util.DelimitedDataConstants; import com.streamsets.pipeline.lib.util.ProtobufConstants; @@ -83,7 +86,7 @@ public class DataParserFormatConfig implements DataFormatConfig{ displayPosition = 300, group = "#0", dependsOn = "dataFormat^", - triggeredByValue = {"TEXT", "JSON", "DELIMITED", "XML", "LOG"} + triggeredByValue = {"TEXT", "JSON", "DELIMITED", "XML", "LOG", "DATAGRAM"} ) @ValueChooserModel(CharsetChooserValues.class) public String charset = "UTF-8"; @@ -509,6 +512,8 @@ public class DataParserFormatConfig implements DataFormatConfig{ ) public String avroSchema = ""; + // PROTOBUF + @ConfigDef( required = true, type = ConfigDef.Type.STRING, @@ -549,6 +554,8 @@ public class DataParserFormatConfig implements DataFormatConfig{ ) public boolean isDelimited = true; + // BINARY + @ConfigDef( required = true, type = ConfigDef.Type.NUMBER, @@ -564,6 +571,71 @@ public class DataParserFormatConfig implements DataFormatConfig{ ) public int binaryMaxObjectLen; + // DATAGRAM + + @ConfigDef( + required = true, + type = ConfigDef.Type.MODEL, + label = "Data Format", + defaultValue = "SYSLOG", + group = "DATAGRAM", + displayPosition = 800, + dependsOn = "dataFormat^", + triggeredByValue = "DATAGRAM" + ) + @ValueChooserModel(DatagramModeChooserValues.class) + public DatagramMode datagramMode; + + @ConfigDef( + required = false, + type = ConfigDef.Type.STRING, + label = "TypesDB File Path", + description = "User-specified TypesDB file. Overrides the included version.", + displayPosition = 820, + group = "DATAGRAM", + dependsOn = "datagramMode", + triggeredByValue = "COLLECTD" + ) + public String typesDbPath; + + @ConfigDef( + required = true, + type = ConfigDef.Type.BOOLEAN, + defaultValue = "false", + label = "Convert Hi-Res Time & Interval", + description = "Converts high resolution time format interval and timestamp to unix time in (ms).", + displayPosition = 830, + group = "DATAGRAM", + dependsOn = "datagramMode", + triggeredByValue = "COLLECTD" + ) + public boolean convertTime; + + @ConfigDef( + required = true, + type = ConfigDef.Type.BOOLEAN, + defaultValue = "true", + label = "Exclude Interval", + description = "Excludes the interval field from output records.", + displayPosition = 840, + group = "DATAGRAM", + dependsOn = "datagramMode", + triggeredByValue = "COLLECTD" + ) + public boolean excludeInterval; + + @ConfigDef( + required = false, + type = ConfigDef.Type.STRING, + label = "Auth File", + description = "", + displayPosition = 850, + group = "DATAGRAM", + dependsOn = "datagramMode", + triggeredByValue = "COLLECTD" + ) + public String authFilePath; + public boolean init( Stage.Context context, DataFormat dataFormat, @@ -744,6 +816,14 @@ public boolean init( } } break; + case DATAGRAM: + switch (datagramMode) { + case COLLECTD: + checkCollectdParserConfigs(context, configPrefix, issues); + break; + default: + } + break; default: issues.add( context.createConfigIssue( @@ -770,6 +850,33 @@ public boolean init( return valid; } + private void checkCollectdParserConfigs(Stage.Context context, String configPrefix, List issues) { + if (!typesDbPath.isEmpty()) { + File typesDbFile = new File(typesDbPath); + if (!typesDbFile.canRead() || !typesDbFile.isFile()) { + issues.add( + context.createConfigIssue( + DataFormatGroups.DATAGRAM.name(), + configPrefix + "typesDbPath", + DataFormatErrors.DATA_FORMAT_400, typesDbPath + ) + ); + } + } + if (!authFilePath.isEmpty()) { + File authFile = new File(authFilePath); + if (!authFile.canRead() || !authFile.isFile()) { + issues.add( + context.createConfigIssue( + DataFormatGroups.DATAGRAM.name(), + configPrefix + "authFilePath", + DataFormatErrors.DATA_FORMAT_401, authFilePath + ) + ); + } + } + } + private boolean validateDataParser( Stage.Context context, DataFormat dataFormat, @@ -849,6 +956,15 @@ private boolean validateDataParser( .setConfig(ProtobufConstants.DELIMITED_KEY, isDelimited) .setMaxDataLen(-1); break; + case DATAGRAM: + builder + .setConfig(DatagramParserFactory.CONVERT_TIME_KEY, convertTime) + .setConfig(DatagramParserFactory.EXCLUDE_INTERVAL_KEY, excludeInterval) + .setConfig(DatagramParserFactory.AUTH_FILE_PATH_KEY, authFilePath) + .setConfig(DatagramParserFactory.TYPES_DB_PATH_KEY, typesDbPath) + .setMode(datagramMode) + .setMaxDataLen(-1); + break; default: throw new IllegalStateException("Unexpected data format" + dataFormat); } diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormat.java b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormat.java similarity index 96% rename from basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormat.java rename to commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormat.java index 7d14c537262..26d4bfdd3ff 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormat.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormat.java @@ -17,7 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.streamsets.pipeline.stage.origin.udp; +package com.streamsets.pipeline.stage.origin.lib; import com.streamsets.pipeline.api.GenerateResourceBundle; import com.streamsets.pipeline.api.Label; diff --git a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormatChooserValues.java b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormatChooserValues.java similarity index 95% rename from basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormatChooserValues.java rename to commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormatChooserValues.java index 454c0263100..dd372e4a792 100644 --- a/basic-lib/src/main/java/com/streamsets/pipeline/stage/origin/udp/UDPDataFormatChooserValues.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/stage/origin/lib/UDPDataFormatChooserValues.java @@ -17,7 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.streamsets.pipeline.stage.origin.udp; +package com.streamsets.pipeline.stage.origin.lib; import com.streamsets.pipeline.api.base.BaseEnumChooserValues; diff --git a/dev-lib/src/main/java/com/streamsets/pipeline/stage/devtest/rawdata/RawDataSourceGroups.java b/dev-lib/src/main/java/com/streamsets/pipeline/stage/devtest/rawdata/RawDataSourceGroups.java index b0aa5ab1dc8..03bfcc86088 100644 --- a/dev-lib/src/main/java/com/streamsets/pipeline/stage/devtest/rawdata/RawDataSourceGroups.java +++ b/dev-lib/src/main/java/com/streamsets/pipeline/stage/devtest/rawdata/RawDataSourceGroups.java @@ -33,7 +33,8 @@ public enum RawDataSourceGroups implements Label { LOG(DataFormat.LOG.getLabel()), AVRO(DataFormat.AVRO.getLabel()), BINARY(DataFormat.BINARY.getLabel()), - PROTOBUF(DataFormat.PROTOBUF.getLabel()) + PROTOBUF(DataFormat.PROTOBUF.getLabel()), + DATAGRAM(DataFormat.DATAGRAM.getLabel()) ; private final String label; diff --git a/flume-protolib/src/main/java/com/streamsets/pipeline/stage/destination/flume/Groups.java b/flume-protolib/src/main/java/com/streamsets/pipeline/stage/destination/flume/Groups.java index 003a672b7d8..106bc8fa4e3 100644 --- a/flume-protolib/src/main/java/com/streamsets/pipeline/stage/destination/flume/Groups.java +++ b/flume-protolib/src/main/java/com/streamsets/pipeline/stage/destination/flume/Groups.java @@ -31,6 +31,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; diff --git a/hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/destination/hdfs/Groups.java b/hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/destination/hdfs/Groups.java index e90ab6ce053..7757347269b 100644 --- a/hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/destination/hdfs/Groups.java +++ b/hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/destination/hdfs/Groups.java @@ -33,6 +33,7 @@ public enum Groups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; diff --git a/jms-lib/src/main/java/com/streamsets/pipeline/stage/origin/jms/JmsGroups.java b/jms-lib/src/main/java/com/streamsets/pipeline/stage/origin/jms/JmsGroups.java index 9f34a0d6663..10fb12dbb73 100644 --- a/jms-lib/src/main/java/com/streamsets/pipeline/stage/origin/jms/JmsGroups.java +++ b/jms-lib/src/main/java/com/streamsets/pipeline/stage/origin/jms/JmsGroups.java @@ -35,6 +35,7 @@ public enum JmsGroups implements Label { AVRO(DataFormat.AVRO.getLabel()), BINARY(DataFormat.BINARY.getLabel()), PROTOBUF(DataFormat.PROTOBUF.getLabel()), + DATAGRAM(DataFormat.DATAGRAM.getLabel()), ; diff --git a/maprstreams-source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/maprstreams/MapRStreamsSourceGroups.java b/maprstreams-source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/maprstreams/MapRStreamsSourceGroups.java index 8d52c2d22e4..843cb773f0e 100644 --- a/maprstreams-source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/maprstreams/MapRStreamsSourceGroups.java +++ b/maprstreams-source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/maprstreams/MapRStreamsSourceGroups.java @@ -33,6 +33,7 @@ public enum MapRStreamsSourceGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/maprstreams-target-protolib/src/main/java/com/streamsets/pipeline/stage/destination/maprstreams/MapRStreamsTargetGroups.java b/maprstreams-target-protolib/src/main/java/com/streamsets/pipeline/stage/destination/maprstreams/MapRStreamsTargetGroups.java index 0cf6913df8e..db000b016ff 100644 --- a/maprstreams-target-protolib/src/main/java/com/streamsets/pipeline/stage/destination/maprstreams/MapRStreamsTargetGroups.java +++ b/maprstreams-target-protolib/src/main/java/com/streamsets/pipeline/stage/destination/maprstreams/MapRStreamsTargetGroups.java @@ -31,6 +31,7 @@ public enum MapRStreamsTargetGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/BaseRabbitConfigBean.java b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/BaseRabbitConfigBean.java index 5fdf2549074..9d1e18b66e6 100644 --- a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/BaseRabbitConfigBean.java +++ b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/BaseRabbitConfigBean.java @@ -23,9 +23,6 @@ import com.streamsets.pipeline.api.ConfigDefBean; import com.streamsets.pipeline.api.ListBeanModel; import com.streamsets.pipeline.api.Stage; -import com.streamsets.pipeline.api.ValueChooserModel; -import com.streamsets.pipeline.config.DataFormat; -import com.streamsets.pipeline.config.DataFormatChooserValues; import com.streamsets.pipeline.stage.origin.lib.CredentialsConfig; import java.util.ArrayList; diff --git a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/Groups.java b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/Groups.java index 495533a51f8..c4b00c14f85 100644 --- a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/Groups.java +++ b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/lib/rabbitmq/config/Groups.java @@ -38,6 +38,7 @@ public enum Groups implements Label { AVRO(DataFormat.AVRO.getLabel()), BINARY(DataFormat.BINARY.getLabel()), PROTOBUF(DataFormat.PROTOBUF.getLabel()), + DATAGRAM(DataFormat.DATAGRAM.getLabel()), ; private final String label; diff --git a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/DataFormatChooserValues.java b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/DataFormatChooserValues.java new file mode 100644 index 00000000000..3b81ef1226b --- /dev/null +++ b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/DataFormatChooserValues.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 StreamSets Inc. + * + * Licensed under the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.streamsets.pipeline.stage.origin.rabbitmq; + +import com.streamsets.pipeline.api.base.BaseEnumChooserValues; +import com.streamsets.pipeline.config.DataFormat; + +public class DataFormatChooserValues extends BaseEnumChooserValues { + + public DataFormatChooserValues() { + super( + DataFormat.TEXT, + DataFormat.JSON, + DataFormat.LOG, + DataFormat.AVRO, + DataFormat.DELIMITED, + DataFormat.SDC_JSON, + DataFormat.XML, + DataFormat.PROTOBUF, + DataFormat.BINARY + ); + } + +} \ No newline at end of file diff --git a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/RabbitSourceConfigBean.java b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/RabbitSourceConfigBean.java index 369addf4777..1b3c1ccb97b 100644 --- a/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/RabbitSourceConfigBean.java +++ b/rabbitmq-lib/src/main/java/com/streamsets/pipeline/stage/origin/rabbitmq/RabbitSourceConfigBean.java @@ -23,7 +23,6 @@ import com.streamsets.pipeline.api.ConfigDefBean; import com.streamsets.pipeline.api.ValueChooserModel; import com.streamsets.pipeline.config.DataFormat; -import com.streamsets.pipeline.config.DataFormatChooserValues; import com.streamsets.pipeline.lib.rabbitmq.config.BaseRabbitConfigBean; import com.streamsets.pipeline.stage.origin.lib.BasicConfig; import com.streamsets.pipeline.stage.origin.lib.DataParserFormatConfig; diff --git a/redis-lib/src/main/java/com/streamsets/pipeline/stage/destination/redis/Groups.java b/redis-lib/src/main/java/com/streamsets/pipeline/stage/destination/redis/Groups.java index 1b9092e4a51..c343d60fe9d 100644 --- a/redis-lib/src/main/java/com/streamsets/pipeline/stage/destination/redis/Groups.java +++ b/redis-lib/src/main/java/com/streamsets/pipeline/stage/destination/redis/Groups.java @@ -33,6 +33,7 @@ public enum Groups implements Label { AVRO(DataFormat.AVRO.getLabel()), BINARY(DataFormat.BINARY.getLabel()), PROTOBUF(DataFormat.PROTOBUF.getLabel()), + DATAGRAM(DataFormat.DATAGRAM.getLabel()), ; private final String label; diff --git a/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/DataFormatChooserValues.java b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/DataFormatChooserValues.java new file mode 100644 index 00000000000..1899aa0cefc --- /dev/null +++ b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/DataFormatChooserValues.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 StreamSets Inc. + * + * Licensed under the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.streamsets.pipeline.stage.origin.redis; + +import com.streamsets.pipeline.api.base.BaseEnumChooserValues; +import com.streamsets.pipeline.config.DataFormat; + +public class DataFormatChooserValues extends BaseEnumChooserValues { + + public DataFormatChooserValues() { + super( + DataFormat.TEXT, + DataFormat.JSON, + DataFormat.LOG, + DataFormat.AVRO, + DataFormat.DELIMITED, + DataFormat.SDC_JSON, + DataFormat.XML, + DataFormat.PROTOBUF, + DataFormat.BINARY + ); + } + +} \ No newline at end of file diff --git a/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/Groups.java b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/Groups.java index 939cb40e950..c02f0c98d08 100644 --- a/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/Groups.java +++ b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/Groups.java @@ -32,7 +32,10 @@ public enum Groups implements Label { LOG("Log"), AVRO("Avro"), BINARY("Binary"), - PROTOBUF("Protobuf"),; + PROTOBUF("Protobuf"), + DATAGRAM("Datagram") + + ; private final String label; diff --git a/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/RedisOriginConfigBean.java b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/RedisOriginConfigBean.java index 2cec037ffa1..29421b9ade8 100644 --- a/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/RedisOriginConfigBean.java +++ b/redis-lib/src/main/java/com/streamsets/pipeline/stage/origin/redis/RedisOriginConfigBean.java @@ -23,7 +23,6 @@ import com.streamsets.pipeline.api.ConfigDefBean; import com.streamsets.pipeline.api.ValueChooserModel; import com.streamsets.pipeline.config.DataFormat; -import com.streamsets.pipeline.config.DataFormatChooserValues; import com.streamsets.pipeline.stage.origin.lib.DataParserFormatConfig; import java.util.List; diff --git a/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaDestinationGroups.java b/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaDestinationGroups.java index 71f8e668df2..4554b1584b8 100644 --- a/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaDestinationGroups.java +++ b/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaDestinationGroups.java @@ -31,6 +31,7 @@ public enum KafkaDestinationGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label; diff --git a/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaOriginGroups.java b/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaOriginGroups.java index 7303b532ec2..9d065b35668 100644 --- a/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaOriginGroups.java +++ b/sdc-kafka-api/src/main/java/com/streamsets/pipeline/kafka/api/KafkaOriginGroups.java @@ -33,6 +33,7 @@ public enum KafkaOriginGroups implements Label { AVRO("Avro"), BINARY("Binary"), PROTOBUF("Protobuf"), + DATAGRAM("Datagram") ; private final String label;