diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java index c2c248b1..72c4fa7d 100644 --- a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java @@ -76,7 +76,7 @@ public void run() throws Exception { for (;;) { eventCounter++; long intData = rand.nextInt(100); - String routingKey = Long.toString(intData); + String routingKey = String.format("%02d", intData); sum += intData; String generatedTimestampStr = dateFormat.format(new Date()); String message = String.join(",", diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java index 4d37c3b8..828d59ba 100644 --- a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java @@ -17,6 +17,7 @@ import io.pravega.client.stream.impl.UTF8StringSerializer; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; @@ -136,7 +137,7 @@ public ExactlyOnceMultithreadedProcessor(String scope, String inputStreamName, S } /** - * Commit all transactions that have been opened since the last checkpoint. + * Commit all transactions that are part of a checkpoint. * * @param checkpointName */ @@ -146,21 +147,23 @@ private void commitTransactions(String checkpointName) { // Read the contents of all pravega-transactions-worker-XX files. // These files contain the Pravega transaction IDs that must be committed now. Path checkpointDirPath = checkpointRootPath.resolve(checkpointName); - List txnIds = IntStream - .range(0, numWorkers) - .parallel() - .boxed() - .map(workerIndex -> checkpointDirPath.resolve(CHECKPOINT_TRANSACTION_ID_FILE_NAME_PREFIX + workerIndex)) - .filter(Files::exists) - .flatMap(path -> { - try { - return Files.readAllLines(path, StandardCharsets.UTF_8).stream(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .map(UUID::fromString) - .collect(Collectors.toList()); + List txnIds = null; + try { + txnIds = Files.list(checkpointDirPath) + .filter(path -> path.getFileName().toString().startsWith(CHECKPOINT_TRANSACTION_ID_FILE_NAME_PREFIX)) + .parallel() + .flatMap(path -> { + try { + return Files.readAllLines(path, StandardCharsets.UTF_8).stream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .map(UUID::fromString) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } log.info("commitTransactions: txnIds={}", txnIds); @@ -214,7 +217,8 @@ public Void call() throws InterruptedException { } /** - * Initiate a checkpoint, wait for it to complete, and write the checkpoint to the state. + * Initiate a checkpoint, wait for it to complete, write the checkpoint to the state, + * and commit transactions. */ private void performCheckpoint() { final String checkpointName = UUID.randomUUID().toString(); @@ -254,8 +258,9 @@ private void performCheckpoint() { cleanCheckpointDirectory(checkpointDirPath); } catch (final Exception e) { - log.warn("performCheckpoint: Error performing checkpoint", e); - // Ignore error. We will retry when we are scheduled again. + // If any exception occurs, this application will abnormally terminate. + // Upon restart, it will resume from the last successful checkpoint. + panic(e); } log.info("performCheckpoint: END: checkpointName={}", checkpointName); } @@ -295,10 +300,17 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO } }); } catch (IOException e) { + // Any errors here are non-fatal. The next call to this function + // will attempt to clean anything that was missed. log.warn("cleanCheckpointDirectory", e); } } + private void panic(Exception e) { + log.error("Aborting due to fatal exception", e); + System.exit(1); + } + public static void main(String[] args) throws Exception { ExactlyOnceMultithreadedProcessor master = new ExactlyOnceMultithreadedProcessor( Parameters.getScope(),