-
-
Notifications
You must be signed in to change notification settings - Fork 27.3k
Added FanOut/FanIn Pattern #1800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| --- | ||
| layout: pattern | ||
| title: Fan-Out/Fan-In | ||
| folder: fanout-fanin | ||
| permalink: /patterns/fanout-fanin/ | ||
| categories: Integration | ||
| language: en | ||
| tags: | ||
| - Microservices | ||
| --- | ||
|
|
||
| ## Intent | ||
| The pattern is used when a source system needs to run one or more long-running processes that will fetch some data. | ||
| The source will not block itself waiting for the reply. <br> The pattern will run the same function in multiple | ||
| services or machines to fetch the data. This is equivalent to invoking the function multiple times on different chunks of data. | ||
|
|
||
| ## Explanation | ||
| The FanOut/FanIn service will take in a list of requests and a consumer. Each request might complete at a different time. | ||
| FanOut/FanIn service will accept the input params and returns the initial system an ID to acknowledge that the pattern | ||
| service has received the requests. Now the caller will not wait or expect the result in the same connection. | ||
|
|
||
| Meanwhile, the pattern service will invoke the requests that have come. The requests might complete at different time. | ||
| These requests will be processed in different instances of the same function in different machines or services. As the | ||
| requests get completed, a callback service everytime is called that transforms the result into a common single object format | ||
| that gets pushed to a consumer. The caller will be at the other end of the consumer receiving the result. | ||
|
|
||
| **Programmatic Example** | ||
|
|
||
| The implementation provided has a list of numbers and end goal is to square the numbers and add them to a single result. | ||
| `FanOutFanIn` class receives the list of numbers in the form of list of `SquareNumberRequest` and a `Consumer` instance | ||
| that collects the results as the requests get over. `SquareNumberRequest` will square the number with a random delay | ||
| to give the impression of a long-running process that can complete at any time. `Consumer` instance will add the results from | ||
| different `SquareNumberRequest` that will come random time instances. | ||
|
|
||
| Let's look at `FanOutFanIn` class that fans out the requests in async processes. | ||
|
|
||
| ```java | ||
| public class FanOutFanIn { | ||
| public static Long fanOutFanIn( | ||
| final List<SquareNumberRequest> requests, final Consumer consumer) { | ||
|
|
||
| ExecutorService service = Executors.newFixedThreadPool(requests.size()); | ||
|
|
||
| // fanning out | ||
| List<CompletableFuture<Void>> futures = | ||
| requests.stream() | ||
| .map( | ||
| request -> | ||
| CompletableFuture.runAsync(() -> request.delayedSquaring(consumer), service)) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); | ||
|
|
||
| return consumer.getSumOfSquaredNumbers().get(); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| `Consumer` is used a callback class that will be called when a request is completed. This will aggregate | ||
| the result from all requests. | ||
|
|
||
| ```java | ||
| public class Consumer { | ||
|
|
||
| private final AtomicLong sumOfSquaredNumbers; | ||
|
|
||
| Consumer(Long init) { | ||
| sumOfSquaredNumbers = new AtomicLong(init); | ||
| } | ||
|
|
||
| public Long add(final Long num) { | ||
| return sumOfSquaredNumbers.addAndGet(num); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| Request is represented as a `SquareNumberRequest` that squares the number with random delay and calls the | ||
| `Consumer` once it is squared. | ||
|
|
||
| ```java | ||
| public class SquareNumberRequest { | ||
|
|
||
| private final Long number; | ||
| public void delayedSquaring(final Consumer consumer) { | ||
|
|
||
| var minTimeOut = 5000L; | ||
|
|
||
| SecureRandom secureRandom = new SecureRandom(); | ||
| var randomTimeOut = secureRandom.nextInt(2000); | ||
|
|
||
| try { | ||
| // this will make the thread sleep from 5-7s. | ||
| Thread.sleep(minTimeOut + randomTimeOut); | ||
| } catch (InterruptedException e) { | ||
| LOGGER.error("Exception while sleep ", e); | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| consumer.add(number * number); | ||
| } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Class diagram | ||
|  | ||
|
|
||
| ## Applicability | ||
|
|
||
| Use this pattern when you can divide the workload into multiple chunks that can be dealt with separately. | ||
|
|
||
| ## Related patterns | ||
|
|
||
| * [Aggregator Microservices](https://java-design-patterns.com/patterns/aggregator-microservices/) | ||
| * [API Gateway](https://java-design-patterns.com/patterns/api-gateway/) | ||
|
|
||
| ## Credits | ||
|
|
||
| * [Understanding Azure Durable Functions - Part 8: The Fan Out/Fan In Pattern](http://dontcodetired.com/blog/post/Understanding-Azure-Durable-Functions-Part-8-The-Fan-OutFan-In-Pattern) | ||
| * [Fan-out/fan-in scenario in Durable Functions - Cloud backup example](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup) | ||
| * [Understanding the Fan-Out/Fan-In API Integration Pattern](https://dzone.com/articles/understanding-the-fan-out-fan-in-api-integration-p) | ||
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| @startuml | ||
| package com.iluwatar.fanout.fanin { | ||
| class App { | ||
| - LOGGER : Logger {static} | ||
| + App() | ||
| + main(args : String[]) {static} | ||
| } | ||
| class Consumer { | ||
| - sumOfSquaredNumbers : AtomicLong | ||
| ~ Consumer(init : Long) | ||
| + add(num : Long) : Long | ||
| + getSumOfSquaredNumbers() : AtomicLong | ||
| } | ||
| class FanOutFanIn { | ||
| + FanOutFanIn() | ||
| + fanOutFanIn(requests : List<SquareNumberRequest>, consumer : Consumer) : Long {static} | ||
| } | ||
| class SquareNumberRequest { | ||
| - LOGGER : Logger {static} | ||
| - number : Long | ||
| + SquareNumberRequest(number : Long) | ||
| + delayedSquaring(consumer : Consumer) | ||
| } | ||
|
|
||
| object SquareNumberRequest1 | ||
| object SquareNumberRequest2 | ||
| object SquareNumberRequest3 | ||
| diamond dia | ||
| } | ||
|
|
||
| App --> FanOutFanIn | ||
| FanOutFanIn --> "fan out - running in parallel" SquareNumberRequest1 | ||
| FanOutFanIn --> "fan out" SquareNumberRequest2 | ||
| FanOutFanIn --> "fan out" SquareNumberRequest3 | ||
| SquareNumberRequest1 --> "fan in - aggregate using callback" dia | ||
| SquareNumberRequest2 --> "fan in" dia | ||
| SquareNumberRequest3 --> "fan in" dia | ||
| dia --> Consumer | ||
| @enduml |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <!-- | ||
|
|
||
| The MIT License (MIT) | ||
|
|
||
| Copyright © 2014-2021 Ilkka Seppälä | ||
|
|
||
| Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| of this software and associated documentation files (the "Software"), to deal | ||
| in the Software without restriction, including without limitation the rights | ||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| copies of the Software, and to permit persons to whom the Software is | ||
| furnished to do so, subject to the following conditions: | ||
|
|
||
| The above copyright notice and this permission notice shall be included in all | ||
| copies or substantial portions of the Software. | ||
|
|
||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| SOFTWARE. | ||
|
|
||
| Module Model-view-viewmodel is using ZK framework | ||
| ZK framework is licensed under LGPL and the license can be found at lgpl-3.0.txt | ||
|
|
||
| --> | ||
| <project xmlns="http://maven.apache.org/POM/4.0.0" | ||
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
| <parent> | ||
| <artifactId>java-design-patterns</artifactId> | ||
| <groupId>com.iluwatar</groupId> | ||
| <version>1.25.0-SNAPSHOT</version> | ||
| </parent> | ||
| <modelVersion>4.0.0</modelVersion> | ||
|
|
||
| <artifactId>fanout-fanin</artifactId> | ||
|
|
||
| <dependencies> | ||
| <dependency> | ||
| <groupId>org.junit.jupiter</groupId> | ||
| <artifactId>junit-jupiter-engine</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| </dependencies> | ||
| <build> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-assembly-plugin</artifactId> | ||
| <executions> | ||
| <execution> | ||
| <configuration> | ||
| <archive> | ||
| <manifest> | ||
| <mainClass>com.iluwatar.fanout.fanin.App</mainClass> | ||
| </manifest> | ||
| </archive> | ||
| </configuration> | ||
| </execution> | ||
| </executions> | ||
| </plugin> | ||
| </plugins> | ||
| </build> | ||
|
|
||
| </project> |
74 changes: 74 additions & 0 deletions
74
fanout-fanin/src/main/java/com/iluwatar/fanout/fanin/App.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * The MIT License | ||
| * Copyright © 2014-2021 Ilkka Seppälä | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in | ||
| * all copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
| * THE SOFTWARE. | ||
| */ | ||
|
|
||
| package com.iluwatar.fanout.fanin; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
|
|
||
|
|
||
| /** | ||
| * FanOut/FanIn pattern is a concurrency pattern that refers to executing multiple instances of the | ||
| * activity function concurrently. The "fan out" part is essentially splitting the data into | ||
| * multiple chunks and then calling the activity function multiple times, passing the chunks. | ||
| * | ||
| * <p>When each chunk has been processed, the "fan in" takes place that aggregates results from each | ||
| * instance of function and forms a single final result. | ||
| * | ||
| * <p>This pattern is only really useful if you can “chunk” the workload in a meaningful way for | ||
| * splitting up to be processed in parallel. | ||
| */ | ||
| @Slf4j | ||
| public class App { | ||
|
|
||
| /** | ||
| * Entry point. | ||
| * | ||
| * <p>Implementation provided has a list of numbers that has to be squared and added. The list can | ||
| * be chunked in any way and the "activity function" {@link | ||
| * SquareNumberRequest#delayedSquaring(Consumer)} i.e. squaring the number ca be done | ||
| * concurrently. The "fan in" part is handled by the {@link Consumer} that takes in the result | ||
| * from each instance of activity and aggregates it whenever that particular activity function | ||
| * gets over. | ||
| */ | ||
| public static void main(String[] args) { | ||
| final List<Long> numbers = Arrays.asList(1L, 3L, 4L, 7L, 8L); | ||
|
|
||
| LOGGER.info("Numbers to be squared and get sum --> {}", numbers); | ||
|
|
||
| final List<SquareNumberRequest> requests = | ||
| numbers.stream().map(SquareNumberRequest::new).collect(Collectors.toList()); | ||
|
|
||
| var consumer = new Consumer(0L); | ||
|
|
||
| // Pass the request and the consumer to fanOutFanIn or sometimes referred as Orchestrator | ||
| // function | ||
| final Long sumOfSquaredNumbers = FanOutFanIn.fanOutFanIn(requests, consumer); | ||
|
|
||
| LOGGER.info("Sum of all squared numbers --> {}", sumOfSquaredNumbers); | ||
| } | ||
| } |
48 changes: 48 additions & 0 deletions
48
fanout-fanin/src/main/java/com/iluwatar/fanout/fanin/Consumer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * The MIT License | ||
| * Copyright © 2014-2021 Ilkka Seppälä | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in | ||
| * all copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
| * THE SOFTWARE. | ||
| */ | ||
|
|
||
| package com.iluwatar.fanout.fanin; | ||
|
|
||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import lombok.Getter; | ||
|
|
||
|
|
||
|
|
||
| /** | ||
| * Consumer or callback class that will be called everytime a request is complete This will | ||
| * aggregate individual result to form a final result. | ||
| */ | ||
| @Getter | ||
| public class Consumer { | ||
|
|
||
| private final AtomicLong sumOfSquaredNumbers; | ||
|
|
||
| Consumer(Long init) { | ||
| sumOfSquaredNumbers = new AtomicLong(init); | ||
| } | ||
|
|
||
| public Long add(final Long num) { | ||
| return sumOfSquaredNumbers.addAndGet(num); | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.