Skip to content

Commit

Permalink
Merge pull request #46 from zalando-incubator/bugfix/#44-flow_id-sending
Browse files Browse the repository at this point in the history
#44 flow id sending
  • Loading branch information
BGehrels authored Jun 15, 2017
2 parents b34dcee + 6e8a2d4 commit 1ea0d45
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 6 deletions.
2 changes: 1 addition & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>4.0.1</version>
<version>4.0.2</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down
7 changes: 6 additions & 1 deletion nakadi-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>4.0.1</version>
<version>4.0.2</version>
</parent>

<artifactId>nakadi-producer</artifactId>
Expand Down Expand Up @@ -79,6 +79,11 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public NakadiEvent mapToNakadiEvent(final EventLog event) {
final NakadiMetadata metadata = new NakadiMetadata();
metadata.setEid(convertToUUID(event.getId()));
metadata.setOccuredAt(event.getCreated());
metadata.setFlowId(event.getFlowId());
nakadiEvent.setMetadata(metadata);

HashMap<String, Object> payloadDTO;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.zalando.nakadiproducer.transmission.impl;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.time.Instant;

import lombok.Data;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;

@Data
public class NakadiMetadata {
Expand All @@ -17,4 +17,7 @@ public class NakadiMetadata {
@JsonFormat(shape = JsonFormat.Shape.STRING)
private Instant occuredAt;

@JsonProperty("flow_id")
private String flowId;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.zalando.nakadiproducer.transmission.impl;

import static com.jayway.jsonpath.Criteria.where;
import static com.jayway.jsonpath.JsonPath.read;
import static java.time.Instant.now;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

import java.util.List;

import org.junit.Before;
import org.junit.Test;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.transmission.MockNakadiPublishingClient;
import org.zalando.nakadiproducer.util.Fixture;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class TransmissionServiceTest {

private EventTransmissionService service;
private EventLogRepository repo;
private MockNakadiPublishingClient publishingClient;
private ObjectMapper mapper;

@Before
public void setUp() {
repo = mock(EventLogRepository.class);
publishingClient = new MockNakadiPublishingClient();
mapper = new ObjectMapper();
service = new EventTransmissionService(repo, publishingClient, mapper);
}

@Test
public void testWithFlowId() throws JsonProcessingException {
String flowId = "XYZ";
String payloadString = mapper.writeValueAsString(Fixture.mockPayload(42, "bla"));
EventLog ev = new EventLog(27, "type", payloadString, flowId, now(), now(), null, null);

service.sendEvent(ev);

List<String> events = publishingClient.getSentEvents("type");
assertThat(events, hasSize(1));
assertThat(read(events.get(0), "$.metadata.flow_id"), is(flowId));
}

@Test
public void testWithoutFlowId() throws JsonProcessingException {
String payloadString = mapper.writeValueAsString(Fixture.mockPayload(42, "bla"));
EventLog ev = new EventLog(27, "type", payloadString, null, now(), now(), null, null);

service.sendEvent(ev);

List<String> events = publishingClient.getSentEvents("type");
assertThat(events, hasSize(1));
assertThat(read(events.get(0), "$.metadata.[?]", where("flow_id").exists(true)), is(empty()));
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<artifactId>nakadi-producer-reactor</artifactId>
<groupId>org.zalando</groupId>
<version>4.0.1</version>
<version>4.0.2</version>
<packaging>pom</packaging>
<name>Nakadi Event Producer Reactor</name>

Expand Down

0 comments on commit 1ea0d45

Please sign in to comment.