Skip to content

Commit

Permalink
revert elastic#11482 and fix redundant code generation (elastic#11564)
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant authored Feb 4, 2020
1 parent e09723e commit 8481bd0
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import co.elastic.logstash.api.Codec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
Expand Down Expand Up @@ -35,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -81,28 +79,18 @@ public final class CompiledPipeline {
*/
private final RubyIntegration.PluginFactory pluginFactory;

/**
* Per pipeline compiled classes cache shared across threads {@link CompiledExecution}
*/
private final Map<String, Class<? extends Dataset>> datasetClassCache = new ConcurrentHashMap<>(500);

/**
* First, constructor time, compilation of the pipeline that will warm
* the {@link CompiledPipeline#datasetClassCache} in a thread safe way
* before the concurrent per worker threads {@link CompiledExecution} compilations
*/
private final AtomicReference<CompiledExecution> warmedCompiledExecution = new AtomicReference<>();

public CompiledPipeline(
final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory) {
final RubyIntegration.PluginFactory pluginFactory)
{
this(pipelineIR, pluginFactory, null);
}

public CompiledPipeline(
final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory,
final SecretStore secretStore) {
final SecretStore secretStore)
{
this.pipelineIR = pipelineIR;
this.pluginFactory = pluginFactory;
try (ConfigVariableExpander cve = new ConfigVariableExpander(
Expand All @@ -111,10 +99,6 @@ public CompiledPipeline(
inputs = setupInputs(cve);
filters = setupFilters(cve);
outputs = setupOutputs(cve);

// invoke a first compilation to warm the class cache which will prevent
// redundant compilations for each subsequent worker {@link CompiledExecution}
warmedCompiledExecution.set(new CompiledPipeline.CompiledExecution());
} catch (Exception e) {
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage());
}
Expand All @@ -138,13 +122,10 @@ public Collection<IRubyObject> inputs() {
* @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology
*/
public Dataset buildExecution() {
CompiledExecution result = warmedCompiledExecution.getAndSet(null);
if (result != null) {
return result.toDataset();
}
return new CompiledPipeline.CompiledExecution().toDataset();
}


/**
* Sets up all outputs learned from {@link PipelineIR}.
*/
Expand All @@ -155,7 +136,7 @@ private Map<String, AbstractOutputDelegatorExt> setupOutputs(ConfigVariableExpan
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
res.put(v.getId(), pluginFactory.buildOutput(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
));
});
return res;
Expand All @@ -172,7 +153,7 @@ private Map<String, AbstractFilterDelegatorExt> setupFilters(ConfigVariableExpan
final PluginDefinition def = vertex.getPluginDefinition();
final SourceWithMetadata source = vertex.getSourceWithMetadata();
res.put(vertex.getId(), pluginFactory.buildFilter(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
));
}
return res;
Expand All @@ -188,7 +169,7 @@ private Collection<IRubyObject> setupInputs(ConfigVariableExpander cve) {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
IRubyObject o = pluginFactory.buildInput(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve));
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve));
nodes.add(o);
});
return nodes;
Expand All @@ -212,8 +193,9 @@ private RubyHash convertArgs(final PluginDefinition def) {
SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata();
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
source, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codec.getArguments()
source,
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codec.getArguments()
);
} else {
toput = value;
Expand Down Expand Up @@ -241,9 +223,10 @@ private Map<String, Object> convertJavaArgs(final PluginDefinition def, ConfigVa
SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata();
Map<String, Object> codecArgs = expandConfigVariables(cve, codec.getArguments());
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
source, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codecArgs
RubyUtil.RUBY.newString(codec.getName()),
source,
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codecArgs
);
Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput);
args.put(key, javaCodec);
Expand Down Expand Up @@ -292,17 +275,6 @@ private boolean isOutput(final Vertex vertex) {
return outputs.containsKey(vertex.getId());
}

/**
* Returns an existing compiled dataset class implementation for the given {@code vertexId},
* or compiles one from the provided {@code computeStepSyntaxElement}.
* @param vertexId a string uniquely identifying a {@link Vertex} within the current pipeline
* @param computeStepSyntaxElement the source from which to compile a dataset class
* @return an implementation of {@link Dataset} for the given vertex
*/
private Class<? extends Dataset> getDatasetClass(final String vertexId, final ComputeStepSyntaxElement<? extends Dataset> computeStepSyntaxElement) {
return datasetClassCache.computeIfAbsent(vertexId, _vid -> computeStepSyntaxElement.compile());
}

/**
* Instances of this class represent a fully compiled pipeline execution. Note that this class
* has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread)
Expand Down Expand Up @@ -342,37 +314,11 @@ private Dataset compile() {
if (outputNodes.isEmpty()) {
return Dataset.IDENTITY;
} else {
return terminalDataset(outputNodes.stream().map(
return DatasetCompiler.terminalDataset(outputNodes.stream().map(
leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf))
).collect(Collectors.toList()));
}
}
/**
* <p>Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.</p>
* <p>If the given set of parent {@link Dataset} is empty the sum is defined as the
* trivial dataset that does not invoke any computation whatsoever.</p>
* {@link Dataset#compute(RubyArray, boolean, boolean)} is always
* {@link Collections#emptyList()}.
* @param parents Parent {@link Dataset} to sum and terminate
* @return Dataset representing the sum of given parent {@link Dataset}
*/
public Dataset terminalDataset(final Collection<Dataset> parents) {
final int count = parents.size();
final Dataset result;
if (count > 1) {
ComputeStepSyntaxElement<Dataset> prepared = DatasetCompiler.terminalDataset(parents);
result = prepared.instantiate(prepared.compile());
} else if (count == 1) {
// No need for a terminal dataset here, if there is only a single parent node we can
// call it directly.
result = parents.iterator().next();
} else {
throw new IllegalArgumentException(
"Cannot create Terminal Dataset for an empty number of parent datasets"
);
}
return result;
}

/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
Expand All @@ -385,14 +331,13 @@ private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> dat
final String vertexId = vertex.getId();

if (!plugins.containsKey(vertexId)) {
final ComputeStepSyntaxElement<Dataset> prepared = DatasetCompiler.filterDataset(
flatten(datasets, vertex),
filters.get(vertexId));
final Class<? extends Dataset> clazz = getDatasetClass(vertexId, prepared);

final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.filterDataset(
flatten(datasets, vertex),
filters.get(vertexId));
LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared);

plugins.put(vertexId, prepared.instantiate(clazz));
plugins.put(vertexId, prepared.instantiate());
}

return plugins.get(vertexId);
Expand All @@ -409,16 +354,15 @@ private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> dat
final String vertexId = vertex.getId();

if (!plugins.containsKey(vertexId)) {
final ComputeStepSyntaxElement<Dataset> prepared = DatasetCompiler.outputDataset(
flatten(datasets, vertex),
outputs.get(vertexId),
outputs.size() == 1);
final Class<? extends Dataset> clazz = getDatasetClass(vertexId, prepared);

final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.outputDataset(
flatten(datasets, vertex),
outputs.get(vertexId),
outputs.size() == 1);
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);

plugins.put(vertexId, prepared.instantiate(clazz));
}
plugins.put(vertexId, prepared.instantiate());
}

return plugins.get(vertexId);
}
Expand All @@ -435,23 +379,21 @@ private SplitDataset split(final Collection<Dataset> datasets,
final EventCondition condition, final Vertex vertex) {
final String vertexId = vertex.getId();
SplitDataset conditional = iffs.get(vertexId);

if (conditional == null) {
final Collection<Dataset> dependencies = flatten(datasets, vertex);
conditional = iffs.get(vertexId);
// Check that compiling the dependencies did not already instantiate the conditional
// by requiring its else branch.
if (conditional == null) {
final ComputeStepSyntaxElement<SplitDataset> prepared = DatasetCompiler.splitDataset(dependencies, condition);
final Class<? extends Dataset> clazz = getDatasetClass(vertexId, prepared);

final ComputeStepSyntaxElement<SplitDataset> prepared =
DatasetCompiler.splitDataset(dependencies, condition);
LOGGER.debug("Compiled conditional\n {} \n into \n {}", vertex, prepared);

conditional = prepared.instantiate(clazz);
conditional = prepared.instantiate();
iffs.put(vertexId, conditional);
}
}

}
return conditional;
}

Expand Down
Loading

0 comments on commit 8481bd0

Please sign in to comment.