Skip to content

Commit

Permalink
SDC-3382. New DATAGRAM datatype in DataParserFormatConfig
Browse files Browse the repository at this point in the history
  To expose in the UI configuration the UDP data format

Change-Id: I65d5f1fc6297d82bb638b0e3fc08d2fb5ad4a6e8
Reviewed-on: https://review.streamsets.net/3391
Reviewed-by: Alejandro Abdelnur <tucu@streamsets.com>
  • Loading branch information
harikiran nayak authored and Adam Kunicki committed Aug 4, 2016
1 parent 6d5d0ee commit 9048289
Show file tree
Hide file tree
Showing 38 changed files with 246 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum FirehoseGroups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public DataFormatChooserValues() {
DataFormat.DELIMITED,
DataFormat.SDC_JSON,
DataFormat.XML,
DataFormat.PROTOBUF
DataFormat.PROTOBUF,
DataFormat.BINARY
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
ADVANCED("Advanced")
ADVANCED("Advanced"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.streamsets.pipeline.api.ConfigDefBean;
import com.streamsets.pipeline.api.ValueChooserModel;
import com.streamsets.pipeline.config.DataFormat;
import com.streamsets.pipeline.config.DataFormatChooserValues;
import com.streamsets.pipeline.stage.lib.aws.ProxyConfig;
import com.streamsets.pipeline.stage.lib.kinesis.KinesisConfigBean;
import com.streamsets.pipeline.stage.origin.lib.DataParserFormatConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum Groups implements Label {
LOG("Log"),
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf")
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import com.streamsets.pipeline.lib.util.ThreadUtil;
import com.streamsets.pipeline.sdk.SourceRunner;
import com.streamsets.pipeline.sdk.StageRunner;
<<<<<<< HEAD
=======
import com.streamsets.pipeline.config.DatagramMode;
>>>>>>> d66c12f... Relocate
import com.streamsets.testing.NetworkUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum Groups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum DataFormat implements Label {
AVRO("Avro", DataParserFormat.AVRO, DataGeneratorFormat.AVRO),
BINARY("Binary", DataParserFormat.BINARY, DataGeneratorFormat.BINARY),
PROTOBUF("Protobuf", DataParserFormat.PROTOBUF, DataGeneratorFormat.PROTOBUF),
DATAGRAM("Datagram", DataParserFormat.DATAGRAM, null),
;

private final String label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.streamsets.pipeline.lib.parser.protobuf.ProtobufDataParserFactory;
import com.streamsets.pipeline.lib.parser.sdcrecord.SdcRecordDataParserFactory;
import com.streamsets.pipeline.lib.parser.text.TextDataParserFactory;
import com.streamsets.pipeline.lib.parser.udp.DatagramParserFactory;
import com.streamsets.pipeline.lib.parser.xml.XmlDataParserFactory;

import java.lang.reflect.Constructor;
Expand All @@ -48,6 +49,7 @@ public enum DataParserFormat implements DataFormat<DataParserFactory> {
AVRO(AvroDataParserFactory.class, AvroDataParserFactory.MODES, AvroDataParserFactory.CONFIGS),
BINARY(BinaryDataParserFactory.class, BinaryDataParserFactory.MODES, BinaryDataParserFactory.CONFIGS),
PROTOBUF(ProtobufDataParserFactory.class, ProtobufDataParserFactory.MODES, ProtobufDataParserFactory.CONFIGS),
DATAGRAM(DatagramParserFactory.class, DatagramParserFactory.MODES, DatagramParserFactory.CONFIGS),

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
public class DatagramParserFactory extends DataParserFactory {

private static final String KEY_PREFIX = "collectd.";
static final String CONVERT_TIME_KEY = KEY_PREFIX + "convertTime";
public static final String CONVERT_TIME_KEY = KEY_PREFIX + "convertTime";
static final boolean CONVERT_TIME_DEFAULT = false;
static final String AUTH_FILE_PATH_KEY = KEY_PREFIX + "authFilePath";
static final String TYPES_DB_PATH_KEY = KEY_PREFIX + "typesDbPath";
static final String EXCLUDE_INTERVAL_KEY = KEY_PREFIX + "excludeInterval";
public static final String AUTH_FILE_PATH_KEY = KEY_PREFIX + "authFilePath";
public static final String TYPES_DB_PATH_KEY = KEY_PREFIX + "typesDbPath";
public static final String EXCLUDE_INTERVAL_KEY = KEY_PREFIX + "excludeInterval";
static final boolean EXCLUDE_INTERVAL_DEFAULT = true;

public static final Map<String, Object> CONFIGS;
Expand All @@ -52,8 +52,8 @@ public class DatagramParserFactory extends DataParserFactory {
static {
Map<String, Object> configs = new HashMap<>();
configs.put(CONVERT_TIME_KEY, CONVERT_TIME_DEFAULT);
configs.put(AUTH_FILE_PATH_KEY, null);
configs.put(TYPES_DB_PATH_KEY, null);
configs.put(AUTH_FILE_PATH_KEY, "");
configs.put(TYPES_DB_PATH_KEY, "");
configs.put(EXCLUDE_INTERVAL_KEY, EXCLUDE_INTERVAL_DEFAULT);
CONFIGS = Collections.unmodifiableMap(configs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum DataFormatErrors implements ErrorCode {

DATA_FORMAT_302("Input data is not Base64 for record: {}"),

DATA_FORMAT_303("Could not parse XML object '{}'")
DATA_FORMAT_303("Could not parse XML object '{}'"),

DATA_FORMAT_400("collectd Types DB '{}' not found"),
DATA_FORMAT_401("collectd Auth File '{}' not found"),

;
private final String msg;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum DataFormatGroups implements Label {
AVRO("Avro"),
BINARY("Binary"),
PROTOBUF("Protobuf"),
DATAGRAM("Datagram")

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
import com.streamsets.pipeline.config.LogModeChooserValues;
import com.streamsets.pipeline.config.OnParseError;
import com.streamsets.pipeline.config.OnParseErrorChooserValues;
import com.streamsets.pipeline.config.DatagramMode;
import com.streamsets.pipeline.config.DatagramModeChooserValues;
import com.streamsets.pipeline.lib.parser.DataParserFactory;
import com.streamsets.pipeline.lib.parser.DataParserFactoryBuilder;
import com.streamsets.pipeline.lib.parser.avro.AvroDataParserFactory;
import com.streamsets.pipeline.lib.parser.log.LogDataFormatValidator;
import com.streamsets.pipeline.lib.parser.log.LogDataParserFactory;
import com.streamsets.pipeline.lib.parser.log.RegExConfig;
import com.streamsets.pipeline.lib.parser.text.TextDataParserFactory;
import com.streamsets.pipeline.lib.parser.udp.DatagramParserFactory;
import com.streamsets.pipeline.lib.parser.xml.XmlDataParserFactory;
import com.streamsets.pipeline.lib.util.DelimitedDataConstants;
import com.streamsets.pipeline.lib.util.ProtobufConstants;
Expand Down Expand Up @@ -83,7 +86,7 @@ public class DataParserFormatConfig implements DataFormatConfig{
displayPosition = 300,
group = "#0",
dependsOn = "dataFormat^",
triggeredByValue = {"TEXT", "JSON", "DELIMITED", "XML", "LOG"}
triggeredByValue = {"TEXT", "JSON", "DELIMITED", "XML", "LOG", "DATAGRAM"}
)
@ValueChooserModel(CharsetChooserValues.class)
public String charset = "UTF-8";
Expand Down Expand Up @@ -509,6 +512,8 @@ public class DataParserFormatConfig implements DataFormatConfig{
)
public String avroSchema = "";

// PROTOBUF

@ConfigDef(
required = true,
type = ConfigDef.Type.STRING,
Expand Down Expand Up @@ -549,6 +554,8 @@ public class DataParserFormatConfig implements DataFormatConfig{
)
public boolean isDelimited = true;

// BINARY

@ConfigDef(
required = true,
type = ConfigDef.Type.NUMBER,
Expand All @@ -564,6 +571,71 @@ public class DataParserFormatConfig implements DataFormatConfig{
)
public int binaryMaxObjectLen;

// DATAGRAM

@ConfigDef(
required = true,
type = ConfigDef.Type.MODEL,
label = "Data Format",
defaultValue = "SYSLOG",
group = "DATAGRAM",
displayPosition = 800,
dependsOn = "dataFormat^",
triggeredByValue = "DATAGRAM"
)
@ValueChooserModel(DatagramModeChooserValues.class)
public DatagramMode datagramMode;

@ConfigDef(
required = false,
type = ConfigDef.Type.STRING,
label = "TypesDB File Path",
description = "User-specified TypesDB file. Overrides the included version.",
displayPosition = 820,
group = "DATAGRAM",
dependsOn = "datagramMode",
triggeredByValue = "COLLECTD"
)
public String typesDbPath;

@ConfigDef(
required = true,
type = ConfigDef.Type.BOOLEAN,
defaultValue = "false",
label = "Convert Hi-Res Time & Interval",
description = "Converts high resolution time format interval and timestamp to unix time in (ms).",
displayPosition = 830,
group = "DATAGRAM",
dependsOn = "datagramMode",
triggeredByValue = "COLLECTD"
)
public boolean convertTime;

@ConfigDef(
required = true,
type = ConfigDef.Type.BOOLEAN,
defaultValue = "true",
label = "Exclude Interval",
description = "Excludes the interval field from output records.",
displayPosition = 840,
group = "DATAGRAM",
dependsOn = "datagramMode",
triggeredByValue = "COLLECTD"
)
public boolean excludeInterval;

@ConfigDef(
required = false,
type = ConfigDef.Type.STRING,
label = "Auth File",
description = "",
displayPosition = 850,
group = "DATAGRAM",
dependsOn = "datagramMode",
triggeredByValue = "COLLECTD"
)
public String authFilePath;

public boolean init(
Stage.Context context,
DataFormat dataFormat,
Expand Down Expand Up @@ -744,6 +816,14 @@ public boolean init(
}
}
break;
case DATAGRAM:
switch (datagramMode) {
case COLLECTD:
checkCollectdParserConfigs(context, configPrefix, issues);
break;
default:
}
break;
default:
issues.add(
context.createConfigIssue(
Expand All @@ -770,6 +850,33 @@ public boolean init(
return valid;
}

private void checkCollectdParserConfigs(Stage.Context context, String configPrefix, List<Stage.ConfigIssue> issues) {
if (!typesDbPath.isEmpty()) {
File typesDbFile = new File(typesDbPath);
if (!typesDbFile.canRead() || !typesDbFile.isFile()) {
issues.add(
context.createConfigIssue(
DataFormatGroups.DATAGRAM.name(),
configPrefix + "typesDbPath",
DataFormatErrors.DATA_FORMAT_400, typesDbPath
)
);
}
}
if (!authFilePath.isEmpty()) {
File authFile = new File(authFilePath);
if (!authFile.canRead() || !authFile.isFile()) {
issues.add(
context.createConfigIssue(
DataFormatGroups.DATAGRAM.name(),
configPrefix + "authFilePath",
DataFormatErrors.DATA_FORMAT_401, authFilePath
)
);
}
}
}

private boolean validateDataParser(
Stage.Context context,
DataFormat dataFormat,
Expand Down Expand Up @@ -849,6 +956,15 @@ private boolean validateDataParser(
.setConfig(ProtobufConstants.DELIMITED_KEY, isDelimited)
.setMaxDataLen(-1);
break;
case DATAGRAM:
builder
.setConfig(DatagramParserFactory.CONVERT_TIME_KEY, convertTime)
.setConfig(DatagramParserFactory.EXCLUDE_INTERVAL_KEY, excludeInterval)
.setConfig(DatagramParserFactory.AUTH_FILE_PATH_KEY, authFilePath)
.setConfig(DatagramParserFactory.TYPES_DB_PATH_KEY, typesDbPath)
.setMode(datagramMode)
.setMaxDataLen(-1);
break;
default:
throw new IllegalStateException("Unexpected data format" + dataFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamsets.pipeline.stage.origin.udp;
package com.streamsets.pipeline.stage.origin.lib;

import com.streamsets.pipeline.api.GenerateResourceBundle;
import com.streamsets.pipeline.api.Label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamsets.pipeline.stage.origin.udp;
package com.streamsets.pipeline.stage.origin.lib;

import com.streamsets.pipeline.api.base.BaseEnumChooserValues;

Expand Down
Loading

0 comments on commit 9048289

Please sign in to comment.