Skip to content

Commit

Permalink
use final atomicreference for warmed compiled execution to avoid sync…
Browse files Browse the repository at this point in the history
…hronization
  • Loading branch information
yaauie committed Jan 10, 2020
1 parent 829a87d commit 017051f
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -90,7 +91,7 @@ public final class CompiledPipeline {
* the {@link CompiledPipeline#datasetClassCache} in a thread safe way
* before the concurrent per worker threads {@link CompiledExecution} compilations
*/
private CompiledExecution warmedCompiledExecution;
private final AtomicReference<CompiledExecution> warmedCompiledExecution = new AtomicReference<>();

public CompiledPipeline(
final PipelineIR pipelineIR,
Expand All @@ -113,7 +114,7 @@ public CompiledPipeline(

// invoke a first compilation to warm the class cache which will prevent
// redundant compilations for each subsequent worker {@link CompiledExecution}
warmedCompiledExecution = new CompiledPipeline.CompiledExecution();
warmedCompiledExecution.set(new CompiledPipeline.CompiledExecution());
} catch (Exception e) {
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage());
}
Expand All @@ -137,14 +138,9 @@ public Collection<IRubyObject> inputs() {
* @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology
*/
public Dataset buildExecution() {
synchronized (this) {
// the first worker get the warmed CompiledExecution and the other
// get their own.
if (warmedCompiledExecution != null) {
final CompiledExecution result = warmedCompiledExecution;
warmedCompiledExecution = null;
return result.toDataset();
}
CompiledExecution result = warmedCompiledExecution.getAndSet(null);
if(result != null) {
return result.toDataset();
}
return new CompiledPipeline.CompiledExecution().toDataset();
}
Expand Down

0 comments on commit 017051f

Please sign in to comment.