Skip to content

Commit 1b267f7

Browse files
mynameboratjagadish-v0
authored andcommitted
SAMZA-2055: Async high level api
https://cwiki.apache.org/confluence/display/SAMZA/SEP-21%3A+Samza+Async+API+for+High+Level Author: mynameborat <bharath.kumarasubramanian@gmail.com> Reviewers: Jagadish <jagadish@apache.org> Closes apache#905 from mynameborat/async-high-level-api
1 parent ccae8f9 commit 1b267f7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1168
-326
lines changed

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import java.util.ArrayList;
2323
import java.util.Collection;
2424

25+
import java.util.concurrent.CompletionStage;
2526
import org.apache.samza.annotation.InterfaceStability;
27+
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
2628
import org.apache.samza.operators.functions.FilterFunction;
2729
import org.apache.samza.operators.functions.FlatMapFunction;
2830
import org.apache.samza.operators.functions.JoinFunction;
@@ -67,6 +69,31 @@ public interface MessageStream<M> {
6769
*/
6870
<OM> MessageStream<OM> flatMap(FlatMapFunction<? super M, ? extends OM> flatMapFn);
6971

72+
/**
73+
* Applies the provided 1:n transformation asynchronously to this {@link MessageStream}. The asynchronous transformation
74+
* is specified through {@link AsyncFlatMapFunction}. The results are emitted to the downstream operators upon the
75+
* completion of the {@link CompletionStage} returned from the {@link AsyncFlatMapFunction}.
76+
* <p>
77+
* The operator can operate in two modes depending on <i>task.max.concurrency.</i>.
78+
* <ul>
79+
* <li>
80+
* Serialized (task.max.concurrency=1) - In this mode, each invocation of the {@link AsyncFlatMapFunction} is guaranteed
81+
* to happen-before next invocation.
82+
* </li>
83+
* <li>
84+
* Parallel (task.max.concurrency&gt;1) - In this mode, multiple invocations can happen in parallel without happens-before guarantee
85+
* and the {@link AsyncFlatMapFunction} is required synchronize any shared state. The operator doesn't provide any ordering guarantees.
86+
* i.e The results corresponding to each invocation of this operator might not be emitted in the same order as invocations.
87+
* By extension, the operator chain that follows it also doesn't have any ordering guarantees.
88+
* </li>
89+
* </ul>
90+
*
91+
* @param asyncFlatMapFn the async function to transform a message to zero or more messages
92+
* @param <OM> the type of messages in the transformed {@link MessageStream}
93+
* @return the transformed {@link MessageStream}
94+
*/
95+
<OM> MessageStream<OM> flatMapAsync(AsyncFlatMapFunction<? super M, ? extends OM> asyncFlatMapFn);
96+
7097
/**
7198
* Applies the provided function to messages in this {@link MessageStream} and returns the
7299
* filtered {@link MessageStream}.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.operators.functions;
20+
21+
import java.io.Serializable;
22+
import java.util.Collection;
23+
import java.util.concurrent.CompletionStage;
24+
import org.apache.samza.SamzaException;
25+
import org.apache.samza.annotation.InterfaceStability;
26+
27+
28+
/**
29+
* Asynchronous variant of the {@link FlatMapFunction} used in tandem with {@link org.apache.samza.operators.MessageStream#flatMapAsync(AsyncFlatMapFunction)}
30+
* to transform a collection of 0 or more messages.
31+
* <p>
32+
* Typically, {@link AsyncFlatMapFunction} is used for describing complex transformations that involve IO operations or remote calls.
33+
* The following pseudo code demonstrates a sample implementation of {@link AsyncFlatMapFunction} that sends out an email
34+
* and returns the status asynchronously.
35+
* <pre> {@code
36+
* AsyncFlatMapFunction<Email, Status> asyncEmailSender = (Email message) -> {
37+
* ...
38+
*
39+
* Request<Email> emailRequest = buildEmailRequest(message);
40+
* Future<EmailResponse> emailResponseFuture = emailClient.sendRequest(emailRequest); // send email asynchronously
41+
* ...
42+
*
43+
* return new CompletableFuture<>(emailResponseFuture)
44+
* .thenApply(response -> fetchStatus(response);
45+
* }
46+
* }
47+
* </pre>
48+
*
49+
* <p>
50+
* The function needs to be thread safe in case of task.max.concurrency&gt;1. It also needs to coordinate any
51+
* shared state since happens-before is not guaranteed between the messages delivered to the function. Refer to
52+
* {@link org.apache.samza.operators.MessageStream#flatMapAsync(AsyncFlatMapFunction)} docs for more details on the modes
53+
* and guarantees.
54+
*
55+
* <p>
56+
* For each invocation, the {@link CompletionStage} returned by the function should be completed successfully/exceptionally
57+
* within task.callback.timeout.ms; failure to do so will result in {@link SamzaException} bringing down the application.
58+
*
59+
* @param <M> type of the input message
60+
* @param <OM> type of the transformed messages
61+
*/
62+
@InterfaceStability.Unstable
63+
@FunctionalInterface
64+
public interface AsyncFlatMapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable {
65+
66+
/**
67+
* Transforms the provided message into a collection of 0 or more messages.
68+
*
69+
* @param message the input message to be transformed
70+
* @return a {@link CompletionStage} of a {@link Collection} of transformed messages
71+
*/
72+
CompletionStage<Collection<OM>> apply(M message);
73+
}

samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525

2626
import org.apache.samza.SamzaException;
2727
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
28+
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
2829
import org.apache.samza.operators.functions.FilterFunction;
2930
import org.apache.samza.operators.functions.FlatMapFunction;
3031
import org.apache.samza.operators.functions.JoinFunction;
3132
import org.apache.samza.operators.functions.MapFunction;
3233
import org.apache.samza.operators.functions.SinkFunction;
3334
import org.apache.samza.operators.functions.StreamTableJoinFunction;
35+
import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
3436
import org.apache.samza.operators.spec.BroadcastOperatorSpec;
3537
import org.apache.samza.operators.spec.JoinOperatorSpec;
3638
import org.apache.samza.operators.spec.OperatorSpec;
@@ -102,6 +104,14 @@ public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> f
102104
return new MessageStreamImpl<>(this.streamAppDesc, op);
103105
}
104106

107+
@Override
108+
public <OM> MessageStream<OM> flatMapAsync(AsyncFlatMapFunction<? super M, ? extends OM> flatMapFn) {
109+
String opId = this.streamAppDesc.getNextOpId(OpCode.ASYNC_FLAT_MAP);
110+
AsyncFlatMapOperatorSpec<M, OM> op = OperatorSpecs.createAsyncOperatorSpec(flatMapFn, opId);
111+
this.operatorSpec.registerNextOperatorSpec(op);
112+
return new MessageStreamImpl<>(this.streamAppDesc, op);
113+
}
114+
105115
@Override
106116
public void sink(SinkFunction<? super M> sinkFn) {
107117
String opId = this.streamAppDesc.getNextOpId(OpCode.SINK);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.operators.impl;
20+
21+
import java.util.Collection;
22+
import java.util.concurrent.CompletionStage;
23+
import org.apache.samza.context.Context;
24+
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
25+
import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
26+
import org.apache.samza.operators.spec.OperatorSpec;
27+
import org.apache.samza.task.MessageCollector;
28+
import org.apache.samza.task.TaskCoordinator;
29+
30+
31+
public class AsyncFlatmapOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
32+
private final AsyncFlatMapOperatorSpec<M, RM> opSpec;
33+
private final AsyncFlatMapFunction<M, RM> transformFn;
34+
35+
AsyncFlatmapOperatorImpl(AsyncFlatMapOperatorSpec<M, RM> opSpec) {
36+
this.opSpec = opSpec;
37+
this.transformFn = opSpec.getTransformFn();
38+
}
39+
@Override
40+
protected void handleInit(Context context) {
41+
this.transformFn.init(context);
42+
}
43+
44+
@Override
45+
protected CompletionStage<Collection<RM>> handleMessageAsync(M message, MessageCollector collector,
46+
TaskCoordinator coordinator) {
47+
return transformFn.apply(message);
48+
}
49+
50+
@Override
51+
protected void handleClose() {
52+
}
53+
54+
@Override
55+
protected OperatorSpec<M, RM> getOperatorSpec() {
56+
return opSpec;
57+
}
58+
}

samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.samza.operators.impl;
2121

22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionStage;
2224
import org.apache.samza.context.Context;
2325
import org.apache.samza.operators.spec.BroadcastOperatorSpec;
2426
import org.apache.samza.operators.spec.OperatorSpec;
@@ -50,9 +52,10 @@ protected void handleInit(Context context) {
5052
}
5153

5254
@Override
53-
protected Collection<Void> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
55+
protected CompletionStage<Collection<Void>> handleMessageAsync(M message, MessageCollector collector,
56+
TaskCoordinator coordinator) {
5457
collector.send(new OutgoingMessageEnvelope(systemStream, 0, null, message));
55-
return Collections.emptyList();
58+
return CompletableFuture.completedFuture(Collections.emptyList());
5659
}
5760

5861
@Override

samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java renamed to samza-core/src/main/java/org/apache/samza/operators/impl/FlatmapOperatorImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.samza.operators.impl;
2020

21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionStage;
2123
import org.apache.samza.context.Context;
2224
import org.apache.samza.operators.functions.FlatMapFunction;
2325
import org.apache.samza.operators.spec.OperatorSpec;
@@ -34,12 +36,12 @@
3436
* @param <M> the type of input message
3537
* @param <RM> the type of result
3638
*/
37-
class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
39+
class FlatmapOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
3840

3941
private final StreamOperatorSpec<M, RM> streamOpSpec;
4042
private final FlatMapFunction<M, RM> transformFn;
4143

42-
StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec) {
44+
FlatmapOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec) {
4345
this.streamOpSpec = streamOpSpec;
4446
this.transformFn = streamOpSpec.getTransformFn();
4547
}
@@ -50,9 +52,9 @@ protected void handleInit(Context context) {
5052
}
5153

5254
@Override
53-
public Collection<RM> handleMessage(M message, MessageCollector collector,
55+
protected CompletionStage<Collection<RM>> handleMessageAsync(M message, MessageCollector collector,
5456
TaskCoordinator coordinator) {
55-
return this.transformFn.apply(message);
57+
return CompletableFuture.completedFuture(this.transformFn.apply(message));
5658
}
5759

5860
@Override

samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.samza.operators.impl;
2020

21+
import java.util.Optional;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionStage;
2124
import org.apache.samza.context.Context;
2225
import org.apache.samza.operators.KV;
2326
import org.apache.samza.system.descriptors.InputTransformer;
@@ -47,18 +50,21 @@ protected void handleInit(Context context) {
4750
}
4851

4952
@Override
50-
public Collection<Object> handleMessage(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
51-
Object message;
53+
protected CompletionStage<Collection<Object>> handleMessageAsync(IncomingMessageEnvelope message,
54+
MessageCollector collector, TaskCoordinator coordinator) {
55+
Object result;
5256
InputTransformer transformer = inputOpSpec.getTransformer();
5357
if (transformer != null) {
54-
message = transformer.apply(ime);
58+
result = transformer.apply(message);
5559
} else {
56-
message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), ime.getMessage()) : ime.getMessage();
60+
result = this.inputOpSpec.isKeyed() ? KV.of(message.getKey(), message.getMessage()) : message.getMessage();
5761
}
58-
if (message != null) {
59-
return Collections.singletonList(message);
60-
}
61-
return Collections.emptyList();
62+
63+
Collection<Object> output = Optional.ofNullable(result)
64+
.map(Collections::singletonList)
65+
.orElse(Collections.emptyList());
66+
67+
return CompletableFuture.completedFuture(output);
6268
}
6369

6470
@Override

0 commit comments

Comments
 (0)