Skip to content

Commit

Permalink
#46 implement batching
Browse files Browse the repository at this point in the history
  • Loading branch information
BGehrels committed Feb 5, 2019
1 parent d9e5585 commit 2165e84
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.zalando.nakadiproducer.transmission.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;

@Slf4j
public class EventBatcher {

private static final long NAKADI_BATCH_SIZE_LIMIT_IN_BYTES = 50000000;
private final ObjectMapper objectMapper;
private final BiConsumer<List<EventLog>, List<NakadiEvent>> publisher;

private List<EventLog> rawBatch;
private List<NakadiEvent> mappedBatch;
private long aggregatedBatchSize;

public EventBatcher(ObjectMapper objectMapper, BiConsumer<List<EventLog>, List<NakadiEvent>> publisher) {
this.objectMapper = objectMapper;
this.publisher = publisher;

this.rawBatch = new ArrayList<>();
this.mappedBatch = new ArrayList<>();
this.aggregatedBatchSize = 0;
}

public void pushEvent(EventLog event, NakadiEvent nakadiEvent) {
long eventSize;

try {
eventSize = objectMapper.writeValueAsBytes(nakadiEvent).length;
} catch (Exception e) {
log.error("Could not serialize event {} of type {}, skipping it.", event.getId(), event.getEventType(), e);
return;
}


if (rawBatch.size() > 0 &&
(hasAnotherEventType(rawBatch, event) || batchWouldBecomeTooBig(aggregatedBatchSize, eventSize))) {
this.publisher.accept(rawBatch, mappedBatch);

rawBatch.clear();
mappedBatch.clear();
aggregatedBatchSize = 0;
}

rawBatch.add(event);
mappedBatch.add(nakadiEvent);
aggregatedBatchSize += eventSize;
}

public void finish() {
if (rawBatch.size() > 0) {
this.publisher.accept(rawBatch, mappedBatch);
}
}

private boolean hasAnotherEventType(List<EventLog> rawBatch, EventLog event) {
return !event.getEventType().equals(rawBatch.get(0).getEventType());
}

private boolean batchWouldBecomeTooBig(long aggregatedBatchSize, long eventSize) {
return aggregatedBatchSize + eventSize > 0.8 * NAKADI_BATCH_SIZE_LIMIT_IN_BYTES;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.zalando.fahrschein.EventPublishingException;
import org.zalando.fahrschein.domain.BatchItemResponse;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.transmission.NakadiPublishingClient;
Expand All @@ -12,10 +14,14 @@
import java.io.UncheckedIOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.time.temporal.ChronoUnit.MINUTES;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -60,6 +66,58 @@ public void sendEvent(EventLog eventLog) {

}

@Transactional
public void sendEvents(Collection<EventLog> events) {
EventBatcher batcher = new EventBatcher(objectMapper, this::publishBatch);

for (EventLog event : events) {
if (lockNearlyExpired(event)) {
// to avoid that two instances process this event, we skip it
continue;
}

NakadiEvent nakadiEvent;

try {
nakadiEvent = mapToNakadiEvent(event);
} catch (Exception e) {
log.error("Could not serialize event {} of type {}, skipping it.", event.getId(), event.getEventType(), e);
continue;
}

batcher.pushEvent(event, nakadiEvent);
}

batcher.finish();
}

private void publishBatch(List<EventLog> eventLogs, List<NakadiEvent> nakadiEvents) {
try {
this.tryToPublishBatch(eventLogs, nakadiEvents);
} catch (Exception e) {
log.error("Could not send {} events of type {}, skipping them.", eventLogs.size(), eventLogs.get(0).getEventType(), e);
}
}

private void tryToPublishBatch(List<EventLog> rawBatch, List<NakadiEvent> mappedBatch) throws Exception {
Stream<EventLog> successfulEvents;
try {
nakadiPublishingClient.publish(rawBatch.get(0).getEventType(), mappedBatch);
successfulEvents = rawBatch.stream();
log.info("Sent {} events of type {}.", rawBatch.size(), rawBatch.get(0).getEventType());
} catch (EventPublishingException e) {
log.error("{} out of {} events of type {} failed to be sent.", e.getResponses().length, rawBatch.size(), rawBatch.get(0).getEventType());
List<String> failedEids = collectEids(e);
successfulEvents = rawBatch.stream().filter(rawEvent -> failedEids.contains(convertToUUID(rawEvent.getId())));
}

successfulEvents.forEach(eventLogRepository::delete);
}

private List<String> collectEids(EventPublishingException e) {
return Arrays.stream(e.getResponses()).map(BatchItemResponse::getEid).collect(Collectors.toList());
}

private boolean lockNearlyExpired(EventLog eventLog) {
// since clocks never work exactly synchronous and sending the event also takes some time, we include a minute
// of safety buffer here. This is still not 100% precise, but since we require events to be consumed idempotent,
Expand Down Expand Up @@ -106,5 +164,4 @@ private String convertToUUID(final int number) {
return new UUID(0, number).toString();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ public EventTransmitter(EventTransmissionService eventTransmissionService) {
}

public void sendEvents() {
eventTransmissionService.lockSomeEvents().forEach(eventTransmissionService::sendEvent);
eventTransmissionService.sendEvents(eventTransmissionService.lockSomeEvents());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.junit.MockitoJUnitRunner;
import org.zalando.nakadiproducer.flowid.FlowIdComponent;
import org.zalando.nakadiproducer.util.Fixture;
import org.zalando.nakadiproducer.util.MockPayload;
Expand Down

0 comments on commit 2165e84

Please sign in to comment.