Skip to content

Commit

Permalink
SAMZA-1073: top-level fluent API
Browse files Browse the repository at this point in the history
`Initial draft of top-level fluent API for operator DAGs

Author: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>, Jacob Maes <jmaes@linkedin.com>, Prateek Maheshwari <pmaheshw@linkedin.com>

Closes apache#51 from nickpan47/samza-fluent-api-v1 and squashes the following commits:

001be63 [Yi Pan (Data Infrastructure)] SAMZA-1073: Addressing review feedbacks. Change StreamGraphFactory to StreamGraphBuilder.
373048a [Yi Pan (Data Infrastructure)] SAMZA-1073: top-level fluent API `
  • Loading branch information
nickpan47 committed Feb 16, 2017
1 parent 09bf833 commit c249443
Show file tree
Hide file tree
Showing 73 changed files with 3,080 additions and 1,006 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.task.TaskContext;


/**
* Interface class defining methods to initialize and finalize the context used by the transformation functions.
*/
@InterfaceStability.Unstable
public interface ContextManager {
/**
* The initialization method to create shared context for the whole task in Samza. Default to NO-OP
*
* @param config the configuration object for the task
* @param context the {@link TaskContext} object
* @return User-defined task-wide context object
*/
default TaskContext initTaskContext(Config config, TaskContext context) {
return context;
}

/**
* The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
*
*/
default void finalizeTaskContext() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
Expand All @@ -29,97 +28,116 @@
import org.apache.samza.operators.windows.WindowPane;

import java.util.Collection;
import java.util.function.Function;


/**
* Represents a stream of {@link MessageEnvelope}s.
* Represents a stream of messages.
* <p>
* A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
*
* @param <M> type of {@link MessageEnvelope}s in this stream
* @param <M> type of messages in this stream
*/
@InterfaceStability.Unstable
public interface MessageStream<M extends MessageEnvelope> {
public interface MessageStream<M> {

/**
* Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
* Applies the provided 1:1 function to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
*
* @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope}
* @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
* @param mapFn the function to transform a message to another message
* @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
<TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);

/**
* Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream}
* to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
* Applies the provided 1:n function to transform a message in this {@link MessageStream}
* to n messages in the transformed {@link MessageStream}
*
* @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s
* @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
* @param flatMapFn the function to transform a message to zero or more messages
* @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
<TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);

/**
* Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
* Applies the provided function to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
* <p>
* The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream}
* The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
* should be retained in the transformed {@link MessageStream}.
*
* @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream}
* @param filterFn the predicate to filter messages from this {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<M> filterFn);

/**
* Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output
* {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}.
* Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
*
* @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems
* NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
*
* @param sinkFn the function to send messages in this stream to output
*/
void sink(SinkFunction<M> sinkFn);

/**
* Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window}
* Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
*
* NOTE: the {@code stream} has to be a {@link MessageStream}.
*
* @param stream the output {@link MessageStream}
*/
void sendTo(OutputStream<M> stream);

/**
* Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
* (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
* {@link WindowPane}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
*
* @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream}
* @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified,
* @param window the window to group and process messages from this {@link MessageStream}
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
* panes are emitted per-key.
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
* @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window);
<K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);

/**
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
* <p>
* We currently only support 2-way joins.
*
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream}
* @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param <K> the type of join key
* @param <OM> the type of {@link MessageEnvelope}s in the other stream
* @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn}
* @param <OM> the type of messages in the other stream
* @param <RM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
<K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream,
JoinFunction<M, OM, RM> joinFn);
<K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn);

/**
* Merge all {@code otherStreams} with this {@link MessageStream}.
* <p>
* The merging streams must have the same {@link MessageEnvelope} type {@code M}.
* The merging streams must have the same messages of type {@code M}.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);


/**
* Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
*
* Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
*
* @param parKeyExtractor a {@link Function} that extract the partition key from a message in this {@link MessageStream}
* @param <K> the type of partition key
* @return a {@link MessageStream} object after the re-partition
*/
<K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.functions.SinkFunction;


/**
* The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
*
* @param <M> The type of message to be send to this output stream
*/
@InterfaceStability.Unstable
public interface OutputStream<M> {

/**
* Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
* via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
* Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
*
* @return The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
*/
SinkFunction<M> getSinkFunction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.serializers.Serde;

import java.util.Map;


/**
* Job-level programming interface to create an operator DAG and run in various different runtime environments.
*/
@InterfaceStability.Unstable
public interface StreamGraph {
/**
* Method to add an input {@link MessageStream} from the system
*
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
* @param keySerde the serde used to serialize/deserialize the message key from the input {@link MessageStream}
* @param msgSerde the serde used to serialize/deserialize the message body from the input {@link MessageStream}
* @param <K> the type of key in the input message
* @param <V> the type of message in the input message
* @param <M> the type of {@link MessageEnvelope} in the input {@link MessageStream}
* @return the input {@link MessageStream} object
*/
<K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);

/**
* Method to add an output {@link MessageStream} from the system
*
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
* @param keySerde the serde used to serialize/deserialize the message key from the output {@link MessageStream}
* @param msgSerde the serde used to serialize/deserialize the message body from the output {@link MessageStream}
* @param <K> the type of key in the output message
* @param <V> the type of message in the output message
* @param <M> the type of {@link MessageEnvelope} in the output {@link MessageStream}
* @return the output {@link MessageStream} object
*/
<K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);

/**
* Method to add an intermediate {@link MessageStream} from the system
*
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
* @param keySerde the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
* @param msgSerde the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
* @param <K> the type of key in the intermediate message
* @param <V> the type of message in the intermediate message
* @param <M> the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStream} object
*/
<K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);

/**
* Method to get the input {@link MessageStream}s
*
* @return the input {@link MessageStream}
*/
Map<StreamSpec, MessageStream> getInStreams();

/**
* Method to get the {@link OutputStream}s
*
* @return the map of all {@link OutputStream}s
*/
Map<StreamSpec, OutputStream> getOutStreams();

/**
* Method to set the {@link ContextManager} for this {@link StreamGraph}
*
* @param manager the {@link ContextManager} object
* @return this {@link StreamGraph} object
*/
StreamGraph withContextManager(ContextManager manager);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;


/**
* This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
*/
@InterfaceStability.Unstable
public interface StreamGraphBuilder {
/**
* Users are required to implement this abstract method to initialize the processing logic of the application, in terms
* of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
*
* @param graph an empty {@link StreamGraph} object to be initialized
* @param config the {@link Config} of the application
*/
void init(StreamGraph graph, Config config);
}
Loading

0 comments on commit c249443

Please sign in to comment.