Skip to content

Commit

Permalink
SDC-9174. Directory origin - tracking ID generated as NULL
Browse files Browse the repository at this point in the history
- need getters/setters that are user and system space specific
- need validation for header user attributes are only user attributes
- that when all attributes are set, that required attributes are present

Change-Id: Id4277d459322a90d246d45f4e80eb50b1143585d
Signed-off-by: Keith Burns <keith@streamsets.com>
Reviewed-on: https://review.streamsets.net/15160
Tested-by: StreamSets CI <streamsets-ci-spam@streamsets.com>
Reviewed-by: Junko Urata <junko@streamsets.com>
  • Loading branch information
Keith Burns authored and ujunko committed Jun 27, 2018
1 parent 8d93008 commit e38b690
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Object clone(Object record) {
Class headerClassCl = Class.forName("com.streamsets.datacollector.record.HeaderImpl");

Object newHeader = headerClassCl.newInstance();
headerClassCl.getMethod("setAllAttributes", Map.class).invoke(newHeader, headers);
headerClassCl.getMethod("overrideUserAndSystemAttributes", Map.class).invoke(newHeader, headers);

Field resultField = cloneField(field);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,17 @@ public Map<String, Object> getAllAttributes() {
}

@Override
public Map<String, Object> setAllAttributes(Map<String, Object> newAttrs) {
public Map<String, Object> overrideUserAndSystemAttributes(Map<String, Object> newAttrs) {
return null;
}
@Override
public Map<String, Object> getUserAttributes() {return null;}

@Override
public Map<String, Object> setUserAttributes(Map<String, Object> newAttributes) {return null;}

};

@Override
public Header getHeader() {
return header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -26,10 +27,13 @@
import com.streamsets.pipeline.api.impl.Utils;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class HeaderImpl implements Record.Header, Predicate<String>, Cloneable, Serializable {
private static final String RESERVED_PREFIX = "_.";
Expand All @@ -50,6 +54,9 @@ public class HeaderImpl implements Record.Header, Predicate<String>, Cloneable,
private static final String ERROR_PIPELINE_NAME_ATTR = RESERVED_PREFIX + "pipelineName";
private static final String ERROR_STACKTRACE = RESERVED_PREFIX + "errorStackTrace";
private static final String ERROR_JOB_ID = RESERVED_PREFIX + "errorJobId";
private static final List<String> REQUIRED_ATTRIBUTES = ImmutableList.of(STAGE_CREATOR_INSTANCE_ATTR,
RECORD_SOURCE_ID_ATTR);

//Note: additional fields should also define in ScriptRecord

private Map<String, Object> map;
Expand Down Expand Up @@ -163,9 +170,9 @@ public Set<String> getAttributeNames() {
return ImmutableSet.copyOf(Sets.filter(map.keySet(), this));
}

private static final String REQUIRED_ATTR_EXCEPTION_MSG = "Does not have required attributes";
private static final String RESERVED_PREFIX_EXCEPTION_MSG = "Header attributes cannot start with '" +
RESERVED_PREFIX + "'";

@Override
public String getAttribute(String name) {
Preconditions.checkNotNull(name, "name cannot be null");
Expand Down Expand Up @@ -397,10 +404,72 @@ public Map<String, Object> getAllAttributes() {
return Collections.unmodifiableMap(map);
}

public Map<String, Object> setAllAttributes(Map<String, Object> newAttrs) {
private boolean isReservedAttribute(String attribute) {
return attribute.startsWith(RESERVED_PREFIX);
}

private Map<String, Object> getSystemAttributes() {
Map<String, Object> existingSystemAttr = new HashMap<>();

//Need to do this way due to valid null values
List<String> existingReservedKeys = map.keySet()
.stream()
.filter(key -> key.startsWith(RESERVED_PREFIX))
.collect(Collectors.toList());
existingReservedKeys.forEach( (key) -> existingSystemAttr.put(key, map.get(key)));

return existingSystemAttr;
}

private boolean hasRequiredAttributes(Set<String> keys) {
/* Any new header map MUST have the MINIMUM required reserved header attributes AND if any
reserved attributes exist in existing map the MUST exist in new map's keys
*/

// Check that new map's keys have the minimum required attributes of a header impl.
if(!keys.containsAll(REQUIRED_ATTRIBUTES)) {
return false;
}

// Check that new map's keys also contain any existing reserved attributes.
Set<String> existingReservedKeys = getSystemAttributes().keySet();
if (!keys.containsAll(existingReservedKeys)) {
return false;
}

return true;
}

public Map<String, Object> overrideUserAndSystemAttributes(Map<String, Object> newAttrs) {
/* Any new header map MUST have the MINIMUM required reserved header attributes AND if any
reserved attributes exist in existing map the MUST exist in new map's keys
*/
if (!hasRequiredAttributes(newAttrs.keySet())) {
throw new IllegalArgumentException(REQUIRED_ATTR_EXCEPTION_MSG);
}

// ImmutableMap can't have null values and our map could have, so use unmodifiable map
Map<String, Object> old = Collections.unmodifiableMap(map);
map = new HashMap<>(newAttrs);
return old;
}

public Map<String, Object> getUserAttributes() {
return map.entrySet()
.stream()
.filter(map -> !map.getKey().startsWith(RESERVED_PREFIX))
.collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue()));
}

public Map<String, Object> setUserAttributes(Map<String, Object> newAttributes) {
// ImmutableMap can't have null values and our map could have, so use unmodifiable map
Map<String, Object> old = Collections.unmodifiableMap(getUserAttributes());

//Set current map to just the Reserved System Attributes
map = getSystemAttributes();
// Add and validate each of the new user attributes
newAttributes.forEach((k,v) -> setAttribute(k, v.toString()));
return old;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,15 @@ public Map<String, Object> getAllAttributes() {
}

@Override
public Map<String, Object> setAllAttributes(Map<String, Object> newAttrs) {
public Map<String, Object> overrideUserAndSystemAttributes(Map<String, Object> newAttrs) {
return null;
}
@Override
public Map<String, Object> getUserAttributes() {return null;}

@Override
public Map<String, Object> setUserAttributes(Map<String, Object> newAttributes) {return null;}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

public class TestRecordImpl {

private static final String RESERVED_PREFIX = "_.";
private static final String REQUIRED_ATTR_EXCEPTION_MSG = "Does not have required attributes";
private static final String RESERVED_PREFIX_EXCEPTION_MSG = "Header attributes cannot start with '" +
RESERVED_PREFIX + "'";

@Test(expected = NullPointerException.class)
public void testConstructorInvalid1() {
new RecordImpl(null, (String) null, null, null);
Expand Down Expand Up @@ -91,6 +96,128 @@ record = new RecordImpl("stage", "source", new byte[0], "M");
record.toString();
}

@Test
public void testHeaderUserAttr_basic() {
RecordImpl record = new RecordImpl("stage", "source", null, null);
Record.Header header = record.getHeader();
Assert.assertNull(header.getRaw());
Assert.assertNull(header.getRawMimeType());

Map<String, Object> oldAttrs = new HashMap<>();
oldAttrs.put("something", "to be overwritten");

Map<String, Object> userAttrs = new HashMap<>();
userAttrs.put("bird", "chicken");
userAttrs.put("SSN", 12345);

//oldAttrs has a value, but should be no user data in header
Assert.assertEquals(oldAttrs.size(), 1);
oldAttrs = header.getUserAttributes();
Assert.assertEquals(oldAttrs.size(), 0);

//setUserAttributes returns the old user attributes, which should be empty
oldAttrs = header.setUserAttributes(userAttrs);
Assert.assertEquals(oldAttrs.size(), 0);
//setUserAttributes returns the old user attributes, which
// should now equal the new userAttrs
oldAttrs = header.setUserAttributes(userAttrs);
Assert.assertEquals(oldAttrs.size(), userAttrs.size());

}

@Test
public void testHeaderUserAttr_reservedAttrs() {
RecordImpl record = new RecordImpl("stage", "source", null, null);
Record.Header header = record.getHeader();
Assert.assertNull(header.getRaw());
Assert.assertNull(header.getRawMimeType());

Map<String, Object> oldAttrs = new HashMap<>();
oldAttrs.put("something", "to be overwritten");

Map<String, Object> userAttrs = new HashMap<>();
userAttrs.put("bird", "chicken");
userAttrs.put("SSN", 12345);
header.setUserAttributes(userAttrs);

// Now try and add a RESERVED attribute
try {
userAttrs.put(RESERVED_PREFIX+"-bird", "rabbit");
oldAttrs = header.setUserAttributes(userAttrs);
Assert.fail("Test should have asserted with attempt to set reserved attribute");
} catch (IllegalArgumentException ex) {
Assert.assertEquals(ex.getMessage(), RESERVED_PREFIX_EXCEPTION_MSG);
}

//Try overriding all attributes without required attributes
try {
oldAttrs = header.overrideUserAndSystemAttributes(userAttrs);
Assert.fail("Test should have asserted with attempt to override attributes with all required attributes.");
} catch (IllegalArgumentException ex) {
Assert.assertEquals(ex.getMessage(), REQUIRED_ATTR_EXCEPTION_MSG);
}

//Try overriding all attributes with required attributes
try {
oldAttrs = header.getAllAttributes();
oldAttrs = header.overrideUserAndSystemAttributes(oldAttrs);
} catch (IllegalArgumentException ex) {
Assert.fail(ex.getMessage());
}

//oldAttrs should have all headers, reserved and user
Assert.assertEquals(oldAttrs.size(), 3);

/* Test for hasRequiredAttributes(). Any new header map MUST have the MINIMUM required reserved
header attributes AND if any reserved attributes exist in existing map the MUST exist in new
map's keys
*/
oldAttrs = header.getAllAttributes();
Assert.assertEquals(oldAttrs.size(), 3);

// Remove reserved attribute RESERVED_PREFIX+"sourceRecord" from map, which exists in existing
// header, and attempt to override header map with missing attribute
Map<String, Object> badAttrs = new HashMap<>(oldAttrs);
badAttrs.remove(new String(RESERVED_PREFIX+"sourceRecord"));
try {
badAttrs = header.overrideUserAndSystemAttributes(badAttrs);
Assert.fail("Should not be able to override header map without all reserved attributes.");
} catch (IllegalArgumentException ex) {
Assert.assertEquals(ex.getMessage(), REQUIRED_ATTR_EXCEPTION_MSG);
}
}

@Test
public void testHeaderUserAttr_setUserAttrs() {
//setup
RecordImpl record = new RecordImpl("stage", "source", null, null);
Record.Header header = record.getHeader();
Assert.assertNull(header.getRaw());
Assert.assertNull(header.getRawMimeType());

Map<String, Object> userAttrs = new HashMap<>();
userAttrs.put("bird", "chicken");
userAttrs.put("SSN", 12345);
header.setUserAttributes(userAttrs);

// Check that setAllUserAttributes() overrides existing
Map<String, Object> newAttrs = new HashMap<>(header.getUserAttributes());
Assert.assertEquals(userAttrs.size(), newAttrs.size());

newAttrs.remove("bird");
Map<String, Object> oldAttrs = header.setUserAttributes(newAttrs);
//old attributes should map original map from userAttrs
Assert.assertEquals(oldAttrs.size(), userAttrs.size());
//.. which should not match newAttrs since we deleted "bird"
Assert.assertNotEquals(oldAttrs.size(),newAttrs.size());

oldAttrs = header.getUserAttributes();
//map should now contain only the new user attributes
Assert.assertEquals(oldAttrs, newAttrs);

}


@Test
public void testRaw() {
RecordImpl record = new RecordImpl("stage", "source", new byte[0], "M");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;


public class SpoolDirRunnable implements Runnable {
public static final String SPOOL_DIR_METRICS = "Spool Directory Metrics for Thread - ";
public static final String CURRENT_FILE = "Current File";
Expand Down Expand Up @@ -514,7 +515,7 @@ private Map<String, Object> generateHeaderAttrs(WrappedFile file) throws IOExcep
}

private void setHeaders(Record record, Map<String, Object> recordHeaderAttr) throws IOException {
record.getHeader().setAllAttributes(recordHeaderAttr);
recordHeaderAttr.forEach((k,v) -> record.getHeader().setAttribute(k, v.toString()));
}

private enum Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void iterateNode(
(String) attributes.get(HeaderAttributeConstants.OFFSET)
));
Record.Header recordHeader = record.getHeader();
recordHeader.setAllAttributes(attributes);
recordHeader.setUserAttributes(attributes);
recordHeader.setAttribute(MAPR_OP_TIMESTAMP, String.valueOf(node.getOpTimestamp()));
recordHeader.setAttribute(MAPR_SERVER_TIMESTAMP, String.valueOf(node.getServerTimestamp()));
if(node.getType() == Value.Type.MAP) {
Expand Down Expand Up @@ -192,7 +192,7 @@ private void iterateNode(
(String) attributes.get(HeaderAttributeConstants.OFFSET)
));
Record.Header recordHeader = record.getHeader();
recordHeader.setAllAttributes(attributes);
recordHeader.setUserAttributes(attributes);

HashMap<String, Field> root = new HashMap<>();
record.set(Field.create(root));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ public void testMultipleTopics() throws StageException, InterruptedException, Ex
int numMessages = rand.nextInt(40)+1;
totalMessages += numMessages;
ConsumerRecords<byte[], ChangeDataRecord> consumerRecords =
generateConsumerRecords(numMessages, 1, "topic"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE);
generateConsumerRecords(numMessages, 1, "topic-"+rand.nextInt(numTopics), 0, ChangeDataRecordType
.RECORD_UPDATE);
ConsumerRecords<byte[], ChangeDataRecord> emptyRecords =
generateConsumerRecords(1, 0, "topic"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE);
generateConsumerRecords(1, 0, "topic-"+rand.nextInt(numTopics), 0, ChangeDataRecordType.RECORD_UPDATE);

KafkaConsumer mockConsumer = Mockito.mock(KafkaConsumer.class);
consumerList.add(mockConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ public static <C extends Processor> void verifyRecordHeaderAttribute(
map,
null
);
record.getHeader().setAllAttributes(recordHeaderAttributes);
record.getHeader().overrideUserAndSystemAttributes(recordHeaderAttributes);

ProcessorRunner runner = new ProcessorRunner.Builder(clazz, processor)
.addOutputLane("lane")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static Record clone(Record record, Processor.Context context) {
// So create a new record and set its root field to be the deserialized one's root field.
Record r = context.createRecord(record);
r.set(record.get());
r.getHeader().setAllAttributes(record.getHeader().getAllAttributes());
r.getHeader().overrideUserAndSystemAttributes(record.getHeader().getAllAttributes());
return r;
}

Expand All @@ -65,7 +65,7 @@ public static Record clone(Object record, Processor.Context context) {
Object origHeaders = record.getClass().getMethod("getHeader").invoke(record);
Map<String, Object> headers =
(Map<String, Object>) origHeaders.getClass().getMethod("getAllAttributes").invoke(origHeaders);
newRecord.getHeader().setAllAttributes(headers);
newRecord.getHeader().overrideUserAndSystemAttributes(headers);
newRecord.set(RecordCloner.cloneField(record.getClass().getMethod("get").invoke(record)));
return newRecord;
} catch(Exception ex) {
Expand Down

0 comments on commit e38b690

Please sign in to comment.