Skip to content

Commit 3b8dc03

Browse files
authored
Propogating the system messages to the stream. (apache#937)
* Adding system messages to the stream * checkstyle fixes
1 parent 2a32e43 commit 3b8dc03

File tree

8 files changed

+133
-40
lines changed

8 files changed

+133
-40
lines changed

samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.apache.samza.sql.data;
2121

2222
import java.io.Serializable;
23+
import java.time.Instant;
24+
import org.codehaus.jackson.annotate.JsonIgnore;
2325
import org.codehaus.jackson.annotate.JsonProperty;
2426

2527

@@ -36,9 +38,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
3638
public boolean isNewInputMessage = true;
3739

3840
/**
39-
*
41+
* Indicates whether the SamzaSqlMessage is a system message or not.
4042
*/
41-
public String operatorBeginProcessingInstant = null;
43+
@JsonIgnore
44+
private boolean isSystemMessage = false;
45+
46+
/**
47+
* Time at which the join operation started for the message.
48+
* If there is no join node in the operator graph, this will be -1.
49+
*/
50+
public long joinStartTimeMs = -1;
4251

4352

4453
/**
@@ -93,7 +102,6 @@ public void setArrivalTime(String arrivalTime) {
93102

94103
public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); }
95104

96-
97105
@JsonProperty("scanTime")
98106
public String getscanTime() { return scanTime;}
99107

@@ -103,6 +111,16 @@ public void setScanTime(String scanTime) {
103111

104112
public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); }
105113

114+
@JsonIgnore
115+
public void setIsSystemMessage(boolean isSystemMessage) {
116+
this.isSystemMessage = isSystemMessage;
117+
}
118+
119+
@JsonIgnore
120+
public boolean isSystemMessage() {
121+
return isSystemMessage;
122+
}
123+
106124
@Override
107125
public String toString() {
108126
return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]";

samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class SamzaSqlApplicationConfig {
8989

9090
public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
9191
public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
92+
public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents";
9293

9394
public static final String SAMZA_SYSTEM_LOG = "log";
9495

@@ -115,6 +116,7 @@ public class SamzaSqlApplicationConfig {
115116

116117
private final String metadataTopicPrefix;
117118
private final long windowDurationMs;
119+
private final boolean processSystemEvents;
118120

119121
public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
120122
List<String> outputSystemStreams) {
@@ -165,6 +167,8 @@ public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemSt
165167

166168
metadataTopicPrefix =
167169
staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
170+
171+
processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
168172
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
169173
}
170174

@@ -324,4 +328,8 @@ public String getMetadataTopicPrefix() {
324328
public long getWindowDurationMs() {
325329
return windowDurationMs;
326330
}
331+
332+
public boolean isProcessSystemEvents() {
333+
return processSystemEvents;
334+
}
327335
}

samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,5 +193,4 @@ void translate(final Project project, final String logicalOpId, final Translator
193193
context.registerMessageStream(project.getId(), outputStream);
194194
context.registerRelNode(project.getId(), project);
195195
}
196-
197196
}

samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.samza.operators.KV;
5151
import org.apache.samza.operators.MessageStream;
5252
import org.apache.samza.operators.OutputStream;
53+
import org.apache.samza.operators.functions.FilterFunction;
5354
import org.apache.samza.operators.functions.MapFunction;
5455
import org.apache.samza.serializers.KVSerde;
5556
import org.apache.samza.serializers.NoOpSerde;
@@ -293,6 +294,17 @@ private void sendToOutputStream(String queryLogicalId, String logicalOpId, Strin
293294
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
294295
OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
295296
outputStream.sendTo(stm);
297+
298+
// Process system events only if the output is a stream.
299+
if (sqlConfig.isProcessSystemEvents()) {
300+
for( MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) {
301+
MessageStream<KV<Object, Object>> systemEventStream =
302+
inputStream.filter(message -> message.getMetadata().isSystemMessage())
303+
.map(SamzaSqlInputMessage::getKeyAndMessageKV);
304+
305+
systemEventStream.sendTo(stm);
306+
}
307+
}
296308
} else {
297309
Table outputTable = appDesc.getTable(tableDescriptor.get());
298310
if (outputTable == null) {

samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing,
13-
* software distributed under the License is distributed on an
14-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
* KIND, either express or implied. See the License for the
16-
* specific language governing permissions and limitations
17-
* under the License.
18-
*/
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
1919

2020
package org.apache.samza.sql.translator;
2121

@@ -36,8 +36,8 @@
3636
import org.apache.samza.operators.functions.FilterFunction;
3737
import org.apache.samza.operators.functions.MapFunction;
3838
import org.apache.samza.serializers.NoOpSerde;
39-
import org.apache.samza.sql.SamzaSqlInputTransformer;
4039
import org.apache.samza.sql.SamzaSqlInputMessage;
40+
import org.apache.samza.sql.SamzaSqlInputTransformer;
4141
import org.apache.samza.sql.data.SamzaSqlRelMessage;
4242
import org.apache.samza.sql.interfaces.SamzaRelConverter;
4343
import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -48,6 +48,7 @@
4848
import org.apache.samza.table.descriptors.CachingTableDescriptor;
4949
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
5050

51+
5152
/**
5253
* Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
5354
* implementation
@@ -78,7 +79,7 @@ public void init(Context context) {
7879

7980
@Override
8081
public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
81-
return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
82+
return !samzaSqlInputMessage.getMetadata().isSystemMessage();
8283
}
8384
}
8485

@@ -147,11 +148,11 @@ private void updateMetrics(Instant startProcessing, Instant endProcessing) {
147148
queryInputEvents.inc();
148149
processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
149150
}
150-
151151
} // ScanMapFunction
152152

153-
void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
154-
Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
153+
void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId,
154+
final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors,
155+
Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
155156
StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
156157
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
157158
String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -162,9 +163,9 @@ void translate(final TableScan tableScan, final String queryLogicalId, final Str
162163
final String streamId = sqlIOConfig.getStreamId();
163164
final String source = sqlIOConfig.getSource();
164165

165-
final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
166-
(sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
167-
sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
166+
final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && (
167+
sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor || sqlIOConfig.getTableDescriptor()
168+
.get() instanceof CachingTableDescriptor);
168169

169170
// For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
170171
// SqlIOResolverFactory.
@@ -181,22 +182,55 @@ void translate(final TableScan tableScan, final String queryLogicalId, final Str
181182
systemDescriptors.put(systemName, systemDescriptor);
182183
} else {
183184
/* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only
184-
* in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
185-
* used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
185+
* in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
186+
* used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
186187
if (systemDescriptor.getTransformer().isPresent()) {
187188
InputTransformer existingTransformer = systemDescriptor.getTransformer().get();
188189
if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
189-
throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
190+
throw new SamzaException(
191+
"SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
190192
}
191193
}
192194
}
193195

194196
InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
195-
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
196-
inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor))
197-
.filter(new FilterSystemMessageFunction(sourceName, queryId))
198-
.map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
197+
198+
if (!inputMsgStreams.containsKey(source)) {
199+
MessageStream<SamzaSqlInputMessage> inputMsgStream = streamAppDesc.getInputStream(inputDescriptor);
200+
inputMsgStreams.put(source, inputMsgStream.map(new SystemMessageMapperFunction(source, queryId)));
201+
}
202+
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputMsgStreams.get(source)
203+
.filter(new FilterSystemMessageFunction(sourceName, queryId))
204+
.map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
199205

200206
context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
201207
}
208+
209+
/**
210+
* Function that populates whether the message is a system message.
211+
* TODO This should ideally be populated by the InputTransformer in future.
212+
*/
213+
private static class SystemMessageMapperFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlInputMessage> {
214+
private final String source;
215+
private final int queryId;
216+
private transient SamzaRelConverter relConverter;
217+
218+
public SystemMessageMapperFunction(String source, int queryId) {
219+
this.source = source;
220+
this.queryId = queryId;
221+
}
222+
223+
@Override
224+
public void init(Context context) {
225+
TranslatorContext translatorContext =
226+
((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
227+
relConverter = translatorContext.getMsgConverter(source);
228+
}
229+
230+
@Override
231+
public SamzaSqlInputMessage apply(SamzaSqlInputMessage message) {
232+
message.getMetadata().setIsSystemMessage(relConverter.isSystemMessage(message.getKeyAndMessageKV()));
233+
return message;
234+
}
235+
}
202236
}

samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.samza.sql.translator;
2121

22-
import com.google.common.annotations.VisibleForTesting;
2322
import java.time.Instant;
2423
import org.apache.samza.context.ContainerContext;
2524
import org.apache.samza.context.Context;
@@ -60,7 +59,7 @@ public void init(Context context) {
6059
@Override
6160
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
6261
inputEvents.inc();
63-
message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString();
62+
message.getSamzaSqlRelMsgMetadata().joinStartTimeMs = Instant.now().toEpochMilli();
6463
return message;
6564
}
6665

samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void init(Context context) {
6363
@Override
6464
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
6565
Instant endProcessing = Instant.now();
66-
Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
66+
Instant beginProcessing = Instant.ofEpochMilli(message.getSamzaSqlRelMsgMetadata().joinStartTimeMs);
6767
outputEvents.inc();
6868
processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
6969
return message;

samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,29 @@ public void testEndToEndWithSystemMessages() {
103103
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
104104
runApplication(new MapConfig(staticConfigs));
105105

106+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
107+
.map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
108+
.sorted()
109+
.collect(Collectors.toList());
110+
Assert.assertEquals(numMessages, outMessages.size());
111+
}
112+
113+
@Test
114+
public void testEndToEndDisableSystemMessages() {
115+
int numMessages = 20;
116+
117+
TestAvroSystemFactory.messages.clear();
118+
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
119+
String avroSamzaToRelMsgConverterDomain =
120+
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
121+
staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
122+
SampleRelConverterFactory.class.getName());
123+
String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
124+
List<String> sqlStmts = Arrays.asList(sql);
125+
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
126+
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, "false");
127+
runApplication(new MapConfig(staticConfigs));
128+
106129
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
107130
.map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
108131
.sorted()
@@ -174,7 +197,7 @@ public void testEndToEndMultiSqlStmts() {
174197
Assert.assertEquals(numMessages, outMessagesSet.size());
175198
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
176199
}
177-
200+
178201
@Test
179202
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
180203
int numMessages = 20;

0 commit comments

Comments
 (0)