Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

caching-resolvers: add cache reloader and reloading service #48

Merged
merged 6 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 0.0.2 (UNRELEASED)
- Adds proactive reloaders for both datastream-to-pipeline-name mappings and pipeline definitions to ensure upstream changes are made available without impacting processing [#48](https://github.com/elastic/logstash-filter-elastic_integration/pull/48)
- Presents helpful guidance when run on an unsupported version of Java [#43](https://github.com/elastic/logstash-filter-elastic_integration/pull/43)

## 0.0.1
Expand Down
18 changes: 18 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,24 @@ Changes to the IngestDocument will be reflected in the resulting Logstash Event,
| _everything else_ | _in-place, as-structured_ | only minimal type conversions are done
|=======================================================================

[id="plugins-{type}s-{plugin}-resolving"]
==== Resolving Pipeline Definitions

:cached-entry-ttl: 24 hours
:cache-reload-frequency: 1 minute

This plugin uses {es} to establish mappings of datastream names to pipeline names, and also to resolve pipeline names into their pipeline definitions.
It uses hit/miss caches to avoid querying Elasticsearch for every single event.
It also works to update these cached mappings _before_ they expire.
The result is that when {es} is responsive this plugin is able to pick up changes quickly without impacting its own performance, and it can survive periods of {es} issues without interruption by continuing to use potentially-stale mappings or definitions.

To achieve this, mappings are cached for a maximum of {cached-entry-ttl}, and cached values are reloaded every {cache-reload-frequency} with the following effect:

* when a reloaded mapping is non-empty and is the _same_ as its already-cached value, its time-to-live is reset to ensure that subsequent events can continue using the confirmed-unchanged value
* when a reloaded mapping is non-empty and is _different_ from its previously-cached value, the entry is _updated_ so that subsequent events will use the new value
* when a reloaded mapping is newly _empty_, the previous non-empty mapping is _replaced_ with a new empty entry so that subsequent events will use the empty value
* when the reload of a mapping _fails_, this plugin emits a log warning but the existing cache entry is unchanged and gets closer to its expiry.

[id="plugins-{type}s-{plugin}-options"]
==== {elastic-integration-name} Filter Configuration Options

Expand Down
5 changes: 4 additions & 1 deletion lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def initialize(*a, &b)
extend EventApiBridge

super

java_import('co.elastic.logstash.filters.elasticintegration.util.PluginContext')
@plugin_context = PluginContext.new(execution_context&.pipeline_id || "UNDEF", id)
end

def register
Expand Down Expand Up @@ -340,7 +343,7 @@ def initialize_event_processor!
@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client)
.setFilterMatchListener(method(:filter_matched_java).to_proc)
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
.build("logstash.filter.elastic_integration.#{id}.#{__id__}")
.build(@plugin_context)
rescue => exception
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import co.elastic.logstash.api.Event;
import co.elastic.logstash.filters.elasticintegration.resolver.AbstractSimpleCacheableResolver;
import co.elastic.logstash.filters.elasticintegration.resolver.CacheReloader;
import co.elastic.logstash.filters.elasticintegration.resolver.CachingResolver;
import co.elastic.logstash.filters.elasticintegration.resolver.Resolver;
import co.elastic.logstash.filters.elasticintegration.resolver.ResolverCache;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -20,7 +22,6 @@
import org.elasticsearch.client.RestClient;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -56,6 +57,14 @@ public Optional<String> resolve(final Event event, final Consumer<Exception> exc
.flatMap((datastreamName) -> datastreamToPipelineNameResolver.resolve(datastreamName, exceptionHandler));
}

@Override
public Optional<CacheReloader> innerCacheReloader() {
if (CachingResolver.class.isAssignableFrom(this.datastreamToPipelineNameResolver.getClass())) {
return Optional.of(((CachingResolver<String, String>) this.datastreamToPipelineNameResolver).getReloader());
}
return Optional.empty();
}

private Optional<String> resolveDatastreamName(Event event) {
final String namespace = safeExtractString(event, DATASTREAM_NAMESPACE_FIELD_REFERENCE);
if (Objects.isNull(namespace)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.filters.elasticintegration.ingest.SetSecurityUserProcessor;
import co.elastic.logstash.filters.elasticintegration.ingest.SingleProcessorIngestPlugin;
import co.elastic.logstash.filters.elasticintegration.resolver.CacheReloadService;
import co.elastic.logstash.filters.elasticintegration.resolver.SimpleResolverCache;
import co.elastic.logstash.filters.elasticintegration.resolver.ResolverCache;
import co.elastic.logstash.filters.elasticintegration.util.PluginContext;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
Expand All @@ -34,23 +39,25 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedRateSchedule;

@SuppressWarnings("UnusedReturnValue")
public class EventProcessorBuilder {

static final Duration CACHE_MAXIMUM_AGE = Duration.ofHours(24);
static final Duration CACHE_RELOAD_FREQUENCY = Duration.ofSeconds(60);

private static <K,V> Supplier<ResolverCache<K,V>> defaultCacheSupplier(final String description) {
return () -> new SimpleResolverCache<>(description, SimpleResolverCache.Configuration.PERMANENT);
}
Expand All @@ -59,9 +66,10 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
final EventProcessorBuilder builder = new EventProcessorBuilder();

builder.setEventPipelineNameResolver(new DatastreamEventToPipelineNameResolver(elasticsearchRestClient, new SimpleResolverCache<>("datastream-to-pipeline",
new SimpleResolverCache.Configuration(Duration.ofMinutes(60), Duration.ofSeconds(60)))));
new SimpleResolverCache.Configuration(CACHE_MAXIMUM_AGE, CACHE_MAXIMUM_AGE))));

builder.setPipelineConfigurationResolver(new ElasticsearchPipelineConfigurationResolver(elasticsearchRestClient));
builder.setPipelineResolverCacheConfig(Duration.ofMinutes(60), Duration.ofSeconds(60));
builder.setPipelineResolverCacheConfig(CACHE_MAXIMUM_AGE, CACHE_MAXIMUM_AGE);
return builder;
}

Expand Down Expand Up @@ -126,21 +134,17 @@ public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier<Inges
return this;
}

EventProcessor build(final String nodeName) {
final Settings defaultSettings = Settings.builder()
synchronized EventProcessor build(final PluginContext pluginContext) {
Objects.requireNonNull(this.pipelineConfigurationResolver, "pipeline configuration resolver is REQUIRED");
Objects.requireNonNull(this.eventToPipelineNameResolver, "event to pipeline name resolver is REQUIRED");

final Settings settings = Settings.builder()
.put("path.home", "/")
.put("node.name", nodeName)
.put("node.name", "logstash.filter.elastic_integration." + pluginContext.pluginId())
.put("ingest.grok.watchdog.interval", "1s")
.put("ingest.grok.watchdog.max_execution_time", "1s")
.build();

return build(defaultSettings);
}

synchronized EventProcessor build(final Settings settings) {
Objects.requireNonNull(this.pipelineConfigurationResolver, "pipeline configuration resolver is REQUIRED");
Objects.requireNonNull(this.eventToPipelineNameResolver, "event to pipeline name resolver is REQUIRED");

final List<Closeable> resourcesToClose = new ArrayList<>();

try {
Expand Down Expand Up @@ -175,12 +179,27 @@ synchronized EventProcessor build(final Settings settings) {
final ResolverCache<String, IngestPipeline> ingestPipelineCache = Optional.ofNullable(pipelineResolverCacheSupplier)
.orElse(defaultCacheSupplier("ingest-pipeline"))
.get();

final SimpleCachingIngestPipelineResolver cachingInternalPipelineResolver =
new SimpleIngestPipelineResolver(this.pipelineConfigurationResolver, ingestPipelineFactory).withCache(ingestPipelineCache);

final FilterMatchListener filterMatchListener = Objects.requireNonNullElse(this.filterMatchListener, (event) -> {});

// start reload services for our resolvers
final ArrayList<Service> services = new ArrayList<>();
eventToPipelineNameResolver.innerCacheReloader().ifPresent(cacheReloader -> {
final AbstractScheduledService.Scheduler pipelineNameReloadSchedule = newFixedRateSchedule(CACHE_RELOAD_FREQUENCY, CACHE_RELOAD_FREQUENCY);
services.add(CacheReloadService.newManaged(pluginContext, cacheReloader, pipelineNameReloadSchedule));
});
final AbstractScheduledService.Scheduler pipelineDefinitionReloadSchedule = newFixedRateSchedule(CACHE_RELOAD_FREQUENCY, CACHE_RELOAD_FREQUENCY);
services.add(CacheReloadService.newManaged(pluginContext, cachingInternalPipelineResolver.getReloader(), pipelineDefinitionReloadSchedule));

final ServiceManager serviceManager = new ServiceManager(services);
serviceManager.startAsync();
resourcesToClose.add(() -> {
serviceManager.stopAsync();
serviceManager.awaitStopped();
});

return new EventProcessor(filterMatchListener, cachingInternalPipelineResolver, eventToPipelineNameResolver, resourcesToClose);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(resourcesToClose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package co.elastic.logstash.filters.elasticintegration;

import co.elastic.logstash.api.Event;
import co.elastic.logstash.filters.elasticintegration.resolver.CacheReloader;
import co.elastic.logstash.filters.elasticintegration.resolver.UncacheableResolver;

import java.util.Optional;
Expand All @@ -24,4 +25,6 @@
public interface EventToPipelineNameResolver extends UncacheableResolver<Event, String> {
@Override
Optional<String> resolve(Event event, Consumer<Exception> exceptionHandler);

default Optional<CacheReloader> innerCacheReloader() { return Optional.empty(); };
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ public class IngestPipeline {
private final PipelineConfiguration pipelineConfiguration;
private final Pipeline innerPipeline;

public IngestPipeline(final PipelineConfiguration pipelineConfiguration,
final Pipeline innerPipeline) {
/**
* @see IngestPipelineFactory#create(PipelineConfiguration)
*
* @param pipelineConfiguration the source ingest pipeline configuration
* @param innerPipeline an instantiated ingest pipeline
*/
IngestPipeline(final PipelineConfiguration pipelineConfiguration,
final Pipeline innerPipeline) {
this.pipelineConfiguration = pipelineConfiguration;
this.innerPipeline = innerPipeline;
}
Expand Down Expand Up @@ -54,4 +60,12 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(pipelineConfiguration);
}

@Override
public String toString() {
return "IngestPipeline{" +
"id=" + getId() +
"pipelineConfiguration=" + pipelineConfiguration +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V.
* under one or more contributor license agreements. Licensed under the
* Elastic License 2.0; you may not use this file except in compliance
* with the Elastic License 2.0.
*/
package co.elastic.logstash.filters.elasticintegration.resolver;

import co.elastic.logstash.filters.elasticintegration.util.PluginContext;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;

import java.util.concurrent.ScheduledExecutorService;

/**
* A {@link CacheReloadService} is a service for scheduled reloading of resolver caches via {@link CacheReloader}.
*/
public class CacheReloadService extends AbstractScheduledService {

/**
* Creates a new cache reload service, wholly managing the lifecycle of the internal
* scheduled executor service to ensure that it is shut down when this service is terminated
* or transitions into a failed state.
*
* @param pluginContext
* @param reloader
* @param scheduler
* @return
*/
public static CacheReloadService newManaged(final PluginContext pluginContext,
final CacheReloader reloader,
final Scheduler scheduler) {
final String threadPurpose = String.format("cache-reloader(%s)", reloader.type());
final ScheduledExecutorService executor = pluginContext.newSingleThreadScheduledExecutor(threadPurpose);

final CacheReloadService cacheReloadService = new CacheReloadService(reloader, executor, scheduler);
cacheReloadService.addListener(new Service.Listener() {
public void terminated(Service.State from) {
executor.shutdown();
}

public void failed(Service.State from, Throwable failure) {
executor.shutdown();
}
}, MoreExecutors.directExecutor());

return cacheReloadService;
}

final CacheReloader reloader;
final ScheduledExecutorService executor;

final Scheduler scheduler;

private CacheReloadService(CacheReloader reloader,
ScheduledExecutorService executor,
Scheduler scheduler) {
this.reloader = reloader;
this.executor = executor;
this.scheduler = scheduler;
}

@Override
protected void runOneIteration() throws Exception {
reloader.reloadOnce();
}

@Override
protected Scheduler scheduler() {
return scheduler;
}

@Override
protected ScheduledExecutorService executor() {
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V.
* under one or more contributor license agreements. Licensed under the
* Elastic License 2.0; you may not use this file except in compliance
* with the Elastic License 2.0.
*/
package co.elastic.logstash.filters.elasticintegration.resolver;

public interface CacheReloader {
String type();

void reloadOnce();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
* @param <V> the type of the resolved value
*/
public interface CachingResolver<K, V> extends Resolver<K, V> {
CacheReloader getReloader();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package co.elastic.logstash.filters.elasticintegration.resolver;

import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ Optional<V> resolve(K resolveKey,
void flush();

Set<K> keys();

// API: internal
void reload(final K resolveKey, CacheableResolver.Ephemeral<K, V> resolver);

CacheReloader getReloader(CacheableResolver.Ephemeral<K,V> resolver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public Optional<V> resolve(final K resolveKey,
return cache.resolve(resolveKey, cacheMissResolver, exceptionHandler);
}

@Override
public CacheReloader getReloader() {
return cache.getReloader(cacheMissResolver);
}

@FunctionalInterface
public interface Bindable<K,V> {
CacheableResolver.Ephemeral<K,V> withCachingResolverBinding(final SimpleCachingResolver<K,V> cachingResolver);
Expand Down
Loading