Skip to content

Commit 5fcfc95

Browse files
Daniel Nishimuraatoomula
authored andcommitted
SAMZA-1990: Samza framework should let using the same system stream as both input and output.
**Symptom:** An `IllegalArgumentException` is thrown when the same `streamId` is referred from multiple input/output stream descriptors. **Cause:** The `ApplicationDescriptorImpl` caches the serde instances for streams by a `streamId` and there's a check to ensure the expected stream serde matches when using the same stream from multiple input/output descriptors. However the check is incorrect because it compares serde instances and not serde types. This check always fails in this scenario. **Fix:** Compare the stream serdes for a particular `streamId` by type. Please take a look prateekm nickpan47 CC: atoomula Author: Daniel Nishimura <dnishimura@linkedin.com> Reviewers: prateekm Closes apache#928 from dnishimura/samza-1990-same-stream-different-inputoutputdescriptors
1 parent d5d0956 commit 5fcfc95

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,12 @@ KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
311311
". Values will not be (de)serialized");
312312
}
313313
streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
314-
} else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
314+
} else if (!currentSerdePair.getKey().getClass().equals(keySerde.getClass())
315+
|| !currentSerdePair.getValue().getClass().equals(valueSerde.getClass())) {
315316
throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to "
316317
+ "different serdes.", streamId));
318+
} else {
319+
LOGGER.warn("Using previously defined serde for streamId: " + streamId + ".");
317320
}
318321
return streamSerdes.get(streamId);
319322
}

samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,7 @@ public void testEndToEndMultiSqlStmts() {
174174
Assert.assertEquals(numMessages, outMessagesSet.size());
175175
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
176176
}
177-
178-
// The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream
179-
// to be used as both input and output stream.
180-
@Ignore
177+
181178
@Test
182179
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
183180
int numMessages = 20;

0 commit comments

Comments
 (0)