Skip to content

Commit 47f85be

Browse files
eason-yuchen-liuHeartSaVioR
authored andcommitted
[SPARK-48850][DOCS][SS][SQL] Add documentation for new options added to State Data Source
### What changes were proposed in this pull request? In #46944 and #47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <yuchen.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 71cf25e commit 47f85be

File tree

1 file changed

+112
-3
lines changed

1 file changed

+112
-3
lines changed

docs/structured-streaming-state-data-source.md

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ Users can read an instance of state store, which is matched to a single stateful
4242
Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and
4343
provides a user-friendly approach to read the state. See the section for stream-stream join for more details.
4444

45-
### Creating a State store for Batch Queries (all defaults)
45+
### Creating a state store for batch queries (all defaults)
4646

4747
<div class="codetabs">
4848

@@ -144,16 +144,126 @@ The following configurations are optional:
144144
<td>(none)</td>
145145
<td>Represents the target side to read from. This option is used when users want to read the state from stream-stream join.</td>
146146
</tr>
147+
<tr>
148+
<td>snapshotStartBatchId</td>
149+
<td>numeric value</td>
150+
<td></td>
151+
<td>If specified, force to read the snapshot at this batch ID, then changelogs will be replayed until 'batchId' or its default. Note that snapshot batch ID starts with 0 and equals to snapshot version ID minus 1. This option must be used together with 'snapshotPartitionId'.</td>
152+
</tr>
153+
<tr>
154+
<td>snapshotPartitionId</td>
155+
<td>numeric value</td>
156+
<td></td>
157+
<td>If specified, only this specific partition will be read. Note that partition ID starts with 0. This option must be used together with 'snapshotStartBatchId'.</td>
158+
</tr>
159+
<tr>
160+
<td>readChangeFeed</td>
161+
<td>boolean</td>
162+
<td>false</td>
163+
<td>If set to true, will read the change of state over microbatches. The output table schema will also differ. Details can be found in section <a href="#reading-state-changes-over-microbatches">"Reading state changes over microbatches"</a>. Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId' and 'snapshotPartitionId' cannot be used together with this option.</td>
164+
</tr>
165+
<tr>
166+
<td>changeStartBatchId</td>
167+
<td>numeric value</td>
168+
<td></td>
169+
<td>Represents the first batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true.</td>
170+
</tr>
171+
<tr>
172+
<td>changeEndBatchId</td>
173+
<td>numeric value</td>
174+
<td>latest commited batchId</td>
175+
<td>Represents the last batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true.</td>
176+
</tr>
147177
</table>
148178

149-
### Reading state for Stream-stream join
179+
180+
### Reading state for stream-stream join
150181

151182
Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally.
152183
These instances logically compose buffers to store the input rows for left and right.
153184

154185
Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join.
155186
To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together.
156187

188+
### Reading state changes over microbatches
189+
190+
If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, 'readChangeFeed' is the option to use.
191+
For example, this is the code to read the change of state from batch 2 to the latest committed batch.
192+
193+
<div class="codetabs">
194+
195+
<div data-lang="python" markdown="1">
196+
{% highlight python %}
197+
198+
df = spark \
199+
.read \
200+
.format("statestore") \
201+
.option("readChangeFeed", true) \
202+
.option("changeStartBatchId", 2) \
203+
.load("<checkpointLocation>")
204+
205+
{% endhighlight %}
206+
</div>
207+
208+
<div data-lang="scala" markdown="1">
209+
{% highlight scala %}
210+
211+
val df = spark
212+
.read
213+
.format("statestore")
214+
.option("readChangeFeed", true)
215+
.option("changeStartBatchId", 2)
216+
.load("<checkpointLocation>")
217+
218+
{% endhighlight %}
219+
</div>
220+
221+
<div data-lang="java" markdown="1">
222+
{% highlight java %}
223+
224+
Dataset<Row> df = spark
225+
.read()
226+
.format("statestore")
227+
.option("readChangeFeed", true)
228+
.option("changeStartBatchId", 2)
229+
.load("<checkpointLocation>");
230+
231+
{% endhighlight %}
232+
</div>
233+
234+
</div>
235+
236+
The output schema will also be different from the normal output.
237+
238+
<table>
239+
<thead><tr><th>Column</th><th>Type</th><th>Note</th></tr></thead>
240+
<tr>
241+
<td>batch_id</td>
242+
<td>long</td>
243+
<td></td>
244+
</tr>
245+
<tr>
246+
<td>change_type</td>
247+
<td>string</td>
248+
<td>There are two possible values: 'update' and 'delete'. Update represents either inserting a non-existing key-value pair or updating an existing key with new value. The 'value' field will be null for delete records.</td>
249+
</tr>
250+
<tr>
251+
<td>key</td>
252+
<td>struct (depends on the type for state key)</td>
253+
<td></td>
254+
</tr>
255+
<tr>
256+
<td>value</td>
257+
<td>struct (depends on the type for state value)</td>
258+
<td></td>
259+
</tr>
260+
<tr>
261+
<td>partition_id</td>
262+
<td>int</td>
263+
<td></td>
264+
</tr>
265+
</table>
266+
157267
## State metadata source
158268

159269
Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.
@@ -179,7 +289,6 @@ df = spark \
179289

180290
<div data-lang="scala" markdown="1">
181291
{% highlight scala %}
182-
183292
val df = spark
184293
.read
185294
.format("state-metadata")

0 commit comments

Comments
 (0)