From 4b29112efd21278117dad0921c25d2a93e2b44bb Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 13 Jan 2020 12:52:58 -0500 Subject: [PATCH] cache compiled datasets (#11482) --- .../logstash/config/ir/CompiledPipeline.java | 114 ++++++++++++++---- .../ir/compiler/ComputeStepSyntaxElement.java | 58 +++++---- .../config/ir/compiler/DatasetCompiler.java | 34 ++---- .../ir/compiler/DatasetCompilerTest.java | 16 +-- 4 files changed, 142 insertions(+), 80 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index a204f1c66df..3dea8a4f487 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -3,6 +3,7 @@ import co.elastic.logstash.api.Codec; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jruby.RubyArray; import org.jruby.RubyHash; import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.builtin.IRubyObject; @@ -34,6 +35,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -78,6 +81,18 @@ public final class CompiledPipeline { */ private final RubyIntegration.PluginFactory pluginFactory; + /** + * Per pipeline compiled classes cache shared across threads {@link CompiledExecution} + */ + private final Map> datasetClassCache = new ConcurrentHashMap<>(500); + + /** + * First, constructor time, compilation of the pipeline that will warm + * the {@link CompiledPipeline#datasetClassCache} in a thread safe way + * before the concurrent per worker threads {@link CompiledExecution} compilations + */ + private final AtomicReference warmedCompiledExecution = new AtomicReference<>(); + public CompiledPipeline( final PipelineIR pipelineIR, final RubyIntegration.PluginFactory pluginFactory) { @@ -96,6 +111,10 @@ public CompiledPipeline( inputs = setupInputs(cve); filters = setupFilters(cve); outputs = setupOutputs(cve); + + // invoke a first compilation to warm the class cache which will prevent + // redundant compilations for each subsequent worker {@link CompiledExecution} + warmedCompiledExecution.set(new CompiledPipeline.CompiledExecution()); } catch (Exception e) { throw new IllegalStateException("Unable to configure plugins: " + e.getMessage()); } @@ -119,6 +138,10 @@ public Collection inputs() { * @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology */ public Dataset buildExecution() { + CompiledExecution result = warmedCompiledExecution.getAndSet(null); + if (result != null) { + return result.toDataset(); + } return new CompiledPipeline.CompiledExecution().toDataset(); } @@ -269,6 +292,17 @@ private boolean isOutput(final Vertex vertex) { return outputs.containsKey(vertex.getId()); } + /** + * Returns an existing compiled dataset class implementation for the given {@code vertexId}, + * or compiles one from the provided {@code computeStepSyntaxElement}. + * @param vertexId a string uniquely identifying a {@link Vertex} within the current pipeline + * @param computeStepSyntaxElement the source from which to compile a dataset class + * @return an implementation of {@link Dataset} for the given vertex + */ + private Class getDatasetClass(final String vertexId, final ComputeStepSyntaxElement computeStepSyntaxElement) { + return datasetClassCache.computeIfAbsent(vertexId, _vid -> computeStepSyntaxElement.compile()); + } + /** * Instances of this class represent a fully compiled pipeline execution. Note that this class * has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread) @@ -279,13 +313,13 @@ private final class CompiledExecution { /** * Compiled {@link IfVertex, indexed by their ID as returned by {@link Vertex#getId()}. */ - private final Map iffs = new HashMap<>(5); + private final Map iffs = new HashMap<>(50); /** * Cached {@link Dataset} compiled from {@link PluginVertex} indexed by their ID as returned * by {@link Vertex#getId()} to avoid duplicate computations. */ - private final Map plugins = new HashMap<>(5); + private final Map plugins = new HashMap<>(50); private final Dataset compiled; @@ -308,11 +342,37 @@ private Dataset compile() { if (outputNodes.isEmpty()) { return Dataset.IDENTITY; } else { - return DatasetCompiler.terminalDataset(outputNodes.stream().map( + return terminalDataset(outputNodes.stream().map( leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf)) ).collect(Collectors.toList())); } } + /** + *

Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.

+ *

If the given set of parent {@link Dataset} is empty the sum is defined as the + * trivial dataset that does not invoke any computation whatsoever.

+ * {@link Dataset#compute(RubyArray, boolean, boolean)} is always + * {@link Collections#emptyList()}. + * @param parents Parent {@link Dataset} to sum and terminate + * @return Dataset representing the sum of given parent {@link Dataset} + */ + public Dataset terminalDataset(final Collection parents) { + final int count = parents.size(); + final Dataset result; + if (count > 1) { + ComputeStepSyntaxElement prepared = DatasetCompiler.terminalDataset(parents); + result = prepared.instantiate(prepared.compile()); + } else if (count == 1) { + // No need for a terminal dataset here, if there is only a single parent node we can + // call it directly. + result = parents.iterator().next(); + } else { + throw new IllegalArgumentException( + "Cannot create Terminal Dataset for an empty number of parent datasets" + ); + } + return result; + } /** * Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after @@ -325,12 +385,14 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat final String vertexId = vertex.getId(); if (!plugins.containsKey(vertexId)) { - final ComputeStepSyntaxElement prepared = - DatasetCompiler.filterDataset(flatten(datasets, vertex), - filters.get(vertexId)); + final ComputeStepSyntaxElement prepared = DatasetCompiler.filterDataset( + flatten(datasets, vertex), + filters.get(vertexId)); + final Class clazz = getDatasetClass(vertexId, prepared); + LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared); - plugins.put(vertexId, prepared.instantiate()); + plugins.put(vertexId, prepared.instantiate(clazz)); } return plugins.get(vertexId); @@ -347,13 +409,16 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat final String vertexId = vertex.getId(); if (!plugins.containsKey(vertexId)) { - final ComputeStepSyntaxElement prepared = - DatasetCompiler.outputDataset(flatten(datasets, vertex), - outputs.get(vertexId), - outputs.size() == 1); + final ComputeStepSyntaxElement prepared = DatasetCompiler.outputDataset( + flatten(datasets, vertex), + outputs.get(vertexId), + outputs.size() == 1); + final Class clazz = getDatasetClass(vertexId, prepared); + LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared); - plugins.put(vertexId, prepared.instantiate()); - } + + plugins.put(vertexId, prepared.instantiate(clazz)); + } return plugins.get(vertexId); } @@ -368,24 +433,25 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat */ private SplitDataset split(final Collection datasets, final EventCondition condition, final Vertex vertex) { - final String key = vertex.getId(); - SplitDataset conditional = iffs.get(key); + final String vertexId = vertex.getId(); + SplitDataset conditional = iffs.get(vertexId); + if (conditional == null) { final Collection dependencies = flatten(datasets, vertex); - conditional = iffs.get(key); + conditional = iffs.get(vertexId); // Check that compiling the dependencies did not already instantiate the conditional // by requiring its else branch. if (conditional == null) { - final ComputeStepSyntaxElement prepared = - DatasetCompiler.splitDataset(dependencies, condition); - LOGGER.debug( - "Compiled conditional\n {} \n into \n {}", vertex, prepared - ); - conditional = prepared.instantiate(); - iffs.put(key, conditional); - } + final ComputeStepSyntaxElement prepared = DatasetCompiler.splitDataset(dependencies, condition); + final Class clazz = getDatasetClass(vertexId, prepared); + LOGGER.debug("Compiled conditional\n {} \n into \n {}", vertex, prepared); + + conditional = prepared.instantiate(clazz); + iffs.put(vertexId, conditional); + } } + return conditional; } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java index 33d3b14b487..9fdc19e8b12 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java @@ -11,6 +11,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -32,10 +33,9 @@ public final class ComputeStepSyntaxElement { private static final ISimpleCompiler COMPILER = new SimpleCompiler(); /** - * Cache of runtime compiled classes to prevent duplicate classes being compiled. + * Sequential counter to generate the class name */ - private static final Map, Class> CLASS_CACHE - = new HashMap<>(); + private static final AtomicLong classSeqCount = new AtomicLong(); /** * Pattern to remove redundant {@code ;} from formatted code since {@link Formatter} does not @@ -49,6 +49,8 @@ public final class ComputeStepSyntaxElement { private final Class type; + private final long classSeq; + public static ComputeStepSyntaxElement create( final Iterable methods, final ClassFields fields, final Class interfce) { @@ -60,37 +62,41 @@ private ComputeStepSyntaxElement(final Iterable methods, this.methods = methods; this.fields = fields; type = interfce; + classSeq = classSeqCount.incrementAndGet(); } @SuppressWarnings("unchecked") - public T instantiate() { - // We need to globally synchronize to avoid concurrency issues with the internal class - // loader and the CLASS_CACHE + public T instantiate(Class clazz) { + try { + return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments()); + } catch (final NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException ex) { + throw new IllegalStateException(ex); + } + } + + @SuppressWarnings("unchecked") + public Class compile() { + // We need to globally synchronize to avoid concurrency issues with the internal class loader + // Per https://github.com/elastic/logstash/pull/11482 we should review this lock. synchronized (COMPILER) { try { final Class clazz; - if (CLASS_CACHE.containsKey(this)) { - clazz = CLASS_CACHE.get(this); + final String name = String.format("CompiledDataset%d", classSeq); + final String code = generateCode(name); + if (SOURCE_DIR != null) { + final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name)); + Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8)); + COMPILER.cookFile(sourceFile.toFile()); } else { - final String name = String.format("CompiledDataset%d", CLASS_CACHE.size()); - final String code = generateCode(name); - if (SOURCE_DIR != null) { - final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name)); - Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8)); - COMPILER.cookFile(sourceFile.toFile()); - } else { - COMPILER.cook(code); - } - COMPILER.setParentClassLoader(COMPILER.getClassLoader()); - clazz = (Class) COMPILER.getClassLoader().loadClass( - String.format("org.logstash.generated.%s", name) - ); - CLASS_CACHE.put(this, clazz); + COMPILER.cook(code); } - return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments()); - } catch (final CompileException | ClassNotFoundException | IOException - | NoSuchMethodException | InvocationTargetException | InstantiationException - | IllegalAccessException ex) { + COMPILER.setParentClassLoader(COMPILER.getClassLoader()); + clazz = (Class)COMPILER.getClassLoader().loadClass( + String.format("org.logstash.generated.%s", name) + ); + + return clazz; + } catch (final CompileException | ClassNotFoundException | IOException ex) { throw new IllegalStateException(ex); } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index 28717e9832c..5cd4b65cd40 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -90,6 +90,7 @@ public static ComputeStepSyntaxElement filterDataset(final Collection()); final Closure clear = Closure.wrap(); final Closure compute; + if (parents.isEmpty()) { compute = filterBody(outputBuffer, BATCH_ARG, fields, plugin); } else { @@ -116,29 +117,16 @@ public static ComputeStepSyntaxElement filterDataset(final Collection parents) { - final int count = parents.size(); - final Dataset result; - if (count > 1) { - final ClassFields fields = new ClassFields(); - final Collection parentFields = - parents.stream().map(fields::add).collect(Collectors.toList()); - result = compileOutput( - Closure.wrap( - parentFields.stream().map(DatasetCompiler::computeDataset) - .toArray(MethodLevelSyntaxElement[]::new) - ).add(clearSyntax(parentFields)), Closure.EMPTY, fields - ).instantiate(); - } else if (count == 1) { - // No need for a terminal dataset here, if there is only a single parent node we can - // call it directly. - result = parents.iterator().next(); - } else { - throw new IllegalArgumentException( - "Cannot create Terminal Dataset for an empty number of parent datasets" - ); - } - return result; + public static ComputeStepSyntaxElement terminalDataset(final Collection parents) { + final ClassFields fields = new ClassFields(); + final Collection parentFields = + parents.stream().map(fields::add).collect(Collectors.toList()); + return compileOutput( + Closure.wrap( + parentFields.stream().map(DatasetCompiler::computeDataset) + .toArray(MethodLevelSyntaxElement[]::new) + ).add(clearSyntax(parentFields)), Closure.EMPTY, fields + ); } /** diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java index 23124545669..0978f0e437b 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java @@ -20,12 +20,13 @@ public final class DatasetCompilerTest { */ @Test public void compilesOutputDataset() { + final ComputeStepSyntaxElement prepared = DatasetCompiler.outputDataset( + Collections.emptyList(), + PipelineTestUtil.buildOutput(events -> {}), + true + ); assertThat( - DatasetCompiler.outputDataset( - Collections.emptyList(), - PipelineTestUtil.buildOutput(events -> {}), - true - ).instantiate().compute(RubyUtil.RUBY.newArray(), false, false), + prepared.instantiate(prepared.compile()).compute(RubyUtil.RUBY.newArray(), false, false), nullValue() ); } @@ -33,9 +34,10 @@ public void compilesOutputDataset() { @Test public void compilesSplitDataset() { final FieldReference key = FieldReference.from("foo"); - final SplitDataset left = DatasetCompiler.splitDataset( + final ComputeStepSyntaxElement prepared = DatasetCompiler.splitDataset( Collections.emptyList(), event -> event.getEvent().includes(key) - ).instantiate(); + ); + final SplitDataset left = prepared.instantiate(prepared.compile()); final Event trueEvent = new Event(); trueEvent.setField(key, "val"); final JrubyEventExtLibrary.RubyEvent falseEvent =