Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions src/main/java/com/gooddata/connector/ConnectorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import static com.gooddata.util.Validate.notNull;
import static java.lang.String.format;

/**
* Service for connector integration creation, update of its settings or execution of its process.
Expand Down Expand Up @@ -197,31 +199,55 @@ public FutureResult<ProcessStatus> executeProcess(final Project project, final P
try {
final UriResponse response = restTemplate
.postForObject(ProcessStatus.URL, execution, UriResponse.class, project.getId(), connectorType);
return new PollResult<>(this, new SimplePollHandler<ProcessStatus>(response.getUri(), ProcessStatus.class) {
@Override
public boolean isFinished(final ClientHttpResponse response) throws IOException {
final ProcessStatus process = extractData(response, ProcessStatus.class);
return process.isFinished();
}

@Override
public void handlePollResult(final ProcessStatus pollResult) {
super.handlePollResult(pollResult);
if (pollResult.isFailed()) {
throw new ConnectorException(connectorType + " process failed: " +
pollResult.getStatus().getDescription());
}
}

@Override
public void handlePollException(final GoodDataRestException e) {
throw new ConnectorException(connectorType + " process failed: " + e.getText(), e);
}
});
return createProcessPollResult(response.getUri());
} catch (GoodDataRestException | RestClientException e) {
throw new ConnectorException("Unable to execute " + connectorType + " process", e);
}
}


/**
* Gets status of provided connector process.
* <p/>
* You can use process retrieved by <code>getXXXProcess</code> methods on {@link Integration} as well as a result of
* {@link ConnectorService#executeProcess(Project, ProcessExecution)}.
*
* @param process process to be executed
* @return executed process
* @throws ConnectorException if process execution fails
*/
public FutureResult<ProcessStatus> getProcessStatus(final IntegrationProcessStatus process) {
notNull(process, "process");
notNull(process.getUri(), "process.getUri");
return createProcessPollResult(process.getUri());
}

private FutureResult<ProcessStatus> createProcessPollResult(final String uri) {
final Map<String, String> match = IntegrationProcessStatus.TEMPLATE.match(uri);
final String connectorType = match.get("connector");
final String processId = match.get("process");
return new PollResult<>(this, new SimplePollHandler<ProcessStatus>(uri, ProcessStatus.class) {
@Override
public boolean isFinished(final ClientHttpResponse response) throws IOException {
final ProcessStatus process = extractData(response, ProcessStatus.class);
return process.isFinished();
}

@Override
public void handlePollResult(final ProcessStatus pollResult) {
super.handlePollResult(pollResult);
if (pollResult.isFailed()) {
throw new ConnectorException(format("%s process %s failed: %s", connectorType, processId,
pollResult.getStatus().getDescription()));
}
}

@Override
public void handlePollException(final GoodDataRestException e) {
throw new ConnectorException(format("%s process %s failed: %s", connectorType, processId,
e.getText()), e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.joda.time.DateTime;
import org.springframework.web.util.UriTemplate;

import java.util.Map;

/**
* Connector process (i.e. single ETL run) status used in integration object. Deserialization only.
Expand All @@ -20,17 +23,24 @@
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class IntegrationProcessStatus {

public static final String URI = "/gdc/projects/{project}/connectors/{connector}/integration/processes/{process}";
public static final UriTemplate TEMPLATE = new UriTemplate(URI);
private static final String SELF_LINK = "self";

private final Status status;
private final DateTime started;
private final DateTime finished;
private Map<String, String> links;

@JsonCreator
protected IntegrationProcessStatus(@JsonProperty("status") Status status,
@JsonProperty("started") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime started,
@JsonProperty("finished") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime finished) {
@JsonProperty("finished") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime finished,
@JsonProperty("links") Map<String, String> links) {
this.status = status;
this.started = started;
this.finished = finished;
this.links = links;
}

public Status getStatus() {
Expand Down Expand Up @@ -68,4 +78,14 @@ public boolean isFinished() {
public boolean isFailed() {
return status != null && status.isFailed();
}

@JsonIgnore
public String getUri() {
return links != null ? links.get(SELF_LINK): null;
}

@JsonIgnore
public String getId() {
return TEMPLATE.match(getUri()).get("process");
}
}
8 changes: 5 additions & 3 deletions src/main/java/com/gooddata/connector/ProcessStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.codehaus.jackson.annotate.JsonTypeName;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.joda.time.DateTime;

import java.util.Map;

/**
* Connector process (i.e. single ETL run) status (standalone, not embedded in integration as its parent) .
* Deserialization only.
Expand All @@ -27,8 +28,9 @@ public class ProcessStatus extends IntegrationProcessStatus {
@JsonCreator
ProcessStatus(@JsonProperty("status") Status status,
@JsonProperty("started") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime started,
@JsonProperty("finished") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime finished) {
super(status, started, finished);
@JsonProperty("finished") @JsonDeserialize(using = ISODateTimeDeserializer.class) DateTime finished,
@JsonProperty("links") Map<String, String> links) {
super(status, started, finished, links);
}

}
44 changes: 41 additions & 3 deletions src/test/java/com/gooddata/connector/ConnectorServiceIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

import com.gooddata.AbstractGoodDataIT;
import com.gooddata.GoodDataException;
import com.gooddata.JsonMatchers;
import com.gooddata.gdc.UriResponse;
import com.gooddata.project.Project;
import com.gooddata.util.ResourceUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static com.gooddata.JsonMatchers.serializesToJson;
import static com.gooddata.connector.Status.Code.ERROR;
import static com.gooddata.connector.Status.Code.SYNCHRONIZED;
import static com.gooddata.util.ResourceUtils.*;
import static java.util.Collections.singletonMap;
import static net.jadler.Jadler.onRequest;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -118,7 +117,7 @@ public void shouldExecuteProcess() throws Exception {
assertThat(process.getStatus().getCode(), is(SYNCHRONIZED.name()));
}

@Test(expectedExceptions = ConnectorException.class, expectedExceptionsMessageRegExp = ".*process failed.*")
@Test(expectedExceptions = ConnectorException.class, expectedExceptionsMessageRegExp = ".*zendesk4 process PROCESS failed.*")
public void shouldFailExecuteProcessPolling() throws Exception {
onRequest()
.havingMethodEqualTo("POST")
Expand Down Expand Up @@ -149,6 +148,45 @@ public void shouldFailExecuteProcess() throws Exception {
assertThat(process.getStatus().getCode(), is(ERROR.name()));
}

@Test
public void shouldGetProcessStatus() throws Exception {
onRequest()
.havingPathEqualTo("/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID")
.respond()
.withBody(readFromResource("/connector/process-status-scheduled.json"))
.thenRespond()
.withBody(readFromResource("/connector/process-status-finished.json"));

final IntegrationProcessStatus runningProcess = new IntegrationProcessStatus(null, null, null,
singletonMap("self", "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID"));
final ProcessStatus process = connectors.getProcessStatus(runningProcess).get();
assertThat(process.getStatus().getCode(), is(SYNCHRONIZED.name()));
}

@Test(expectedExceptions = ConnectorException.class, expectedExceptionsMessageRegExp = ".*zendesk4 process PROCESS_ID failed.*")
public void shouldFailGetProcessStatusPolling() throws Exception {
onRequest()
.havingPathEqualTo("/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID")
.respond()
.withStatus(400);
final IntegrationProcessStatus runningProcess = new IntegrationProcessStatus(null, null, null,
singletonMap("self", "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID"));
connectors.getProcessStatus(runningProcess).get();
}

@Test(expectedExceptions = GoodDataException.class)
public void shouldFailGetProcessStatus() throws Exception {
onRequest()
.havingPathEqualTo("/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID")
.respond()
.withBody(readFromResource("/connector/process-status-error.json"));

final IntegrationProcessStatus runningProcess = new IntegrationProcessStatus(null, null, null,
singletonMap("self", "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID"));
final ProcessStatus process = connectors.getProcessStatus(runningProcess).get();
assertThat(process.getStatus().getCode(), is(ERROR.name()));
}

@Test
public void shouldGetSettings() throws Exception {
onRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.joda.time.DateTimeZone;
import org.testng.annotations.Test;

import java.util.Collections;

import static com.gooddata.connector.Status.Code.ERROR;
import static com.gooddata.connector.Status.Code.SYNCHRONIZED;
import static com.gooddata.connector.Status.Code.UPLOADING;
Expand All @@ -32,65 +34,67 @@ public void testShouldDeserialize() throws Exception {
assertThat(process.getStatus().getCode(), is(ERROR.name()));
assertThat(process.getStatus().getDetail(), is("GDC-INTERNAL-ERROR"));
assertThat(process.getStatus().getDescription(), is(nullValue()));
assertThat(process.getUri(), is("/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/FINISHED_PROCESS_ID"));
assertThat(process.getId(), is("FINISHED_PROCESS_ID"));
}

@Test
public void testIsFinishedOnError() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(ERROR.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(ERROR.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFinished(), is(true));
}

@Test
public void testIsFinishedOnSynchronized() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(SYNCHRONIZED.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(SYNCHRONIZED.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFinished(), is(true));
}

@Test
public void testIsFinishedOnUploading() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(UPLOADING.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(UPLOADING.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFinished(), is(false));
}

@Test
public void testIsFinishedOnNullCode() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(null, "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(null, "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFinished(), is(false));
}

@Test
public void testIsFinishedOnUnknownCode() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status("unknown code", "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status("unknown code", "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFinished(), is(false));
}

@Test
public void testIsFailedOnError() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(ERROR.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(ERROR.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFailed(), is(true));
}

@Test
public void testIsFailedOnUserError() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(USER_ERROR.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(USER_ERROR.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFailed(), is(true));
}

@Test
public void testIsFailedOnSynchronized() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(SYNCHRONIZED.name(), "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(SYNCHRONIZED.name(), "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFailed(), is(false));
}

@Test
public void testIsFailedOnNullCode() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(null, "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status(null, "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFailed(), is(false));
}

@Test
public void testIsFailedOnUnknownCode() throws Exception {
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status("unknown code", "", ""), now(), now());
final IntegrationProcessStatus process = new IntegrationProcessStatus(new Status("unknown code", "", ""), now(), now(), Collections.<String, String>emptyMap());
assertThat(process.isFailed(), is(false));
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/gooddata/connector/ProcessStatusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public void shouldDeserialize() throws Exception {
assertThat(process.getStatus().getCode(), is(ERROR.name()));
assertThat(process.getStatus().getDetail(), is("GDC-INTERNAL-ERROR"));
assertThat(process.getStatus().getDescription(), is("Data load unsuccessful. Please check your settings and try again or contact us at support@gooddata.com"));
assertThat(process.getUri(), is("/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID"));
assertThat(process.getId(), is("PROCESS_ID"));
}

}
48 changes: 48 additions & 0 deletions src/test/resources/connector/integration-scheduled-process.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"integration" : {
"projectTemplate" : "/projectTemplates/template",
"active" : true,
"lastFinishedProcess" : {
"status" : {
"code" : "ERROR",
"detail" : "GDC-INTERNAL-ERROR",
"description" : "Data load unsuccessful. Please check your settings and try again or contact us at support@gooddata.com"
},
"started" : "2014-05-30T07:50:15.000Z",
"finished" : "2014-05-30T07:50:50.000Z",
"links" : {
"self" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/FINISHED_PROCESS_ID"
}
},
"lastSuccessfulProcess" : {
"status" : {
"code" : "SYNCHRONIZED",
"detail" : null,
"description" : null
},
"started" : "2014-04-30T07:50:15.000Z",
"finished" : "2014-04-30T07:50:50.000Z",
"links" : {
"self" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/SUCCESSFUL_PROCESS_ID"
}
},
"runningProcess" : {
"status" : {
"code" : "SCHEDULED",
"detail" : "",
"description" : ""
},
"started" : "2014-04-30T07:50:15.000Z",
"finished" : null,
"links" : {
"self" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes/PROCESS_ID"
}
},
"links" : {
"self" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration",
"processes" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/processes",
"configuration" : "/gdc/projects/PROJECT_ID/connectors/zendesk4/integration/settings"
},
"ui" : { }
}
}