Skip to content

Commit 10607f0

Browse files
cameronlee314prateekm
authored andcommitted
SAMZA-1880: Rename non-metrics classes which use Timer in their name
Summary of API changes: 1. TimerRegistry -> KeyScheduler; _register_ -> _schedule_ 2. TimerFunction -> SchedulingFunction; _registerTimer_ -> _schedulingInit_, _onTimer_ -> _executeForKey_ 3. TimerCallback -> SchedulingCallback _onTimer_ -> _execute_ 4. TaskContext: _registerTimer_ -> _scheduleCallback_, _deleteTimer_ -> _deleteScheduledCallback_ Only terminology changes are intended (e.g. classes, var names, logs). No functionality change is intended. An upcoming PR will further update TaskContext and the access to the scheduling logic. Author: Cameron Lee <calee@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshwari@apaapache.org> Closes apache#644 from cameronlee314/rename_timer
1 parent 03410b8 commit 10607f0

36 files changed

+316
-299
lines changed

samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java renamed to samza-api/src/main/java/org/apache/samza/operators/Scheduler.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,20 @@
2020
package org.apache.samza.operators;
2121

2222
/**
23-
* Allows registering epoch-time timer callbacks from the operators.
24-
* See {@link org.apache.samza.operators.functions.TimerFunction} for details.
25-
* @param <K> type of the timer key
23+
* Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
24+
* @param <K> type of the key to schedule
2625
*/
27-
public interface TimerRegistry<K> {
28-
26+
public interface Scheduler<K> {
2927
/**
30-
* Register a epoch-time timer with key.
31-
* @param key unique timer key
32-
* @param timestamp epoch time when the timer will be fired, in milliseconds
28+
* Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
29+
* @param key unique key associated with the callback to schedule
30+
* @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
3331
*/
34-
void register(K key, long timestamp);
32+
void schedule(K key, long timestamp);
3533

3634
/**
37-
* Delete the timer for the provided key.
38-
* @param key key for the timer to delete
35+
* Delete the scheduled callback for the provided {@code key}.
36+
* @param key key to delete
3937
*/
4038
void delete(K key);
4139
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.operators.functions;
21+
22+
import org.apache.samza.operators.Scheduler;
23+
24+
import java.util.Collection;
25+
26+
27+
/**
28+
* Allows scheduling a callback for a specific epoch-time.
29+
* Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
30+
* corresponding schedule time occurs.
31+
*
32+
* <p>
33+
* Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
34+
* <pre>{@code
35+
* public class ExampleScheduledFn implements FlatMapFunction<String, String>, ScheduledFunction<String, String> {
36+
* // for recurring callbacks, keep track of the scheduler from "schedule"
37+
* private Scheduler scheduler;
38+
*
39+
* public void schedule(Scheduler scheduler) {
40+
* // save the scheduler for recurring callbacks
41+
* this.scheduler = scheduler;
42+
* long time = System.currentTimeMillis() + 5000; // fire after 5 sec
43+
* scheduler.schedule("do-delayed-logic", time);
44+
* }
45+
* public Collection<String> apply(String s) {
46+
* ...
47+
* }
48+
* public Collection<String> onCallback(String key, long timestamp) {
49+
* // do some logic for key "do-delayed-logic"
50+
* ...
51+
* // for recurring callbacks, call the saved scheduler again
52+
* this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
53+
* }
54+
* }
55+
* }</pre>
56+
* @param <K> type of the key
57+
* @param <OM> type of the output
58+
*/
59+
public interface ScheduledFunction<K, OM> {
60+
/**
61+
* Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks.
62+
* @param scheduler used to specify the schedule time(s) and key(s)
63+
*/
64+
void schedule(Scheduler<K> scheduler);
65+
66+
/**
67+
* Returns the output from the scheduling logic corresponding to the key that was triggered.
68+
* @param key key corresponding to the callback that got invoked
69+
* @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch
70+
* @return {@link Collection} of output elements
71+
*/
72+
Collection<OM> onCallback(K key, long timestamp);
73+
}

samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

samza-api/src/main/java/org/apache/samza/task/TimerCallback.java renamed to samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.samza.task;
20+
package org.apache.samza.scheduler;
21+
22+
import org.apache.samza.task.MessageCollector;
23+
import org.apache.samza.task.TaskCoordinator;
24+
2125

2226
/**
23-
* The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
24-
* @param <K> type of the timer key
27+
* The callback that is invoked when its corresponding schedule time registered via
28+
* {@link org.apache.samza.task.TaskContext} is reached.
29+
* @param <K> type of the callback key
2530
*/
26-
public interface TimerCallback<K> {
31+
public interface ScheduledCallback<K> {
2732
/**
28-
* Invoked when the timer of key fires.
29-
* @param key timer key
33+
* Invoked when the corresponding schedule time is reached.
34+
* @param key key for callback
3035
* @param collector contains the means of sending message envelopes to the output stream.
3136
* @param coordinator manages execution of tasks.
3237
*/
33-
void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
38+
void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
3439
}

samza-api/src/main/java/org/apache/samza/task/TaskContext.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.samza.container.SamzaContainerContext;
2525
import org.apache.samza.container.TaskName;
2626
import org.apache.samza.metrics.MetricsRegistry;
27+
import org.apache.samza.scheduler.ScheduledCallback;
2728
import org.apache.samza.system.SystemStreamPartition;
2829
import org.apache.samza.table.Table;
2930

@@ -76,21 +77,21 @@ default Object getUserContext() {
7677
}
7778

7879
/**
79-
* Register a keyed timer with a callback of {@link TimerCallback} in this task.
80+
* Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
8081
* The callback will be invoked exclusively with any other operations for this task,
8182
* e.g. processing, windowing and commit.
82-
* @param key timer key
83-
* @param timestamp epoch time when the timer will be fired, in milliseconds
84-
* @param callback callback when the timer is fired
83+
* @param key key for the callback
84+
* @param timestamp epoch time when the callback will be fired, in milliseconds
85+
* @param callback callback to call when the {@code timestamp} is reached
8586
* @param <K> type of the key
8687
*/
87-
<K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
88+
<K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);
8889

8990
/**
90-
* Delete the keyed timer in this task.
91-
* Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
92-
* @param key timer key
91+
* Delete the scheduled {@code callback} for the {@code key}.
92+
* Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
93+
* @param key callback key
9394
* @param <K> type of the key
9495
*/
95-
<K> void deleteTimer(K key);
96+
<K> void deleteScheduledCallback(K key);
9697
}

samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.samza.system.SystemStreamPartition;
3030
import org.apache.samza.table.Table;
3131
import org.apache.samza.table.TableManager;
32-
import org.apache.samza.task.SystemTimerScheduler;
32+
import org.apache.samza.task.EpochTimeScheduler;
3333
import org.apache.samza.task.TaskContext;
34-
import org.apache.samza.task.TimerCallback;
34+
import org.apache.samza.scheduler.ScheduledCallback;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

@@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext {
5353
private final JobModel jobModel;
5454
private final StreamMetadataCache streamMetadataCache;
5555
private final Map<String, Object> objectRegistry = new HashMap<>();
56-
private final SystemTimerScheduler timerScheduler;
56+
private final EpochTimeScheduler timerScheduler;
5757

5858
private Object userContext = null;
5959

@@ -76,7 +76,7 @@ public TaskContextImpl(TaskName taskName,
7676
this.tableManager = tableManager;
7777
this.jobModel = jobModel;
7878
this.streamMetadataCache = streamMetadataCache;
79-
this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
79+
this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
8080
}
8181

8282
@Override
@@ -134,12 +134,12 @@ public Object getUserContext() {
134134
}
135135

136136
@Override
137-
public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
137+
public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
138138
timerScheduler.setTimer(key, timestamp, callback);
139139
}
140140

141141
@Override
142-
public <K> void deleteTimer(K key) {
142+
public <K> void deleteScheduledCallback(K key) {
143143
timerScheduler.deleteTimer(key);
144144
}
145145

@@ -159,7 +159,7 @@ public StreamMetadataCache getStreamMetadataCache() {
159159
return streamMetadataCache;
160160
}
161161

162-
public SystemTimerScheduler getTimerScheduler() {
162+
public EpochTimeScheduler getTimerScheduler() {
163163
return timerScheduler;
164164
}
165165
}

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.samza.container.TaskName;
2626
import org.apache.samza.job.model.ContainerModel;
2727
import org.apache.samza.job.model.TaskModel;
28-
import org.apache.samza.operators.TimerRegistry;
29-
import org.apache.samza.operators.functions.TimerFunction;
28+
import org.apache.samza.operators.Scheduler;
29+
import org.apache.samza.operators.functions.ScheduledFunction;
3030
import org.apache.samza.operators.functions.WatermarkFunction;
3131
import org.apache.samza.system.EndOfStreamMessage;
3232
import org.apache.samza.metrics.Counter;
@@ -439,19 +439,19 @@ final long getOutputWatermark() {
439439

440440
/**
441441
* Returns a registry which allows registering arbitrary system-clock timer with K-typed key.
442-
* The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)}
442+
* The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)}
443443
* for timer notifications.
444444
* @param <K> key type for the timer.
445-
* @return an instance of {@link TimerRegistry}
445+
* @return an instance of {@link Scheduler}
446446
*/
447-
<K> TimerRegistry<K> createOperatorTimerRegistry() {
448-
return new TimerRegistry<K>() {
447+
<K> Scheduler<K> createOperatorScheduler() {
448+
return new Scheduler<K>() {
449449
@Override
450-
public void register(K key, long time) {
451-
taskContext.registerTimer(key, time, (k, collector, coordinator) -> {
452-
final TimerFunction<K, RM> timerFn = getOperatorSpec().getTimerFn();
453-
if (timerFn != null) {
454-
final Collection<RM> output = timerFn.onTimer(key, time);
450+
public void schedule(K key, long time) {
451+
taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> {
452+
final ScheduledFunction<K, RM> scheduledFn = getOperatorSpec().getScheduledFn();
453+
if (scheduledFn != null) {
454+
final Collection<RM> output = scheduledFn.onCallback(key, time);
455455

456456
if (!output.isEmpty()) {
457457
output.forEach(rm ->
@@ -460,15 +460,15 @@ public void register(K key, long time) {
460460
}
461461
} else {
462462
throw new SamzaException(
463-
String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.",
463+
String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.",
464464
getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation()));
465465
}
466466
});
467467
}
468468

469469
@Override
470470
public void delete(K key) {
471-
taskContext.deleteTimer(key);
471+
taskContext.deleteScheduledCallback(key);
472472
}
473473
};
474474
}

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.samza.job.model.JobModel;
2929
import org.apache.samza.metrics.MetricsRegistry;
3030
import org.apache.samza.operators.KV;
31-
import org.apache.samza.operators.TimerRegistry;
31+
import org.apache.samza.operators.Scheduler;
3232
import org.apache.samza.operators.functions.JoinFunction;
3333
import org.apache.samza.operators.functions.PartialJoinFunction;
3434
import org.apache.samza.util.TimestampedValue;
@@ -172,9 +172,9 @@ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec
172172
operatorImpl.init(config, context);
173173
operatorImpl.registerInputStream(inputStream);
174174

175-
if (operatorSpec.getTimerFn() != null) {
176-
final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry();
177-
operatorSpec.getTimerFn().registerTimer(timerRegistry);
175+
if (operatorSpec.getScheduledFn() != null) {
176+
final Scheduler scheduler = operatorImpl.createOperatorScheduler();
177+
operatorSpec.getScheduledFn().schedule(scheduler);
178178
}
179179

180180
// Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).

samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public Collection<WindowPane<K, Object>> handleMessage(
183183

184184
@Override
185185
public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
186-
LOG.trace("Processing timer.");
186+
LOG.trace("Processing time triggers");
187187
List<WindowPane<K, Object>> results = new ArrayList<>();
188188
List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();
189189

0 commit comments

Comments
 (0)