- Transparent: Handling different data flows using java queues with runnable transparently.
- Simple: Expose one simple interface easy to implement and thread-safe.
- Batch support: Batching elements with time-based flushing.
- Lightweight: No third-party libraries, pure java8.
https://jitpack.io/#fulmicotone/io.fulmicotone.fqueue
- Make sure that you have only one instance of FQueueRegistry in your project.
- If using Spring please annotate it as @Bean
FQueueRegistry registry = new FQueueRegistry();
- Simple consuming, by default consumes 1 element at time.
registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> System.out.println("CASE 1 - Elements batched are: "+elms.size()));
- Consumes data and aggregate them in chunks of 5 elements.
- If data are less than chunk size it will flush them every 1 second.
registry.buildFQueue(String.class)
.batch()
.withChunkSize(5)
.withFlushTimeout(1)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(() -> (broadcaster, elms) -> System.out.println("CASE 2 - Elements batched are: "+elms.size()));
- Batching consuming with a custom accumulation function.
- It consumes data and aggregate them in chunks of 15 bytes elements (2 "Sample" string will fit in).
- If data are less than chunk size it will flush them every 1 second.
registry.buildFQueue(String.class)
.batch()
.withChunkSize(15)
.withLengthFunction(s -> (long)s.getBytes().length)
.withFlushTimeout(1)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(() -> (broadcaster, elms) -> System.out.println("CASE 3 - Elements batched are: "+elms.size()));
- fanOut(3) creates three nested FQueue, while the first defined acts as round-robin dispatcher.
- Every nested FQueue consume data and aggregate them in chunks of 5 elements.
- If data are less than chunk size every nested FQueue will flush them every 1 second.
registry.buildFQueue(String.class)
.fanOut(3)
.batch()
.withChunkSize(5)
.withFlushTimeout(1)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(() -> (broadcaster, elms) -> {
System.out.println("CASE 4 - currentThread is: "+Thread.currentThread().getName()+ " - Elements batched are: "+elms.size());
});
- Use the registry to push datas into FQueue.
- Registry will send your object only to FQueue's which consume it's class.
registry.sendBroadcast("Sample");
registry.sendBroadcast(2);
registry.sendBroadcast(new AnyObjectYouWant());
- If multiple FQueue consumes the same class, every object will be sent to them.
FQueue<String> one = registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> System.out.println("ONE - Elements received are: " + elms.size()));
FQueue<String> two = registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> System.out.println("TWO - Elements batched are: " + elms.size()));
/** This will received by one and two */
registry.sendBroadcast("Sample");
- In the case you have multiple FQueue that receive the same class, and you want to send an object to a specific FQueue, you need to push into it's queue.
FQueue<String> one = registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> System.out.println("ONE - Elements received are: " + elms.size()));
FQueue<String> two = registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> System.out.println("TWO - Elements batched are: " + elms.size()));
/** This will received by one */
one.getQueue().add("Sample");
- Sometimes you want to pass datas between FQueue
- It's possible by calling the "brodacaster" object injected into the consuming function
FQueue<String> one = registry.buildFQueue(String.class)
.consume(() -> (broadcaster, elms) -> {
// count all characters and send them to "two" FQueue
elms.stream()
.map(String::length)
.forEach(broadcaster::sendBroadcast);
});
FQueue<Integer> two = registry.buildFQueue(Integer.class)
.consume(() -> (broadcaster, elms) -> {
elms.forEach(ch -> System.out.println("Character size is:" + ch));
});
/** This will received by one */
one.getQueue().add("Sample");
Please checkout all these examples under: https://github.com/fulmicotone/io.fulmicotone.fqueue/blob/master/src/test/java/io/fulmicotone/fqueue/examples/GithubExample.java