Skip to content
This repository was archived by the owner on Mar 1, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ private void checkRestartLocked() {
private static final int BIND = 8;
private static final int FILTER_SUCCESS = 9;
private static final int FILTER_FAILURE = 10;
private static final int FILTER_FAILED_CHECK = 11;

/**
* @param asynchronously Whether this flow is run asynchronously. True after the first goTo and
Expand Down Expand Up @@ -339,6 +340,9 @@ private void runFlowFrom(final int index, final boolean asynchronously) {
case FILTER_FAILURE:
i = runFilterFailure(directives, i);
break;
case FILTER_FAILED_CHECK:
i = runFilterFailedCheck(directives, i);
break;
case END:
i = runEnd(directives, i);
break;
Expand Down Expand Up @@ -484,14 +488,35 @@ static void addFilterFailure(@NonNull final List<Object> directives) {
private int runFilterFailure(@NonNull final Object[] directives, final int index) {
final Result tryValue = (Result) intermediateValue;
if (tryValue.succeeded()) {
runTerminate(tryValue.get(), identityFunction());
setNewValueAndEndFlow(tryValue.get());
return -1;
} else {
intermediateValue = tryValue.getFailure();
return index + 1;
}
}

static void addFilterFailedCheck(@NonNull final Function caseFunction,
@NonNull final Predicate casePredicate,
@NonNull final List<Object> directives) {
directives.add(FILTER_FAILED_CHECK);
directives.add(caseFunction);
directives.add(casePredicate);
}

private int runFilterFailedCheck(@NonNull final Object[] directives, final int index) {
final Function caseFunction = (Function) directives[index + 1];
final Predicate casePredicate = (Predicate) directives[index + 2];

final Object caseValue = caseFunction.apply(intermediateValue);
if (casePredicate.apply(caseValue)) {
setNewValueAndEndFlow(intermediateValue);
return -1;
} else {
return index + 3;
}
}

private void runTerminate(@NonNull final Object caseValue,
@Nullable final Function terminatingValueFunction) {
if (terminatingValueFunction == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.android.agera.CompiledRepository.addBindWith;
import static com.google.android.agera.CompiledRepository.addCheck;
import static com.google.android.agera.CompiledRepository.addEnd;
import static com.google.android.agera.CompiledRepository.addFilterFailedCheck;
import static com.google.android.agera.CompiledRepository.addFilterFailure;
import static com.google.android.agera.CompiledRepository.addFilterSuccess;
import static com.google.android.agera.CompiledRepository.addGetFrom;
Expand Down Expand Up @@ -48,7 +49,7 @@ final class RepositoryCompiler implements
RepositoryCompilerStates.RFrequency,
RepositoryCompilerStates.RFlow,
RepositoryCompilerStates.RTerminationOrContinue,
RepositoryCompilerStates.RConfig {
RepositoryCompilerStates.RThenCheckOrConfig {

private static final ThreadLocal<RepositoryCompiler> compilers = new ThreadLocal<>();

Expand All @@ -75,7 +76,7 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) {

@Retention(RetentionPolicy.SOURCE)
@IntDef({NOTHING, FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE, FLOW,
TERMINATE_THEN_FLOW, TERMINATE_THEN_END, CONFIG})
TERMINATE_THEN_FLOW, TERMINATE_THEN_END, THEN_CHECK_OR_CONFIG, CONFIG})
private @interface Expect {}

private static final int NOTHING = 0;
Expand All @@ -84,7 +85,8 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) {
private static final int FLOW = 3;
private static final int TERMINATE_THEN_FLOW = 4;
private static final int TERMINATE_THEN_END = 5;
private static final int CONFIG = 6;
private static final int THEN_CHECK_OR_CONFIG = 6;
private static final int CONFIG = 7;

private Object initialValue;
private final ArrayList<Observable> eventSources = new ArrayList<>();
Expand All @@ -95,6 +97,7 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) {
private Function caseExtractor;
private Predicate casePredicate;
private boolean goLazyUsed;
private boolean inThenCheckTerminationClause;
private Merger notifyChecker = objectsUnequal();
@RepositoryConfig
private int deactivationConfig;
Expand Down Expand Up @@ -128,6 +131,20 @@ private void checkGoLazyUnused() {
checkState(!goLazyUsed, "Unexpected occurrence of async directive after goLazy()");
}

private void checkExpectConfigAndEnsureEndFlow() {
if (expect == THEN_CHECK_OR_CONFIG) {
endFlow(false);
expect = CONFIG;
} else {
checkExpect(CONFIG);
}
}

private void endFlow(final boolean skip) {
addEnd(skip, directives);
expect = CONFIG;
}

//region REventSource

@NonNull
Expand Down Expand Up @@ -226,6 +243,7 @@ public RepositoryCompiler bindWith(@NonNull final Supplier secondValueSupplier,
@NonNull
@Override
public RepositoryCompiler thenSkip() {
checkExpect(FLOW);
endFlow(true);
return this;
}
Expand All @@ -234,7 +252,7 @@ public RepositoryCompiler thenSkip() {
@Override
public RepositoryCompiler thenGetFrom(@NonNull final Supplier supplier) {
getFrom(supplier);
endFlow(false);
expect = THEN_CHECK_OR_CONFIG;
return this;
}

Expand All @@ -243,23 +261,18 @@ public RepositoryCompiler thenGetFrom(@NonNull final Supplier supplier) {
public RepositoryCompiler thenMergeIn(
@NonNull final Supplier supplier, @NonNull final Merger merger) {
mergeIn(supplier, merger);
endFlow(false);
expect = THEN_CHECK_OR_CONFIG;
return this;
}

@NonNull
@Override
public RepositoryCompiler thenTransform(@NonNull final Function function) {
transform(function);
endFlow(false);
expect = THEN_CHECK_OR_CONFIG;
return this;
}

private void endFlow(final boolean skip) {
addEnd(skip, directives);
expect = CONFIG;
}

@NonNull
@Override
public RepositoryCompiler attemptGetFrom(@NonNull final Supplier attemptSupplier) {
Expand Down Expand Up @@ -335,7 +348,7 @@ public RepositoryCompiler goLazy() {

//endregion RFlow

//region RTermination
//region RTerminationOrContinue

@NonNull
@Override
Expand All @@ -360,54 +373,81 @@ private void terminate(@Nullable final Function valueFunction) {
}
caseExtractor = null;
casePredicate = null;
if (expect == TERMINATE_THEN_END) {
if (expect == TERMINATE_THEN_FLOW) {
expect = FLOW;
} else if (inThenCheckTerminationClause) {
endFlow(false);
} else {
expect = FLOW;
expect = THEN_CHECK_OR_CONFIG;
}
}

@NonNull
@Override
public RepositoryCompiler orContinue() {
checkExpect(TERMINATE_THEN_END);
addFilterFailure(directives);
if (inThenCheckTerminationClause) {
addFilterFailedCheck(caseExtractor, casePredicate, directives);
caseExtractor = null;
casePredicate = null;
inThenCheckTerminationClause = false;
} else {
addFilterFailure(directives);
}
expect = FLOW;
return this;
}

//endregion RTermination
//endregion RTerminationOrContinue

//region RThenCheckOrConfig

@NonNull
@Override
public RepositoryCompiler thenCheck(@NonNull final Predicate predicate) {
return thenCheck(identityFunction(), predicate);
}

//region RConfig
@NonNull
@Override
public RepositoryCompiler thenCheck(
@NonNull final Function function, @NonNull final Predicate predicate) {
checkExpect(THEN_CHECK_OR_CONFIG);
caseExtractor = checkNotNull(function);
casePredicate = checkNotNull(predicate);
expect = TERMINATE_THEN_END;
inThenCheckTerminationClause = true;
return this;
}

@NonNull
@Override
public RepositoryCompiler notifyIf(@NonNull final Merger notifyChecker) {
checkExpect(CONFIG);
checkExpectConfigAndEnsureEndFlow();
this.notifyChecker = checkNotNull(notifyChecker);
return this;
}

@NonNull
@Override
public RepositoryCompiler onDeactivation(@RepositoryConfig final int deactivationConfig) {
checkExpect(CONFIG);
checkExpectConfigAndEnsureEndFlow();
this.deactivationConfig = deactivationConfig;
return this;
}

@NonNull
@Override
public RepositoryCompiler onConcurrentUpdate(@RepositoryConfig final int concurrentUpdateConfig) {
checkExpect(CONFIG);
checkExpectConfigAndEnsureEndFlow();
this.concurrentUpdateConfig = concurrentUpdateConfig;
return this;
}

@NonNull
@Override
public RepositoryCompiler sendDiscardedValuesTo(@NonNull final Receiver disposer) {
checkExpect(CONFIG);
checkExpectConfigAndEnsureEndFlow();
discardedValueDisposer = checkNotNull(disposer);
return this;
}
Expand All @@ -431,7 +471,7 @@ public RepositoryCompiler compileIntoRepositoryWithInitialValue(@NonNull final O

@NonNull
private Repository compileRepositoryAndReset() {
checkExpect(CONFIG);
checkExpectConfigAndEnsureEndFlow();
Repository repository = compiledRepository(initialValue, eventSources, frequency, directives,
notifyChecker, concurrentUpdateConfig, deactivationConfig, discardedValueDisposer);
expect = NOTHING;
Expand All @@ -440,6 +480,7 @@ private Repository compileRepositoryAndReset() {
frequency = 0;
directives.clear();
goLazyUsed = false;
inThenCheckTerminationClause = false;
notifyChecker = objectsUnequal();
deactivationConfig = RepositoryConfig.CONTINUE_FLOW;
concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;
Expand Down
Loading