|
27 | 27 | import com.google.cloud.dataflow.sdk.options.Validation;
|
28 | 28 | import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
|
29 | 29 | import com.google.cloud.dataflow.sdk.transforms.Aggregator;
|
| 30 | +import com.google.cloud.dataflow.sdk.transforms.Combine; |
30 | 31 | import com.google.cloud.dataflow.sdk.transforms.DoFn;
|
31 | 32 | import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
|
32 | 33 | import com.google.cloud.dataflow.sdk.transforms.MapElements;
|
@@ -309,26 +310,26 @@ public void processElement(ProcessContext c) {
|
309 | 310 |
|
310 | 311 |
|
311 | 312 | // [START DocInclude_SessionCalc]
|
312 |
| - // Calculate the total score for the users per session-- that is, a burst of activity |
313 |
| - // separated by a gap from further activity. Find and record the mean session lengths. |
| 313 | + // Detect user sessions-- that is, a burst of activity separated by a gap from further |
| 314 | + // activity. Find and record the mean session lengths. |
314 | 315 | // This information could help the game designers track the changing user engagement
|
315 | 316 | // as their set of games changes.
|
316 | 317 | userEvents
|
317 | 318 | .apply(Window.named("WindowIntoSessions")
|
318 | 319 | .<KV<String, Integer>>into(
|
319 | 320 | Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
|
320 |
| - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()) |
321 |
| - .withAllowedLateness(Duration.ZERO)) |
322 |
| - .apply("UserSessionSum", Sum.<String>integersPerKey()) |
| 321 | + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) |
| 322 | + // For this use, we care only about the existence of the session, not any particular |
| 323 | + // information aggregated over it, so the following is an efficient way to do that. |
| 324 | + .apply(Combine.perKey(x -> 0)) |
323 | 325 | // Get the duration per session.
|
324 | 326 | .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
|
325 | 327 | // [END DocInclude_SessionCalc]
|
326 | 328 | // [START DocInclude_Rewindow]
|
327 | 329 | // Re-window to process groups of session sums according to when the sessions complete.
|
328 | 330 | .apply(Window.named("WindowToExtractSessionMean")
|
329 | 331 | .<Integer>into(
|
330 |
| - FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))) |
331 |
| - .withAllowedLateness(Duration.ZERO)) |
| 332 | + FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) |
332 | 333 | // Find the mean session duration in each window.
|
333 | 334 | .apply(Mean.<Integer>globally().withoutDefaults())
|
334 | 335 | // Write this info to a BigQuery table.
|
|
0 commit comments