Skip to content

Commit 1cb97cd

Browse files
committed
Update based on JingsongLi's comments
1 parent e7a99f5 commit 1cb97cd

File tree

2 files changed

+22
-31
lines changed

2 files changed

+22
-31
lines changed

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.flink.table.runtime.generated.JoinCondition;
4343
import org.apache.flink.table.runtime.operators.TableStreamOperator;
4444
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
45+
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
46+
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
4547
import org.apache.flink.table.runtime.operators.window.state.WindowListState;
4648
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
4749
import org.apache.flink.types.RowKind;
@@ -51,13 +53,14 @@
5153
import java.util.List;
5254

5355
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
54-
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
5556

5657
/**
5758
* Streaming window join operator.
5859
*
5960
* <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus
6061
* late elements (elements belong to emitted windows) will be simply dropped.
62+
*
63+
* <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row.
6164
*/
6265
public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
6366
implements TwoInputStreamOperator<RowData, RowData, RowData>,
@@ -91,7 +94,7 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
9194
/** Flag to prevent duplicate function.close() calls in close() and dispose(). */
9295
private transient boolean functionsClosed = false;
9396

94-
private transient InternalTimerService<Long> internalTimerService;
97+
private transient WindowTimerService<Long> windowTimerService;
9598

9699
// ------------------------------------------------------------------------
97100
protected transient JoinConditionWithNullFilters joinCondition;
@@ -139,7 +142,9 @@ public void open() throws Exception {
139142

140143
final LongSerializer windowSerializer = LongSerializer.INSTANCE;
141144

142-
internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
145+
InternalTimerService<Long> internalTimerService =
146+
getInternalTimerService("window-timers", windowSerializer, this);
147+
this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone);
143148

144149
// init join condition
145150
JoinCondition condition =
@@ -178,11 +183,11 @@ public void open() throws Exception {
178183
metrics.gauge(
179184
WATERMARK_LATENCY_METRIC_NAME,
180185
() -> {
181-
long watermark = internalTimerService.currentWatermark();
186+
long watermark = windowTimerService.currentWatermark();
182187
if (watermark < 0) {
183188
return 0L;
184189
} else {
185-
return internalTimerService.currentProcessingTime() - watermark;
190+
return windowTimerService.currentProcessingTime() - watermark;
186191
}
187192
});
188193
}
@@ -227,19 +232,19 @@ private void processElement(
227232
throws Exception {
228233
RowData inputRow = element.getValue();
229234
long windowEnd = inputRow.getLong(windowEndIndex);
230-
if (isWindowFired(windowEnd, internalTimerService.currentWatermark(), shiftTimeZone)) {
235+
if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) {
231236
// element is late and should be dropped
232237
lateRecordsDroppedRate.markEvent();
233238
return;
234239
}
235240
if (RowDataUtil.isAccumulateMsg(inputRow)) {
236241
recordState.add(windowEnd, inputRow);
237242
} else {
238-
recordState.delete(windowEnd, inputRow);
243+
throw new UnsupportedOperationException(
244+
"Currently, window join doesn't support DELETE or UPDATE_BEFORE input row.");
239245
}
240246
// always register time for every element
241-
internalTimerService.registerEventTimeTimer(
242-
windowEnd, toEpochMillsForTimer(windowEnd - 1, shiftTimeZone));
247+
windowTimerService.registerEventTimeWindowTimer(windowEnd);
243248
}
244249

245250
@Override
@@ -372,6 +377,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)
372377

373378
private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator {
374379

380+
private static final long serialVersionUID = 1L;
381+
375382
private transient RowData leftNullRow;
376383
private transient RowData rightNullRow;
377384
private transient JoinedRowData outRow;
@@ -431,6 +438,8 @@ protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) {
431438

432439
static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
433440

441+
private static final long serialVersionUID = 1L;
442+
434443
LeftOuterJoinOperator(
435444
InternalTypeInfo leftType,
436445
InternalTypeInfo rightType,
@@ -476,6 +485,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)
476485

477486
static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
478487

488+
private static final long serialVersionUID = 1L;
489+
479490
RightOuterJoinOperator(
480491
InternalTypeInfo leftType,
481492
InternalTypeInfo rightType,
@@ -520,6 +531,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)
520531

521532
static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
522533

534+
private static final long serialVersionUID = 1L;
535+
523536
FullOuterJoinOperator(
524537
InternalTypeInfo leftType,
525538
InternalTypeInfo rightType,

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,26 +58,4 @@ public void add(W window, RowData value) throws Exception {
5858
windowState.setCurrentNamespace(window);
5959
windowState.add(value);
6060
}
61-
62-
/**
63-
* Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the
64-
* list of values. The next time {@link #get(W)} is called (for the same state partition) the
65-
* returned state will represent the updated list.
66-
*
67-
* <p>If null is passed in, the state value will remain unchanged.
68-
*
69-
* <p>The performance is not well, first get complete list by calling {@link
70-
* InternalListState#getInternal()})}, then remove the value from list, finally update state by
71-
* calling {@link InternalListState#update(List)}.
72-
*
73-
* @param window The namespace for the state.
74-
* @param value The new value for the state.
75-
* @throws Exception Thrown if the system cannot access the state.
76-
*/
77-
public boolean delete(W window, RowData value) throws Exception {
78-
List<RowData> completeData = get(window);
79-
boolean flag = completeData.remove(value);
80-
windowState.updateInternal(completeData);
81-
return flag;
82-
}
8361
}

0 commit comments

Comments
 (0)