Skip to content

Commit ad8ba96

Browse files
SAMZA-1271; Guarantee predictable, deterministic order for operator initialization and finalization
Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes: - No fixed order of iteration for all subscribed `OperatorSpec`s for a given `MessageStream` - No fixed order of iteration for all the `OperatorImpl`s in the `OperatorImplGraph` We aim to provide the following 2 guarantees in this patch: For any 2 operators A, B in the graph, if B consumes the output of A: - A is initialized before B is initialized - A is finalized only after B is finalized Author: vjagadish1989 <jvenkatr@linkedin.com> Reviewers: Prateek Maheshwari<pmaheshw@linkedin.com> Closes apache#211 from vjagadish1989/deterministic_order
1 parent a417430 commit ad8ba96

File tree

9 files changed

+180
-18
lines changed

9 files changed

+180
-18
lines changed

samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
*
2727
* <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
2828
*
29+
* <p> Order of finalization: {@link ClosableFunction}s are invoked in the reverse topological order of operators in the
30+
* {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
31+
* from operator A, then operator B is guaranteed to be closed before operator A.
32+
*
2933
*/
3034
@InterfaceStability.Unstable
3135
public interface ClosableFunction {

samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525

2626
/**
2727
* A function that can be initialized before execution.
28+
*
29+
* <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
30+
* {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
31+
* from operator A, then operator A is guaranteed to be initialized before operator B.
32+
*
2833
*/
2934
@InterfaceStability.Unstable
3035
public interface InitableFunction {

samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import java.util.ArrayList;
4242
import java.util.Collection;
4343
import java.util.Collections;
44-
import java.util.HashSet;
44+
import java.util.LinkedHashSet;
4545
import java.util.List;
4646
import java.util.Set;
4747
import java.util.function.Function;
@@ -61,8 +61,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
6161

6262
/**
6363
* The set of operators that consume the messages in this {@link MessageStream}
64+
*
65+
* Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
6466
*/
65-
private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
67+
private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>();
6668

6769
/**
6870
* Default constructor

samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131

3232
import java.util.Collection;
3333
import java.util.Collections;
34-
import java.util.HashMap;
3534
import java.util.HashSet;
35+
import java.util.LinkedHashMap;
3636
import java.util.Map;
3737
import java.util.Set;
3838
import java.util.function.BiFunction;
@@ -51,8 +51,9 @@ public class StreamGraphImpl implements StreamGraph {
5151
*/
5252
private int opId = 0;
5353

54-
private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
55-
private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
54+
// Using LHM for deterministic order in initializing and closing operators.
55+
private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>();
56+
private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>();
5657
private final ApplicationRunner runner;
5758
private final Config config;
5859

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.samza.operators.impl;
2020

21+
import com.google.common.collect.Lists;
2122
import org.apache.samza.config.Config;
2223
import org.apache.samza.operators.MessageStreamImpl;
2324
import org.apache.samza.operators.StreamGraphImpl;
@@ -31,9 +32,12 @@
3132
import org.apache.samza.util.Clock;
3233
import org.apache.samza.util.SystemClock;
3334

35+
import java.util.ArrayList;
3436
import java.util.Collection;
3537
import java.util.Collections;
3638
import java.util.HashMap;
39+
import java.util.LinkedHashMap;
40+
import java.util.List;
3741
import java.util.Map;
3842

3943

@@ -47,8 +51,10 @@ public class OperatorImplGraph {
4751
* A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
4852
* multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
4953
* input {@link MessageStreamImpl}s.
54+
*
55+
* Using LHM for deterministic ordering in initializing and closing operators.
5056
*/
51-
private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>();
57+
private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>();
5258

5359
/**
5460
* A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
@@ -99,13 +105,10 @@ public Collection<RootOperatorImpl> getAllRootOperators() {
99105
return Collections.unmodifiableCollection(this.rootOperators.values());
100106
}
101107

102-
/**
103-
* Get all {@link OperatorImpl}s for the graph.
104-
*
105-
* @return an unmodifiable view of all {@link OperatorImpl}s for the graph
106-
*/
107-
public Collection<OperatorImpl> getAllOperators() {
108-
return Collections.unmodifiableCollection(this.operatorImpls.values());
108+
public void close() {
109+
List<OperatorImpl> initializationOrder = new ArrayList<>(operatorImpls.values());
110+
List<OperatorImpl> finalizationOrder = Lists.reverse(initializationOrder);
111+
finalizationOrder.forEach(OperatorImpl::close);
109112
}
110113

111114
/**

samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public Collection<OM> apply(M message) {
7171
public void init(Config config, TaskContext context) {
7272
mapFn.init(config, context);
7373
}
74+
75+
@Override
76+
public void close() {
77+
mapFn.close();
78+
}
7479
}, nextStream, OperatorSpec.OpCode.MAP, opId);
7580
}
7681

@@ -101,6 +106,12 @@ public Collection<M> apply(M message) {
101106
public void init(Config config, TaskContext context) {
102107
filterFn.init(config, context);
103108
}
109+
110+
@Override
111+
public void close() {
112+
filterFn.close();
113+
}
114+
104115
}, nextStream, OperatorSpec.OpCode.FILTER, opId);
105116
}
106117

samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.samza.config.Config;
2323
import org.apache.samza.operators.ContextManager;
2424
import org.apache.samza.operators.StreamGraphImpl;
25-
import org.apache.samza.operators.impl.OperatorImpl;
2625
import org.apache.samza.operators.impl.OperatorImplGraph;
2726
import org.apache.samza.operators.impl.RootOperatorImpl;
2827
import org.apache.samza.operators.stream.InputStreamInternal;
@@ -32,7 +31,6 @@
3231
import org.apache.samza.util.Clock;
3332
import org.apache.samza.util.SystemClock;
3433

35-
import java.util.Collection;
3634
import java.util.HashMap;
3735
import java.util.Map;
3836

@@ -142,8 +140,6 @@ public void close() throws Exception {
142140
if (this.contextManager != null) {
143141
this.contextManager.close();
144142
}
145-
146-
Collection<OperatorImpl> allOperators = operatorImplGraph.getAllOperators();
147-
allOperators.forEach(OperatorImpl::close);
143+
operatorImplGraph.close();
148144
}
149145
}

samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818
*/
1919
package org.apache.samza.operators;
2020

21+
import junit.framework.Assert;
2122
import org.apache.samza.config.Config;
2223
import org.apache.samza.config.JobConfig;
2324
import org.apache.samza.operators.data.MessageType;
2425
import org.apache.samza.operators.data.TestInputMessageEnvelope;
2526
import org.apache.samza.operators.data.TestMessageEnvelope;
27+
import org.apache.samza.operators.stream.InputStreamInternal;
2628
import org.apache.samza.operators.stream.InputStreamInternalImpl;
2729
import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
2830
import org.apache.samza.operators.stream.OutputStreamInternalImpl;
2931
import org.apache.samza.runtime.ApplicationRunner;
3032
import org.apache.samza.system.StreamSpec;
3133
import org.junit.Test;
3234

35+
import java.util.ArrayList;
3336
import java.util.function.BiFunction;
3437
import java.util.function.Function;
3538

@@ -193,4 +196,30 @@ public void testGetNextOpId() {
193196
assertEquals(graph.getNextOpId(), 1);
194197
}
195198

199+
@Test
200+
public void testGetInputStreamPreservesInsertionOrder() {
201+
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
202+
Config mockConfig = mock(Config.class);
203+
204+
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
205+
206+
StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
207+
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
208+
209+
StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
210+
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
211+
212+
StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
213+
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
214+
215+
graph.getInputStream("test-stream-1", (k, v) -> v);
216+
graph.getInputStream("test-stream-2", (k, v) -> v);
217+
graph.getInputStream("test-stream-3", (k, v) -> v);
218+
219+
ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
220+
Assert.assertEquals(inputMessageStreams.size(), 3);
221+
Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
222+
Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
223+
Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
224+
}
196225
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.operators.impl;
21+
22+
import org.apache.samza.config.Config;
23+
import org.apache.samza.metrics.MetricsRegistryMap;
24+
import org.apache.samza.operators.MessageStream;
25+
import org.apache.samza.operators.StreamGraphImpl;
26+
import org.apache.samza.operators.functions.MapFunction;
27+
import org.apache.samza.runtime.ApplicationRunner;
28+
import org.apache.samza.system.StreamSpec;
29+
import org.apache.samza.task.TaskContext;
30+
import org.apache.samza.util.SystemClock;
31+
import org.junit.Test;
32+
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
import static junit.framework.Assert.assertEquals;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.when;
39+
40+
public class TestOperatorImplGraph {
41+
42+
@Test
43+
public void testOperatorGraphInitAndClose() {
44+
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
45+
StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
46+
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
47+
StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
48+
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
49+
50+
Config mockConfig = mock(Config.class);
51+
TaskContext mockContext = createMockContext();
52+
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
53+
54+
List<String> initializationOrder = new ArrayList<>();
55+
List<String> finalizationOrder = new ArrayList<>();
56+
57+
MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
58+
MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
59+
60+
inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
61+
.map(createMapFunction("2", initializationOrder, finalizationOrder));
62+
63+
inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
64+
.map(createMapFunction("4", initializationOrder, finalizationOrder));
65+
66+
OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
67+
68+
// Assert that initialization occurs in topological order.
69+
implGraph.init(graph, mockConfig, mockContext);
70+
assertEquals(initializationOrder.get(0), "1");
71+
assertEquals(initializationOrder.get(1), "2");
72+
assertEquals(initializationOrder.get(2), "3");
73+
assertEquals(initializationOrder.get(3), "4");
74+
75+
// Assert that finalization occurs in reverse topological order.
76+
implGraph.close();
77+
assertEquals(finalizationOrder.get(0), "4");
78+
assertEquals(finalizationOrder.get(1), "3");
79+
assertEquals(finalizationOrder.get(2), "2");
80+
assertEquals(finalizationOrder.get(3), "1");
81+
}
82+
83+
private TaskContext createMockContext() {
84+
TaskContext mockContext = mock(TaskContext.class);
85+
when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
86+
return mockContext;
87+
}
88+
89+
/**
90+
* Creates an identity map function that appends to the provided lists when init/close is invoked.
91+
*/
92+
private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
93+
return new MapFunction<Object, Object>() {
94+
@Override
95+
public void init(Config config, TaskContext context) {
96+
initializationOrder.add(id);
97+
}
98+
99+
@Override
100+
public void close() {
101+
finalizationOrder.add(id);
102+
}
103+
104+
@Override
105+
public Object apply(Object message) {
106+
return message;
107+
}
108+
};
109+
}
110+
}
111+

0 commit comments

Comments
 (0)