diff --git a/cluster-bootstrap-api/src/main/java/com/streamsets/pipeline/spark/RecordCloner.java b/cluster-bootstrap-api/src/main/java/com/streamsets/pipeline/spark/RecordCloner.java index 8d335d35ace..966290aac87 100644 --- a/cluster-bootstrap-api/src/main/java/com/streamsets/pipeline/spark/RecordCloner.java +++ b/cluster-bootstrap-api/src/main/java/com/streamsets/pipeline/spark/RecordCloner.java @@ -51,7 +51,7 @@ public static Object clone(Object record) { Class headerClassCl = Class.forName("com.streamsets.datacollector.record.HeaderImpl"); Object newHeader = headerClassCl.newInstance(); - headerClassCl.getMethod("setAllAttributes", Map.class).invoke(newHeader, headers); + headerClassCl.getMethod("overrideUserAndSystemAttributes", Map.class).invoke(newHeader, headers); Field resultField = cloneField(field); diff --git a/common/src/test/java/com/streamsets/pipeline/lib/sampling/TestRecordSampler.java b/common/src/test/java/com/streamsets/pipeline/lib/sampling/TestRecordSampler.java index c0f6aca372b..fd524af5988 100644 --- a/common/src/test/java/com/streamsets/pipeline/lib/sampling/TestRecordSampler.java +++ b/common/src/test/java/com/streamsets/pipeline/lib/sampling/TestRecordSampler.java @@ -153,10 +153,17 @@ public Map getAllAttributes() { } @Override - public Map setAllAttributes(Map newAttrs) { + public Map overrideUserAndSystemAttributes(Map newAttrs) { return null; } + @Override + public Map getUserAttributes() {return null;} + + @Override + public Map setUserAttributes(Map newAttributes) {return null;} + }; + @Override public Header getHeader() { return header; diff --git a/container/src/main/java/com/streamsets/datacollector/record/HeaderImpl.java b/container/src/main/java/com/streamsets/datacollector/record/HeaderImpl.java index 8f5f0981cf4..46967e08008 100644 --- a/container/src/main/java/com/streamsets/datacollector/record/HeaderImpl.java +++ b/container/src/main/java/com/streamsets/datacollector/record/HeaderImpl.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -26,10 +27,13 @@ import com.streamsets.pipeline.api.impl.Utils; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class HeaderImpl implements Record.Header, Predicate, Cloneable, Serializable { private static final String RESERVED_PREFIX = "_."; @@ -50,6 +54,9 @@ public class HeaderImpl implements Record.Header, Predicate, Cloneable, private static final String ERROR_PIPELINE_NAME_ATTR = RESERVED_PREFIX + "pipelineName"; private static final String ERROR_STACKTRACE = RESERVED_PREFIX + "errorStackTrace"; private static final String ERROR_JOB_ID = RESERVED_PREFIX + "errorJobId"; + private static final List REQUIRED_ATTRIBUTES = ImmutableList.of(STAGE_CREATOR_INSTANCE_ATTR, + RECORD_SOURCE_ID_ATTR); + //Note: additional fields should also define in ScriptRecord private Map map; @@ -163,9 +170,9 @@ public Set getAttributeNames() { return ImmutableSet.copyOf(Sets.filter(map.keySet(), this)); } + private static final String REQUIRED_ATTR_EXCEPTION_MSG = "Does not have required attributes"; private static final String RESERVED_PREFIX_EXCEPTION_MSG = "Header attributes cannot start with '" + RESERVED_PREFIX + "'"; - @Override public String getAttribute(String name) { Preconditions.checkNotNull(name, "name cannot be null"); @@ -397,10 +404,72 @@ public Map getAllAttributes() { return Collections.unmodifiableMap(map); } - public Map setAllAttributes(Map newAttrs) { + private boolean isReservedAttribute(String attribute) { + return attribute.startsWith(RESERVED_PREFIX); + } + + private Map getSystemAttributes() { + Map existingSystemAttr = new HashMap<>(); + + //Need to do this way due to valid null values + List existingReservedKeys = map.keySet() + .stream() + .filter(key -> key.startsWith(RESERVED_PREFIX)) + .collect(Collectors.toList()); + existingReservedKeys.forEach( (key) -> existingSystemAttr.put(key, map.get(key))); + + return existingSystemAttr; + } + + private boolean hasRequiredAttributes(Set keys) { + /* Any new header map MUST have the MINIMUM required reserved header attributes AND if any + reserved attributes exist in existing map the MUST exist in new map's keys + */ + + // Check that new map's keys have the minimum required attributes of a header impl. + if(!keys.containsAll(REQUIRED_ATTRIBUTES)) { + return false; + } + + // Check that new map's keys also contain any existing reserved attributes. + Set existingReservedKeys = getSystemAttributes().keySet(); + if (!keys.containsAll(existingReservedKeys)) { + return false; + } + + return true; + } + + public Map overrideUserAndSystemAttributes(Map newAttrs) { + /* Any new header map MUST have the MINIMUM required reserved header attributes AND if any + reserved attributes exist in existing map the MUST exist in new map's keys + */ + if (!hasRequiredAttributes(newAttrs.keySet())) { + throw new IllegalArgumentException(REQUIRED_ATTR_EXCEPTION_MSG); + } + // ImmutableMap can't have null values and our map could have, so use unmodifiable map Map old = Collections.unmodifiableMap(map); map = new HashMap<>(newAttrs); return old; } + + public Map getUserAttributes() { + return map.entrySet() + .stream() + .filter(map -> !map.getKey().startsWith(RESERVED_PREFIX)) + .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + } + + public Map setUserAttributes(Map newAttributes) { + // ImmutableMap can't have null values and our map could have, so use unmodifiable map + Map old = Collections.unmodifiableMap(getUserAttributes()); + + //Set current map to just the Reserved System Attributes + map = getSystemAttributes(); + // Add and validate each of the new user attributes + newAttributes.forEach((k,v) -> setAttribute(k, v.toString())); + return old; + } + } diff --git a/container/src/main/java/com/streamsets/datacollector/validation/RuleDefinitionValidator.java b/container/src/main/java/com/streamsets/datacollector/validation/RuleDefinitionValidator.java index e3696a8b38c..20a9c3a46fb 100644 --- a/container/src/main/java/com/streamsets/datacollector/validation/RuleDefinitionValidator.java +++ b/container/src/main/java/com/streamsets/datacollector/validation/RuleDefinitionValidator.java @@ -312,9 +312,15 @@ public Map getAllAttributes() { } @Override - public Map setAllAttributes(Map newAttrs) { + public Map overrideUserAndSystemAttributes(Map newAttrs) { return null; } + @Override + public Map getUserAttributes() {return null;} + + @Override + public Map setUserAttributes(Map newAttributes) {return null;} + }; } diff --git a/container/src/test/java/com/streamsets/datacollector/record/TestRecordImpl.java b/container/src/test/java/com/streamsets/datacollector/record/TestRecordImpl.java index ad440966067..90a3a0ce551 100644 --- a/container/src/test/java/com/streamsets/datacollector/record/TestRecordImpl.java +++ b/container/src/test/java/com/streamsets/datacollector/record/TestRecordImpl.java @@ -31,6 +31,11 @@ public class TestRecordImpl { + private static final String RESERVED_PREFIX = "_."; + private static final String REQUIRED_ATTR_EXCEPTION_MSG = "Does not have required attributes"; + private static final String RESERVED_PREFIX_EXCEPTION_MSG = "Header attributes cannot start with '" + + RESERVED_PREFIX + "'"; + @Test(expected = NullPointerException.class) public void testConstructorInvalid1() { new RecordImpl(null, (String) null, null, null); @@ -91,6 +96,128 @@ record = new RecordImpl("stage", "source", new byte[0], "M"); record.toString(); } + @Test + public void testHeaderUserAttr_basic() { + RecordImpl record = new RecordImpl("stage", "source", null, null); + Record.Header header = record.getHeader(); + Assert.assertNull(header.getRaw()); + Assert.assertNull(header.getRawMimeType()); + + Map oldAttrs = new HashMap<>(); + oldAttrs.put("something", "to be overwritten"); + + Map userAttrs = new HashMap<>(); + userAttrs.put("bird", "chicken"); + userAttrs.put("SSN", 12345); + + //oldAttrs has a value, but should be no user data in header + Assert.assertEquals(oldAttrs.size(), 1); + oldAttrs = header.getUserAttributes(); + Assert.assertEquals(oldAttrs.size(), 0); + + //setUserAttributes returns the old user attributes, which should be empty + oldAttrs = header.setUserAttributes(userAttrs); + Assert.assertEquals(oldAttrs.size(), 0); + //setUserAttributes returns the old user attributes, which + // should now equal the new userAttrs + oldAttrs = header.setUserAttributes(userAttrs); + Assert.assertEquals(oldAttrs.size(), userAttrs.size()); + + } + + @Test + public void testHeaderUserAttr_reservedAttrs() { + RecordImpl record = new RecordImpl("stage", "source", null, null); + Record.Header header = record.getHeader(); + Assert.assertNull(header.getRaw()); + Assert.assertNull(header.getRawMimeType()); + + Map oldAttrs = new HashMap<>(); + oldAttrs.put("something", "to be overwritten"); + + Map userAttrs = new HashMap<>(); + userAttrs.put("bird", "chicken"); + userAttrs.put("SSN", 12345); + header.setUserAttributes(userAttrs); + + // Now try and add a RESERVED attribute + try { + userAttrs.put(RESERVED_PREFIX+"-bird", "rabbit"); + oldAttrs = header.setUserAttributes(userAttrs); + Assert.fail("Test should have asserted with attempt to set reserved attribute"); + } catch (IllegalArgumentException ex) { + Assert.assertEquals(ex.getMessage(), RESERVED_PREFIX_EXCEPTION_MSG); + } + + //Try overriding all attributes without required attributes + try { + oldAttrs = header.overrideUserAndSystemAttributes(userAttrs); + Assert.fail("Test should have asserted with attempt to override attributes with all required attributes."); + } catch (IllegalArgumentException ex) { + Assert.assertEquals(ex.getMessage(), REQUIRED_ATTR_EXCEPTION_MSG); + } + + //Try overriding all attributes with required attributes + try { + oldAttrs = header.getAllAttributes(); + oldAttrs = header.overrideUserAndSystemAttributes(oldAttrs); + } catch (IllegalArgumentException ex) { + Assert.fail(ex.getMessage()); + } + + //oldAttrs should have all headers, reserved and user + Assert.assertEquals(oldAttrs.size(), 3); + + /* Test for hasRequiredAttributes(). Any new header map MUST have the MINIMUM required reserved + header attributes AND if any reserved attributes exist in existing map the MUST exist in new + map's keys + */ + oldAttrs = header.getAllAttributes(); + Assert.assertEquals(oldAttrs.size(), 3); + + // Remove reserved attribute RESERVED_PREFIX+"sourceRecord" from map, which exists in existing + // header, and attempt to override header map with missing attribute + Map badAttrs = new HashMap<>(oldAttrs); + badAttrs.remove(new String(RESERVED_PREFIX+"sourceRecord")); + try { + badAttrs = header.overrideUserAndSystemAttributes(badAttrs); + Assert.fail("Should not be able to override header map without all reserved attributes."); + } catch (IllegalArgumentException ex) { + Assert.assertEquals(ex.getMessage(), REQUIRED_ATTR_EXCEPTION_MSG); + } + } + + @Test + public void testHeaderUserAttr_setUserAttrs() { + //setup + RecordImpl record = new RecordImpl("stage", "source", null, null); + Record.Header header = record.getHeader(); + Assert.assertNull(header.getRaw()); + Assert.assertNull(header.getRawMimeType()); + + Map userAttrs = new HashMap<>(); + userAttrs.put("bird", "chicken"); + userAttrs.put("SSN", 12345); + header.setUserAttributes(userAttrs); + + // Check that setAllUserAttributes() overrides existing + Map newAttrs = new HashMap<>(header.getUserAttributes()); + Assert.assertEquals(userAttrs.size(), newAttrs.size()); + + newAttrs.remove("bird"); + Map oldAttrs = header.setUserAttributes(newAttrs); + //old attributes should map original map from userAttrs + Assert.assertEquals(oldAttrs.size(), userAttrs.size()); + //.. which should not match newAttrs since we deleted "bird" + Assert.assertNotEquals(oldAttrs.size(),newAttrs.size()); + + oldAttrs = header.getUserAttributes(); + //map should now contain only the new user attributes + Assert.assertEquals(oldAttrs, newAttrs); + + } + + @Test public void testRaw() { RecordImpl record = new RecordImpl("stage", "source", new byte[0], "M"); diff --git a/dir-spooler-protolib/src/main/java/com/streamsets/pipeline/lib/dirspooler/SpoolDirRunnable.java b/dir-spooler-protolib/src/main/java/com/streamsets/pipeline/lib/dirspooler/SpoolDirRunnable.java index 21f1f56211d..3a746c83cd8 100644 --- a/dir-spooler-protolib/src/main/java/com/streamsets/pipeline/lib/dirspooler/SpoolDirRunnable.java +++ b/dir-spooler-protolib/src/main/java/com/streamsets/pipeline/lib/dirspooler/SpoolDirRunnable.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; + public class SpoolDirRunnable implements Runnable { public static final String SPOOL_DIR_METRICS = "Spool Directory Metrics for Thread - "; public static final String CURRENT_FILE = "Current File"; @@ -514,7 +515,7 @@ private Map generateHeaderAttrs(WrappedFile file) throws IOExcep } private void setHeaders(Record record, Map recordHeaderAttr) throws IOException { - record.getHeader().setAllAttributes(recordHeaderAttr); + recordHeaderAttr.forEach((k,v) -> record.getHeader().setAttribute(k, v.toString())); } private enum Status { diff --git a/maprdb-protolib/src/main/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/MapRDBCDCSource.java b/maprdb-protolib/src/main/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/MapRDBCDCSource.java index a8e5953c999..f719c53c383 100644 --- a/maprdb-protolib/src/main/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/MapRDBCDCSource.java +++ b/maprdb-protolib/src/main/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/MapRDBCDCSource.java @@ -156,7 +156,7 @@ private void iterateNode( (String) attributes.get(HeaderAttributeConstants.OFFSET) )); Record.Header recordHeader = record.getHeader(); - recordHeader.setAllAttributes(attributes); + recordHeader.setUserAttributes(attributes); recordHeader.setAttribute(MAPR_OP_TIMESTAMP, String.valueOf(node.getOpTimestamp())); recordHeader.setAttribute(MAPR_SERVER_TIMESTAMP, String.valueOf(node.getServerTimestamp())); if(node.getType() == Value.Type.MAP) { @@ -192,7 +192,7 @@ private void iterateNode( (String) attributes.get(HeaderAttributeConstants.OFFSET) )); Record.Header recordHeader = record.getHeader(); - recordHeader.setAllAttributes(attributes); + recordHeader.setUserAttributes(attributes); HashMap root = new HashMap<>(); record.set(Field.create(root)); diff --git a/maprdb-protolib/src/test/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/TestMapRDBCDCSource.java b/maprdb-protolib/src/test/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/TestMapRDBCDCSource.java index 5e058436d5d..bc8df0dc6c7 100644 --- a/maprdb-protolib/src/test/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/TestMapRDBCDCSource.java +++ b/maprdb-protolib/src/test/java/com/streamsets/pipeline/stage/origin/cdc/maprdb/TestMapRDBCDCSource.java @@ -261,9 +261,10 @@ public void testMultipleTopics() throws StageException, InterruptedException, Ex int numMessages = rand.nextInt(40)+1; totalMessages += numMessages; ConsumerRecords consumerRecords = - generateConsumerRecords(numMessages, 1, "topic"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE); + generateConsumerRecords(numMessages, 1, "topic-"+rand.nextInt(numTopics), 0, ChangeDataRecordType + .RECORD_UPDATE); ConsumerRecords emptyRecords = - generateConsumerRecords(1, 0, "topic"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE); + generateConsumerRecords(1, 0, "topic-"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE); KafkaConsumer mockConsumer = Mockito.mock(KafkaConsumer.class); consumerList.add(mockConsumer); diff --git a/scripting-protolib/src/test/java/com/streamsets/pipeline/stage/processor/scripting/ScriptingProcessorTestUtil.java b/scripting-protolib/src/test/java/com/streamsets/pipeline/stage/processor/scripting/ScriptingProcessorTestUtil.java index bf4991bd0bb..9f4b431138d 100644 --- a/scripting-protolib/src/test/java/com/streamsets/pipeline/stage/processor/scripting/ScriptingProcessorTestUtil.java +++ b/scripting-protolib/src/test/java/com/streamsets/pipeline/stage/processor/scripting/ScriptingProcessorTestUtil.java @@ -791,7 +791,7 @@ public static void verifyRecordHeaderAttribute( map, null ); - record.getHeader().setAllAttributes(recordHeaderAttributes); + record.getHeader().overrideUserAndSystemAttributes(recordHeaderAttributes); ProcessorRunner runner = new ProcessorRunner.Builder(clazz, processor) .addOutputLane("lane") diff --git a/spark-processor-protolib/src/main/java/com/streamsets/pipeline/stage/processor/spark/util/RecordCloner.java b/spark-processor-protolib/src/main/java/com/streamsets/pipeline/stage/processor/spark/util/RecordCloner.java index d3e5402ec8e..09dc0a2eacf 100644 --- a/spark-processor-protolib/src/main/java/com/streamsets/pipeline/stage/processor/spark/util/RecordCloner.java +++ b/spark-processor-protolib/src/main/java/com/streamsets/pipeline/stage/processor/spark/util/RecordCloner.java @@ -54,7 +54,7 @@ public static Record clone(Record record, Processor.Context context) { // So create a new record and set its root field to be the deserialized one's root field. Record r = context.createRecord(record); r.set(record.get()); - r.getHeader().setAllAttributes(record.getHeader().getAllAttributes()); + r.getHeader().overrideUserAndSystemAttributes(record.getHeader().getAllAttributes()); return r; } @@ -65,7 +65,7 @@ public static Record clone(Object record, Processor.Context context) { Object origHeaders = record.getClass().getMethod("getHeader").invoke(record); Map headers = (Map) origHeaders.getClass().getMethod("getAllAttributes").invoke(origHeaders); - newRecord.getHeader().setAllAttributes(headers); + newRecord.getHeader().overrideUserAndSystemAttributes(headers); newRecord.set(RecordCloner.cloneField(record.getClass().getMethod("get").invoke(record))); return newRecord; } catch(Exception ex) {