Skip to content

Commit

Permalink
Merge pull request #155 from srinipunuru/unchecked.2
Browse files Browse the repository at this point in the history
Adding unchecked exception DatastreamRuntimeException
  • Loading branch information
srinipunuru committed Feb 18, 2016
2 parents 419ce68 + 94287eb commit 06140d7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamException;
import com.linkedin.datastream.common.DatastreamNotFoundException;
import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.server.dms.BootstrapBuilders;
import com.linkedin.datastream.server.dms.DatastreamBuilders;
import com.linkedin.r2.RemoteInvocationException;
Expand Down Expand Up @@ -49,13 +50,12 @@ public DatastreamRestClient(String dsmUri) {
* Name of the datastream that should be retrieved.
* @return
* Datastream object corresponding to the datastream. This method will not return null.
* @throws DatastreamException
* @throws com.linkedin.datastream.common.DatastreamRuntimeException
* Throws DatastreamNotFoundException if the datastream doesn't exist,
* Throws DatastreamException for any other errors encountered while fetching the datastream.
* @throws com.linkedin.r2.RemoteInvocationException
* Throws DatastreamRuntimeException for any other errors encountered while fetching the datastream.
* If there are any other network/ system level errors while sending the request or receiving the response.
*/
public Datastream getDatastream(String datastreamName) throws DatastreamException {
public Datastream getDatastream(String datastreamName) {
GetRequest<Datastream> request = _builders.get().id(datastreamName).build();
ResponseFuture<Datastream> datastreamResponseFuture = _restClient.sendRequest(request);
try {
Expand All @@ -65,7 +65,7 @@ public Datastream getDatastream(String datastreamName) throws DatastreamExceptio
&& ((RestLiResponseException) e).getStatus() == HttpStatus.S_404_NOT_FOUND.getCode()) {
throw new DatastreamNotFoundException(datastreamName, e);
} else {
throw new DatastreamException(String.format("Get Datastream {%s} failed with error.", datastreamName), e);
throw new DatastreamRuntimeException(String.format("Get Datastream {%s} failed with error.", datastreamName), e);
}
}
}
Expand All @@ -80,10 +80,10 @@ public Datastream getDatastream(String datastreamName) throws DatastreamExceptio
* wait timeout in milliseconds
* @return
* Returns the initialized datastream object.
* @throws DatastreamException
* @throws com.linkedin.datastream.common.DatastreamRuntimeException
*/
public Datastream waitTillDatastreamIsInitialized(String datastreamName, int timeoutMs)
throws DatastreamException, InterruptedException {
throws InterruptedException {
final int pollIntervalMs = 500;
final long startTimeMs = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
Expand All @@ -95,15 +95,15 @@ public Datastream waitTillDatastreamIsInitialized(String datastreamName, int tim
Thread.sleep(pollIntervalMs);
}

throw new DatastreamException(String.format("Datastream was not initialized before the timeout %s", timeoutMs));
throw new DatastreamRuntimeException(String.format("Datastream was not initialized before the timeout %s", timeoutMs));
}

private List<Datastream> getAllDatastreams(GetAllRequest<Datastream> request) throws DatastreamException {
private List<Datastream> getAllDatastreams(GetAllRequest<Datastream> request) {
ResponseFuture<CollectionResponse<Datastream>> datastreamResponseFuture = _restClient.sendRequest(request);
try {
return datastreamResponseFuture.getResponse().getEntity().getElements();
} catch (RemoteInvocationException e) {
throw new DatastreamException("Get All Datastreams failed with error.", e);
throw new DatastreamRuntimeException("Get All Datastreams failed with error.", e);
}
}

Expand All @@ -113,9 +113,9 @@ private List<Datastream> getAllDatastreams(GetAllRequest<Datastream> request) th
* Entries will be return in lexicographical based on their getName() property.
*
* @return all the Datastream objects
* @throws DatastreamException for any errors encountered while fetching the datastream.
* @for any errors encountered while fetching the datastream.
*/
public List<Datastream> getAllDatastreams() throws DatastreamException {
public List<Datastream> getAllDatastreams() {
return getAllDatastreams(_builders.getAll().build());
}

Expand All @@ -129,7 +129,7 @@ public List<Datastream> getAllDatastreams() throws DatastreamException {
* @return
* @throws DatastreamException
*/
public List<Datastream> getAllDatastreams(int start, int count) throws DatastreamException {
public List<Datastream> getAllDatastreams(int start, int count) {
return getAllDatastreams(_builders.getAll().paginate(start, count).build());
}

Expand All @@ -138,17 +138,17 @@ public List<Datastream> getAllDatastreams(int start, int count) throws Datastrea
* Datastream management service which validates the datastream object and writes it to the store (zookeeper).
* @param datastream
* Datastream that needs to be created.
* @throws DatastreamException for any errors encountered while creating the datastream.
* @for any errors encountered while creating the datastream.
* @throws com.linkedin.r2.RemoteInvocationException for any network/system level errors encountered
* while sending the request or receiving the response.
*/
public void createDatastream(Datastream datastream) throws DatastreamException {
public void createDatastream(Datastream datastream) {
CreateRequest request = _builders.create().input(datastream).build();
ResponseFuture<Datastream> datastreamResponseFuture = _restClient.sendRequest(request);
try {
datastreamResponseFuture.getResponse();
} catch (RemoteInvocationException e) {
throw new DatastreamException(String.format("Create Datastream {%s} failed with error.", datastream), e);
throw new DatastreamRuntimeException(String.format("Create Datastream {%s} failed with error.", datastream), e);
}
}

Expand All @@ -163,10 +163,10 @@ public void createDatastream(Datastream datastream) throws DatastreamException {
* If there are any other network/ system level errors while sending the request or receiving the response.
* @throws DatastreamException
* Throws DatastreamNotFoundException if the datastream doesn't exist,
* Throws DatastreamException for any other errors encountered while creating the datastream.
* for any other errors encountered while creating the datastream.
*
*/
public Datastream createBootstrapDatastream(String datastreamName) throws DatastreamException {
public Datastream createBootstrapDatastream(String datastreamName) {
ActionRequest<Datastream> request = _bootstrapBuilders.actionCreate().paramBaseDatastream(datastreamName).build();
ResponseFuture<Datastream> datastreamResponseFuture = _restClient.sendRequest(request);
try {
Expand All @@ -176,7 +176,7 @@ public Datastream createBootstrapDatastream(String datastreamName) throws Datast
&& ((RestLiResponseException) e).getStatus() == HttpStatus.S_404_NOT_FOUND.getCode()) {
throw new DatastreamNotFoundException(datastreamName, e);
} else {
throw new DatastreamException(
throw new DatastreamRuntimeException(
String.format("Create Bootstrap Datastream {%s} failed with error.", datastreamName), e);
}
}
Expand All @@ -191,13 +191,13 @@ public Datastream createBootstrapDatastream(String datastreamName) throws Datast
* @throws DatastreamException
* When the datastream is not found or any other error happens on the server.
*/
public void deleteDatastream(String datastreamName) throws DatastreamException {
public void deleteDatastream(String datastreamName) {
DeleteRequest<Datastream> request = _builders.delete().id(datastreamName).build();
ResponseFuture response = _restClient.sendRequest(request);
try {
response.getResponse();
} catch (RemoteInvocationException e) {
throw new DatastreamException(String.format("Delete Datastream {%s} failed with error.", datastreamName), e);
throw new DatastreamRuntimeException(String.format("Delete Datastream {%s} failed with error.", datastreamName), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package com.linkedin.datastream;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import com.linkedin.data.template.StringMap;
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamDestination;
import com.linkedin.datastream.common.DatastreamException;
import com.linkedin.datastream.common.DatastreamNotFoundException;
import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.common.DatastreamSource;
import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.connectors.DummyBootstrapConnector;
Expand All @@ -16,21 +34,6 @@
import com.linkedin.datastream.server.DummyTransportProviderFactory;
import com.linkedin.datastream.testutil.EmbeddedZookeeper;
import com.linkedin.r2.RemoteInvocationException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;


@Test(singleThreaded = true)
Expand Down Expand Up @@ -140,14 +143,8 @@ public void testGetAllDatastreams() throws DatastreamException, IOException, Rem
restClient.createDatastream(datastream);
}

Optional<List<Datastream>> result = PollUtils.poll(() -> {
try {
return restClient.getAllDatastreams();
} catch (DatastreamException e) {
e.printStackTrace();
return null;
}
}, streams -> streams.size() - initialSize == createdCount, 100, 1000);
Optional<List<Datastream>> result = PollUtils.poll(restClient::getAllDatastreams,
streams -> streams.size() - initialSize == createdCount, 100, 1000);

Assert.assertTrue(result.isPresent());

Expand Down Expand Up @@ -209,7 +206,7 @@ public void testGetDatastreamThrowsDatastreamNotFoundExceptionWhenDatastreamIsNo
restClient.getDatastream("Datastream_doesntexist");
}

@Test(expectedExceptions = DatastreamException.class)
@Test(expectedExceptions = DatastreamRuntimeException.class)
public void testCreateDatastreamThrowsDatastreamExceptionOnBadDatastream() throws IOException, DatastreamException,
RemoteInvocationException {
DatastreamRestClient restClient = new DatastreamRestClient("http://localhost:8080/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* Exception when the datastream is not found.
*/
public class DatastreamNotFoundException extends DatastreamException {
public class DatastreamNotFoundException extends DatastreamRuntimeException {
public DatastreamNotFoundException(String datastreamName, Throwable e) {
super(String.format("Datastream %s is not found", datastreamName), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datastream.common;

/**
* Common Datastream exception for all unchecked exceptions
*/
public class DatastreamRuntimeException extends RuntimeException {

public DatastreamRuntimeException() {
super();
}

public DatastreamRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public DatastreamRuntimeException(String message) {
super(message);
}

public DatastreamRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,20 +435,12 @@ private Datastream createFileDatastream(String fileName) throws IOException, Dat
private Datastream getPopulatedDatastream(DatastreamRestClient restClient, Datastream fileDatastream1) {
Boolean pollResult = PollUtils.poll(() -> {
Datastream ds = null;
try {
ds = restClient.getDatastream(fileDatastream1.getName());
} catch (DatastreamException e) {
throw new RuntimeException("GetDatastream threw an exception", e);
}
ds = restClient.getDatastream(fileDatastream1.getName());
return ds.hasDestination() && ds.getDestination().hasConnectionString() && !ds.getDestination().getConnectionString().isEmpty();
}, 500, 60000);

if (pollResult) {
try {
return restClient.getDatastream(fileDatastream1.getName());
} catch (DatastreamException e) {
throw new RuntimeException("GetDatastream threw an exception", e);
}
return restClient.getDatastream(fileDatastream1.getName());
} else {
throw new RuntimeException("Destination was not populated before the timeout");
}
Expand Down

0 comments on commit 06140d7

Please sign in to comment.