Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>2.16.8.Final</quarkus.platform.version>
<quarkus.platform.version>2.16.10.Final</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
</properties>
Expand Down
23 changes: 6 additions & 17 deletions src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package se.yolean.kafka.keyvalue.http;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
Expand All @@ -32,9 +31,9 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.ResponseBuilder;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
Expand Down Expand Up @@ -173,27 +172,17 @@ public void write(OutputStream out) throws IOException, WebApplicationException

/**
* @return Newline separated values (no keys)
* @throws IOException
*/
@GET()
@Path("/values")
@Produces(MediaType.TEXT_PLAIN)
public Response values() throws IOException {
requireUpToDateCache();
Iterator<byte[]> values = cache.getValues();

ByteArrayOutputStream buffer = new ByteArrayOutputStream();

while (values.hasNext()) {
buffer.write(values.next());
buffer.write('\n');
}

ResponseBuilder response = Response.ok(buffer);

applyOffsetHeaders(response);

return response.build();
ValuesResponse response = new ValuesResponse(
cache.getValues(),
cache.getCurrentOffsets()
);
return Response.ok(response).build();
}

private void applyOffsetHeaders(ResponseBuilder response) throws JsonProcessingException {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/se/yolean/kafka/keyvalue/http/ValuesResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package se.yolean.kafka.keyvalue.http;

import java.util.Iterator;
import java.util.List;

import se.yolean.kafka.keyvalue.TopicPartitionOffset;

public final class ValuesResponse {

final Iterator<byte[]> values;
final List<TopicPartitionOffset> currentOffsets;

public ValuesResponse(Iterator<byte[]> values, List<TopicPartitionOffset> currentOffsets) {
this.values = values;
this.currentOffsets = currentOffsets;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package se.yolean.kafka.keyvalue.http;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;

import javax.inject.Inject;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;

import com.fasterxml.jackson.databind.ObjectMapper;

import se.yolean.kafka.keyvalue.onupdate.UpdatesBodyPerTopic;

@Provider
public class ValuesResponseWriter implements MessageBodyWriter<ValuesResponse> {

@Inject
ObjectMapper objectMapper;

@Override
public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return type == ValuesResponse.class;
}

@Override
public void writeTo(ValuesResponse v, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders, OutputStream out)
throws IOException, WebApplicationException {
String offsetsJson = objectMapper.writeValueAsString(v.currentOffsets);
httpHeaders.add(UpdatesBodyPerTopic.HEADER_PREFIX + "last-seen-offsets", offsetsJson);
while (v.values.hasNext()) {
out.write(v.values.next());
out.write('\n');
}
}

}
39 changes: 11 additions & 28 deletions src/test/java/se/yolean/kafka/keyvalue/http/CacheResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;

import javax.ws.rs.core.StreamingOutput;

import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponse.Status;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -50,39 +52,20 @@ void testValueByKeyUnready() throws JsonProcessingException {
}

@Test
void testStreamValues() throws IOException {
void testValues() throws IOException {
CacheResource rest = new CacheResource();
rest.cache = Mockito.mock(KafkaCache.class);
rest.mapper = new ObjectMapper();
Mockito.when(rest.cache.isReady()).thenReturn(true);
Mockito.when(rest.cache.getValues()).thenReturn(List.of("a".getBytes(), "b".getBytes()).iterator());
assertEquals("a\nb\n", rest.values().getEntity().toString());
}

@Test
void testValueEndpointWithOffsetHeaders() throws IOException {
CacheResource rest = new CacheResource();
rest.cache = Mockito.mock(KafkaCache.class);
rest.mapper = new ObjectMapper();
rest.mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

Mockito.when(rest.cache.isReady()).thenReturn(true);
Mockito.when(rest.cache.getValues()).thenReturn(List.of("a".getBytes()).iterator());
Mockito.when(rest.cache.getValue(any())).thenReturn("a".getBytes());
assertEquals("a", new String(rest.cache.getValue("key1"), StandardCharsets.UTF_8));

Mockito.when(rest.cache.getCurrentOffsets()).thenReturn(List.of(
new TopicPartitionOffset("mytopic", 0, 0L)));

assertEquals("[x-kkv-last-seen-offsets]", "" + rest.values().getHeaders().keySet());
assertEquals("[{\"offset\":0,\"partition\":0,\"topic\":\"mytopic\"}]", rest.values().getHeaderString("x-kkv-last-seen-offsets"));
assertEquals("[{\"offset\":0,\"partition\":0,\"topic\":\"mytopic\"}]", rest.valueByKey("key1", null).getHeaderString("x-kkv-last-seen-offsets"));

Mockito.when(rest.cache.getCurrentOffsets()).thenReturn(List.of(
new TopicPartitionOffset("mytopic", 0, 17045L)));

assertEquals("[{\"offset\":17045,\"partition\":0,\"topic\":\"mytopic\"}]", rest.values().getHeaderString("x-kkv-last-seen-offsets"));
assertEquals("[{\"offset\":17045,\"partition\":0,\"topic\":\"mytopic\"}]", rest.valueByKey("key1", null).getHeaderString("x-kkv-last-seen-offsets"));
List<TopicPartitionOffset> currentOffsets = List.of(new TopicPartitionOffset("mytopic", 0, 0L));
Mockito.when(rest.cache.getCurrentOffsets()).thenReturn(currentOffsets);
assertEquals(ValuesResponse.class, rest.values().getEntity().getClass());
ValuesResponse v = (ValuesResponse) rest.values().getEntity();
assertEquals("a", new String(v.values.next()));
assertEquals("b", new String(v.values.next()));
assertFalse(v.values.hasNext());
assertEquals(currentOffsets, v.currentOffsets);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package se.yolean.kafka.keyvalue.http;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;

import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.databind.ObjectMapper;

import se.yolean.kafka.keyvalue.TopicPartitionOffset;

public class ValuesResponseWriterTest {

@Test
void testWrite() throws IOException {
ValuesResponse response = new ValuesResponse(
List.of("a".getBytes(), "b".getBytes()).iterator(),
List.of(new TopicPartitionOffset("mytopic", 0, 0L))
);

ValuesResponseWriter w = new ValuesResponseWriter();
w.objectMapper = new ObjectMapper();
assertTrue(w.isWriteable(response.getClass(), null, null, null));

ByteArrayOutputStream out = new ByteArrayOutputStream();
MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
w.writeTo(response, null, null, null, null, headers, out);
assertEquals("[x-kkv-last-seen-offsets]", "" + headers.keySet());
assertEquals("[{\"offset\":0,\"partition\":0,\"topic\":\"mytopic\"}]", headers.getFirst("x-kkv-last-seen-offsets"));
assertEquals("a\nb\n", out.toString());
}

}