Skip to content

Commit

Permalink
NIFI-259 Corrected GetHttp state managment and added a new unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
jpercivall committed Feb 4, 2016
1 parent 55b77fe commit 88d4d2c
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -91,11 +92,14 @@
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;

@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports it, the Processor then stores the Last Modified time and the ETag "
+ "so that data will not be pulled again until the remote data changes or until the state is cleared.")
+ "so that data will not be pulled again until the remote data changes or until the state is cleared. Note that due to limitations on state "
+ "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there "
+ "is the potential for Out of Memory Errors to occur.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header")
Expand Down Expand Up @@ -400,16 +404,18 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
final HttpGet get = new HttpGet(url);
get.setConfig(requestConfigBuilder.build());

final StateMap beforeStateMap;

try {
final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
final String lastModified = stateMap.get(LAST_MODIFIED);
beforeStateMap = context.getStateManager().getState(Scope.LOCAL);
final String lastModified = beforeStateMap.get(LAST_MODIFIED+":" + url);
if (lastModified != null) {
get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified);
get.addHeader(HEADER_IF_MODIFIED_SINCE, parseStateValue(lastModified).getValue());
}

final String etag = stateMap.get(ETAG);
final String etag = beforeStateMap.get(ETAG+":" + url);
if (etag != null) {
get.addHeader(HEADER_IF_NONE_MATCH, etag);
get.addHeader(HEADER_IF_NONE_MATCH, parseStateValue(etag).getValue());
}
} catch (final IOException ioe) {
throw new ProcessException(ioe);
Expand Down Expand Up @@ -461,20 +467,8 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
session.commit();

final Map<String, String> updatedState = new HashMap<>(2);
final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
if (lastModified != null) {
updatedState.put(LAST_MODIFIED, lastModified.getValue());
}
updateStateMap(context,response,beforeStateMap,url);

final Header etag = response.getFirstHeader(HEADER_ETAG);
if (etag != null) {
updatedState.put(ETAG, etag.getValue());
}

if (!updatedState.isEmpty()) {
context.getStateManager().setState(updatedState, Scope.LOCAL);
}
} catch (final IOException e) {
context.yield();
session.rollback();
Expand All @@ -490,4 +484,70 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
conMan.shutdown();
}
}

private void updateStateMap(ProcessContext context, HttpResponse response, StateMap beforeStateMap, String url){
try {
Map<String,String> workingMap = new HashMap<>();
workingMap.putAll(beforeStateMap.toMap());
final StateManager stateManager = context.getStateManager();
StateMap oldValue = beforeStateMap;

long currentTime = System.currentTimeMillis();

final Header receivedLastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
if (receivedLastModified != null) {
workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
}

final Header receivedEtag = response.getFirstHeader(HEADER_ETAG);
if (receivedEtag != null) {
workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
}

boolean replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
boolean changed;

while(!replaceSucceeded){
oldValue = stateManager.getState(Scope.LOCAL);
workingMap.clear();
workingMap.putAll(oldValue.toMap());

changed = false;

if(receivedLastModified != null){
Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(LAST_MODIFIED+":"+url));

if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
changed = true;
}
}

if(receivedEtag != null){
Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(ETAG+":"+url));

if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
changed = true;
}
}

if(changed) {
replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
} else {
break;
}
}
} catch (final IOException ioe) {
throw new ProcessException(ioe);
}
}

protected static Tuple<String, String> parseStateValue(String mapValue){
int indexOfColon = mapValue.indexOf(":");

String timestamp = mapValue.substring(0,indexOfColon);
String value = mapValue.substring(indexOfColon+1);
return new Tuple<>(timestamp,value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -77,21 +78,21 @@ public final void testContentModified() throws Exception {
controller.run(2);

// verify the lastModified and entityTag are updated
controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);

// ran twice, but got one...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);

// verify remote.source flowfile attribute
controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost");

controller.clearTransferState();

// turn off checking for etag and lastModified
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
controller.run(2);

// ran twice, got two...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2);
controller.clearTransferState();
Expand All @@ -114,29 +115,98 @@ public final void testContentModified() throws Exception {
RESTServiceContentModified.IGNORE_ETAG = false;
RESTServiceContentModified.ETAG = 1;
controller.run(2);

// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", Scope.LOCAL);
String eTagStateValue = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.ETAG+":"+destination);
assertEquals("1",GetHTTP.parseStateValue(eTagStateValue).getValue());
controller.clearTransferState();

// turn off checking for Etag, turn on checking for lastModified, but change value
RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000;
String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED);
String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED+":"+destination);
controller.run(2);

// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, lastMod, Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, lastMod, Scope.LOCAL);
controller.clearTransferState();

// shutdown web service
} finally {
// shutdown web service
server.shutdownServer();
}
}


@Test
public final void testContentModifiedTwoServers() throws Exception {
// set up web services
ServletHandler handler1 = new ServletHandler();
handler1.addServletWithMapping(RESTServiceContentModified.class, "/*");

ServletHandler handler2 = new ServletHandler();
handler2.addServletWithMapping(RESTServiceContentModified.class, "/*");

// create the services
TestServer server1 = new TestServer();
server1.addHandler(handler1);

TestServer server2 = new TestServer();
server2.addHandler(handler2);

try {
server1.startServer();
server2.startServer();

// this is the base urls with the random ports
String destination1 = server1.getUrl();
String destination2 = server2.getUrl();

// set up NiFi mock controller
controller = TestRunners.newTestRunner(GetHTTP.class);
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(GetHTTP.URL, destination1);
controller.setProperty(GetHTTP.FILENAME, "testFile");
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");

controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination1, Scope.LOCAL);
controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination1, Scope.LOCAL);
controller.run(2);

// verify the lastModified and entityTag are updated
controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);

// ran twice, but got one...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);

controller.clearTransferState();

controller.setProperty(GetHTTP.URL, destination2);
controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination2, Scope.LOCAL);
controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination2, Scope.LOCAL);

controller.run(2);

// ran twice, but got one...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);

// verify the lastModified's and entityTags are updated
controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination2, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination2, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);

} finally {
// shutdown web services
server1.shutdownServer();
server2.shutdownServer();
}
}

@Test
public final void testUserAgent() throws Exception {
// set up web service
Expand Down

0 comments on commit 88d4d2c

Please sign in to comment.