Skip to content

reactor-test: thenConsumeWhile consumption off-by-one when next script event is a TaskEvent #3121

@ScottButler87

Description

@ScottButler87

The contract of thenConsumeWhile seems to indicate that signals will only be consumed so long as they match the given predicate.
https://github.com/reactor/reactor-core/blob/main/reactor-test/src/main/java/reactor/test/StepVerifier.java#L934-L941

/**
* Consume further onNext signals as long as they match a predicate.
*
* @param predicate the condition to continue consuming onNext
*
* @return this builder
*/
Step<T> thenConsumeWhile(Predicate<T> predicate);

However I've found that under certain circumstances, it can consume one-too-many elements during sequence verification. It definitely does this when the next scripted event is a TaskEvent. I haven't dug deep enough into the code yet to determine if other cases will also trigger this behavior.

During verification of a sequence, when processing a SignalConsumeWhileEvent, the code path passes through final void onExpectation(Signal<T> actualSignal) which seems to be the core code for the script verification of incoming elements received via subscription
https://github.com/reactor/reactor-core/blob/main/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java#L1427-L1524

For any SignalConsumeWhileEvents, method boolean consumeWhile(Signal<T> actualSignal, SignalConsumeWhileEvent<T> whileEvent) is invoked from onExpectation(Signal<T> actualSignal). When actualSignal does not match the predicate used to initialize the event, this method removes the SignalConsumeWhileEvent from the script and returns "false" indicating the signal should not be consumed (expected)
https://github.com/reactor/reactor-core/blob/main/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java#L1582-L1599

boolean consumeWhile(Signal<T> actualSignal, SignalConsumeWhileEvent<T> whileEvent) {
	if (actualSignal.isOnNext()) {
		if (whileEvent.test(actualSignal.get())) {
			//the value matches, gobble it up
			unasserted--;
			if (this.logger != null) {
				logger.debug("{} consumed {}", whileEvent.getDescription(), actualSignal);
			}
			return true;
		}
	}
	if (this.logger != null) {
		logger.debug("{} stopped at {}", whileEvent.getDescription(), actualSignal);
	}
	//stop evaluating the predicate
	this.script.poll();
	return false;
}

However on return from consumeWhile, actualSignal is still the signal that should not be consumed. I would expect the code to handle any non-verification events (TaskEvent) in the script, then resume verification with the next verification event using that same unconsumed actualSignal. Instead, if the next script event is a TaskEvent, execution falls through from SignalConsumeWhileEvent handling into TaskEvent handling and after the TaskEvents are handled, verification ends for that unconsumed signal!
https://github.com/reactor/reactor-core/blob/main/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java#L1481-L1502

if (event instanceof DefaultStepVerifierBuilder.SignalConsumeWhileEvent) {
	if (consumeWhile(actualSignal, (SignalConsumeWhileEvent<T>) event)) {
		return;
	}
	//possibly re-evaluate the current onNext
	event = this.script.peek();
}

// ... other code not exercised in this case

event = this.script.peek();

for (; ; ) {
	if (event == null || !(event instanceof EagerEvent)) {
		break;
	}
	if (event instanceof SubscriptionEvent) {
		if (serializeDrainAndSubscriptionEvent()) {
			return;
		}
	}
	else if (event instanceof CollectEvent) {
		if (onCollect(actualSignal)) {
			return;
		}
	}
	else {
		onTaskEvent();
	}
	event = this.script.peek();
}

There is a portion of the code that I think is intended to handle this issue? But it is insufficient when the next script event is a TaskEvent

//possibly re-evaluate the current onNext
event = this.script.peek();

Is this intentional behavior? If so, I think it should be documented. If not, a possible solution which I haven't vetted would be to recurse into onExpectation whenever the current signal should not be consumed -- for example, when consumeWhile returns false or with TaskEvents

Metadata

Metadata

Assignees

Labels

area/reactor-testThis belongs to the reactor-test moduletype/bugA general bug

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions