Skip to content

Commit

Permalink
Update to use EventStreamClientFactory. Removed ExactlyOnceMultithrea…
Browse files Browse the repository at this point in the history
…dedProcessor.

Signed-off-by: Claudio Fahey <claudio.fahey@dell.com>
  • Loading branch information
Claudio Fahey committed May 31, 2020
1 parent 221e0c3 commit 2b3a033
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
*/
package io.pravega.example.streamprocessing;

import io.pravega.client.ClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.*;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -49,6 +56,7 @@ public static void main(String[] args) throws Exception {
}

public void run() throws Exception {
final ClientConfig clientConfig = ClientConfig.builder().controllerURI(controllerURI).build();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
Expand All @@ -69,7 +77,7 @@ public void run() throws Exception {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}

try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig);
EventStreamReader<String> reader = clientFactory.createReader(
"reader",
readerGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
*/
package io.pravega.example.streamprocessing;

import io.pravega.client.ClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
Expand Down Expand Up @@ -52,6 +53,7 @@ public static void main(String[] args) throws Exception {
}

public void run() throws Exception {
final ClientConfig clientConfig = ClientConfig.builder().controllerURI(controllerURI).build();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
Expand All @@ -64,8 +66,7 @@ public void run() throws Exception {
}

Random rand = new Random(42);

try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig);
EventStreamWriter<String> writer = clientFactory.createEventWriter(
outputStreamName,
new UTF8StringSerializer(),
Expand All @@ -90,7 +91,7 @@ public void run() throws Exception {
String.format("%06d", eventCounter),
String.format("%08d", sum),
message);
final CompletableFuture writeFuture = writer.writeEvent(routingKey, message);
final CompletableFuture<Void> writeFuture = writer.writeEvent(routingKey, message);
Thread.sleep(1000);
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit 2b3a033

Please sign in to comment.