Skip to content

Commit 1335a6d

Browse files
committed
Join docs
1 parent 55dbfbc commit 1335a6d

File tree

1 file changed

+228
-1
lines changed

1 file changed

+228
-1
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 228 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1051,7 +1051,16 @@ output mode.
10511051

10521052

10531053
### Join Operations
1054-
Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples.
1054+
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
1055+
as well as another streaming Dataset/DataFrame. In this section we will explore what type of joins
1056+
(i.e. inner, outer, etc.) are supported in the above cases. Note that in all the supported join
1057+
types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as
1058+
if it was with a static Dataset/DataFrame containing the same data in the stream.
1059+
1060+
#### Stream-static joins
1061+
1062+
Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
1063+
type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
10551064

10561065
<div class="codetabs">
10571066
<div data-lang="scala" markdown="1">
@@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
10891098
</div>
10901099
</div>
10911100

1101+
Note that stream-static joins are not stateful, so no state management is necessary.
1102+
However, a few types of stream-static outer join are not supported as the incomplete view of
1103+
all data in a stream makes it infeasible to calculate the results correctly.
1104+
These are discussed at the end of this section.
1105+
1106+
#### Stream-stream Joins
1107+
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
1108+
Datasets/DataFrames. The challenge of generating join results between two data streams is that,
1109+
at any point of time, the view of the dataset is incomplete for both sides of the join making
1110+
it much harder to find matches between inputs. Any row received from one input stream can match
1111+
with any future, yet-to-be-received row from the other input stream. Hence, for both the input
1112+
streams, we buffer past input as streaming state, so that we can match every future input with
1113+
past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
1114+
we automatically handle late, out-of-order data and can limit the state using watermarks.
1115+
Let’s discuss the different types of supported stream-stream joins and how to use them.
1116+
1117+
##### Inner Joins with optional Watermarking
1118+
Inner joins on any kind of columns along with any kind of join conditions are supported.
1119+
However, as the stream runs, the size of streaming state will keep growing indefinitely as
1120+
*all* past input must be saved as the any new input can match with any input from the past.
1121+
To avoid unbounded state, you have to define additional join conditions such that indefinitely
1122+
old inputs cannot match with future inputs and therefore can be cleared from the state.
1123+
In other words, you will have to do the following additional steps in the join.
1124+
1125+
1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
1126+
(similar to streaming aggregations)
1127+
1128+
1. Define a constraint on event-time across the two inputs such that the engine can figure out when
1129+
old rows of one input is not going to be required for matches with the other input. This constraint
1130+
can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
1131+
or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
1132+
Let’s understand this with an example.
1133+
1134+
Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
1135+
another stream of user clicks on advertisements to correlate when impressions led to
1136+
monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
1137+
specify the watermarking delays and the time constraints as follows.
1138+
1139+
1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
1140+
in event-time by at most 2 and 3 hours, respectively.
1141+
1142+
1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
1143+
after the corresponding impression.
1144+
1145+
The code would look like this.
1146+
1147+
<div class="codetabs">
1148+
<div data-lang="scala" markdown="1">
1149+
1150+
{% highlight scala %}
1151+
import org.apache.spark.sql.functions.expr
1152+
1153+
val impressions = spark.readStream. ...
1154+
val clicks = spark.readStream. ...
1155+
1156+
// Apply watermarks on event-time columns
1157+
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
1158+
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
1159+
1160+
// Join with event-time constraints
1161+
impressionsWithWatermark.join(
1162+
clicksWithWatermark,
1163+
expr("""
1164+
clickAdId = impressionAdId AND
1165+
clickTime >= impressionTime AND
1166+
clickTime <= impressionTime + interval 1 hour
1167+
"""
1168+
))
1169+
1170+
{% endhighlight %}
1171+
1172+
</div>
1173+
<div data-lang="java" markdown="1">
1174+
1175+
{% highlight java %}
1176+
import static org.apache.spark.sql.functions.expr
1177+
1178+
Dataset<Row> impressions = spark.readStream(). ...
1179+
Dataset<Row> clicks = spark.readStream(). ...
1180+
1181+
// Apply watermarks on event-time columns
1182+
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
1183+
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
1184+
1185+
// Join with event-time constraints
1186+
impressionsWithWatermark.join(
1187+
clicksWithWatermark,
1188+
expr(
1189+
"clickAdId = impressionAdId AND " +
1190+
"clickTime >= impressionTime AND " +
1191+
"clickTime <= impressionTime + interval 1 hour "
1192+
));
1193+
1194+
{% endhighlight %}
1195+
1196+
1197+
</div>
1198+
<div data-lang="python" markdown="1">
1199+
1200+
{% highlight python %}
1201+
from pyspark.sql.functions import expr
1202+
1203+
impressions = spark.readStream. ...
1204+
clicks = spark.readStream. ...
1205+
1206+
# Apply watermarks on event-time columns
1207+
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
1208+
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
1209+
1210+
# Join with event-time constraints
1211+
impressionsWithWatermark.join(
1212+
clicksWithWatermark,
1213+
expr("""
1214+
clickAdId = impressionAdId AND
1215+
clickTime >= impressionTime AND
1216+
clickTime <= impressionTime + interval 1 hour
1217+
"""
1218+
))
1219+
1220+
{% endhighlight %}
1221+
1222+
</div>
1223+
</div>
1224+
1225+
##### Outer Joins with Watermarking
1226+
While the watermark + event-time constraints is optional for inner joins, for left and right outer
1227+
joins they must be specified. This is because for generating the NULL results in outer join, the
1228+
engine must know when an input row is not going to match with anything in future. Hence, the
1229+
watermark + event-time constraints must be specified for generating correct results.
1230+
1231+
However, note that the outer NULL results will be generated with a delay (depends on the specified
1232+
watermark delay and the time range condition) because the engine has to wait for that long to ensure
1233+
there were no matches and there will be no more matches in future.
1234+
1235+
##### Support matrix for joins on streaming data
1236+
1237+
<table class ="table">
1238+
<tr>
1239+
<th>Left Input</th>
1240+
<th>Right Input</th>
1241+
<th>Join Type</th>
1242+
<th></th>
1243+
</tr>
1244+
<tr>
1245+
<td rowspan="4" style="vertical-align: middle;">
1246+
Stream</td>
1247+
<td rowspan="4" style="vertical-align: middle;">
1248+
Static</td>
1249+
<td style="vertical-align: middle;">Inner</td>
1250+
<td style="vertical-align: middle;">Supported, not stateful</td>
1251+
</tr>
1252+
<tr>
1253+
<td style="vertical-align: middle;">Left Outer</td>
1254+
<td style="vertical-align: middle;">Supported, not stateful</td>
1255+
</tr>
1256+
<tr>
1257+
<td style="vertical-align: middle;">Right Outer</td>
1258+
<td style="vertical-align: middle;">Not supported</td>
1259+
</tr>
1260+
<tr>
1261+
<td style="vertical-align: middle;">Full Outer</td>
1262+
<td style="vertical-align: middle;">Not supported</td>
1263+
</tr>
1264+
<tr>
1265+
<td rowspan="4" style="vertical-align: middle;">
1266+
Static</td>
1267+
<td rowspan="4" style="vertical-align: middle;">
1268+
Stream</td>
1269+
<td style="vertical-align: middle;">Inner</td>
1270+
<td style="vertical-align: middle;">Supported, not stateful</td>
1271+
</tr>
1272+
<tr>
1273+
<td style="vertical-align: middle;">Left Outer</td>
1274+
<td style="vertical-align: middle;">Not supported</td>
1275+
</tr>
1276+
<tr>
1277+
<td style="vertical-align: middle;">Right Outer</td>
1278+
<td style="vertical-align: middle;">Supported, not stateful</td>
1279+
</tr>
1280+
<tr>
1281+
<td style="vertical-align: middle;">Full Outer</td>
1282+
<td style="vertical-align: middle;">Not supported</td>
1283+
</tr>
1284+
<tr>
1285+
<td rowspan="4" style="vertical-align: middle;">
1286+
Stream</td>
1287+
<td rowspan="4" style="vertical-align: middle;">
1288+
Stream</td>
1289+
<td style="vertical-align: middle;">Inner</td>
1290+
<td style="vertical-align: middle;">Supported, optionally specify watermark on both sides + time constraints for state cleanup</td>
1291+
</tr>
1292+
<tr>
1293+
<td style="vertical-align: middle;">Left Outer</td>
1294+
<td style="vertical-align: middle;">Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup</td>
1295+
</tr>
1296+
<tr>
1297+
<td style="vertical-align: middle;">Right Outer</td>
1298+
<td style="vertical-align: middle;">Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup</td>
1299+
</tr>
1300+
<tr>
1301+
<td style="vertical-align: middle;">Full Outer</td>
1302+
<td style="vertical-align: middle;">Not supported</td>
1303+
</tr>
1304+
<tr>
1305+
<td></td>
1306+
<td></td>
1307+
<td></td>
1308+
<td></td>
1309+
</tr>
1310+
</table>
1311+
1312+
Additional details on supported joins:
1313+
1314+
- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`.
1315+
1316+
- As of Spark 2.3, joins can be used only when the query is in Append output mode. Other output modes are not yet supported.
1317+
1318+
10921319
### Streaming Deduplication
10931320
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
10941321

0 commit comments

Comments
 (0)