From 12c44479abcd54a2c28dbc6cab2ca593700a0855 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 13 Nov 2022 00:59:19 +0300 Subject: [PATCH] review upd --- .../yegor256/tojos/MnSynchronizedTest.java | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/src/test/java/com/yegor256/tojos/MnSynchronizedTest.java b/src/test/java/com/yegor256/tojos/MnSynchronizedTest.java index e89eb8f..34a6365 100644 --- a/src/test/java/com/yegor256/tojos/MnSynchronizedTest.java +++ b/src/test/java/com/yegor256/tojos/MnSynchronizedTest.java @@ -23,12 +23,12 @@ */ package com.yegor256.tojos; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -53,35 +53,22 @@ class MnSynchronizedTest { /** * The number of changes in under-test mono. */ - private final AtomicInteger actual = new AtomicInteger(0); + private final AtomicInteger counter = new AtomicInteger(0); /** * The mono under test. */ - private Mono mono; + private Mono shared; /** * The executor. */ - private ThreadPoolExecutor executor; - - /** - * The blocking queue. - */ - private BlockingQueue queue; + private ExecutorService executor; @BeforeEach final void setUp() { - this.mono = new MnSynchronized(new MnMemory()); - this.queue = new LinkedBlockingQueue<>(MnSynchronizedTest.THREADS); - this.executor = new ThreadPoolExecutor( - MnSynchronizedTest.THREADS, - MnSynchronizedTest.THREADS, - 5L, - TimeUnit.SECONDS, - this.queue, - new ThreadPoolExecutor.CallerRunsPolicy() - ); + this.shared = new MnSynchronized(new MnMemory()); + this.executor = Executors.newFixedThreadPool(MnSynchronizedTest.THREADS); } /** @@ -93,20 +80,19 @@ final void setUp() { */ @Test final void concurrentScenario() throws InterruptedException { - this.executor.prestartAllCoreThreads(); - for (int trds = 0; trds < MnSynchronizedTest.THREADS; ++trds) { - this.queue.offer( + for (int trds = 1; trds <= MnSynchronizedTest.THREADS; ++trds) { + this.executor.submit( new MnSynchronizedTest.TestTask( - trds, - this.mono, - this.actual + MnSynchronizedTest.rowsBySize(trds), + this.shared, + this.counter ) ); } this.executor.shutdown(); assert this.executor.awaitTermination(5L, TimeUnit.SECONDS); MatcherAssert.assertThat( - this.actual.get(), + this.counter.get(), Matchers.equalTo(MnSynchronizedTest.expectedSize()) ); } @@ -121,6 +107,22 @@ private static Integer expectedSize() { return len + (len - 1) * len / 2; } + /** + * The rows to write. + * + * @param size The size of rows + * @return Collection of rows + */ + private static Collection> rowsBySize(final int size) { + final Map row = new HashMap<>(0); + row.put(Tojos.KEY, String.valueOf(size)); + final Collection> res = new ArrayList<>(size); + for (int idx = 0; idx < size; ++idx) { + res.add(row); + } + return res; + } + /** * The test task to concurrent read and write operations. * @@ -134,20 +136,15 @@ private static final class TestTask implements Runnable { private static final Logger LOGGER = Logger.getLogger(MnSynchronizedTest.class.getName()); - /** - * The id of thread. - */ - private final int idx; - /** * Local mono. */ private final Mono mono; /** - * Row to write into mono. + * Rows to write. */ - private final Map row; + private final Collection> rows; /** * The counter. @@ -157,34 +154,31 @@ private static final class TestTask implements Runnable { /** * Ctor. * - * @param idx The id + * @param rws The rows to write * @param mno The mono * @param cntr The counter */ TestTask( - final int idx, + final Collection> rws, final Mono mno, final AtomicInteger cntr ) { - this.idx = idx; + this.rows = rws; this.mono = mno; - this.row = new HashMap<>(0); this.counter = cntr; } @Override public void run() { - this.row.put(Tojos.KEY, String.valueOf(this.idx)); - final Collection> rows = this.mono.read(); - rows.add(this.row); - this.mono.write(rows); - this.counter.addAndGet(rows.size()); + this.mono.write(this.rows); + this.counter.addAndGet(this.rows.size()); MnSynchronizedTest.TestTask.LOGGER.log( Level.INFO, String.format( - "Thread %d, written %d rows", - this.idx, - rows.size() + "Thread %s, written %d rows.\nReading:\n%s", + this, + this.rows.size(), + this.mono.read() ) ); }