Skip to content

Commit 242d844

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents ec7d840 + 10607f0 commit 242d844

39 files changed

+322
-333
lines changed

samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java renamed to samza-api/src/main/java/org/apache/samza/operators/Scheduler.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,20 @@
2020
package org.apache.samza.operators;
2121

2222
/**
23-
* Allows registering epoch-time timer callbacks from the operators.
24-
* See {@link org.apache.samza.operators.functions.TimerFunction} for details.
25-
* @param <K> type of the timer key
23+
* Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
24+
* @param <K> type of the key to schedule
2625
*/
27-
public interface TimerRegistry<K> {
28-
26+
public interface Scheduler<K> {
2927
/**
30-
* Register a epoch-time timer with key.
31-
* @param key unique timer key
32-
* @param timestamp epoch time when the timer will be fired, in milliseconds
28+
* Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
29+
* @param key unique key associated with the callback to schedule
30+
* @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
3331
*/
34-
void register(K key, long timestamp);
32+
void schedule(K key, long timestamp);
3533

3634
/**
37-
* Delete the timer for the provided key.
38-
* @param key key for the timer to delete
35+
* Delete the scheduled callback for the provided {@code key}.
36+
* @param key key to delete
3937
*/
4038
void delete(K key);
4139
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.operators.functions;
21+
22+
import org.apache.samza.operators.Scheduler;
23+
24+
import java.util.Collection;
25+
26+
27+
/**
28+
* Allows scheduling a callback for a specific epoch-time.
29+
* Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
30+
* corresponding schedule time occurs.
31+
*
32+
* <p>
33+
* Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
34+
* <pre>{@code
35+
* public class ExampleScheduledFn implements FlatMapFunction<String, String>, ScheduledFunction<String, String> {
36+
* // for recurring callbacks, keep track of the scheduler from "schedule"
37+
* private Scheduler scheduler;
38+
*
39+
* public void schedule(Scheduler scheduler) {
40+
* // save the scheduler for recurring callbacks
41+
* this.scheduler = scheduler;
42+
* long time = System.currentTimeMillis() + 5000; // fire after 5 sec
43+
* scheduler.schedule("do-delayed-logic", time);
44+
* }
45+
* public Collection<String> apply(String s) {
46+
* ...
47+
* }
48+
* public Collection<String> onCallback(String key, long timestamp) {
49+
* // do some logic for key "do-delayed-logic"
50+
* ...
51+
* // for recurring callbacks, call the saved scheduler again
52+
* this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
53+
* }
54+
* }
55+
* }</pre>
56+
* @param <K> type of the key
57+
* @param <OM> type of the output
58+
*/
59+
public interface ScheduledFunction<K, OM> {
60+
/**
61+
* Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks.
62+
* @param scheduler used to specify the schedule time(s) and key(s)
63+
*/
64+
void schedule(Scheduler<K> scheduler);
65+
66+
/**
67+
* Returns the output from the scheduling logic corresponding to the key that was triggered.
68+
* @param key key corresponding to the callback that got invoked
69+
* @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch
70+
* @return {@link Collection} of output elements
71+
*/
72+
Collection<OM> onCallback(K key, long timestamp);
73+
}

samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

samza-api/src/main/java/org/apache/samza/task/TimerCallback.java renamed to samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.samza.task;
20+
package org.apache.samza.scheduler;
21+
22+
import org.apache.samza.task.MessageCollector;
23+
import org.apache.samza.task.TaskCoordinator;
24+
2125

2226
/**
23-
* The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
24-
* @param <K> type of the timer key
27+
* The callback that is invoked when its corresponding schedule time registered via
28+
* {@link org.apache.samza.task.TaskContext} is reached.
29+
* @param <K> type of the callback key
2530
*/
26-
public interface TimerCallback<K> {
31+
public interface ScheduledCallback<K> {
2732
/**
28-
* Invoked when the timer of key fires.
29-
* @param key timer key
33+
* Invoked when the corresponding schedule time is reached.
34+
* @param key key for callback
3035
* @param collector contains the means of sending message envelopes to the output stream.
3136
* @param coordinator manages execution of tasks.
3237
*/
33-
void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
38+
void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
3439
}

samza-api/src/main/java/org/apache/samza/task/TaskContext.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.samza.container.SamzaContainerContext;
2525
import org.apache.samza.container.TaskName;
2626
import org.apache.samza.metrics.MetricsRegistry;
27+
import org.apache.samza.scheduler.ScheduledCallback;
2728
import org.apache.samza.system.SystemStreamPartition;
2829
import org.apache.samza.table.Table;
2930

@@ -76,21 +77,21 @@ default Object getUserContext() {
7677
}
7778

7879
/**
79-
* Register a keyed timer with a callback of {@link TimerCallback} in this task.
80+
* Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
8081
* The callback will be invoked exclusively with any other operations for this task,
8182
* e.g. processing, windowing and commit.
82-
* @param key timer key
83-
* @param timestamp epoch time when the timer will be fired, in milliseconds
84-
* @param callback callback when the timer is fired
83+
* @param key key for the callback
84+
* @param timestamp epoch time when the callback will be fired, in milliseconds
85+
* @param callback callback to call when the {@code timestamp} is reached
8586
* @param <K> type of the key
8687
*/
87-
<K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
88+
<K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);
8889

8990
/**
90-
* Delete the keyed timer in this task.
91-
* Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
92-
* @param key timer key
91+
* Delete the scheduled {@code callback} for the {@code key}.
92+
* Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
93+
* @param key callback key
9394
* @param <K> type of the key
9495
*/
95-
<K> void deleteTimer(K key);
96+
<K> void deleteScheduledCallback(K key);
9697
}

samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,12 @@ public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
6565
this.eventHubClientManagerFactory = eventHubClientManagerFactory;
6666
}
6767

68-
private String getNextOffset(String currentOffset) {
69-
// EventHub will return the first message AFTER the offset
70-
// that was specified in the fetch request.
71-
// If no such offset exists Eventhub will return an error.
72-
return String.valueOf(Long.parseLong(currentOffset) + 1);
73-
}
74-
7568
@Override
7669
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
77-
Map<SystemStreamPartition, String> results = new HashMap<>();
78-
79-
offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset)));
80-
return results;
70+
// In EventHubSystemConsumer#initializeEventHubsManagers, we exclude the offset that we specify. i.e.
71+
// we will only get the message after the checkpoint offset. Hence, by returning the same offset as the
72+
// "next" offset, we won't be reprocessing the same event.
73+
return offsets;
8174
}
8275

8376
// EventHubRuntimeInformation does not implement toString()

samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,13 +276,11 @@ private synchronized void initializeEventHubsManagers() {
276276
receiver = eventHubClientManager.getEventHubClient()
277277
.createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
278278
} else {
279-
// If the offset is less or equal to the newest offset in the system, it can be
280-
// used as the starting offset to receive from. EventHub will return the first
281-
// message AFTER the offset that was specified in the fetch request.
279+
// EventHub will return the first message AFTER the offset that was specified in the fetch request.
282280
// If no such offset exists Eventhub will return an error.
283281
receiver = eventHubClientManager.getEventHubClient()
284282
.createReceiverSync(consumerGroup, partitionId.toString(),
285-
EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
283+
EventPosition.fromOffset(offset, /* inclusiveFlag */false));
286284
}
287285

288286
receiver.setPrefetchCount(prefetchCount);

samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.samza.Partition;
2323
import org.apache.samza.system.SystemAdmin;
2424
import org.apache.samza.system.SystemStreamMetadata;
25-
import org.apache.samza.system.SystemStreamPartition;
2625
import org.apache.samza.system.eventhub.EventHubSystemFactory;
2726
import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
2827
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
@@ -31,7 +30,6 @@
3130
import org.junit.Ignore;
3231
import org.junit.Test;
3332

34-
import java.util.HashMap;
3533
import java.util.HashSet;
3634
import java.util.Map;
3735
import java.util.Set;
@@ -53,23 +51,6 @@ public void testOffsetComparison() {
5351
Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, EventHubSystemConsumer.END_OF_STREAM));
5452
}
5553

56-
@Test
57-
public void testGetNextOffset() {
58-
EventHubSystemFactory eventHubSystemFactory = new EventHubSystemFactory();
59-
SystemAdmin eventHubSystemAdmin = eventHubSystemFactory.getAdmin(SYSTEM_NAME,
60-
MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING));
61-
Map<SystemStreamPartition, String> offsets = new HashMap<>();
62-
SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(0));
63-
SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(2));
64-
offsets.put(ssp0, Integer.toString(0));
65-
offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM);
66-
67-
Map<SystemStreamPartition, String> updatedOffsets = eventHubSystemAdmin.getOffsetsAfter(offsets);
68-
Assert.assertEquals(offsets.size(), updatedOffsets.size());
69-
Assert.assertEquals("1", updatedOffsets.get(ssp0));
70-
Assert.assertEquals("0", updatedOffsets.get(ssp2));
71-
}
72-
7354
@Ignore("Integration Test")
7455
@Test
7556
public void testGetStreamMetadata() {

samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.samza.system.SystemStreamPartition;
3030
import org.apache.samza.table.Table;
3131
import org.apache.samza.table.TableManager;
32-
import org.apache.samza.task.SystemTimerScheduler;
32+
import org.apache.samza.task.EpochTimeScheduler;
3333
import org.apache.samza.task.TaskContext;
34-
import org.apache.samza.task.TimerCallback;
34+
import org.apache.samza.scheduler.ScheduledCallback;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

@@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext {
5353
private final JobModel jobModel;
5454
private final StreamMetadataCache streamMetadataCache;
5555
private final Map<String, Object> objectRegistry = new HashMap<>();
56-
private final SystemTimerScheduler timerScheduler;
56+
private final EpochTimeScheduler timerScheduler;
5757

5858
private Object userContext = null;
5959

@@ -76,7 +76,7 @@ public TaskContextImpl(TaskName taskName,
7676
this.tableManager = tableManager;
7777
this.jobModel = jobModel;
7878
this.streamMetadataCache = streamMetadataCache;
79-
this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
79+
this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
8080
}
8181

8282
@Override
@@ -134,12 +134,12 @@ public Object getUserContext() {
134134
}
135135

136136
@Override
137-
public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
137+
public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
138138
timerScheduler.setTimer(key, timestamp, callback);
139139
}
140140

141141
@Override
142-
public <K> void deleteTimer(K key) {
142+
public <K> void deleteScheduledCallback(K key) {
143143
timerScheduler.deleteTimer(key);
144144
}
145145

@@ -159,7 +159,7 @@ public StreamMetadataCache getStreamMetadataCache() {
159159
return streamMetadataCache;
160160
}
161161

162-
public SystemTimerScheduler getTimerScheduler() {
162+
public EpochTimeScheduler getTimerScheduler() {
163163
return timerScheduler;
164164
}
165165
}

0 commit comments

Comments
 (0)