Skip to content

Commit 228030c

Browse files
authored
Simplify Pipeline class Hierarchy (elastic#14551)
* refactor: pull members up from JavaBasePipelineExt to AbstractPipelineExt * refactor: make `LogStash::JavaPipeline` inherit directly from `AbstractPipeline`
1 parent cd03c86 commit 228030c

File tree

7 files changed

+118
-175
lines changed

7 files changed

+118
-175
lines changed

logstash-core/lib/logstash/java_pipeline.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
require "logstash/config/lir_serializer"
2727
require "logstash/worker_loop_thread"
2828

29-
module LogStash; class JavaPipeline < JavaBasePipeline
29+
module LogStash; class JavaPipeline < AbstractPipeline
3030
include LogStash::Util::Loggable
3131

3232
java_import org.apache.logging.log4j.ThreadContext

logstash-core/lib/logstash/pipeline_action/reload.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def execute(agent, pipelines_registry)
4848
end
4949

5050
begin
51-
pipeline_validator = LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
51+
pipeline_validator = LogStash::AbstractPipeline.new(@pipeline_config, nil, logger, nil)
5252
rescue => e
5353
return LogStash::ConvergeResult::FailedAction.from_exception(e)
5454
end

logstash-core/src/main/java/org/logstash/RubyUtil.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.logstash.execution.ConvergeResultExt;
4848
import org.logstash.execution.EventDispatcherExt;
4949
import org.logstash.execution.ExecutionContextExt;
50-
import org.logstash.execution.JavaBasePipelineExt;
5150
import org.logstash.execution.PipelineReporterExt;
5251
import org.logstash.execution.QueueReadClientBase;
5352
import org.logstash.execution.ShutdownWatcherExt;
@@ -239,8 +238,6 @@ public final class RubyUtil {
239238

240239
public static final RubyClass ABSTRACT_PIPELINE_CLASS;
241240

242-
public static final RubyClass JAVA_PIPELINE_CLASS;
243-
244241
/**
245242
* Logstash Ruby Module.
246243
*/
@@ -488,9 +485,6 @@ public final class RubyUtil {
488485
LOGGABLE_MODULE.defineAnnotatedMethods(LoggableExt.class);
489486
ABSTRACT_PIPELINE_CLASS =
490487
setupLogstashClass(AbstractPipelineExt::new, AbstractPipelineExt.class);
491-
JAVA_PIPELINE_CLASS = setupLogstashClass(
492-
ABSTRACT_PIPELINE_CLASS, JavaBasePipelineExt::new, JavaBasePipelineExt.class
493-
);
494488
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
495489
final RubyClass stdErr = RUBY.getStandardError();
496490
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(

logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.jruby.runtime.ThreadContext;
3030
import org.jruby.runtime.builtin.IRubyObject;
3131
import org.logstash.RubyUtil;
32-
import org.logstash.execution.JavaBasePipelineExt;
32+
import org.logstash.execution.AbstractPipelineExt;
3333
import org.logstash.execution.queue.QueueWriter;
3434
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
3535
import org.logstash.instrument.metrics.MetricKeys;
@@ -45,7 +45,7 @@ public class JavaInputDelegatorExt extends RubyObject {
4545

4646
private AbstractNamespacedMetricExt metric;
4747

48-
private JavaBasePipelineExt pipeline;
48+
private AbstractPipelineExt pipeline;
4949

5050
private transient Input input;
5151

@@ -55,7 +55,7 @@ public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) {
5555
super(runtime, metaClass);
5656
}
5757

58-
public static JavaInputDelegatorExt create(final JavaBasePipelineExt pipeline,
58+
public static JavaInputDelegatorExt create(final AbstractPipelineExt pipeline,
5959
final AbstractNamespacedMetricExt metric, final Input input,
6060
final Map<String, Object> pluginArgs) {
6161
final JavaInputDelegatorExt instance =

logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
import java.time.Duration;
3232
import java.util.ArrayList;
3333
import java.util.Arrays;
34+
import java.util.Collection;
3435
import java.util.List;
3536
import java.util.UUID;
3637
import java.util.regex.Matcher;
3738
import java.util.regex.Pattern;
39+
import java.util.stream.Stream;
3840

3941
import com.google.common.annotations.VisibleForTesting;
4042
import org.apache.commons.codec.binary.Hex;
@@ -43,6 +45,7 @@
4345
import org.jruby.Ruby;
4446
import org.jruby.RubyArray;
4547
import org.jruby.RubyBasicObject;
48+
import org.jruby.RubyBoolean;
4649
import org.jruby.RubyClass;
4750
import org.jruby.RubyString;
4851
import org.jruby.RubySymbol;
@@ -58,13 +61,16 @@
5861
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
5962
import org.logstash.common.DeadLetterQueueFactory;
6063
import org.logstash.common.EnvironmentVariableProvider;
64+
import org.logstash.common.IncompleteSourceWithMetadataException;
6165
import org.logstash.common.SourceWithMetadata;
6266
import org.logstash.common.io.DeadLetterQueueWriter;
6367
import org.logstash.common.io.QueueStorageType;
68+
import org.logstash.config.ir.CompiledPipeline;
6469
import org.logstash.config.ir.ConfigCompiler;
6570
import org.logstash.config.ir.InvalidIRException;
6671
import org.logstash.config.ir.PipelineConfig;
6772
import org.logstash.config.ir.PipelineIR;
73+
import org.logstash.execution.queue.QueueWriter;
6874
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
6975
import org.logstash.ext.JRubyWrappedWriteClientExt;
7076
import org.logstash.instrument.metrics.AbstractMetricExt;
@@ -75,6 +81,9 @@
7581
import org.logstash.instrument.metrics.UptimeMetric;
7682
import org.logstash.instrument.metrics.counter.LongCounter;
7783
import org.logstash.plugins.ConfigVariableExpander;
84+
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
85+
import org.logstash.plugins.factory.PluginFactoryExt;
86+
import org.logstash.plugins.factory.PluginMetricsFactoryExt;
7887
import org.logstash.secret.store.SecretStore;
7988
import org.logstash.secret.store.SecretStoreExt;
8089

@@ -83,7 +92,10 @@
8392
import static org.logstash.instrument.metrics.UptimeMetric.ScaleUnits.SECONDS;
8493

8594
/**
86-
* JRuby extension to provide ancestor class for Ruby's Pipeline and JavaPipeline classes.
95+
* JRuby extension to provide ancestor class for the ruby-defined {@code LogStash::JavaPipeline} class.
96+
*
97+
* <p>NOTE: Although this class' name implies that it is "abstract", we instantiated it directly
98+
* as a lightweight temporary-scoped pipeline in the ruby-defined {@code LogStash::PipelineAction::Reload}
8799
* */
88100
@JRubyClass(name = "AbstractPipeline")
89101
public class AbstractPipelineExt extends RubyBasicObject {
@@ -104,6 +116,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
104116

105117
@SuppressWarnings("serial")
106118
protected PipelineIR lir;
119+
private transient CompiledPipeline lirExecution;
107120

108121
private final RubyString ephemeralId = RubyUtil.RUBY.newString(UUID.randomUUID().toString());
109122

@@ -135,13 +148,46 @@ public class AbstractPipelineExt extends RubyBasicObject {
135148
private QueueReadClientBase filterQueueClient;
136149

137150
private ArrayList<FlowMetric> flowMetrics = new ArrayList<>();
151+
private @SuppressWarnings("rawtypes") RubyArray inputs;
152+
private @SuppressWarnings("rawtypes") RubyArray filters;
153+
private @SuppressWarnings("rawtypes") RubyArray outputs;
138154

139155
public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
140156
super(runtime, metaClass);
141157
}
142158

159+
@JRubyMethod(required = 4)
160+
public AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject[] args)
161+
throws IncompleteSourceWithMetadataException, NoSuchAlgorithmException {
162+
initialize(context, args[0], args[1], args[2]);
163+
lirExecution = new CompiledPipeline(
164+
lir,
165+
new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init(
166+
lir,
167+
new PluginMetricsFactoryExt(
168+
context.runtime, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS
169+
).initialize(context, pipelineId(), metric()),
170+
new ExecutionContextFactoryExt(
171+
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
172+
).initialize(context, args[3], this, dlqWriter(context)),
173+
RubyUtil.FILTER_DELEGATOR_CLASS
174+
),
175+
getSecretStore(context)
176+
);
177+
inputs = RubyArray.newArray(context.runtime, lirExecution.inputs());
178+
filters = RubyArray.newArray(context.runtime, lirExecution.filters());
179+
outputs = RubyArray.newArray(context.runtime, lirExecution.outputs());
180+
if (getSetting(context, "config.debug").isTrue() && LOGGER.isDebugEnabled()) {
181+
LOGGER.debug(
182+
"Compiled pipeline code for pipeline {} : {}", pipelineId(),
183+
lir.getGraph().toString()
184+
);
185+
}
186+
return this;
187+
}
188+
143189
@JRubyMethod
144-
public final AbstractPipelineExt initialize(final ThreadContext context,
190+
private AbstractPipelineExt initialize(final ThreadContext context,
145191
final IRubyObject pipelineConfig, final IRubyObject namespacedMetric,
146192
final IRubyObject rubyLogger)
147193
throws NoSuchAlgorithmException {
@@ -269,6 +315,11 @@ public final IRubyObject lir(final ThreadContext context) {
269315
return JavaUtil.convertJavaToUsableRubyObject(context.runtime, lir);
270316
}
271317

318+
@JRubyMethod(name = "lir_execution")
319+
public IRubyObject lirExecution(final ThreadContext context) {
320+
return JavaUtil.convertJavaToUsableRubyObject(context.runtime, lirExecution);
321+
}
322+
272323
@JRubyMethod(name = "dlq_writer")
273324
public final IRubyObject dlqWriter(final ThreadContext context) {
274325
if (dlqWriter == null) {
@@ -375,6 +426,29 @@ public final IRubyObject isConfiguredReloadable(final ThreadContext context) {
375426
return getSetting(context, "pipeline.reloadable");
376427
}
377428

429+
@JRubyMethod(name = "reloadable?")
430+
public RubyBoolean isReloadable(final ThreadContext context) {
431+
return isConfiguredReloadable(context).isTrue() && reloadablePlugins(context).isTrue()
432+
? context.tru : context.fals;
433+
}
434+
435+
@JRubyMethod(name = "reloadable_plugins?")
436+
public RubyBoolean reloadablePlugins(final ThreadContext context) {
437+
return nonReloadablePlugins(context).isEmpty() ? context.tru : context.fals;
438+
}
439+
440+
@SuppressWarnings({"unchecked", "rawtypes"})
441+
@JRubyMethod(name = "non_reloadable_plugins")
442+
public RubyArray nonReloadablePlugins(final ThreadContext context) {
443+
final RubyArray result = RubyArray.newArray(context.runtime);
444+
Stream.of(inputs, outputs, filters).flatMap(
445+
plugins -> ((Collection<IRubyObject>) plugins).stream()
446+
).filter(
447+
plugin -> !plugin.callMethod(context, "reloadable?").isTrue()
448+
).forEach(result::add);
449+
return result;
450+
}
451+
378452
@JRubyMethod(name = "collect_stats")
379453
public final IRubyObject collectStats(final ThreadContext context) throws IOException {
380454
final AbstractNamespacedMetricExt pipelineMetric =
@@ -536,6 +610,17 @@ public final JRubyWrappedWriteClientExt wrappedWriteClient(final ThreadContext c
536610
.initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId);
537611
}
538612

613+
public QueueWriter getQueueWriter(final String inputName) {
614+
return new JRubyWrappedWriteClientExt(RubyUtil.RUBY, RubyUtil.WRAPPED_WRITE_CLIENT_CLASS)
615+
.initialize(
616+
RubyUtil.RUBY.getCurrentContext(),
617+
new IRubyObject[]{
618+
inputQueueClient(), pipelineId().convertToString().intern(),
619+
metric(), RubyUtil.RUBY.newSymbol(inputName)
620+
}
621+
);
622+
}
623+
539624
@JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED)
540625
@SuppressWarnings("rawtypes")
541626
public RubyArray getPipelineSourceDetails(final ThreadContext context) {
@@ -589,4 +674,28 @@ private AbstractNamespacedMetricExt getDlqMetric(final ThreadContext context) {
589674
}
590675
return dlqMetric;
591676
}
677+
678+
@JRubyMethod
679+
@SuppressWarnings("rawtypes")
680+
public RubyArray inputs() {
681+
return inputs;
682+
}
683+
684+
@JRubyMethod
685+
@SuppressWarnings("rawtypes")
686+
public RubyArray filters() {
687+
return filters;
688+
}
689+
690+
@JRubyMethod
691+
@SuppressWarnings("rawtypes")
692+
public RubyArray outputs() {
693+
return outputs;
694+
}
695+
696+
@JRubyMethod(name = "shutdown_requested?")
697+
public IRubyObject isShutdownRequested(final ThreadContext context) {
698+
// shutdown_requested? MUST be implemented in the concrete implementation of this class.
699+
throw new IllegalStateException("Pipeline implementation does not provide `shutdown_requested?`, which is a Logstash internal error.");
700+
}
592701
}

0 commit comments

Comments
 (0)