This module allows simplifying error handling with Apache Beam Java.
Asgarde | Beam |
---|---|
0.10.0 | 2.31.0 |
0.11.0 | 2.32.0 |
0.12.0 | 2.33.0 |
0.13.0 | 2.34.0 |
0.14.0 | 2.35.0 |
0.15.0 | 2.36.0 |
0.16.0 | 2.37.0 |
0.17.0 | 2.38.0 |
0.18.0 | 2.39.0 |
0.19.0 | 2.40.0 |
0.20.0 | 2.41.0 |
0.21.0 | 2.42.0 |
0.22.0 | 2.43.0 |
0.23.0 | 2.44.0 |
0.24.0 | 2.45.0 |
0.25.0 | 2.46.0 |
0.26.0 | 2.47.0 |
0.27.0 | 2.48.0 |
0.28.0 | 2.49.0 |
0.29.0 | 2.50.0 |
0.30.0 | 2.51.0 |
0.31.0 | 2.52.0 |
0.32.0 | 2.53.0 |
0.33.0 | 2.54.0 |
0.34.0 | 2.55.0 |
0.35.0 | 2.56.0 |
0.36.0 | 2.57.0 |
The project is hosted on Maven repository.
You can install it with all the build tools compatibles with Maven.
Example with Maven and Gradle :
<dependency>
<groupId>fr.groupbees</groupId>
<artifactId>asgarde</artifactId>
<version>0.36.0</version>
</dependency>
implementation group: 'fr.groupbees', name: 'asgarde', version: '0.36.0'
Beam recommends treating errors with Dead letters.
It means catching errors in the flow and, using side outputs, sinking errors to a file, database or any other output...
Beam suggests handling side outputs with TupleTags
in a DoFn
class, example :
// Failure object.
public class Failure implements Serializable {
private final String pipelineStep;
private final Integer inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final T element,
final Throwable exception) {
return new Failure(pipelineStep, element.toString(), exception);
}
}
// Word count DoFn class.
public class WordCountFn extends DoFn<String, Integer> {
private final TupleTag<Integer> outputTag = new TupleTag<Integer>() {};
private final TupleTag<Failure> failuresTag = new TupleTag<Failure>() {};
@ProcessElement
public void processElement(ProcessContext ctx) {
try {
// Could throw ArithmeticException.
final String word = ctx.element();
ctx.output(1 / word.length());
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}
public TupleTag<Integer> getOutputTag() {
return outputTag;
}
public TupleTag<Failure> getFailuresTag() {
return failuresTag;
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
final WordCountFn wordCountFn = new WordCountFn();
final PCollectionTuple tuple = wordPCollection
.apply("ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
// Output PCollection via outputTag.
PCollection<Integer> outputCollection = tuple.get(wordCountFn.getOutputTag());
// Failures PCollection via failuresTag.
PCollection<Failure> failuresCollection = tuple.get(wordCountFn.getFailuresTag());
With this approach we can, in all steps, get the output and failures result PCollections.
Beam also allows handling errors with built-in components like MapElements
and FlatMapElements
(it's currently an experimental feature as of april of 2020).
Behind the scene, in these classes Beam use the same concept explained above.
Example:
public class Failure implements Serializable {
private final String pipelineStep;
private final String inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final WithFailures.ExceptionElement<T> exceptionElement) {
final T inputElement = exceptionElement.element();
return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<Integer>, Failure> result = wordPCollection
.apply("Map", MapElements
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length()) // Could throw ArithmeticException
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
);
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
The logic is the same for FlatMapElements :
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<String>, Failure>> result = wordPCollection
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
)
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
In a usual Beam pipeline flow, steps are chained fluently:
final PCollection<Integer> outputPCollection = inputPCollection
.apply("Map", MapElements .into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("Map ParDo", ParDo.of(new WordCountFn()));
Here's the same flow with error handling in each step:
WithFailures.Result<PCollection<String>, Failure> result1 = input
.apply("Map", MapElements
.into(TypeDescriptors.strings())
.via((String word) -> word + "Test")
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output1 = result1.output();
final PCollection<Failure> failure1 = result1.failures();
WithFailures.Result<PCollection<String>, Failure> result2 = output1
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output2 = result1.output();
final PCollection<Failure> failure2 = result1.failures();
final PCollectionTuple result3 = output2
.apply("Map ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
final PCollection<Integer> output3 = result3.get(wordCountFn.getOutputTag());
final PCollection<Failure> failure3 = result3.get(wordCountFn.getFailuresTag());
final PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());
Problems with this approach:
- We loose the native fluent style on apply chains, because we have to handle output and error for each step.
- For
MapElements
andFlatMapElements
we have to always addexceptionsInto
andexceptionsVia
(can be centralized). - For each custom DoFn, we have to duplicate the code of
TupleTag
logic and the try catch block (can be centralized). - The code is verbose.
- There is no centralized code to concat all the errors, we have to concat all failures (can be centralized).
Here's the same flow with error handling, but using this library instead:
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
.getResult();
Some explanations:
- The
CollectionComposer
class allows to centralize all the error logic, fluently compose the applies and concat all the failures occurring in the flow. - For
MapElements
andFlatMapElements
, behind the scene, theapply
method addsexceptionsInto
andexceptionsVia
onFailure
object. We can also explicitely useexceptionsInto
andexceptionsVia
if needed, if you have some custom logic based onFailure
object. - The
MapElementFn
class is a customDoFn
class internally wraps the shared logic forDoFn
like try/catch block and Tuple tags. We will detail concepts in the next sections.
- Wrap all error handling logic in a composer class.
- Wrap
exceptionsInto
andexceptionsVia
usage in the native Beam classesMapElements
andFlatMapElements
. - Keep the fluent style natively proposed by Beam in
apply
methods while checking for failures and offer a less verbose way of handling errors. - Expose custom
DoFn
classes with centralized try/catch blocks (loan pattern) and Tuple tags. - Expose an easier access to the
@Setup
,@StartBundle
,@FinishBundle
,@Teardown
steps ofDoFn
classes. - Expose a way to handle errors in filtering logic (currently not available with Beam's
Filter.by
).
Some resources for Loan pattern :
https://dzone.com/articles/functional-programming-patterns-with-java-8
https://blog.knoldus.com/scalaknol-understanding-loan-pattern/
This class is the equivalent of a Beam MapElements
.
It must be created by the outputTypeDescriptor
and takes a SerializableFunction
on generic input and output (input and output types of the mapper).
This SerializableFunction
will be invoked lazily in the @ProcessElement
method and lifecycle of the DoFn
.
We can also give to this class actions related to DoFn
lifecycle :
setupAction
executed in the@Setup
methodstartBundleAction
executed in the@StartBundle
methodfinishBundleAction
executed in the@FinishBundle
methodteardownAction
executed in the@Teardown
method
This action is represented by a SerializableAction
:
@FunctionalInterface
public interface SerializableAction extends Serializable {
void execute();
}
The SerializableAction
is like a java.lang.Runnable
that has to implement Serializable
.
When writing a DoFn
, Beam can infer the output type from DoFn<Input, Output>
and deduce the output type descriptor from it.
A default Coder
can be added for this descriptor.
When we write a generic DoFn
, Beam is unable to infer the output type and create the output descriptor,
that's why in our custom DoFn
classes we have to give the output descriptor in the into
method.
Usage example:
final PCollection<Integer> outputMapElementFn = CollectionComposer.of(inputPCollection)
.apply("PaDo", MapElementFn
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker"))
.withTeardownAction(() -> LOGGER.info("End word count action in the worker")))
.getResult()
.output();
Behind the scene the CollectionComposer
class adds a ParDo
on this DoFn
and handles errors with tuple tags.
This class works exactly as MapElementFn
, but gives access to Beam's ProcessContext
object.
It must be created from the input type of DoFn
, which allows giving more information on the input,
because the SerializableFunction
is from the ProcessContext
and not from the Input
(doesn't bring the input type):
SerializableFunction<ProcessContext, Output>
This class can take a DoFn
lifecycle methods as the MapElementFn
:
setupAction
startBundleAction
finishBundleAction
teardownAction
and expects an output descriptor too.
Usage example:
final PCollection<Integer> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptors.integers())
.via(ctx -> 1 / ctx.element().length())
.withSetupAction(() -> LOGGER.info("Setup word count action in the worker"))
.withStartBundleAction(() -> LOGGER.info("Start bundle word count action in the worker"))
.withFinishBundleAction(() -> LOGGER.info("Finish bundle word count action in the worker"))
.withTeardownAction(() -> LOGGER.info("End word count action in the worker"))))
.getResult()
.output();
Sometimes we need access to Beam's ProcessContext
to get technical fields or handle side inputs.
For MapElementFn
and MapProcessContextFn
, we can give side inputs to the CollectionComposer
.
Here's an example using side inputs:
// Simulates a side input, from Beam pipeline.
final String wordDescription = "Word to describe Football teams";
final PCollectionView<String> sideInputs = pipeline
.apply("Create word description side input", Create.of(wordDescription))
.apply("Create as collection view", View.asSingleton());
final PCollection<WordStats> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptor.of(WordStats.class))
.via(ctx -> toWordStats(sideInputs, ctx))
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")),
Collections.singleton(sideInputs))
.getResult()
.output();
private WordStats toWordStats(final PCollectionView<String> sideInputs, final DoFn<String, WordStats>.ProcessContext context) {
final String word = context.element();
final Integer dividedWordCount = 1 / word.length();
// Gets the current timestamp in context.
final Instant timestamp = context.timestamp();
// Gets the side input value from PCollectionView and context.
final String wordDescription = context.sideInput(sideInputs);
return new WordStats(word, dividedWordCount, timestamp, wordDescription);
}
private static class WordStats implements Serializable {
private final String word;
private final Integer dividedWordCount;
private final Instant timestamp;
private final String wordDescription;
public WordStats(String word, Integer dividedWordCount, Instant timestamp, String wordDescription) {
this.word = word;
this.dividedWordCount = dividedWordCount;
this.timestamp = timestamp;
this.wordDescription = wordDescription;
}
}
Behind the scene the CollectionComposer
class adds the ParDo
on this DoFn
and handles errors with tuple tags.
Same principle as MapElementFn
but for flatMap
operator.
PCollection<Player> players =
FlatMapElementFn.into(TypeDescriptor.of(Player.class))
.via(team -> team.getPlayers())
.withSetupAction(() -> System.out.println("Starting of mapping...")
.withStartBundleAction(() -> System.out.println("Starting bundle of mapping...")
.withFinishBundleAction(() -> System.out.println("Ending bundle of mapping...")
.withTeardownAction(() -> System.out.println("Ending of mapping...")
Same principle as MapProcessContextFn
but for flatMap
operator.
PCollection<Player> players =
FlatMapProcessContextFn.from(Team.class)
.into(TypeDescriptor.of(Player.class))
.via((ProcessContext ctx) -> ctx.element().getPlayers())
.withSetupAction(() -> System.out.println("Starting of mapping...")
.withStartBundleAction(() -> System.out.println("Starting bundle of mapping...")
.withFinishBundleAction(() -> System.out.println("Ending bundle of mapping...")
.withTeardownAction(() -> System.out.println("Ending of mapping...")
This class is like the Filter
class exposed by Beam but with built-in error handling.
It's a generic DoFn
implementation just like MapElementFn
and MapProcessElementFn
.
Usage example:
final PCollection<String> resFilterFn = CollectionComposer.of(wordCollection)
.apply("FilterFn", FilterFn.by(word -> word.length() > 3))
.getResult()
.output();
It takes a predicate via:
final SerializableFunction<InputT, Boolean> predicate
Behind the scene, the CollectionComposer
takes the output from the previous PCollection
.
No need to pass any output descriptor in this case, because it's only a filtering operation on the same type.
Centralizes error handling in one place.
Create an instance from a PCollection.
CollectionComposer.of(wordPCollection)
- It accepts native
MapElements
andFlatMapElements
without addingexceptionsType
andexceptionsVia
:
final PCollection<Integer> resMapElements = CollectionComposer.of(input)
.apply("ParDo", MapElements.into(TypeDescriptors.integers()).via((String word) -> 1 / word.length()))
.getResult()
.output();
final PCollection<String> resFlatMapElements = CollectionComposer.of(input)
.apply("FlatMapElements", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.getResult()
.output();
- It accepts native
MapElements
andFlatMapElements
withexceptionsType
andexceptionsVia
(if externalexceptionsType
andexceptionsVia
are given, they have to be based on theFailure
object exposed by the library):
final PCollection<Integer> resMapElements2 = CollectionComposer.of(input)
.apply("MapElements", MapElements
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)))
.getResult()
.output();
final PCollection<String> resFlatMapElements2 = CollectionComposer.of(input)
.apply("FlatMapElements", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)))
.getResult()
.output();
The Failure
object exposed by the library, contains the inputElement
of the step in a String format (toString method), and the occurred exception.
For the input element used in transforms, developers can override toString method and implement the string representation of object.
A possible approach is to serialize the object to Json string via a framework like Jackson for better readability.
Beam already uses Jackson as an internal dependency.
Example with a Team
class used in a Beam Transform :
public static class Team implements Serializable {
...
@Override
public String toString() {
return JsonUtil.serialize(this);
}
}
public static <T> String serialize(final T obj) {
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new IllegalStateException("The serialization of object fails : " + obj);
}
}
In this case, we use Jackson to serialize the current object to Json String, in the toString method of the Team object.
The Failure class exposes factory methods to build from inputs:
public class Failure implements Serializable {
private final String pipelineStep;
private final String inputElement;
private final Throwable exception;
private Failure(String pipelineStep,
String inputElement,
Throwable exception) {
this.pipelineStep = pipelineStep;
this.inputElement = inputElement;
this.exception = exception;
}
public static <T> Failure from(final String pipelineStep,
final WithFailures.ExceptionElement<T> exceptionElement) {
final T inputElement = exceptionElement.element();
return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
}
public static <T> Failure from(final String pipelineStep,
final T element,
final Throwable exception) {
return new Failure(pipelineStep, element.toString(), exception);
}
}
- It accepts
MapElementFn
,MapProcessElementFn
,MapElementFn
,MapProcessElementFn
,FilterFn
and customDoFn
s:
// Map element Fn.
final PCollection<Integer> resMapElementFn = CollectionComposer.of(input)
.apply("PaDo", MapElementFn
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")))
.getResult()
.output();
// Map process context Fn without side inputs.
final PCollection<Integer> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptors.integers())
.via(ctx -> 1 / ctx.element().length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")))
.getResult()
.output();
// Map process context Fn with side inputs.
final String wordDescription = "Word to describe Football teams";
final PCollectionView<String> sideInputs = input.getPipeline()
.apply("String side input", Create.of(wordDescription))
.apply("Create as collection view", View.asSingleton());
final PCollection<WordStats> resMapProcessElementFn2 = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptor.of(WordStats.class))
.via(ctx -> toWordStats(sideInputs, ctx))
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")),
Collections.singleton(sideInputs))
.getResult()
.output();
// Filter Fn.
final PCollection<String> resFilterFn = CollectionComposer.of(input)
.apply("FilterFn", FilterFn.by(word -> word.length() > 3))
.getResult()
.output();
- Custom non-generic
DoFn
classes are also supported, they must extend theBaseElementFn
(it initializes the tuple tags logic) and the type descriptors will be deduced from non-generic types:
import fr.groupbees.asgarde.Failure;
final PCollection<WordStats> resCustomDoFn=CollectionComposer.of(input)
.apply("PaDo",new WordStatsFn(sideInputs),Collections.singleton(sideInputs))
.getResult()
.output();
// Custom DoFn class.
public class WordStatsFn extends BaseElementFn<String, WordStats> {
private PCollectionView<String> sideInputs;
public WordStatsFn(final PCollectionView<String> sideInputs) {
// Do not forget to call this!
super();
this.sideInputs = sideInputs
}
@ProcessElement
public void processElement(ProcessContext ctx) {
try {
ctx.output(toWordStats(sideInputs, ctx));
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}
}
- It keeps the
apply
methods' fluent style and internally concats all the occurring failures.
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
.getResult();
final PCollection<String> output = resultComposer.output();
final PCollection<Failure> failures = resultComposer.failures();
A coder can be set for the current output PCollection
in the flow, example :
// Generate and instantiate an Avro specific object.
final AvroTest inputAvroObject = AvroTest.newBuilder()
.setId(34444)
.setName("test")
.build();
// Read an Avro file with a type corresponding to the previous specific object.
final PCollection<AvroTest> avroObjectsCollection = pipeline.apply("Reads Avro objects", AvroIO
.read(AvroTest.class)
.from("filePath"));
private GenericRecord avroObjectToGenericRecord(final AvroTest avroTest) {
GenericRecord record = new GenericData.Record(avroTest.getSchema());
record.put("id", avroTest.getId());
record.put("name", avroTest.getName());
return record;
}
// Flow with CollectionComposer.
final Result<PCollection<GenericRecord>, Failure> result2 = CollectionComposer.of(avroObjectsCollection)
.apply("Map", MapElements.into(of(GenericRecord.class)).via(this::avroObjectToGenericRecord))
.setCoder(AvroCoder.of(GenericRecord.class, inputAvroObject.getSchema()))
.getResult();
In this example, an Avro
file is read and mapped to a typed and specific object AvroTest
.
Then we simulate a transformation from this AvroTest
instance to a GenericRecord
object.
The GenericRecord
doesn't contain any information about Serialization
and in this case Beam
can't infer
a default Coder
.
We have to set a Coder
for the output PCollection
of GenericRecord
:
.setCoder(AvroCoder.of(GenericRecord.class, inputAvroObject.getSchema()))
Asgarde
and the CollectionComposer
class work as the usual PCollection
for coders and
propose the same setCoder
method for good outputs.
Apache Beam Java
can be used with Kotlin
, and it's make the experience more enjoyable.
Asgarde
proposes also Kotlin
extensions to use CollectionComposer
class with more expressive/concise code and with
functional programming style.
⚠️ Kotlin Asgarde is proposed from 0.15.0 and Beam 2.36.0 versions⚠️ The current Kotlin version used with Asgarde is : 1.9.22
Let's take a previous example of Asgarde Java
pipeline with error handling :
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word)-> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1/word.length()))
.getResult();
In Asgarde Kotlin
the same pipeline is :
import fr.groupbees.asgarde.*
val result: Result<PCollection<Int>, Failure> = CollectionComposer.of(words)
.map("Map") { word -> word + "Test" }
.flatMap("FlatMap") { Arrays.asList(*Arrays.copyOfRange(it.split(" ").toTypedArray(), 1, 5)) }
.mapFn("ParDo", { word -> 1 / word.length })
.result
The Kotlin code of Asgarde
uses extensions
.
To use these extensions, the following import
must be added :
import fr.groupbees.asgarde.*
This feature is great because we can mix native Asgarde
code with
functions dedicated to Kotlin, example :
val result: Result<PCollection<Int>, Failure> = CollectionComposer.of(words)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via(SerializableFunction { String word -> word + "Test" })
.flatMap("FlatMap") { Arrays.asList(*Arrays.copyOfRange(it.split(" ").toTypedArray(), 1, 5)) }
.mapFn("ParDo", { word -> 1 / word.length })
.result
In this case, the first map
function has been replaced by native apply
function with MapElements
The type of lambda expression
in via
function needs to be specified in Kotlin
, because this can take ProcessFunction
or SerializableFunction
.
This causes an ambiguity (SerializableFunction
is used in this example).
In the following sections, all Asgarde
native components and their equivalents in Kotlin
are proposed.
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", MapElements.into(TypeDescriptor.of(OtherTeam.class)).via(team -> TestSettings.toOtherTeam(team)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElements.into(TypeDescriptor.of(OtherTeam.class)).via(TestSettings::toOtherTeam))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.map("Step name") { team -> TestSettings.toOtherTeam(team) }
.result
With Kotlin
it
CollectionComposer.of(teams)
.map("Step name") { TestSettings.toOtherTeam(it) }
.result
The Kotlin
version takes only a lambda type in the inline function,
Kotlin
recommends writing the lambda in a separated parenthesis.
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements.into(of(Player.class)).via(team -> team.getPlayers()))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements.into(of(Player.class)).via(Team::getPlayers))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.flatMap("Step name") { team -> team.players }
.result
With Kotlin
it
CollectionComposer.of(teams)
.flatMap("Step name") { it.players }
.result
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", MapElements
.into(of(Team.class))
.via(team -> toTeamWithPsgError(team))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElements
.into(of(Team.class))
.via(this::toTeamWithPsgError)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teamCollection)
.mapWithFailure(
"Step name",
{ team -> toTeamWithPsgError(team) },
{ exElt -> Failure.from("Step name", exElt) }
)
With Kotlin
it
CollectionComposer.of(teamCollection)
.mapWithFailure(
"Step name",
{ toTeamWithPsgError(it) },
{ Failure.from("Step name", it) }
)
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements
.into(of(Player.class))
.via(team -> simulateFlatMapErrorPsgTeam(team))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements
.into(of(Player.class))
.via(this::simulateFlatMapErrorPsgTeam)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teamCollection)
.flatMapWithFailure(
"Step name",
{ team -> simulateFlatMapErrorPsgTeam(team) },
{ exElt -> Failure.from("Step name", exElt) }
)
.result
With Kotlin
it
CollectionComposer.of(teamCollection)
.flatMapWithFailure(
"Step name",
{ simulateFlatMapErrorPsgTeam(it) },
{ Failure.from("Step name", it) }
)
.result
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", MapElementFn
.into(of(OtherTeam.class))
.via(team -> TestSettings.toOtherTeam(team))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElementFn
.into(of(OtherTeam.class))
.via(TestSettings::toOtherTeam)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.mapFn("Step name",
{ team -> TestSettings.toOtherTeam(team) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin
it
CollectionComposer.of(teams)
.mapFn("Step name",
{ TestSettings.toOtherTeam(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", FlatMapElementFn
.into(of(Player.class))
.via(team -> team.getPlayers())
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElementFn
.into(of(Player.class))
.via(Team::getPlayers)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.flatMapFn("Step name",
{ team -> team.players },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin
it
CollectionComposer.of(teams)
.flatMapFn("Step name",
{ it.players },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", MapProcessContextFn
.from(Team.class)
.into(of(OtherTeam.class))
.via(ctx -> TestSettings.toOtherTeam(ctx))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapProcessContextFn
.from(Team.class)
.into(of(OtherTeam.class))
.via(TestSettings::toOtherTeam)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.mapFnWithContext("Step name",
{ ctx: DoFn<Team, OtherTeam>.ProcessContext -> TestSettings.toOtherTeam(ctx) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin
it
CollectionComposer.of(teams)
.mapFnWithContext<Team, OtherTeam>("Step name",
{ TestSettings.toOtherTeam(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Asgarde Java
:
CollectionComposer.of(teams)
.apply("Step name", FlatMapProcessContextFn
.from(Team.class)
.into(of(Player.class))
.via(ctx -> TestSettins.toPlayers(ctx))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapProcessContextFn
.from(Team.class)
.into(of(Player.class))
.via(TestSettins::toPlayers)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teams)
.flatMapFnWithContext("Step name",
{ ctx: DoFn<Team, Player>.ProcessContext -> TestSettings.toPlayers(ctx) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin
it
CollectionComposer.of(teams)
.flatMapFnWithContext<Team, Player>("Step name",
{ TestSettings.toPlayers(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Asgarde Java
:
CollectionComposer.of(teamCollection)
.apply("Step name", FilterFn.by(team -> simulateFilterErrorPsgTeam(team)))
.getResult();
With method reference
CollectionComposer.of(teamCollection)
.apply("Step name", FilterFn.by(this::simulateFilterErrorPsgTeam))
.getResult();
Asgarde Kotlin
:
CollectionComposer.of(teamCollection)
.filter("Step name") { team -> simulateFilterErrorPsgTeam(team) }
.result
With Kotlin
it
CollectionComposer.of(teamCollection)
.filter("Step name") { simulateFilterErrorPsgTeam(it) }
.result
- Maybe allow injecting a custom
Failure
object and error handling function to be used in allapply
calls.