Skip to content

Commit

Permalink
Work to reduce test time by reducing some repeated tests, using Await…
Browse files Browse the repository at this point in the history
…ility, and reducing delays (opensearch-project#3019)

Work to reduce test time by reducing some repeated tests, using Awaitility, and reducing some wait times.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Jul 25, 2023
1 parent 7af9055 commit 68b420c
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

import static org.awaitility.Awaitility.await;

public class AbstractSinkTest {
@Test
public void testMetrics() {
Expand Down Expand Up @@ -51,8 +54,8 @@ public void testMetrics() {
Assert.assertEquals(1.0, MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.COUNT).getValue(), 0);
Assert.assertTrue(MetricsTestUtil.isBetween(
MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.TOTAL_TIME).getValue(),
0.5,
0.6));
0.2,
0.3));
Assert.assertEquals(abstractSink.getRetryThreadState(), null);
abstractSink.shutdown();
}
Expand All @@ -71,14 +74,8 @@ public void testSinkNotReady() {
// Do another intialize to make sure the sink is still not ready
abstractSink.initialize();
Assert.assertEquals(abstractSink.isReady(), false);
while (!abstractSink.isReady()) {
try {
Thread.sleep(1000);
} catch (Exception e) {}
}
try {
Thread.sleep(2000);
} catch (Exception e) {}
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
Assert.assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
abstractSink.shutdown();
}
Expand All @@ -92,7 +89,7 @@ public AbstractSinkImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(200);
} catch (InterruptedException e) {

}
Expand Down Expand Up @@ -126,7 +123,7 @@ public AbstractSinkNotReadyImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(100);
} catch (InterruptedException e) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@ExtendWith(MockitoExtension.class)
public class AcknowledgementSetMonitorTests {
private static final int DEFAULT_WAIT_TIME_MS = 2000;
private static final int DEFAULT_WAIT_TIME_MS = 500;
@Mock
DefaultAcknowledgementSet acknowledgementSet1;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -67,7 +68,7 @@
import static org.mockito.Mockito.when;

class PipelineTests {
private static final int TEST_READ_BATCH_TIMEOUT = 3000;
private static final int TEST_READ_BATCH_TIMEOUT = 500;
private static final int TEST_PROCESSOR_THREADS = 1;
private static final String TEST_PIPELINE_NAME = "test-pipeline";

Expand Down Expand Up @@ -168,9 +169,9 @@ void testPipelineStateWithProcessor() {

@Test
void testPipelineDelayedReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofMillis(2000);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand All @@ -182,11 +183,11 @@ void testPipelineDelayedReady() throws InterruptedException {
Instant startTime = Instant.now();
testPipeline.execute();
assertFalse(testPipeline.isReady());
for (int i = 0; i < delayTimeSeconds + 2; i++) {
Thread.sleep(1000);
}
await().atMost(Duration.ofSeconds(2).plus(delayTime))
.pollInterval(Duration.ofMillis(200))
.until(testPipeline::isReady);
assertTrue(testPipeline.isReady());
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(Duration.ofSeconds(delayTimeSeconds)));
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(delayTime));
assertThat("Pipeline isStopRequested is expected to be false", testPipeline.isStopRequested(), is(false));
testPipeline.shutdown();
assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true));
Expand All @@ -196,9 +197,9 @@ void testPipelineDelayedReady() throws InterruptedException {

@Test
void testPipelineDelayedReadyShutdownBeforeReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofSeconds(2);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -29,11 +30,11 @@ public TestSink() {
this.ready = true;
}

public TestSink(int readyAfterSecs) {
public TestSink(Duration readyAfter) {
this.ready = false;
this.failSinkForTest = false;
this.collectedRecords = new ArrayList<>();
this.readyTime = Instant.now().plusSeconds(readyAfterSecs);
this.readyTime = Instant.now().plus(readyAfter);
}

public TestSink(boolean failSinkForTest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private AggregateProcessor createObjectUnderTest() {
return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator);
}

@RepeatedTest(value = 10)
@RepeatedTest(value = 2)
void aggregateWithNoConcludingGroupsReturnsExpectedResult() throws InterruptedException {
aggregateAction = new RemoveDuplicatesAggregateAction();
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -260,7 +260,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
}

@ParameterizedTest
@ValueSource(doubles = {5.0, 15.0, 33.0, 55.0, 70.0, 85.0, 92.0, 99.0})
@ValueSource(doubles = {5.0, 15.0, 55.0, 92.0, 99.0})
void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedException, NoSuchFieldException, IllegalAccessException {
PercentSamplerAggregateActionConfig percentSamplerAggregateActionConfig = new PercentSamplerAggregateActionConfig();
setField(PercentSamplerAggregateActionConfig.class, percentSamplerAggregateActionConfig, "percent", testPercent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_noLimit_THEN_keepsWritingToB

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

lenient().when(sourceConfig.getInterval()).thenReturn(Duration.ofSeconds(1)); // interval of 1 second
Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(INFINITE_LOG_COUNT); // no limit to log count

logGeneratorSource.start(spyBuffer);
Thread.sleep(1500);

Thread.sleep((long) (interval.toMillis() * 1.5));
verify(spyBuffer, atLeast(1)).write(any(Record.class), anyInt());
Thread.sleep(700);
Thread.sleep((long) (interval.toMillis() * 0.7));
verify(spyBuffer, atLeast(2)).write(any(Record.class), anyInt());
}

Expand All @@ -102,16 +103,18 @@ void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_reachedLimit_THEN_stopsWriti

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

lenient().when(sourceConfig.getInterval()).thenReturn(Duration.ofSeconds(1)); // interval of 1 second
Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(1); // max log count of 1 in logGeneratorSource

assertEquals(spyBuffer.isEmpty(), true);
logGeneratorSource.start(spyBuffer);
Thread.sleep(1100);
Thread.sleep((long) (interval.toMillis() * 1.1));

verify(spyBuffer, times(1)).write(any(Record.class), anyInt());

Thread.sleep(1000);
Thread.sleep(interval.toMillis());
verify(spyBuffer, times(1)).write(any(Record.class), anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void start(final Buffer<Record<Document>> buffer) {
throw new IllegalStateException("Buffer is null");
}
rssReaderTask = new RssReaderTask(rssReader, rssSourceConfig.getUrl(), buffer);
scheduledExecutorService.scheduleAtFixedRate(rssReaderTask, 0, 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(rssReaderTask, 0,
rssSourceConfig.getPollingFrequency().toMillis(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.time.Duration;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
Expand All @@ -42,12 +44,14 @@ class RSSSourceTest {
private PluginMetrics pluginMetrics;

private RSSSource rssSource;
private Duration pollingFrequency;

@BeforeEach
void setUp() {
pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, PIPELINE_NAME);
pollingFrequency = Duration.ofMillis(1800);
lenient().when(rssSourceConfig.getUrl()).thenReturn(VALID_RSS_URL);
lenient().when(rssSourceConfig.getPollingFrequency()).thenReturn(Duration.ofSeconds(5));
lenient().when(rssSourceConfig.getPollingFrequency()).thenReturn(pollingFrequency);
rssSource = new RSSSource(pluginMetrics, rssSourceConfig);
}

Expand All @@ -59,9 +63,15 @@ public void tearDown() {
@Test
void test_ExecutorService_keep_writing_Events_to_Buffer() throws Exception {
rssSource.start(buffer);
Thread.sleep(5000);
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
Thread.sleep(5000);
await().atMost(pollingFrequency.multipliedBy(2))
.untilAsserted(() -> {
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
});
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
await().atMost(pollingFrequency.multipliedBy(2))
.untilAsserted(() -> {
verify(buffer, atLeast(2)).writeAll(anyCollection(), anyInt());
});
verify(buffer, atLeast(2)).writeAll(anyCollection(), anyInt());
}
}

0 comments on commit 68b420c

Please sign in to comment.