Skip to content

Commit bf35e63

Browse files
jam01objectiser
authored andcommitted
Adds TracedScheduledExecutorService (#11)
1 parent f02c584 commit bf35e63

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.opentracing.contrib.concurrent;
2+
3+
import io.opentracing.Tracer;
4+
5+
import java.util.concurrent.Callable;
6+
import java.util.concurrent.ScheduledExecutorService;
7+
import java.util.concurrent.ScheduledFuture;
8+
import java.util.concurrent.TimeUnit;
9+
10+
/**
11+
* @author Jose Montoya
12+
*/
13+
public class TracedScheduledExecutorService extends TracedExecutorService implements ScheduledExecutorService {
14+
15+
private final ScheduledExecutorService delegate;
16+
17+
public TracedScheduledExecutorService(ScheduledExecutorService delegate, Tracer tracer) {
18+
super(delegate, tracer);
19+
this.delegate = delegate;
20+
}
21+
22+
@Override
23+
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
24+
return delegate.schedule(tracer.activeSpan() == null ? runnable :
25+
new TracedRunnable(runnable, tracer), delay, timeUnit);
26+
}
27+
28+
@Override
29+
public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit timeUnit) {
30+
return delegate.schedule(tracer.activeSpan() == null ? callable :
31+
new TracedCallable<T>(callable, tracer), delay, timeUnit);
32+
}
33+
34+
@Override
35+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) {
36+
return delegate.scheduleAtFixedRate(tracer.activeSpan() == null ? runnable :
37+
new TracedRunnable(runnable, tracer), initialDelay, period, timeUnit);
38+
}
39+
40+
@Override
41+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
42+
return delegate.scheduleWithFixedDelay(tracer.activeSpan() == null ? runnable :
43+
new TracedRunnable(runnable, tracer), initialDelay, delay, timeUnit);
44+
}
45+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package io.opentracing.contrib.concurrent;
2+
3+
import io.opentracing.mock.MockSpan;
4+
import org.junit.Test;
5+
6+
import java.util.concurrent.*;
7+
8+
import static org.junit.Assert.assertEquals;
9+
10+
/**
11+
* @author Jose Montoya
12+
*/
13+
public class TracedScheduledExecutorServiceTest extends AbstractConcurrentTest {
14+
private static final int NUMBER_OF_THREADS = 4;
15+
16+
protected ScheduledExecutorService toTraced(ScheduledExecutorService scheduledExecutorService) {
17+
return new TracedScheduledExecutorService(scheduledExecutorService, mockTracer);
18+
}
19+
20+
@Test
21+
public void scheduleRunnableTest() throws InterruptedException {
22+
ScheduledExecutorService executorService = toTraced(Executors.newScheduledThreadPool(NUMBER_OF_THREADS));
23+
24+
MockSpan parentSpan = mockTracer.buildSpan("foo").startManual();
25+
mockTracer.scopeManager().activate(parentSpan, true);
26+
executorService.schedule(new TestRunnable(), 300, TimeUnit.MILLISECONDS);
27+
28+
countDownLatch.await();
29+
assertParentSpan(parentSpan);
30+
assertEquals(1, mockTracer.finishedSpans().size());
31+
}
32+
33+
@Test
34+
public void scheduleCallableTest() throws InterruptedException {
35+
ScheduledExecutorService executorService = toTraced(Executors.newScheduledThreadPool(NUMBER_OF_THREADS));
36+
37+
MockSpan parentSpan = mockTracer.buildSpan("foo").startManual();
38+
mockTracer.scopeManager().activate(parentSpan, true);
39+
executorService.schedule(new TestCallable(), 300, TimeUnit.MILLISECONDS);
40+
41+
countDownLatch.await();
42+
assertParentSpan(parentSpan);
43+
assertEquals(1, mockTracer.finishedSpans().size());
44+
}
45+
46+
@Test
47+
public void scheduleAtFixedRateTest() throws InterruptedException {
48+
countDownLatch = new CountDownLatch(2);
49+
ScheduledExecutorService executorService = toTraced(Executors.newScheduledThreadPool(NUMBER_OF_THREADS));
50+
51+
MockSpan parentSpan = mockTracer.buildSpan("foo").startManual();
52+
mockTracer.scopeManager().activate(parentSpan, true);
53+
executorService.scheduleAtFixedRate(new TestRunnable(), 0, 300, TimeUnit.MILLISECONDS);
54+
55+
countDownLatch.await();
56+
executorService.shutdown();
57+
assertParentSpan(parentSpan);
58+
assertEquals(2, mockTracer.finishedSpans().size());
59+
}
60+
61+
@Test
62+
public void scheduleWithFixedDelayTest() throws InterruptedException {
63+
countDownLatch = new CountDownLatch(2);
64+
ScheduledExecutorService executorService = toTraced(Executors.newScheduledThreadPool(NUMBER_OF_THREADS));
65+
66+
MockSpan parentSpan = mockTracer.buildSpan("foo").startManual();
67+
mockTracer.scopeManager().activate(parentSpan, true);
68+
executorService.scheduleWithFixedDelay(new TestRunnable(), 0, 300, TimeUnit.MILLISECONDS);
69+
70+
countDownLatch.await();
71+
executorService.shutdown();
72+
assertParentSpan(parentSpan);
73+
assertEquals(2, mockTracer.finishedSpans().size());
74+
}
75+
}

0 commit comments

Comments
 (0)