Skip to content

Commit

Permalink
Checkstyle and Java xlint violations will now fail the build.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas Lazaro committed Feb 23, 2016
1 parent 06140d7 commit 63f234c
Show file tree
Hide file tree
Showing 28 changed files with 102 additions and 89 deletions.
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ subprojects {
configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml")
sourceSets = [ getProject().sourceSets.main, getProject().sourceSets.test ]
toolVersion = "6.7"
ignoreFailures = false
}

spec['product'] = ['pegasus':
Expand Down Expand Up @@ -92,6 +93,12 @@ subprojects {
attributes 'Implementation-Title': 'Datastream'
}
}

tasks.withType(JavaCompile) {
// Skipping 'deprecation' since pegasus generates problematic files and 'fallthrough' since it can't be suppressed
// Xlint:all - [deprecation, fallthrough]
options.compilerArgs = ["-Xlint:cast,classfile,dep-ann,divzero,empty,finally,options,overrides,path,processing,rawtypes,serial,static,try,unchecked,varargs", "-Werror"]
}
}

project(':datastream-common') {
Expand Down
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
LinkedIn Java style.
-->
<module name="Checker">
<property name="severity" value="${checkstyle.severity}" default="warning"/>
<property name="severity" value="${checkstyle.severity}" default="error"/>

<module name="TreeWalker">
<module name="FileContentsHolder"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@

import com.linkedin.common.callback.FutureCallback;
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamException;
import com.linkedin.datastream.common.DatastreamNotFoundException;
import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.server.dms.BootstrapBuilders;
import com.linkedin.datastream.server.dms.DatastreamBuilders;
import com.linkedin.datastream.server.dms.BootstrapRequestBuilders;
import com.linkedin.datastream.server.dms.DatastreamRequestBuilders;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.ActionRequest;
import com.linkedin.restli.client.CreateRequest;
import com.linkedin.restli.client.CreateIdRequest;
import com.linkedin.restli.client.DeleteRequest;
import com.linkedin.restli.client.GetAllRequest;
import com.linkedin.restli.client.GetRequest;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.common.IdResponse;
import java.util.Collections;
import java.util.List;

Expand All @@ -30,14 +31,14 @@
*/
public class DatastreamRestClient {

private final DatastreamBuilders _builders;
private final DatastreamRequestBuilders _builders;
private final RestClient _restClient;
private final BootstrapBuilders _bootstrapBuilders;
private final BootstrapRequestBuilders _bootstrapBuilders;
private final HttpClientFactory _httpClient;

public DatastreamRestClient(String dsmUri) {
_builders = new DatastreamBuilders();
_bootstrapBuilders = new BootstrapBuilders();
_builders = new DatastreamRequestBuilders();
_bootstrapBuilders = new BootstrapRequestBuilders();
_httpClient = new HttpClientFactory();
final Client r2Client = new TransportClientAdapter(_httpClient.getClient(Collections.<String, String>emptyMap()));
_restClient = new RestClient(r2Client, dsmUri);
Expand Down Expand Up @@ -143,8 +144,8 @@ public List<Datastream> getAllDatastreams(int start, int count) {
* while sending the request or receiving the response.
*/
public void createDatastream(Datastream datastream) {
CreateRequest request = _builders.create().input(datastream).build();
ResponseFuture<Datastream> datastreamResponseFuture = _restClient.sendRequest(request);
CreateIdRequest<String, Datastream> request = _builders.create().input(datastream).build();
ResponseFuture<IdResponse<String>> datastreamResponseFuture = _restClient.sendRequest(request);
try {
datastreamResponseFuture.getResponse();
} catch (RemoteInvocationException e) {
Expand All @@ -167,7 +168,7 @@ public void createDatastream(Datastream datastream) {
*
*/
public Datastream createBootstrapDatastream(String datastreamName) {
ActionRequest<Datastream> request = _bootstrapBuilders.actionCreate().paramBaseDatastream(datastreamName).build();
ActionRequest<Datastream> request = _bootstrapBuilders.actionCreate().baseDatastreamParam(datastreamName).build();
ResponseFuture<Datastream> datastreamResponseFuture = _restClient.sendRequest(request);
try {
return datastreamResponseFuture.getResponse().getEntity();
Expand All @@ -193,7 +194,7 @@ public Datastream createBootstrapDatastream(String datastreamName) {
*/
public void deleteDatastream(String datastreamName) {
DeleteRequest<Datastream> request = _builders.delete().id(datastreamName).build();
ResponseFuture response = _restClient.sendRequest(request);
ResponseFuture<EmptyRecord> response = _restClient.sendRequest(request);
try {
response.getResponse();
} catch (RemoteInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Exception when the datastream is not found.
*/
public class DatastreamNotFoundException extends DatastreamRuntimeException {
private static final long serialVersionUID = 1;

public DatastreamNotFoundException(String datastreamName, Throwable e) {
super(String.format("Datastream %s is not found", datastreamName), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Common Datastream exception for all unchecked exceptions
*/
public class DatastreamRuntimeException extends RuntimeException {
private static final long serialVersionUID = 1;

public DatastreamRuntimeException() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


public class DatastreamValidationException extends DatastreamException {
private static final long serialVersionUID = 1;

public DatastreamValidationException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Exception class for all the schema registry provider exceptions.
*/
public class SchemaRegistryException extends Exception {
private static final long serialVersionUID = 1;

public SchemaRegistryException() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Exception class for all the transport related exceptions.
*/
public class TransportException extends Exception {
private static final long serialVersionUID = 1;

public TransportException() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
break;

case HANDLE_INSTANCE_ERROR:
handleInstanceError(event);
handleInstanceError((CoordinatorEvent.HandleInstanceError) event);
break;
default:
throw new Exception(String.format("Unknown event type %s.", event.getType()));
Expand All @@ -451,11 +451,11 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
LOG.info("END: Handle event " + event);
}

// when we encouter an error, we need to persist the error message in zookeeper. We only persist the
// when we encounter an error, we need to persist the error message in zookeeper. We only persist the
// first 10 messages. Why we put this logic in event loop instead of synchronously handle it? This
// is because the same reason that can result in error can also result in the failure of persisting
// the error message.
private void handleInstanceError(CoordinatorEvent<String> event) {
private void handleInstanceError(CoordinatorEvent.HandleInstanceError event) {
String msg = event.getEventData();
_adapter.zkSaveInstanceError(msg);
}
Expand All @@ -472,7 +472,7 @@ private void handleDatastreamAddOrDelete() {
List<Datastream> newDatastreams = _adapter.getAllDatastreams();

// do nothing if there is zero datastreams
if (newDatastreams.size() == 0) {
if (newDatastreams.isEmpty()) {
LOG.warn("Received a new datastream event, but there were no datastreams");
return;
}
Expand Down Expand Up @@ -555,7 +555,7 @@ private void handleLeaderDoAssignment() {
// Add the tasks for this connector type to the instance
tasksByConnectorAndInstance.get(instance).forEach(task -> {
// Each task must have a valid zkAdapter
((DatastreamTaskImpl)task).setZkAdapter(_adapter);
((DatastreamTaskImpl) task).setZkAdapter(_adapter);
assigmentsByInstance.get(instance).add(task);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.datastream.server;

public class CoordinatorEvent<T> {
public class CoordinatorEvent {

public enum EventType {
LEADER_DO_ASSIGNMENT,
Expand All @@ -9,55 +9,57 @@ public enum EventType {
HANDLE_INSTANCE_ERROR
}

private final EventType _eventType;
private T _eventData;
public static final CoordinatorEvent LEADER_DO_ASSIGNMENT_EVENT = new CoordinatorEvent(EventType.LEADER_DO_ASSIGNMENT);
public static final CoordinatorEvent HANDLE_ASSIGNMENT_CHANGE_EVENT = new CoordinatorEvent(EventType.HANDLE_ASSIGNMENT_CHANGE);
public static final CoordinatorEvent HANDLE_ADD_OR_DELETE_DATASTREAM_EVENT = new CoordinatorEvent(EventType.HANDLE_ADD_OR_DELETE_DATASTREAM);

private CoordinatorEvent(EventType eventName) {
_eventType = eventName;
_eventData = null;
protected final EventType _eventType;

private CoordinatorEvent(EventType eventType) {
_eventType = eventType;
}

public EventType getType() {
return _eventType;
}

private CoordinatorEvent(EventType eventName, T eventData) {
_eventType = eventName;
_eventData = eventData;
@Override
public String toString() {
return "type:" + _eventType;
}

public static CoordinatorEvent createLeaderDoAssignmentEvent() {
return new CoordinatorEvent(EventType.LEADER_DO_ASSIGNMENT);
return LEADER_DO_ASSIGNMENT_EVENT;
}

public static CoordinatorEvent createHandleAssignmentChangeEvent() {
return new CoordinatorEvent(EventType.HANDLE_ASSIGNMENT_CHANGE);
return HANDLE_ASSIGNMENT_CHANGE_EVENT;
}

public static CoordinatorEvent createHandleDatastreamAddOrDeleteEvent() {
return new CoordinatorEvent(EventType.HANDLE_ADD_OR_DELETE_DATASTREAM);
}

public static CoordinatorEvent<String> createHandleInstanceErrorEvent(String errorMessage) {
CoordinatorEvent event = new CoordinatorEvent(EventType.HANDLE_INSTANCE_ERROR, errorMessage);
return event;
return HANDLE_ADD_OR_DELETE_DATASTREAM_EVENT;
}

public EventType getType() {
return _eventType;
public static HandleInstanceError createHandleInstanceErrorEvent(String errorMessage) {
return new HandleInstanceError(errorMessage);
}

public T getEventData() {
return _eventData;
}
public static final class HandleInstanceError extends CoordinatorEvent {
private final String _errorMessage;

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("type:" + _eventType.toString());
private HandleInstanceError(String errorMessage) {
super(EventType.HANDLE_INSTANCE_ERROR);
_errorMessage = errorMessage;
}

if (_eventData != null) {
sb.append("\n");
sb.append(_eventData);
public String getEventData() {
return _errorMessage;
}

return sb.toString();
@Override
public String toString() {
return "type:" + _eventType + "\n" + _errorMessage;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private void loadCheckpoints(List<DatastreamTask> tasks) {
Map<DatastreamTask, String> committed = _checkpointProvider.getCommitted(tasks);

// Instruct jackson to convert string keys to integer
TypeReference typeRef = new TypeReference<HashMap<Integer, String>>() {
TypeReference<HashMap<Integer, String>> typeRef = new TypeReference<HashMap<Integer, String>>() {
};

// Load checkpoints only for specified task list
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.datastream.server;

import com.linkedin.datastream.common.VerifiableProperties;
import com.linkedin.datastream.server.api.schemaregistry.SchemaRegistryProvider;
import com.linkedin.datastream.server.api.transport.TransportProvider;
import com.linkedin.datastream.server.api.transport.TransportProviderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public Datastream get(String name) {
return _store.getDatastream(name);
}

@SuppressWarnings("deprecated")
@Override
public List<Datastream> getAll(@Context PagingContext pagingContext) {
return RestliUtils.withPaging(_store.getAllDatastreams(), pagingContext).map(_store::getDatastream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

import java.util.Collections;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -25,7 +26,6 @@
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -57,11 +57,7 @@ public static void readTopic(String topic, Integer partition, String brokerList,
Validate.notNull(callback);

TopicPartition subscription = new TopicPartition(topic, partition);
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>() {
{
add(subscription);
}
};
List<TopicPartition> topicPartitions = Collections.singletonList(subscription);
KafkaConsumer<byte[], byte[]> consumer = createConsumer(brokerList);
consumer.assign(topicPartitions);
consumer.seekToBeginning(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ public void testUnassignableStreams() throws Exception {
LOG.info("Set destination only for datastream2, leave it unassigned for datastream1");
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
@SuppressWarnings("unchecked")
List<Datastream> streams = (List<Datastream>) args[0];
for (Datastream stream : streams) {
// Populate Destination only for datastream2
Expand All @@ -381,6 +382,7 @@ public void testUnassignableStreams() throws Exception {
// Stub for populateDatastreamDestination to set destination on all datastreams
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
@SuppressWarnings("unchecked")
List<Datastream> streams = (List<Datastream>) args[0];
for (Datastream stream : streams) {
setDatastreamDestination(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import java.util.Properties;
import java.util.UUID;

import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,9 +32,6 @@
import com.linkedin.datastream.connectors.file.FileConnectorFactory;
import com.linkedin.datastream.kafka.EmbeddedZookeeperKafkaCluster;
import com.linkedin.datastream.kafka.KafkaDestination;
import com.linkedin.datastream.server.api.schemaregistry.SchemaRegistryException;
import com.linkedin.datastream.server.api.schemaregistry.SchemaRegistryProvider;
import com.linkedin.datastream.server.api.schemaregistry.SchemaRegistryProviderFactory;
import com.linkedin.datastream.server.assignment.BroadcastStrategy;
import com.linkedin.datastream.server.assignment.LoadbalancingStrategy;
import com.linkedin.datastream.server.zk.KeyBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private void setup(boolean customCheckpointing, boolean checkRace) {
_producer = new EventProducer(_tasks, _transport, _cpProvider, _config, customCheckpointing);
}

@SuppressWarnings("unchecked")
@Test
public void testSendWithCustomCheckpoint() throws DatastreamException, TransportException, InterruptedException {
setup(true);
Expand Down Expand Up @@ -176,7 +177,7 @@ private boolean validateCheckpoint(CheckpointProvider provider, List<DatastreamT
if (cpString == null) { // not ready
return false;
}
TypeReference typeRef = new TypeReference<HashMap<Integer, String>>() {
TypeReference<HashMap<Integer, String>> typeRef = new TypeReference<HashMap<Integer, String>>() {
};
Map<Integer, String> cpMap = JsonUtils.fromJson(cpString, typeRef);
if (!cpMap.equals(taskCpMap.get(task))) {
Expand Down
Loading

0 comments on commit 63f234c

Please sign in to comment.