Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ target
.classpath
.settings

# Intellij Idea
.idea/
*.iml
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
package io.opentracing.contrib.concurrent;

import java.util.concurrent.Executor;

import io.opentracing.Scope;
import io.opentracing.Tracer;
import java.util.concurrent.Executor;

/**
* Executor which propagates span from parent thread to submitted {@link Runnable}.
* Optionally it creates parent span if traceWithActiveSpanOnly = false.
*
* @author Pavol Loffay
*/
public class TracedExecutor implements Executor {

protected final Tracer tracer;
private final Executor delegate;
private final boolean traceWithActiveSpanOnly;

public TracedExecutor(Executor executor, Tracer tracer) {
this(executor, tracer, true);
}

public TracedExecutor(Executor executor, Tracer tracer, boolean traceWithActiveSpanOnly) {
this.delegate = executor;
this.tracer = tracer;
this.traceWithActiveSpanOnly = traceWithActiveSpanOnly;
}

@Override
public void execute(Runnable runnable) {
delegate.execute(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer));
Scope scope = createScope("execute");
try {
delegate.execute(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer));
} finally {
if (scope != null) {
scope.close();
}
}
}

Scope createScope(String operationName) {
if (tracer.activeSpan() == null && !traceWithActiveSpanOnly) {
return tracer.buildSpan(operationName).startActive(true);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Scope;
import io.opentracing.Tracer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -10,17 +12,23 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.opentracing.Tracer;

/**
* @author Pavol Loffay
*
* Executor which propagates span from parent thread to submitted.
* Optionally it creates parent span if traceWithActiveSpanOnly = false.
*/
public class TracedExecutorService extends TracedExecutor implements ExecutorService {

private final ExecutorService delegate;

public TracedExecutorService(ExecutorService delegate, Tracer tracer) {
super(delegate, tracer);
this(delegate, tracer, true);
}

public TracedExecutorService(ExecutorService delegate, Tracer tracer,
boolean traceWithActiveSpanOnly) {
super(delegate, tracer, traceWithActiveSpanOnly);
this.delegate = delegate;
}

Expand Down Expand Up @@ -51,44 +59,93 @@ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedExc

@Override
public <T> Future<T> submit(Callable<T> callable) {
return delegate.submit(tracer.activeSpan() == null ? callable :
new TracedCallable<T>(callable, tracer));
Scope scope = createScope("submit");
try {
return delegate.submit(tracer.activeSpan() == null ? callable :
new TracedCallable<T>(callable, tracer));
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> Future<T> submit(Runnable runnable, T t) {
return delegate.submit(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), t);
Scope scope = createScope("submit");
try {
return delegate.submit(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), t);
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public Future<?> submit(Runnable runnable) {
return delegate.submit(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer));
Scope scope = createScope("submit");
try {
return delegate.submit(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer));
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
throws InterruptedException {
return delegate.invokeAll(toTraced(collection));
Scope scope = createScope("invokeAll");
try {
return delegate.invokeAll(toTraced(collection));
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long l,
TimeUnit timeUnit) throws InterruptedException {
return delegate.invokeAll(toTraced(collection), l, timeUnit);
Scope scope = createScope("invokeAll");
try {
return delegate.invokeAll(toTraced(collection), l, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> collection)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(toTraced(collection));
Scope scope = createScope("invokeAny");
try {
return delegate.invokeAny(toTraced(collection));
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(toTraced(collection), l, timeUnit);
Scope scope = createScope("invokeAny");
try {
return delegate.invokeAny(toTraced(collection), l, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}

private <T> Collection<? extends Callable<T>> toTraced(Collection<? extends Callable<T>> delegate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,83 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Scope;
import io.opentracing.Tracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* @author Jose Montoya
*
* Executor which propagates span from parent thread to scheduled.
* Optionally it creates parent span if traceWithActiveSpanOnly = false.
*/
public class TracedScheduledExecutorService extends TracedExecutorService implements ScheduledExecutorService {

private final ScheduledExecutorService delegate;

public TracedScheduledExecutorService(ScheduledExecutorService delegate, Tracer tracer) {
super(delegate, tracer);
this.delegate = delegate;
}

@Override
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
return delegate.schedule(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), delay, timeUnit);
}

@Override
public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit timeUnit) {
return delegate.schedule(tracer.activeSpan() == null ? callable :
new TracedCallable<T>(callable, tracer), delay, timeUnit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) {
return delegate.scheduleAtFixedRate(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), initialDelay, period, timeUnit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
return delegate.scheduleWithFixedDelay(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), initialDelay, delay, timeUnit);
}
private final ScheduledExecutorService delegate;

public TracedScheduledExecutorService(ScheduledExecutorService delegate, Tracer tracer) {
this(delegate, tracer, true);
}

public TracedScheduledExecutorService(ScheduledExecutorService delegate, Tracer tracer,
boolean traceWithActiveSpanOnly) {
super(delegate, tracer, traceWithActiveSpanOnly);
this.delegate = delegate;
}

@Override
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
Scope scope = createScope("schedule");
try {
return delegate.schedule(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), delay, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit timeUnit) {
Scope scope = createScope("schedule");
try {
return delegate.schedule(tracer.activeSpan() == null ? callable :
new TracedCallable<T>(callable, tracer), delay, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period,
TimeUnit timeUnit) {
Scope scope = createScope("scheduleAtFixedRate");
try {
return delegate.scheduleAtFixedRate(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), initialDelay, period, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay,
TimeUnit timeUnit) {
Scope scope = createScope("scheduleWithFixedDelay");
try {
return delegate.scheduleWithFixedDelay(tracer.activeSpan() == null ? runnable :
new TracedRunnable(runnable, tracer), initialDelay, delay, timeUnit);
} finally {
if (scope != null) {
scope.close();
}
}
}
}
Loading