|  | 
|  | 1 | +package io.mincong.concurrency.completablefuture; | 
|  | 2 | + | 
|  | 3 | +import java.time.LocalTime; | 
|  | 4 | +import java.util.concurrent.CompletableFuture; | 
|  | 5 | +import java.util.concurrent.Executors; | 
|  | 6 | +import java.util.stream.IntStream; | 
|  | 7 | + | 
|  | 8 | +public class MultipleStageDemo { | 
|  | 9 | + | 
|  | 10 | +  public static void main(String[] args) { | 
|  | 11 | +    var demo = new MultipleStageDemo(); | 
|  | 12 | +    demo.scenario1(); | 
|  | 13 | +    demo.scenario2(); | 
|  | 14 | +  } | 
|  | 15 | + | 
|  | 16 | +  /** | 
|  | 17 | +   * In scenario 1, resource creation and deletion are done in the same completion stage. Given a | 
|  | 18 | +   * fixed thread pool executor with 2 threads, the executor will complete the tasks of two | 
|  | 19 | +   * resources (e.g. 0 and 1) before starting the next stages. Its behavior can be illustrated as | 
|  | 20 | +   * follows: | 
|  | 21 | +   * | 
|  | 22 | +   * <pre> | 
|  | 23 | +   *          +-----------------------------------------------------------------> Time | 
|  | 24 | +   *          | +------------------------+ +------------------------+ | 
|  | 25 | +   * Thread 1 | | Creation 0, Deletion 0 | | Creation 2, Deletion 2 | | 
|  | 26 | +   *          | +------------------------+ +------------------------+ | 
|  | 27 | +   * Thread 2 | | Creation 1, Deletion 1 | | Creation 3, Deletion 3 | | 
|  | 28 | +   *          | +------------------------+ +------------------------+ | 
|  | 29 | +   *          | | 
|  | 30 | +   * </pre> | 
|  | 31 | +   * | 
|  | 32 | +   * This is because these stages are queued in the following way: | 
|  | 33 | +   * | 
|  | 34 | +   * <pre> | 
|  | 35 | +   * 0. +------------------------+ | 
|  | 36 | +   *    | Creation 0, Deletion 0 | | 
|  | 37 | +   *    +------------------------+ | 
|  | 38 | +   * 1. +------------------------+ | 
|  | 39 | +   *    | Creation 1, Deletion 1 | | 
|  | 40 | +   *    +------------------------+ | 
|  | 41 | +   * 2. +------------------------+ | 
|  | 42 | +   *    | Creation 2, Deletion 2 | | 
|  | 43 | +   *    +------------------------+ | 
|  | 44 | +   * 3. +------------------------+ | 
|  | 45 | +   *    | Creation 3, Deletion 3 | | 
|  | 46 | +   *    +------------------------+ | 
|  | 47 | +   * </pre> | 
|  | 48 | +   * | 
|  | 49 | +   * Console output: | 
|  | 50 | +   * | 
|  | 51 | +   * <pre> | 
|  | 52 | +   * [15:41:18.010681][main] - Scenario 1 started | 
|  | 53 | +   * [15:41:18.103478][pool-1-thread-1] - [id=0] New resource: creating | 
|  | 54 | +   * [15:41:18.103482][pool-1-thread-2] - [id=1] New resource: creating | 
|  | 55 | +   * [15:41:19.107141][pool-1-thread-1] - [id=0] New resource: created | 
|  | 56 | +   * [15:41:19.107472][pool-1-thread-1] - [id=0] Old resource: creating | 
|  | 57 | +   * [15:41:19.108780][pool-1-thread-2] - [id=1] New resource: created | 
|  | 58 | +   * [15:41:19.108929][pool-1-thread-2] - [id=1] Old resource: creating | 
|  | 59 | +   * [15:41:20.112909][pool-1-thread-2] - [id=1] Old resource: created | 
|  | 60 | +   * [15:41:20.112909][pool-1-thread-1] - [id=0] Old resource: created | 
|  | 61 | +   * [15:41:20.113265][pool-1-thread-1] - [id=2] New resource: creating | 
|  | 62 | +   * [15:41:20.113452][pool-1-thread-2] - [id=3] New resource: creating | 
|  | 63 | +   * [15:41:21.114555][pool-1-thread-1] - [id=2] New resource: created | 
|  | 64 | +   * [15:41:21.114555][pool-1-thread-2] - [id=3] New resource: created | 
|  | 65 | +   * [15:41:21.114914][pool-1-thread-1] - [id=2] Old resource: creating | 
|  | 66 | +   * [15:41:21.115019][pool-1-thread-2] - [id=3] Old resource: creating | 
|  | 67 | +   * [15:41:22.119584][pool-1-thread-2] - [id=3] Old resource: created | 
|  | 68 | +   * [15:41:22.119584][pool-1-thread-1] - [id=2] Old resource: created | 
|  | 69 | +   * [15:41:22.120209][main] - Scenario 1 finished | 
|  | 70 | +   * </pre> | 
|  | 71 | +   */ | 
|  | 72 | +  private void scenario1() { | 
|  | 73 | +    print("Scenario 1 started"); | 
|  | 74 | +    var executor = Executors.newFixedThreadPool(2); | 
|  | 75 | +    var futures = | 
|  | 76 | +        IntStream.range(0, 4) | 
|  | 77 | +            .mapToObj( | 
|  | 78 | +                i -> | 
|  | 79 | +                    CompletableFuture.runAsync( | 
|  | 80 | +                        () -> { | 
|  | 81 | +                          createNewResource(i); | 
|  | 82 | +                          deleteOldResource(i); | 
|  | 83 | +                        }, | 
|  | 84 | +                        executor)) | 
|  | 85 | +            .toArray(CompletableFuture[]::new); | 
|  | 86 | +    try { | 
|  | 87 | +      CompletableFuture.allOf(futures).join(); | 
|  | 88 | +    } finally { | 
|  | 89 | +      executor.shutdownNow(); | 
|  | 90 | +    } | 
|  | 91 | +    print("Scenario 1 finished"); | 
|  | 92 | +  } | 
|  | 93 | + | 
|  | 94 | +  /** | 
|  | 95 | +   * In scenario 2, resource creation and deletion are done separately in two completion stages. | 
|  | 96 | +   * Given a fixed thread pool executor with 2 threads, the executor will complete the creation of | 
|  | 97 | +   * two resources (e.g. 0 and 1) then continue the creation of two other resources (e.g. 2 and 3). | 
|  | 98 | +   * Once done, it will start the deletions in the same way. | 
|  | 99 | +   * | 
|  | 100 | +   * <pre> | 
|  | 101 | +   *          +-----------------------------------------------------------------> Time | 
|  | 102 | +   *          | +------------+ +------------+ +------------+ +------------+ | 
|  | 103 | +   * Thread 1 | | Creation 0 | | Creation 2 | | Deletion 0 | | Deletion 2 | | 
|  | 104 | +   *          | +------------+ +------------+ +------------+ +------------+ | 
|  | 105 | +   * Thread 2 | | Creation 1 | | Creation 3 | | Deletion 1 | | Deletion 3 | | 
|  | 106 | +   *          | +------------+ +------------+ +------------+ +------------+ | 
|  | 107 | +   *          | | 
|  | 108 | +   * </pre> | 
|  | 109 | +   * | 
|  | 110 | +   * This is because these stages are queued in the following way: | 
|  | 111 | +   * | 
|  | 112 | +   * <pre> | 
|  | 113 | +   * 0. +------------+ | 
|  | 114 | +   *    | Creation 0 | | 
|  | 115 | +   *    +------------+ | 
|  | 116 | +   * 1. +------------+ | 
|  | 117 | +   *    | Creation 1 | | 
|  | 118 | +   *    +------------+ | 
|  | 119 | +   * 2. +------------+ | 
|  | 120 | +   *    | Creation 2 | | 
|  | 121 | +   *    +------------+ | 
|  | 122 | +   * 3. +------------+ | 
|  | 123 | +   *    | Creation 3 | | 
|  | 124 | +   *    +------------+ | 
|  | 125 | +   * 4. +------------+ | 
|  | 126 | +   *    | Deletion 0 | | 
|  | 127 | +   *    +------------+ | 
|  | 128 | +   * 5. +------------+ | 
|  | 129 | +   *    | Deletion 1 | | 
|  | 130 | +   *    +------------+ | 
|  | 131 | +   * 6. +------------+ | 
|  | 132 | +   *    | Deletion 2 | | 
|  | 133 | +   *    +------------+ | 
|  | 134 | +   * 7. +------------+ | 
|  | 135 | +   *    | Deletion 3 | | 
|  | 136 | +   *    +-------------+ | 
|  | 137 | +   * </pre> | 
|  | 138 | +   * | 
|  | 139 | +   * Console output: | 
|  | 140 | +   * | 
|  | 141 | +   * <pre> | 
|  | 142 | +   * [15:41:22.120374][main] - Scenario 2 started | 
|  | 143 | +   * [15:41:22.123783][pool-2-thread-1] - [id=0] New resource: creating | 
|  | 144 | +   * [15:41:22.124870][pool-2-thread-2] - [id=1] New resource: creating | 
|  | 145 | +   * [15:41:23.126835][pool-2-thread-1] - [id=0] New resource: created | 
|  | 146 | +   * [15:41:23.126927][pool-2-thread-2] - [id=1] New resource: created | 
|  | 147 | +   * [15:41:23.127811][pool-2-thread-1] - [id=2] New resource: creating | 
|  | 148 | +   * [15:41:23.127868][pool-2-thread-2] - [id=3] New resource: creating | 
|  | 149 | +   * [15:41:24.132029][pool-2-thread-2] - [id=3] New resource: created | 
|  | 150 | +   * [15:41:24.132044][pool-2-thread-1] - [id=2] New resource: created | 
|  | 151 | +   * [15:41:24.132439][pool-2-thread-2] - [id=0] Old resource: creating | 
|  | 152 | +   * [15:41:24.132452][pool-2-thread-1] - [id=1] Old resource: creating | 
|  | 153 | +   * [15:41:25.137506][pool-2-thread-2] - [id=0] Old resource: created | 
|  | 154 | +   * [15:41:25.137506][pool-2-thread-1] - [id=1] Old resource: created | 
|  | 155 | +   * [15:41:25.137804][pool-2-thread-2] - [id=3] Old resource: creating | 
|  | 156 | +   * [15:41:25.138011][pool-2-thread-1] - [id=2] Old resource: creating | 
|  | 157 | +   * [15:41:26.138201][pool-2-thread-2] - [id=3] Old resource: created | 
|  | 158 | +   * [15:41:26.138340][pool-2-thread-1] - [id=2] Old resource: created | 
|  | 159 | +   * [15:41:26.139091][main] - Scenario 2 finished | 
|  | 160 | +   * </pre> | 
|  | 161 | +   */ | 
|  | 162 | +  private void scenario2() { | 
|  | 163 | +    print("Scenario 2 started"); | 
|  | 164 | +    var executor = Executors.newFixedThreadPool(2); | 
|  | 165 | +    var futures = | 
|  | 166 | +        IntStream.range(0, 4) | 
|  | 167 | +            .mapToObj( | 
|  | 168 | +                i -> | 
|  | 169 | +                    CompletableFuture.runAsync(() -> createNewResource(i), executor) | 
|  | 170 | +                        .thenRunAsync(() -> deleteOldResource(i), executor)) | 
|  | 171 | +            .toArray(CompletableFuture[]::new); | 
|  | 172 | +    try { | 
|  | 173 | +      CompletableFuture.allOf(futures).join(); | 
|  | 174 | +    } finally { | 
|  | 175 | +      executor.shutdownNow(); | 
|  | 176 | +    } | 
|  | 177 | +    print("Scenario 2 finished"); | 
|  | 178 | +  } | 
|  | 179 | + | 
|  | 180 | +  private void createNewResource(int i) { | 
|  | 181 | +    print("[id=" + i + "] New resource: creating"); | 
|  | 182 | +    try { | 
|  | 183 | +      Thread.sleep(1000); | 
|  | 184 | +    } catch (InterruptedException e) { | 
|  | 185 | +      // ignore | 
|  | 186 | +    } | 
|  | 187 | +    print("[id=" + i + "] New resource: created"); | 
|  | 188 | +  } | 
|  | 189 | + | 
|  | 190 | +  private void deleteOldResource(int i) { | 
|  | 191 | +    print("[id=" + i + "] Old resource: creating"); | 
|  | 192 | +    try { | 
|  | 193 | +      Thread.sleep(1000); | 
|  | 194 | +    } catch (InterruptedException e) { | 
|  | 195 | +      // ignore | 
|  | 196 | +    } | 
|  | 197 | +    print("[id=" + i + "] Old resource: created"); | 
|  | 198 | +  } | 
|  | 199 | + | 
|  | 200 | +  private static void print(String message) { | 
|  | 201 | +    var thread = Thread.currentThread().getName(); | 
|  | 202 | +    var time = LocalTime.now(); | 
|  | 203 | +    System.out.println("[" + time + "][" + thread + "] - " + message); | 
|  | 204 | +  } | 
|  | 205 | +} | 
0 commit comments