Skip to content

Commit

Permalink
Changed the assignment of plugin.id to load the value dynamically ist…
Browse files Browse the repository at this point in the history
…ead of hardcode

Changed ComputeStepSyntaxElement to generate Java code to retrieve the plugin's id by a method instead of hardcoding the value in the generated code.
This permit to share more compiled classes, that differs only by plugin.id and speed up the pipeline compilation.

The change has been secured by future regression with unit test that track pipeline compilations times.

Co-authored-by: Andrea Selva <andsel@users.noreply.github.com>
Co-authored-by: João Duarte <jsvd@users.noreply.github.com>

Fixes: elastic#12031
  • Loading branch information
colinsurprenant authored and elasticsearch-bot committed Jun 25, 2020
1 parent 5eb25dc commit ffac2df
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.logstash.config.ir.compiler;

import com.google.common.annotations.VisibleForTesting;
import com.google.googlejavaformat.java.Formatter;
import com.google.googlejavaformat.java.FormatterException;
import java.io.IOException;
Expand Down Expand Up @@ -84,6 +85,22 @@ public static <T extends Dataset> ComputeStepSyntaxElement<T> create(
return new ComputeStepSyntaxElement<>(methods, fields, interfce);
}

@VisibleForTesting
public static int classCacheSize() {
return CLASS_CACHE.size();
}

/*
* Used in a test to clean start, with class loaders wiped out into Janino compiler and cleared the cached classes.
* */
@VisibleForTesting
public static void cleanClassCache() {
synchronized (COMPILER) {
CLASS_CACHE.clear();
COMPILER.setParentClassLoader(null);
}
}

private ComputeStepSyntaxElement(
final Iterable<MethodSyntaxElement> methods,
final ClassFields fields,
Expand All @@ -100,9 +117,9 @@ private ComputeStepSyntaxElement(

@SuppressWarnings("unchecked")
public T instantiate() {
try {
final Class<? extends Dataset> clazz = compile();
return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments());
try {
final Class<? extends Dataset> clazz = compile();
return (T) clazz.getConstructor(Map.class).newInstance(ctorArguments());
} catch (final NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException ex) {
throw new IllegalStateException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,12 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(
final ClassFields fields = new ClassFields();
final Closure clearSyntax;
final Closure computeSyntax;
final ValueSyntaxElement outputField = fields.add(output);
if (parents.isEmpty()) {
clearSyntax = Closure.EMPTY;
computeSyntax = Closure.wrap(
setPluginIdForLog4j(output),
invokeOutput(fields.add(output), BATCH_ARG),
setPluginIdForLog4j(outputField),
invokeOutput(outputField, BATCH_ARG),
unsetPluginIdForLog4j());
} else {
final Collection<ValueSyntaxElement> parentFields =
Expand All @@ -258,8 +259,8 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(
final ValueSyntaxElement inputBuffer = fields.add(buffer);
computeSyntax = withInputBuffering(
Closure.wrap(
setPluginIdForLog4j(output),
invokeOutput(fields.add(output), inputBuffer),
setPluginIdForLog4j(outputField),
invokeOutput(outputField, inputBuffer),
inlineClear,
unsetPluginIdForLog4j()
),
Expand All @@ -284,7 +285,7 @@ private static Closure filterBody(
{
final ValueSyntaxElement filterField = fields.add(plugin);
final Closure body = Closure.wrap(
setPluginIdForLog4j(plugin),
setPluginIdForLog4j(filterField),
buffer(outputBuffer, filterField.call("multiFilter", inputBuffer))
);
if (plugin.hasFlush()) {
Expand Down Expand Up @@ -393,21 +394,18 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields
}

private static MethodLevelSyntaxElement unsetPluginIdForLog4j() {
return () -> "org.apache.logging.log4j.ThreadContext.remove(\"plugin.id\")";
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractFilterDelegatorExt filterPlugin) {
final IRubyObject pluginId = filterPlugin.getId();
return generateLog4jContextAssignment(pluginId);
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractOutputDelegatorExt outputPlugin) {
final IRubyObject pluginId = outputPlugin.getId();
return generateLog4jContextAssignment(pluginId);
return SyntaxFactory.value("org.apache.logging.log4j.ThreadContext").call(
"remove",
SyntaxFactory.value("\"plugin.id\"")
);
}

private static MethodLevelSyntaxElement generateLog4jContextAssignment(IRubyObject pluginId) {
return () -> "org.apache.logging.log4j.ThreadContext.put(\"plugin.id\", \"" + pluginId + "\")";
private static MethodLevelSyntaxElement setPluginIdForLog4j(final ValueSyntaxElement plugin) {
return SyntaxFactory.value("org.apache.logging.log4j.ThreadContext").call(
"put",
SyntaxFactory.value("\"plugin.id\""),
plugin.call("getId").call("toString")
);
}

private static MethodLevelSyntaxElement clear(final ValueSyntaxElement field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

import co.elastic.logstash.api.Codec;
import com.google.common.base.Strings;

import java.io.IOException;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -45,10 +49,10 @@
import org.logstash.ConvertedMap;
import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.ComputeStepSyntaxElement;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.PluginFactory;
import org.logstash.ext.JrubyEventExtLibrary;
Expand All @@ -57,6 +61,9 @@
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Context;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Tests for {@link CompiledPipeline}.
*/
Expand Down Expand Up @@ -560,4 +567,153 @@ public Filter buildFilter(final String name, final String id,
return null;
}
}

@Test
@SuppressWarnings({"unchecked"})
public void testCompilerCacheCompiledClasses() throws IOException, InvalidIRException {
final FixedPluginFactory pluginFactory = new FixedPluginFactory(
() -> null,
() -> IDENTITY_FILTER,
mockOutputSupplier()
);

final PipelineIR baselinePipeline = ConfigCompiler.configToPipelineIR(
IRHelpers.toSourceWithMetadataFromPath("org/logstash/config/ir/cache/pipeline1.conf"),
false);
final CompiledPipeline cBaselinePipeline = new CompiledPipeline(baselinePipeline, pluginFactory);

final PipelineIR pipelineWithExtraFilter = ConfigCompiler.configToPipelineIR(
IRHelpers.toSourceWithMetadataFromPath("org/logstash/config/ir/cache/pipeline2.conf"),
false);
final CompiledPipeline cPipelineWithExtraFilter = new CompiledPipeline(pipelineWithExtraFilter, pluginFactory);

// actual test: compiling a pipeline with an extra filter should only create 1 extra class
ComputeStepSyntaxElement.cleanClassCache();
cBaselinePipeline.buildExecution();
final int cachedBefore = ComputeStepSyntaxElement.classCacheSize();
cPipelineWithExtraFilter.buildExecution();
final int cachedAfter = ComputeStepSyntaxElement.classCacheSize();

final String message = String.format("unexpected cache size, cachedAfter: %d, cachedBefore: %d", cachedAfter, cachedBefore);
assertEquals(message, 1, cachedAfter - cachedBefore);
}

@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void compilerBenchmark() throws Exception {
final PipelineIR baselinePipelineIR = createPipelineIR(200);
final PipelineIR testPipelineIR = createPipelineIR(400);
final JrubyEventExtLibrary.RubyEvent testEvent =
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());

final FixedPluginFactory pluginFactory = new FixedPluginFactory(
() -> null,
() -> IDENTITY_FILTER,
mockOutputSupplier()
);
final CompiledPipeline baselineCompiledPipeline = new CompiledPipeline(baselinePipelineIR, pluginFactory);

final CompiledPipeline testCompiledPipeline = new CompiledPipeline(testPipelineIR, pluginFactory);

final long compilationBaseline = time(ChronoUnit.SECONDS, () -> {
final CompiledPipeline.CompiledExecution compiledExecution = baselineCompiledPipeline.buildExecution();
compiledExecution.compute(RubyUtil.RUBY.newArray(testEvent), false, false);
});

final long compilationTest = time(ChronoUnit.SECONDS, () -> {
final CompiledPipeline.CompiledExecution compiledExecution = testCompiledPipeline.buildExecution();
compiledExecution.compute(RubyUtil.RUBY.newArray(testEvent), false, false);
});

// sanity checks
final Collection<JrubyEventExtLibrary.RubyEvent> outputEvents = EVENT_SINKS.get(runId);
MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(2));
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));

// regression check
final String testMessage = "regression in pipeline compilation, doubling the filters require more than 5 " +
"time, baseline: " + compilationBaseline + " secs, test: " + compilationTest + " secs";
assertTrue(testMessage, compilationTest/compilationBaseline <= 5);
}

private long time(ChronoUnit seconds, Runnable r) {
LocalTime start = LocalTime.now();
r.run();
LocalTime stop = LocalTime.now();
return seconds.between(start, stop);
}

@SuppressWarnings({"unchecked", "rawtypes"})
private PipelineIR createPipelineIR(int numFilters) throws InvalidIRException {
final String pipelineConfig = createBigPipelineDefinition(numFilters);
final RubyArray swms = IRHelpers.toSourceWithMetadata(pipelineConfig);
return ConfigCompiler.configToPipelineIR(swms,false);
}

private String createBigPipelineDefinition(int numFilters) {
return "input { stdin {}} filter {" + createBigFilterSection(numFilters) + "} output { stdout {}}";
}

private String createBigFilterSection(int numFilters) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < numFilters; i++) {
sb.append("mutate { id => \"").append(i).append("\" rename => [\"a_field\", \"into_another\"]}\n");
}
return sb.toString();
}

/**
* Fixed Mock {@link PluginFactory}
* */
static final class FixedPluginFactory implements PluginFactory {

private Supplier<IRubyObject> input;
private Supplier<IRubyObject> filter;
private Supplier<Consumer<Collection<JrubyEventExtLibrary.RubyEvent>>> output;

FixedPluginFactory(Supplier<IRubyObject> input, Supplier<IRubyObject> filter,
Supplier<Consumer<Collection<JrubyEventExtLibrary.RubyEvent>>> output) {
this.input = input;
this.filter = filter;
this.output = output;
}

@Override
public Input buildInput(String name, String id, Configuration configuration, Context context) {
return null;
}

@Override
public Filter buildFilter(String name, String id, Configuration configuration, Context context) {
return null;
}

@Override
public IRubyObject buildInput(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
return this.input.get();
}

@Override
public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
return PipelineTestUtil.buildOutput(this.output.get());
}

@Override
public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name);
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS)
.initForTesting(this.filter.get(), configNameDouble);
}

@Override
public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
return null;
}

@Override
public Codec buildDefaultCodec(String codecName) {
return null;
}
}
}
39 changes: 35 additions & 4 deletions logstash-core/src/test/java/org/logstash/config/ir/IRHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

package org.logstash.config.ir;

import com.google.common.io.Files;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyArray;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
Expand All @@ -34,10 +36,11 @@
import org.logstash.config.ir.graph.Vertex;
import org.logstash.config.ir.graph.algorithms.GraphDiff;

import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.Callable;

import static org.logstash.config.ir.DSL.*;
Expand Down Expand Up @@ -195,4 +198,32 @@ public static RubyArray toSourceWithMetadata(String config) throws IncompleteSou
return RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, config)));
}

/**
* Load pipeline configuration from a path returning the list of SourceWithMetadata.
*
* The path refers to test's resources, if it point to single file that file is loaded, if reference a directory
* then the full list of contained files is loaded in name order.
* */
@SuppressWarnings("rawtypes")
public static RubyArray toSourceWithMetadataFromPath(String configPath) throws IncompleteSourceWithMetadataException, IOException {
URL url = IRHelpers.class.getClassLoader().getResource(configPath);
String path = url.getPath();
final File filePath = new File(path);
final List<File> files;
if (filePath.isDirectory()) {
files = Arrays.asList(filePath.listFiles());
Collections.sort(files);
} else {
files = Collections.singletonList(filePath);
}

List<IRubyObject> rubySwms = new ArrayList<>();
for (File configFile : files) {
final List<String> fileContent = Files.readLines(configFile, Charset.defaultCharset());
final SourceWithMetadata swm = new SourceWithMetadata("file", configFile.getPath(), 1, 1, String.join("\n", fileContent));
rubySwms.add(JavaUtil.convertJavaToRuby(RubyUtil.RUBY, swm));
}
return RubyUtil.RUBY.newArray(rubySwms);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
input {
stdin {}
}

filter {
mutate {
id => "ppl1_1"
rename => ["a_field", "into_another"]
}

mutate {
id => "ppl1_2"
rename => ["a_field", "into_another"]
}
}

output {
stdout {}
}
Loading

0 comments on commit ffac2df

Please sign in to comment.