Skip to content

Commit

Permalink
Waiting for ready comment on SSE channel to mark connection as ready (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lucassaldanha authored Dec 14, 2022
1 parent 20a2369 commit 2eda229
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,34 @@
public class Eth2EventHandler implements EventHandler {
private final List<PackedMessage> eventList = new ArrayList<>();

// Using this as a way to assert that our EventSubscriber is ready
private boolean hasReceivedReadyComment = false;

@Override
public void onOpen() {}

@Override
public void onClosed() {}

@Override
public void onMessage(final String event, final MessageEvent messageEvent) {
public synchronized void onMessage(final String event, final MessageEvent messageEvent) {
eventList.add(new PackedMessage(event, messageEvent));
}

public List<PackedMessage> getMessages() {
return eventList;
public synchronized List<PackedMessage> getMessages() {
return List.copyOf(eventList);
}

@Override
public void onComment(final String comment) {}
public void onComment(final String comment) {
if (!hasReceivedReadyComment) {
hasReceivedReadyComment = comment.equals("ready");
}
}

public boolean hasReceivedComment() {
return hasReceivedReadyComment;
}

@Override
public void onError(final Throwable t) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.test.acceptance.dsl;

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.ReadyState;
import java.net.URI;
import java.util.List;

Expand All @@ -30,6 +31,10 @@ public List<Eth2EventHandler.PackedMessage> getMessages() {
return handler.getMessages();
}

public boolean isReady() {
return eventSource.getState() == ReadyState.OPEN && handler.hasReceivedComment();
}

public void close() {
eventSource.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void start() throws Exception {
public void startEventListener(final EventType... eventTypes) {
maybeEventStreamListener =
Optional.of(new EventStreamListener(getEventUrl(List.of(eventTypes))));
waitFor(() -> assertThat(maybeEventStreamListener.get().isReady()).isTrue());
}

public void waitForContributionAndProofEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,13 @@ private void keepAlive() {
.ifExceptionGetsHereRaiseABug();
}
}

/*
Using this as a way of notifying clients that our EventSubscriber is ready and they should start receiving events
*/
public void sendReadyComment() {
if (!stopped.get()) {
asyncRunner.runAsync(() -> sseClient.sendComment("ready")).ifExceptionGetsHereRaiseABug();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public EventSubscriptionManager(
}

public void registerClient(final SseClient sseClient) {
LOG.trace("connected " + sseClient.hashCode());
LOG.trace("SSE client connected " + sseClient.hashCode());
final List<String> allTopicsInContext =
ListQueryParameterUtils.getParameterAsStringList(sseClient.ctx().queryParamMap(), TOPICS);
final EventSubscriber subscriber =
Expand All @@ -101,6 +101,7 @@ public void registerClient(final SseClient sseClient) {
timeProvider,
maxPendingEvents);
eventSubscribers.add(subscriber);
subscriber.sendReadyComment();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public class TestServletOutputStream extends ServletOutputStream {
Expand Down Expand Up @@ -55,11 +55,9 @@ public int countComments() {
}

public List<String> getEvents() {
final List<String> result = new ArrayList<>();
String[] splits = StringUtils.splitByWholeSeparator(getString(), "event: ");
for (String s : splits) {
result.add(String.format("event: %s", s));
}
return result;
return Arrays.stream(StringUtils.splitByWholeSeparator(getString(), "event: "))
.filter(s -> !s.startsWith(": ")) // remove SSE comments
.map(s -> String.format("event: %s", s))
.collect(Collectors.toList());
}
}

0 comments on commit 2eda229

Please sign in to comment.