|
25 | 25 | */
|
26 | 26 | public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierGenerator<Long> {
|
27 | 27 |
|
28 |
| - /** |
29 |
| - * The block size (the number of "lo" values for each "hi" value) |
30 |
| - */ |
31 |
| - protected abstract int getBlockSize(); |
| 28 | + /** |
| 29 | + * The block size (the number of "lo" values for each "hi" value) |
| 30 | + */ |
| 31 | + protected abstract int getBlockSize(); |
32 | 32 |
|
33 |
| - /** |
34 |
| - * Allocate a new block, by obtaining the next "hi" value from the database |
35 |
| - */ |
36 |
| - protected abstract CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session); |
| 33 | + /** |
| 34 | + * Allocate a new block, by obtaining the next "hi" value from the database |
| 35 | + */ |
| 36 | + protected abstract CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session); |
37 | 37 |
|
38 |
| - private int loValue; |
39 |
| - private long hiValue; |
| 38 | + private int loValue; |
| 39 | + private long hiValue; |
40 | 40 |
|
41 |
| - private volatile List<Runnable> queue = null; |
| 41 | + private volatile List<Runnable> queue = null; |
42 | 42 |
|
43 |
| - protected synchronized long next() { |
44 |
| - return loValue>0 && loValue<getBlockSize() |
45 |
| - ? hiValue + loValue++ |
46 |
| - : -1; //flag value indicating that we need to hit db |
47 |
| - } |
| 43 | + protected synchronized long next() { |
| 44 | + return loValue > 0 && loValue < getBlockSize() |
| 45 | + ? hiValue + loValue++ |
| 46 | + : -1; //flag value indicating that we need to hit db |
| 47 | + } |
48 | 48 |
|
49 |
| - protected synchronized long next(long hi) { |
50 |
| - hiValue = hi; |
51 |
| - loValue = 1; |
52 |
| - return hi; |
53 |
| - } |
| 49 | + protected synchronized long next(long hi) { |
| 50 | + hiValue = hi; |
| 51 | + loValue = 1; |
| 52 | + return hi; |
| 53 | + } |
54 | 54 |
|
55 |
| - @Override |
56 |
| - public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) { |
57 |
| - if ( getBlockSize()<=1 ) { |
58 |
| - //special case where we're not using blocking at all |
59 |
| - return nextHiValue(session); |
60 |
| - } |
| 55 | + @Override |
| 56 | + public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) { |
| 57 | + if ( getBlockSize() <= 1 ) { |
| 58 | + //special case where we're not using blocking at all |
| 59 | + return nextHiValue( session ); |
| 60 | + } |
61 | 61 |
|
62 |
| - long local = next(); |
63 |
| - if ( local >= 0 ) { |
64 |
| - // We don't need to update or initialize the hi |
65 |
| - // value in the table, so just increment the lo |
66 |
| - // value and return the next id in the block |
67 |
| - return completedFuture(local); |
68 |
| - } |
69 |
| - else { |
70 |
| - synchronized (this) { |
71 |
| - CompletableFuture<Long> result = new CompletableFuture<>(); |
72 |
| - if (queue == null) { |
73 |
| - // make a queue for any concurrent streams |
74 |
| - queue = new ArrayList<>(); |
75 |
| - // go off and fetch the next hi value from db |
76 |
| - nextHiValue(session).thenAccept( id -> { |
| 62 | + long local = next(); |
| 63 | + if ( local >= 0 ) { |
| 64 | + // We don't need to update or initialize the hi |
| 65 | + // value in the table, so just increment the lo |
| 66 | + // value and return the next id in the block |
| 67 | + return completedFuture( local ); |
| 68 | + } |
| 69 | + else { |
| 70 | + synchronized (this) { |
| 71 | + CompletableFuture<Long> result = new CompletableFuture<>(); |
| 72 | + if ( queue == null ) { |
| 73 | + // make a queue for any concurrent streams |
| 74 | + queue = new ArrayList<>(); |
| 75 | + // go off and fetch the next hi value from db |
| 76 | + nextHiValue( session ).thenAccept( id -> { |
77 | 77 | // Vertx.currentContext().runOnContext(v -> {
|
78 |
| - List<Runnable> list; |
79 |
| - synchronized (this) { |
80 |
| - // clone ref to the queue |
81 |
| - list = queue; |
82 |
| - queue = null; |
83 |
| - // use the fetched hi value in this stream |
84 |
| - result.complete( next(id) ); |
85 |
| - } |
86 |
| - // send waiting streams back to try again |
87 |
| - list.forEach(Runnable::run); |
| 78 | + List<Runnable> list; |
| 79 | + synchronized (this) { |
| 80 | + // clone ref to the queue |
| 81 | + list = queue; |
| 82 | + queue = null; |
| 83 | + // use the fetched hi value in this stream |
| 84 | + result.complete( next( id ) ); |
| 85 | + } |
| 86 | + // send waiting streams back to try again |
| 87 | + list.forEach( Runnable::run ); |
88 | 88 | // } );
|
89 |
| - } ); |
90 |
| - } |
91 |
| - else { |
92 |
| - // wait for the concurrent fetch to complete |
93 |
| - // note that we carefully capture the right session,entity here! |
94 |
| - queue.add( () -> generate(session, entity).thenAccept(result::complete) ); |
95 |
| - } |
96 |
| - return result; |
97 |
| - } |
98 |
| - } |
99 |
| - } |
| 89 | + } ); |
| 90 | + } |
| 91 | + else { |
| 92 | + // wait for the concurrent fetch to complete |
| 93 | + // note that we carefully capture the right session,entity here! |
| 94 | + queue.add( () -> generate( session, entity ).thenAccept( result::complete ) ); |
| 95 | + } |
| 96 | + return result; |
| 97 | + } |
| 98 | + } |
| 99 | + } |
100 | 100 | }
|
0 commit comments