Skip to content

Commit d7eb6c1

Browse files
committed
CommandProcessor
1 parent 5d87d12 commit d7eb6c1

File tree

1 file changed

+136
-0
lines changed

1 file changed

+136
-0
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package io.temporal.samples.hello;
2+
3+
import io.temporal.activity.ActivityInterface;
4+
import io.temporal.activity.ActivityOptions;
5+
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
6+
import io.temporal.client.WorkflowClient;
7+
import io.temporal.client.WorkflowOptions;
8+
import io.temporal.client.WorkflowStub;
9+
import io.temporal.serviceclient.WorkflowServiceStubs;
10+
import io.temporal.worker.Worker;
11+
import io.temporal.worker.WorkerFactory;
12+
import io.temporal.workflow.CompletablePromise;
13+
import io.temporal.workflow.UpdateMethod;
14+
import io.temporal.workflow.Workflow;
15+
import io.temporal.workflow.WorkflowInterface;
16+
import io.temporal.workflow.WorkflowMethod;
17+
import java.time.Duration;
18+
import java.util.ArrayList;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
public class CommandProcessor {
22+
static final String TASK_QUEUE = "MyTaskQueue";
23+
static final String WORKFLOW_ID = "MyWorkflowId";
24+
25+
@WorkflowInterface
26+
public interface CommandProcessorWorkflow {
27+
@WorkflowMethod
28+
String startProcessing();
29+
30+
@UpdateMethod
31+
String submitCommand(int command);
32+
}
33+
34+
@ActivityInterface
35+
public interface MyActivities {
36+
String processCommand(int command);
37+
}
38+
39+
public static class CommandProcessorWorkflowImpl implements CommandProcessorWorkflow {
40+
41+
private static class QueuedCommand {
42+
public int command;
43+
public CompletablePromise<String> promise;
44+
45+
public QueuedCommand(int command, CompletablePromise<String> promise) {
46+
this.command = command;
47+
this.promise = promise;
48+
}
49+
}
50+
51+
private ArrayList<QueuedCommand> commandQueue;
52+
private boolean done;
53+
54+
public CommandProcessorWorkflowImpl() {
55+
this.commandQueue = new ArrayList<>();
56+
this.done = false;
57+
}
58+
59+
private final MyActivities activities =
60+
Workflow.newActivityStub(
61+
MyActivities.class,
62+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
63+
64+
@Override
65+
public String startProcessing() {
66+
while (true) {
67+
Workflow.await(() -> this.commandQueue.size() > 0 || this.done);
68+
if (this.done && this.commandQueue.size() == 0) {
69+
return "done";
70+
}
71+
QueuedCommand queuedCommand = this.commandQueue.remove(0);
72+
String result = activities.processCommand(queuedCommand.command);
73+
queuedCommand.promise.complete(result);
74+
}
75+
}
76+
77+
@Override
78+
public String submitCommand(int command) {
79+
if (command < 0) {
80+
this.done = true;
81+
return "stopping workflow";
82+
}
83+
CompletablePromise<String> promise = Workflow.newPromise();
84+
QueuedCommand queuedCommand = new QueuedCommand(command, promise);
85+
this.commandQueue.add(queuedCommand);
86+
return queuedCommand.promise.get();
87+
}
88+
}
89+
90+
static class MyActivitiesImpl implements MyActivities {
91+
@Override
92+
public String processCommand(int commandNum) {
93+
try {
94+
// Earlier commands are slower, so we must serialize if they are to complete in order of
95+
// receipt.
96+
Thread.sleep(1000L * (3 - commandNum));
97+
} catch (InterruptedException e) {
98+
Thread.currentThread().interrupt();
99+
}
100+
String result = commandNum + " [processed]";
101+
System.out.println(result);
102+
return result;
103+
}
104+
}
105+
106+
public static void main(String[] args) {
107+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
108+
WorkflowClient client = WorkflowClient.newInstance(service);
109+
WorkerFactory factory = WorkerFactory.newInstance(client);
110+
Worker worker = factory.newWorker(TASK_QUEUE);
111+
worker.registerWorkflowImplementationTypes(CommandProcessorWorkflowImpl.class);
112+
worker.registerActivitiesImplementations(new MyActivitiesImpl());
113+
factory.start();
114+
CommandProcessorWorkflow commandProcessor =
115+
client.newWorkflowStub(
116+
CommandProcessorWorkflow.class,
117+
WorkflowOptions.newBuilder()
118+
.setWorkflowId(WORKFLOW_ID)
119+
.setTaskQueue(TASK_QUEUE)
120+
.setWorkflowIdReusePolicy(
121+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING)
122+
.build());
123+
124+
WorkflowStub untypedWorkflowStub = WorkflowStub.fromTyped(commandProcessor);
125+
126+
WorkflowClient.start(commandProcessor::startProcessing);
127+
128+
CompletableFuture.allOf(
129+
untypedWorkflowStub.startUpdate("submitCommand", String.class, 1).getResultAsync(),
130+
untypedWorkflowStub.startUpdate("submitCommand", String.class, 2).getResultAsync())
131+
.join();
132+
commandProcessor.submitCommand(-1);
133+
untypedWorkflowStub.getResult(String.class);
134+
System.exit(0);
135+
}
136+
}

0 commit comments

Comments
 (0)