Skip to content

Commit

Permalink
Merge branch '5.0.0-post' into 5.0.1-post
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jun 3, 2020
2 parents b0cc8fa + 7624ed6 commit 6a6aa22
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ interface Emailer {
void sendEmail(EmailTuple details);
}

public class EmailTuple {

public static class EmailTuple {
public Order order;
public Payment payment;
public Customer customer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void getWithTimeout(@PathParam("id") final String id,
}
}

class FilteredResponse<K, V> {
static class FilteredResponse<K, V> {
private AsyncResponse asyncResponse;
private Predicate<K, V> predicate;

Expand Down Expand Up @@ -236,7 +236,7 @@ private HostStoreInfo getKeyLocationOrBlock(final String id, final AsyncResponse
}
try {
//Sleep a bit until metadata becomes available
Thread.sleep(Math.min(Long.valueOf(CALL_TIMEOUT), 200));
Thread.sleep(Math.min(Long.parseLong(CALL_TIMEOUT), 200));
} catch (final InterruptedException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -264,6 +264,7 @@ private void fetchFromOtherHost(final String path, final AsyncResponse asyncResp
});
asyncResponse.resume(bean);
} catch (final Exception swallowed) {
log.warn("GET failed.", swallowed);
}
}

Expand Down Expand Up @@ -410,7 +411,7 @@ public static void main(final String[] args) throws Exception {
final String restPort = args.length > 3 ? args[3] : null;

Schemas.configureSerdesWithSchemaRegistryUrl(schemaRegistryUrl);
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.valueOf(restPort));
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.parseInt(restPort));
service.start(bootstrapServers, "/tmp/kafka-streams");
addShutdownHookAndBlock(service);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public String toString() {

public static class Topics {

public static final Map<String, Topic> ALL = new HashMap<>();
public final static Map<String, Topic> ALL = new HashMap<>();
public static Topic<String, Order> ORDERS;
public static Topic<String, Payment> PAYMENTS;
public static Topic<Long, Customer> CUSTOMERS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.examples.streams.utils.MonitoringInterceptorUtils;

import org.apache.commons.compress.utils.Charsets;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -25,6 +26,7 @@

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,7 +119,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public byte[] serialize(final String topic, final Product pt) {
return pt.toString().getBytes();
return pt.toString().getBytes(Charsets.UTF_8);
}

@Override
Expand All @@ -135,7 +137,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public Product deserialize(final String topic, final byte[] bytes) {
return Product.valueOf(new String(bytes));
return Product.valueOf(new String(bytes, Charsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
Expand Down Expand Up @@ -50,7 +51,9 @@ public PriorityQueue<T> deserialize(final String s, final byte[] bytes) {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
if (dataInputStream.read(valueBytes) != valueBytes.length) {
throw new BufferUnderflowException();
};
priorityQueue.add(valueDeserializer.deserialize(s, valueBytes));
}
} catch (final IOException e) {
Expand Down

0 comments on commit 6a6aa22

Please sign in to comment.