Skip to content

Commit

Permalink
Merge branch '4.0.0-post' into 4.0.1-post
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jun 3, 2020
2 parents 713e187 + a486c10 commit 8d96a6c
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static void main(final String[] args) throws Exception {
if (args.length == 0 || args.length > 2) {
throw new IllegalArgumentException("usage: ... <portForRestEndPoint> [<bootstrap.servers> (optional)]");
}
final int port = Integer.valueOf(args[0]);
final int port = Integer.parseInt(args[0]);
final String bootstrapServers = args.length > 1 ? args[1] : "localhost:9092";

final Properties streamsConfiguration = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static void main(final String[] args) throws Exception {
"[<schema.registry.url> (optional, default: " + DEFAULT_SCHEMA_REGISTRY_URL + ")] " +
"[<hostnameForRestEndPoint> (optional, default: " + DEFAULT_REST_ENDPOINT_HOSTNAME + ")]");
}
final int restEndpointPort = Integer.valueOf(args[0]);
final int restEndpointPort = Integer.parseInt(args[0]);
final String bootstrapServers = args.length > 1 ? args[1] : "localhost:9092";
final String schemaRegistryUrl = args.length > 2 ? args[2] : "http://localhost:8081";
final String restEndpointHostname = args.length > 3 ? args[3] : DEFAULT_REST_ENDPOINT_HOSTNAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ interface Emailer {
void sendEmail(EmailTuple details);
}

public class EmailTuple {

public static class EmailTuple {
public Order order;
public Payment payment;
public Customer customer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void getWithTimeout(@PathParam("id") final String id,
}
}

class FilteredResponse<K, V> {
static class FilteredResponse<K, V> {
private AsyncResponse asyncResponse;
private Predicate<K, V> predicate;

Expand Down Expand Up @@ -235,7 +235,7 @@ private HostStoreInfo getKeyLocationOrBlock(final String id, final AsyncResponse
}
try {
//Sleep a bit until metadata becomes available
Thread.sleep(Math.min(Long.valueOf(CALL_TIMEOUT), 200));
Thread.sleep(Math.min(Long.parseLong(CALL_TIMEOUT), 200));
} catch (final InterruptedException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -263,6 +263,7 @@ private void fetchFromOtherHost(final String path, final AsyncResponse asyncResp
});
asyncResponse.resume(bean);
} catch (final Exception swallowed) {
log.warn("GET failed.", swallowed);
}
}

Expand Down Expand Up @@ -407,7 +408,7 @@ public static void main(final String[] args) throws Exception {
final String restPort = args.length > 4 ? args[4] : null;

Schemas.configureSerdesWithSchemaRegistryUrl(schemaRegistryUrl);
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.valueOf(restPort));
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.parseInt(restPort));
service.start(bootstrapServers);
addShutdownHookAndBlock(service);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public String toString() {

public static class Topics {

public static Map<String, Topic> ALL = new HashMap<>();
public final static Map<String, Topic> ALL = new HashMap<>();
public static Topic<String, Order> ORDERS;
public static Topic<String, Payment> PAYMENTS;
public static Topic<Long, Customer> CUSTOMERS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.confluent.examples.streams.avro.microservices.Product;
import io.confluent.examples.streams.microservices.Service;
import io.confluent.examples.streams.microservices.domain.Schemas;
import org.apache.commons.compress.utils.Charsets;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -23,6 +24,7 @@

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -99,7 +101,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public byte[] serialize(final String topic, final Product pt) {
return pt.toString().getBytes();
return pt.toString().getBytes(Charsets.UTF_8);
}

@Override
Expand All @@ -117,7 +119,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public Product deserialize(final String topic, final byte[] bytes) {
return Product.valueOf(new String(bytes));
return Product.valueOf(new String(bytes, Charsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
Expand Down Expand Up @@ -50,7 +51,9 @@ public PriorityQueue<T> deserialize(final String s, final byte[] bytes) {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
if (dataInputStream.read(valueBytes) != valueBytes.length) {
throw new BufferUnderflowException();
};
priorityQueue.add(valueDeserializer.deserialize(s, valueBytes));
}
} catch (final IOException e) {
Expand Down

0 comments on commit 8d96a6c

Please sign in to comment.