Skip to content

Refactor ForEachProcessor to use iteration instead of recursion #51104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -50,19 +49,16 @@
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {

public static final String TYPE = "foreach";
static final int MAX_RECURSE_PER_THREAD = 10;

private final String field;
private final Processor processor;
private final boolean ignoreMissing;
private final Consumer<Runnable> genericExecutor;

ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
this.genericExecutor = genericExecutor;
}

boolean isIgnoreMissing() {
Expand All @@ -79,41 +75,35 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
}
} else {
List<Object> newValues = new CopyOnWriteArrayList<>();
innerExecute(0, values, newValues, ingestDocument, handler);
innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler);
}
}

void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
for (; index < values.size(); index++) {
AtomicBoolean shouldContinueHere = new AtomicBoolean();
Copy link
Member

@martijnvg martijnvg Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach. In the case of inner processor going async; whichever thread sets this atomic boolean last should continue with the next value.

And in the case inner processor doesn't go async then on the second getAndSet(...) the current thread continues with the next value.

I do wonder if this variable can be renamed to describe this beter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about this scenario:

  1. The current thread sets shouldContinueHere first.
  2. Next value is handled and also manages to set shouldContinueHere first.
  3. Both async call manage to return and update newValues simultaneously.

Is this scenario possible? And if so should have protection against this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If current thread hits shouldContinueHere first, then it sees false and ends the loop (returns).
If it hits it second that means it's after async call already finished and other thread updated newValues already.
As you mentioned in first comment only thread that checks shouldContinueHere last actually proceeds
Does it answer your question or did I misunderstand the scenario?

Copy link
Member

@martijnvg martijnvg Jan 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. I misinterpreted the if statement on line 98 when looking at it this morning.
It is not possible that multiple values from the list are handled concurrently.

Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
int nextIndex = index + 1;
processor.execute(document, (result, e) -> {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
if (e != null || result == null) {
handler.accept(result, e);
} else if (shouldContinueHere.getAndSet(true)) {
innerExecute(nextIndex, values, newValues, document, handler);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case recursion does happen, but we're not risking a SO here, because it always a different thread would get here.

}
});

if (shouldContinueHere.getAndSet(true) == false) {
return;
}
}

if (index == values.size()) {
document.setFieldValue(field, new ArrayList<>(newValues));
handler.accept(document, null);
return;
}

Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
final Thread thread = Thread.currentThread();
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
handler.accept(null, e);
} else if (result == null) {
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
// we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
// only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
// or we have recursed less then 10 times with the same thread, it's safe to recurse
innerExecute(index + 1, values, newValues, document, handler);
}
}
});
}

@Override
Expand All @@ -137,11 +127,9 @@ public Processor getInnerProcessor() {
public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;
private final Consumer<Runnable> genericExecutor;

Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
Factory(ScriptService scriptService) {
this.scriptService = scriptService;
this.genericExecutor = genericExecutor;
}

@Override
Expand All @@ -157,7 +145,7 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()),
entry(GsubProcessor.TYPE, new GsubProcessor.Factory()),
entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)),
entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)),
entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)),
entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -59,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -77,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -90,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -103,15 +103,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down
Loading