Skip to content

Short examples

Richard Hightower edited this page Sep 12, 2016 · 18 revisions

TodoRepo interface

public interface TodoRepo {
    Promise<List<Todo>> loadTodos();
    ...
}

Using TodoRepo

todoRep.loadTodos()
        .then(todos -> {
            logger.info("list todos");
            returnPromise.resolve(todos);
        })
        .catchError(error -> {
            logger.error("Unable to add todo to repo", error);
            returnPromise.reject("Unable to add todo to repo");
        })
        .invoke();

Scheduling a task with the reactor

reactor.runTaskAfter(Duration.ofSeconds(60), () -> {
    logger.info("Registering health check and recovery for repo");
    reactor.addRepeatingTask(Duration.ofSeconds(30), this::circuitBreakerTest);
});

Promise invokeWithReactor

//Connect to repo.
connect().catchError(error -> {
            notConnectedCount++;
            logger.error("Not connected to repo " + notConnectedCount, error);
                ...
       }).thenSafe(connected -> {
                ...
                notConnectedCount = 0;
       }).invokeWithReactor(reactor);

Create a breaker for a session that is not connected yet

private Breaker<Session> sessionBreaker = Breaker.opened();
    

Periodically check the breaker to see if it is open

reactor.runTaskAfter(Duration.ofSeconds(60), () -> {
    logger.info("Registering health check and recovery for repo");
    reactor.addRepeatingTask(Duration.ofSeconds(30), this::circuitBreakerTest);
});

Circuit breaker test

    private void circuitBreakerTest() {
        sessionBreaker.ifBroken(() -> {
            serviceMgmt.increment("repo.breaker.broken");
            //Clean up the old session.
            sessionBreaker.cleanup(session -> {
                try {
                    if (!session.isClosed()) { session.close(); }
                } catch (Exception ex) { logger.warn("unable to clean up old session", ex); }
            });
            //Connect to repo.
            connect().catchError(error -> {
                notConnectedCount++;
                logger.error("Not connected to repo " + notConnectedCount, error);
                ...
                if (notConnectedCount > 10) {
                    logger.error("Attempts to reconnect to Repo failed. Mark it.");
                    serviceMgmt.increment("repo.connect.error.fatal");
                    serviceMgmt.setFailingWithError(error);
                }
            }).thenSafe(connected -> {
                if (serviceMgmt.isFailing()) {
                    serviceMgmt.increment("repo.connect.recover");
                    serviceMgmt.recover();
                }
                notConnectedCount = 0;
            }).invokeWithReactor(reactor);
        });
    }