diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 3dea8a4f487..207aa8a992c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -3,7 +3,6 @@ import co.elastic.logstash.api.Codec; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.jruby.RubyArray; import org.jruby.RubyHash; import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.builtin.IRubyObject; @@ -35,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -81,28 +79,18 @@ public final class CompiledPipeline { */ private final RubyIntegration.PluginFactory pluginFactory; - /** - * Per pipeline compiled classes cache shared across threads {@link CompiledExecution} - */ - private final Map> datasetClassCache = new ConcurrentHashMap<>(500); - - /** - * First, constructor time, compilation of the pipeline that will warm - * the {@link CompiledPipeline#datasetClassCache} in a thread safe way - * before the concurrent per worker threads {@link CompiledExecution} compilations - */ - private final AtomicReference warmedCompiledExecution = new AtomicReference<>(); - public CompiledPipeline( final PipelineIR pipelineIR, - final RubyIntegration.PluginFactory pluginFactory) { + final RubyIntegration.PluginFactory pluginFactory) + { this(pipelineIR, pluginFactory, null); } public CompiledPipeline( final PipelineIR pipelineIR, final RubyIntegration.PluginFactory pluginFactory, - final SecretStore secretStore) { + final SecretStore secretStore) + { this.pipelineIR = pipelineIR; this.pluginFactory = pluginFactory; try (ConfigVariableExpander cve = new ConfigVariableExpander( @@ -111,10 +99,6 @@ public CompiledPipeline( inputs = setupInputs(cve); filters = setupFilters(cve); outputs = setupOutputs(cve); - - // invoke a first compilation to warm the class cache which will prevent - // redundant compilations for each subsequent worker {@link CompiledExecution} - warmedCompiledExecution.set(new CompiledPipeline.CompiledExecution()); } catch (Exception e) { throw new IllegalStateException("Unable to configure plugins: " + e.getMessage()); } @@ -138,13 +122,10 @@ public Collection inputs() { * @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology */ public Dataset buildExecution() { - CompiledExecution result = warmedCompiledExecution.getAndSet(null); - if (result != null) { - return result.toDataset(); - } return new CompiledPipeline.CompiledExecution().toDataset(); } + /** * Sets up all outputs learned from {@link PipelineIR}. */ @@ -155,7 +136,7 @@ private Map setupOutputs(ConfigVariableExpan final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); res.put(v.getId(), pluginFactory.buildOutput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) )); }); return res; @@ -172,7 +153,7 @@ private Map setupFilters(ConfigVariableExpan final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); res.put(vertex.getId(), pluginFactory.buildFilter( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) )); } return res; @@ -188,7 +169,7 @@ private Collection setupInputs(ConfigVariableExpander cve) { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); IRubyObject o = pluginFactory.buildInput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)); + RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)); nodes.add(o); }); return nodes; @@ -212,8 +193,9 @@ private RubyHash convertArgs(final PluginDefinition def) { SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), - source, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codec.getArguments() + source, + Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), + codec.getArguments() ); } else { toput = value; @@ -241,9 +223,10 @@ private Map convertJavaArgs(final PluginDefinition def, ConfigVa SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); Map codecArgs = expandConfigVariables(cve, codec.getArguments()); toput = pluginFactory.buildCodec( - RubyUtil.RUBY.newString(codec.getName()), - source, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codecArgs + RubyUtil.RUBY.newString(codec.getName()), + source, + Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), + codecArgs ); Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput); args.put(key, javaCodec); @@ -292,17 +275,6 @@ private boolean isOutput(final Vertex vertex) { return outputs.containsKey(vertex.getId()); } - /** - * Returns an existing compiled dataset class implementation for the given {@code vertexId}, - * or compiles one from the provided {@code computeStepSyntaxElement}. - * @param vertexId a string uniquely identifying a {@link Vertex} within the current pipeline - * @param computeStepSyntaxElement the source from which to compile a dataset class - * @return an implementation of {@link Dataset} for the given vertex - */ - private Class getDatasetClass(final String vertexId, final ComputeStepSyntaxElement computeStepSyntaxElement) { - return datasetClassCache.computeIfAbsent(vertexId, _vid -> computeStepSyntaxElement.compile()); - } - /** * Instances of this class represent a fully compiled pipeline execution. Note that this class * has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread) @@ -342,37 +314,11 @@ private Dataset compile() { if (outputNodes.isEmpty()) { return Dataset.IDENTITY; } else { - return terminalDataset(outputNodes.stream().map( + return DatasetCompiler.terminalDataset(outputNodes.stream().map( leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf)) ).collect(Collectors.toList())); } } - /** - *

Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.

- *

If the given set of parent {@link Dataset} is empty the sum is defined as the - * trivial dataset that does not invoke any computation whatsoever.

- * {@link Dataset#compute(RubyArray, boolean, boolean)} is always - * {@link Collections#emptyList()}. - * @param parents Parent {@link Dataset} to sum and terminate - * @return Dataset representing the sum of given parent {@link Dataset} - */ - public Dataset terminalDataset(final Collection parents) { - final int count = parents.size(); - final Dataset result; - if (count > 1) { - ComputeStepSyntaxElement prepared = DatasetCompiler.terminalDataset(parents); - result = prepared.instantiate(prepared.compile()); - } else if (count == 1) { - // No need for a terminal dataset here, if there is only a single parent node we can - // call it directly. - result = parents.iterator().next(); - } else { - throw new IllegalArgumentException( - "Cannot create Terminal Dataset for an empty number of parent datasets" - ); - } - return result; - } /** * Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after @@ -385,14 +331,13 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat final String vertexId = vertex.getId(); if (!plugins.containsKey(vertexId)) { - final ComputeStepSyntaxElement prepared = DatasetCompiler.filterDataset( - flatten(datasets, vertex), - filters.get(vertexId)); - final Class clazz = getDatasetClass(vertexId, prepared); - + final ComputeStepSyntaxElement prepared = + DatasetCompiler.filterDataset( + flatten(datasets, vertex), + filters.get(vertexId)); LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared); - plugins.put(vertexId, prepared.instantiate(clazz)); + plugins.put(vertexId, prepared.instantiate()); } return plugins.get(vertexId); @@ -409,16 +354,15 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat final String vertexId = vertex.getId(); if (!plugins.containsKey(vertexId)) { - final ComputeStepSyntaxElement prepared = DatasetCompiler.outputDataset( - flatten(datasets, vertex), - outputs.get(vertexId), - outputs.size() == 1); - final Class clazz = getDatasetClass(vertexId, prepared); - + final ComputeStepSyntaxElement prepared = + DatasetCompiler.outputDataset( + flatten(datasets, vertex), + outputs.get(vertexId), + outputs.size() == 1); LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared); - plugins.put(vertexId, prepared.instantiate(clazz)); - } + plugins.put(vertexId, prepared.instantiate()); + } return plugins.get(vertexId); } @@ -435,23 +379,21 @@ private SplitDataset split(final Collection datasets, final EventCondition condition, final Vertex vertex) { final String vertexId = vertex.getId(); SplitDataset conditional = iffs.get(vertexId); - if (conditional == null) { final Collection dependencies = flatten(datasets, vertex); conditional = iffs.get(vertexId); // Check that compiling the dependencies did not already instantiate the conditional // by requiring its else branch. if (conditional == null) { - final ComputeStepSyntaxElement prepared = DatasetCompiler.splitDataset(dependencies, condition); - final Class clazz = getDatasetClass(vertexId, prepared); - + final ComputeStepSyntaxElement prepared = + DatasetCompiler.splitDataset(dependencies, condition); LOGGER.debug("Compiled conditional\n {} \n into \n {}", vertex, prepared); - conditional = prepared.instantiate(clazz); + conditional = prepared.instantiate(); iffs.put(vertexId, conditional); } - } + } return conditional; } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java index 9fdc19e8b12..8cfa42b96f6 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java @@ -11,13 +11,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.codehaus.commons.compiler.CompileException; -import org.codehaus.commons.compiler.ISimpleCompiler; import org.codehaus.janino.Scanner; +import org.codehaus.commons.compiler.ISimpleCompiler; import org.codehaus.janino.SimpleCompiler; /** @@ -33,9 +32,11 @@ public final class ComputeStepSyntaxElement { private static final ISimpleCompiler COMPILER = new SimpleCompiler(); /** - * Sequential counter to generate the class name + * Global cache of runtime compiled classes to prevent duplicate classes being compiled. + * across pipelines and workers. */ - private static final AtomicLong classSeqCount = new AtomicLong(); + private static final Map, Class> CLASS_CACHE + = new HashMap<>(5000); /** * Pattern to remove redundant {@code ;} from formatted code since {@link Formatter} does not @@ -43,74 +44,94 @@ public final class ComputeStepSyntaxElement { */ private static final Pattern REDUNDANT_SEMICOLON = Pattern.compile("\n[ ]*;\n"); + private static final String CLASS_NAME_PLACEHOLDER = "CLASS_NAME_PLACEHOLDER"; + + private static final Pattern CLASS_NAME_PLACEHOLDER_REGEX = Pattern.compile(CLASS_NAME_PLACEHOLDER); + private final Iterable methods; private final ClassFields fields; private final Class type; - private final long classSeq; + private final String normalizedSource; public static ComputeStepSyntaxElement create( - final Iterable methods, final ClassFields fields, - final Class interfce) { + final Iterable methods, + final ClassFields fields, + final Class interfce) + { return new ComputeStepSyntaxElement<>(methods, fields, interfce); } - private ComputeStepSyntaxElement(final Iterable methods, - final ClassFields fields, final Class interfce) { + private ComputeStepSyntaxElement( + final Iterable methods, + final ClassFields fields, + final Class interfce) + { this.methods = methods; this.fields = fields; type = interfce; - classSeq = classSeqCount.incrementAndGet(); + + // normalizes away the name of the class so that two classes of different name but otherwise + // equivalent syntax get correctly compared by {@link #equals(Object)}. + normalizedSource = generateCode(CLASS_NAME_PLACEHOLDER); } @SuppressWarnings("unchecked") - public T instantiate(Class clazz) { - try { - return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments()); + public T instantiate() { + try { + final Class clazz = compile(); + return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments()); } catch (final NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException ex) { throw new IllegalStateException(ex); } } + /** + * This method is NOT thread-safe, and must have exclusive access to `COMPILER` + * so that the resulting `ClassLoader` after each `SimpleCompiler#cook()` operation + * can be teed up as the parent for the next cook operation. + * Also note that synchronizing on `COMPILER` also protects the global CLASS_CACHE. + */ + @SuppressWarnings("unchecked") - public Class compile() { - // We need to globally synchronize to avoid concurrency issues with the internal class loader - // Per https://github.com/elastic/logstash/pull/11482 we should review this lock. - synchronized (COMPILER) { - try { - final Class clazz; - final String name = String.format("CompiledDataset%d", classSeq); - final String code = generateCode(name); - if (SOURCE_DIR != null) { - final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name)); - Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8)); - COMPILER.cookFile(sourceFile.toFile()); - } else { - COMPILER.cook(code); - } - COMPILER.setParentClassLoader(COMPILER.getClassLoader()); - clazz = (Class)COMPILER.getClassLoader().loadClass( + private Class compile() { + try { + synchronized (COMPILER) { + Class clazz = CLASS_CACHE.get(this); + if (clazz == null) { + final String name = String.format("CompiledDataset%d", CLASS_CACHE.size()); + final String code = CLASS_NAME_PLACEHOLDER_REGEX.matcher(normalizedSource).replaceAll(name); + if (SOURCE_DIR != null) { + final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name)); + Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8)); + COMPILER.cookFile(sourceFile.toFile()); + } else { + COMPILER.cook(code); + } + COMPILER.setParentClassLoader(COMPILER.getClassLoader()); + clazz = (Class) COMPILER.getClassLoader().loadClass( String.format("org.logstash.generated.%s", name) - ); - + ); + CLASS_CACHE.put(this, clazz); + } return clazz; - } catch (final CompileException | ClassNotFoundException | IOException ex) { - throw new IllegalStateException(ex); } + } catch (final CompileException | ClassNotFoundException | IOException ex) { + throw new IllegalStateException(ex); } } @Override public int hashCode() { - return normalizedSource().hashCode(); + return normalizedSource.hashCode(); } @Override public boolean equals(final Object other) { return other instanceof ComputeStepSyntaxElement && - normalizedSource().equals(((ComputeStepSyntaxElement) other).normalizedSource()); + normalizedSource.equals(((ComputeStepSyntaxElement) other).normalizedSource); } private String generateCode(final String name) { @@ -162,15 +183,6 @@ private Map ctorArguments() { return result; } - /** - * Normalizes away the name of the class so that two classes of different name but otherwise - * equivalent syntax get correctly compared by {@link #equals(Object)}. - * @return Source of this class, with its name set to {@code CONSTANT}. - */ - private String normalizedSource() { - return this.generateCode("CONSTANT"); - } - /** * Generates the Java code for defining one field and constructor argument for each given value. * constructor for diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index d660ac7ceb4..22e6e41d5a9 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -90,7 +90,6 @@ public static ComputeStepSyntaxElement filterDataset(final Collection()); final Closure clear = Closure.wrap(); final Closure compute; - if (parents.isEmpty()) { compute = filterBody(outputBuffer, BATCH_ARG, fields, plugin); } else { @@ -117,16 +116,29 @@ public static ComputeStepSyntaxElement filterDataset(final Collection terminalDataset(final Collection parents) { - final ClassFields fields = new ClassFields(); - final Collection parentFields = - parents.stream().map(fields::add).collect(Collectors.toList()); - return compileOutput( - Closure.wrap( - parentFields.stream().map(DatasetCompiler::computeDataset) - .toArray(MethodLevelSyntaxElement[]::new) - ).add(clearSyntax(parentFields)), Closure.EMPTY, fields - ); + public static Dataset terminalDataset(final Collection parents) { + final int count = parents.size(); + final Dataset result; + if (count > 1) { + final ClassFields fields = new ClassFields(); + final Collection parentFields = + parents.stream().map(fields::add).collect(Collectors.toList()); + result = compileOutput( + Closure.wrap( + parentFields.stream().map(DatasetCompiler::computeDataset) + .toArray(MethodLevelSyntaxElement[]::new) + ).add(clearSyntax(parentFields)), Closure.EMPTY, fields + ).instantiate(); + } else if (count == 1) { + // No need for a terminal dataset here, if there is only a single parent node we can + // call it directly. + result = parents.iterator().next(); + } else { + throw new IllegalArgumentException( + "Cannot create Terminal Dataset for an empty number of parent datasets" + ); + } + return result; } /** @@ -152,9 +164,10 @@ public static ComputeStepSyntaxElement outputDataset(final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); @@ -169,10 +182,13 @@ public static ComputeStepSyntaxElement outputDataset(final Collection prepared = DatasetCompiler.outputDataset( - Collections.emptyList(), - PipelineTestUtil.buildOutput(events -> {}), - true - ); assertThat( - prepared.instantiate(prepared.compile()).compute(RubyUtil.RUBY.newArray(), false, false), + DatasetCompiler.outputDataset( + Collections.emptyList(), + PipelineTestUtil.buildOutput(events -> {}), + true + ).instantiate().compute(RubyUtil.RUBY.newArray(), false, false), nullValue() ); } @@ -34,10 +33,9 @@ public void compilesOutputDataset() { @Test public void compilesSplitDataset() { final FieldReference key = FieldReference.from("foo"); - final ComputeStepSyntaxElement prepared = DatasetCompiler.splitDataset( + final SplitDataset left = DatasetCompiler.splitDataset( Collections.emptyList(), event -> event.getEvent().includes(key) - ); - final SplitDataset left = prepared.instantiate(prepared.compile()); + ).instantiate(); final Event trueEvent = new Event(); trueEvent.setField(key, "val"); final JrubyEventExtLibrary.RubyEvent falseEvent =