Skip to content

Commit

Permalink
SAMZA-1092: replace stream spec in fluent API
Browse files Browse the repository at this point in the history
Replaced the StreamSpec class w/ the new one from SAMZA-1075.

Author: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>

Closes apache#58 from nickpan47/replace-stream-spec and squashes the following commits:

761ebb5 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in system package
df953c2 [Yi Pan (Data Infrastructure)] SAMZA-1092: fix unit test
71331d8 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class
2fb19e9 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in fluent API
ed3ad8e [Yi Pan (Data Infrastructure)] WIP: replace stream spec in fluent API
  • Loading branch information
nickpan47 committed Feb 23, 2017
1 parent b5ea877 commit e6147fd
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;

import java.util.Map;


/**
* Job-level programming interface to create an operator DAG and run in various different runtime environments.
*/
Expand Down
46 changes: 0 additions & 46 deletions samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.samza.operators;

import java.util.Properties;
import java.util.function.Function;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
Expand All @@ -44,6 +44,9 @@ public class StreamGraphImpl implements StreamGraph {
*/
private int opId = 0;

// TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
// after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
// s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
final StreamSpec spec;
final Serde<K> keySerde;
Expand Down Expand Up @@ -83,7 +86,7 @@ public SinkFunction<M> getSinkFunction() {
// TODO: need to find a way to directly pass in the serde class names
// mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
// message.getKey(), message.getKey(), message.getMessage()));
mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
};
}
}
Expand Down Expand Up @@ -112,10 +115,10 @@ public SinkFunction<M> getSinkFunction() {
// mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
// message.getKey(), message.getKey(), message.getMessage()));
if (this.parKeyFn == null) {
mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
} else {
// apply partition key function
mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
}
};
}
Expand All @@ -124,17 +127,17 @@ public SinkFunction<M> getSinkFunction() {
/**
* Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
*/
private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
private final Map<String, MessageStream> inStreams = new HashMap<>();
private final Map<String, OutputStream> outStreams = new HashMap<>();

private ContextManager contextManager = new ContextManager() { };

@Override
public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
if (!this.inStreams.containsKey(streamSpec.getId())) {
this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
}
return this.inStreams.get(streamSpec.getSystemStream());
return this.inStreams.get(streamSpec.getId());
}

/**
Expand All @@ -146,10 +149,10 @@ public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(S
*/
@Override
public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
if (!this.outStreams.containsKey(streamSpec.getId())) {
this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
}
return this.outStreams.get(streamSpec.getSystemStream());
return this.outStreams.get(streamSpec.getId());
}

/**
Expand All @@ -161,12 +164,12 @@ public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(S
*/
@Override
public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
if (!this.inStreams.containsKey(streamSpec.getId())) {
this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
}
IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
if (!this.outStreams.containsKey(streamSpec.getId())) {
this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
}
return intStream;
}
Expand Down Expand Up @@ -200,12 +203,15 @@ public ContextManager getContextManager() {
/**
* Helper method to be get the input stream via {@link SystemStream}
*
* @param systemStream the {@link SystemStream}
* @param sstream the {@link SystemStream}
* @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
*/
public MessageStreamImpl getInputStream(SystemStream systemStream) {
if (this.inStreams.containsKey(systemStream)) {
return (MessageStreamImpl) this.inStreams.get(systemStream);
public MessageStreamImpl getInputStream(SystemStream sstream) {
for(MessageStream entry: this.inStreams.values()) {
if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
return (MessageStreamImpl) entry;
}
}
return null;
}
Expand All @@ -217,13 +223,6 @@ <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
return null;
}

<M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
if (this.inStreams.containsValue(outStream)) {
return (MessageStream<M>) outStream;
}
return null;
}

/**
* Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
*
Expand All @@ -234,27 +233,21 @@ <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
*/
<PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
// TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
StreamSpec streamSpec = new StreamSpec() {
@Override
public SystemStream getSystemStream() {
// TODO: should auto-generate intermedaite stream name here
return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
}
StreamSpec streamSpec = this.createIntStreamSpec();

@Override
public Properties getProperties() {
return null;
}
};

if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
if (!this.inStreams.containsKey(streamSpec.getId())) {
this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
}
IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
if (!this.outStreams.containsKey(streamSpec.getId())) {
this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
}
return intStream;
}

private StreamSpec createIntStreamSpec() {
// TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;

import java.util.Properties;


/**
* Example code using {@link KeyValueStore} to implement event-time window
Expand Down Expand Up @@ -113,25 +111,9 @@ public void init(Config config, TaskContext context) {
}
}

StreamSpec input1 = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "PageViewEvent");
}

@Override public Properties getProperties() {
return null;
}
};
StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");

StreamSpec output = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "PageViewPerMember5min");
}

@Override public Properties getProperties() {
return null;
}
};
StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");

class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
String pageId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,24 @@
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.CommandLine;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


/**
* Example {@link StreamGraphBuilder} code to test the API methods
*/
public class NoContextStreamExample implements StreamGraphBuilder {

StreamSpec input1 = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "input1");
}
StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");

@Override public Properties getProperties() {
return null;
}
};
StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");

StreamSpec input2 = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "input2");
}

@Override public Properties getProperties() {
return null;
}
};

StreamSpec output = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "output");
}

@Override public Properties getProperties() {
return null;
}
};
StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");

class MessageType {
String joinKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@
import org.apache.samza.operators.StreamGraphBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.StreamSpec;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;

import java.util.Properties;


/**
* Simple 2-way stream-to-stream join example
Expand Down Expand Up @@ -71,35 +68,11 @@ public static void main(String[] args) throws Exception {
standaloneEnv.run(new OrderShipmentJoinExample(), config);
}

StreamSpec input1 = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "Orders");
}

@Override public Properties getProperties() {
return null;
}
};

StreamSpec input2 = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "Shipment");
}
StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");

@Override public Properties getProperties() {
return null;
}
};
StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");

StreamSpec output = new StreamSpec() {
@Override public SystemStream getSystemStream() {
return new SystemStream("kafka", "FulfilledOrders");
}

@Override public Properties getProperties() {
return null;
}
};
StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");

class OrderRecord implements MessageEnvelope<String, OrderRecord> {
String orderId;
Expand Down
Loading

0 comments on commit e6147fd

Please sign in to comment.