Skip to content

Commit

Permalink
review upd
Browse files Browse the repository at this point in the history
  • Loading branch information
l3r8yJ committed Nov 12, 2022
1 parent 38c5299 commit 12c4447
Showing 1 changed file with 41 additions and 47 deletions.
88 changes: 41 additions & 47 deletions src/test/java/com/yegor256/tojos/MnSynchronizedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
*/
package com.yegor256.tojos;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand All @@ -53,35 +53,22 @@ class MnSynchronizedTest {
/**
* The number of changes in under-test mono.
*/
private final AtomicInteger actual = new AtomicInteger(0);
private final AtomicInteger counter = new AtomicInteger(0);

/**
* The mono under test.
*/
private Mono mono;
private Mono shared;

/**
* The executor.
*/
private ThreadPoolExecutor executor;

/**
* The blocking queue.
*/
private BlockingQueue<Runnable> queue;
private ExecutorService executor;

@BeforeEach
final void setUp() {
this.mono = new MnSynchronized(new MnMemory());
this.queue = new LinkedBlockingQueue<>(MnSynchronizedTest.THREADS);
this.executor = new ThreadPoolExecutor(
MnSynchronizedTest.THREADS,
MnSynchronizedTest.THREADS,
5L,
TimeUnit.SECONDS,
this.queue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.shared = new MnSynchronized(new MnMemory());
this.executor = Executors.newFixedThreadPool(MnSynchronizedTest.THREADS);
}

/**
Expand All @@ -93,20 +80,19 @@ final void setUp() {
*/
@Test
final void concurrentScenario() throws InterruptedException {
this.executor.prestartAllCoreThreads();
for (int trds = 0; trds < MnSynchronizedTest.THREADS; ++trds) {
this.queue.offer(
for (int trds = 1; trds <= MnSynchronizedTest.THREADS; ++trds) {
this.executor.submit(
new MnSynchronizedTest.TestTask(
trds,
this.mono,
this.actual
MnSynchronizedTest.rowsBySize(trds),
this.shared,
this.counter
)
);
}
this.executor.shutdown();
assert this.executor.awaitTermination(5L, TimeUnit.SECONDS);
MatcherAssert.assertThat(
this.actual.get(),
this.counter.get(),
Matchers.equalTo(MnSynchronizedTest.expectedSize())
);
}
Expand All @@ -121,6 +107,22 @@ private static Integer expectedSize() {
return len + (len - 1) * len / 2;
}

/**
* The rows to write.
*
* @param size The size of rows
* @return Collection of rows
*/
private static Collection<Map<String, String>> rowsBySize(final int size) {
final Map<String, String> row = new HashMap<>(0);
row.put(Tojos.KEY, String.valueOf(size));
final Collection<Map<String, String>> res = new ArrayList<>(size);
for (int idx = 0; idx < size; ++idx) {
res.add(row);
}
return res;
}

/**
* The test task to concurrent read and write operations.
*
Expand All @@ -134,20 +136,15 @@ private static final class TestTask implements Runnable {
private static final Logger LOGGER =
Logger.getLogger(MnSynchronizedTest.class.getName());

/**
* The id of thread.
*/
private final int idx;

/**
* Local mono.
*/
private final Mono mono;

/**
* Row to write into mono.
* Rows to write.
*/
private final Map<String, String> row;
private final Collection<Map<String, String>> rows;

/**
* The counter.
Expand All @@ -157,34 +154,31 @@ private static final class TestTask implements Runnable {
/**
* Ctor.
*
* @param idx The id
* @param rws The rows to write
* @param mno The mono
* @param cntr The counter
*/
TestTask(
final int idx,
final Collection<Map<String, String>> rws,
final Mono mno,
final AtomicInteger cntr
) {
this.idx = idx;
this.rows = rws;
this.mono = mno;
this.row = new HashMap<>(0);
this.counter = cntr;
}

@Override
public void run() {
this.row.put(Tojos.KEY, String.valueOf(this.idx));
final Collection<Map<String, String>> rows = this.mono.read();
rows.add(this.row);
this.mono.write(rows);
this.counter.addAndGet(rows.size());
this.mono.write(this.rows);
this.counter.addAndGet(this.rows.size());
MnSynchronizedTest.TestTask.LOGGER.log(
Level.INFO,
String.format(
"Thread %d, written %d rows",
this.idx,
rows.size()
"Thread %s, written %d rows.\nReading:\n%s",
this,
this.rows.size(),
this.mono.read()
)
);
}
Expand Down

0 comments on commit 12c4447

Please sign in to comment.