-
Notifications
You must be signed in to change notification settings - Fork 123
Nexus handler near complete implementation #1708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -91,6 +91,17 @@ export async function decodeArrayFromPayloads( | |||
return arrayFromPayloads(payloadConverter, await decodeOptional(payloadCodecs, payloads)); | |||
} | |||
|
|||
/** | |||
* Decode `payloads` and then return {@link arrayFromPayloads}`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring is wrong
if (nexusServices.has(s.name)) { | ||
throw new TypeError(`Duplicate registration of nexus service ${s.name}`); | ||
} | ||
const ops = new Map<string, nexus.OperationHandler<any, any> | nexus.SyncOperationHandler<any, any>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should that be part of the Nexus SDK? Logic is trivial at present, but there I expect there will be some normalization work to be done at a later point (e.g. once we add decorators) which we wouldn't to diverge from one Nexus implementation to another. This also seems to be what we did in Java, and kind-of what we did in Go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we discussed moving this to the Nexus SDK with Chad yesterday. I will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed that part of the conversation.
I will do that.
I only wanted to confirm that we agree on direction. I can do it myself if you don't have time/prefer not to do it (same apply to all suggestions I make on this PR).
import getPort from 'get-port'; | ||
import * as nexus from 'nexus-rpc'; | ||
import * as protoJsonSerializer from 'proto3-json-serializer'; | ||
import * as temporalnexus from '@temporalio/nexus'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dependency is missing in package.json and tsconfig.json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix.
}); | ||
|
||
Runtime.install({ logger }); | ||
t.context.httpPort = await getPort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO(JWH): This will be flaky in CI due to port collisions, and users will often need to do that themselves. Move this to Core.
test.beforeEach(async (t) => { | ||
const taskQueue = t.title + randomUUID(); | ||
const { env } = t.context; | ||
const response = await env.connection.operatorService.createNexusEndpoint({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is the way for now, but what's the intent regarding this? Do we expect users to have to do the same in their own Temporal+Nexus tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. in Java we have a shortcut, we should probably make it easier in the test environment for Core based SDKs. @dandavison, wondering if you considered this in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I have not considered APIs for user Temporal Nexus testing yet; I'll add a TODO to the code. In my tests I have a helper function like this for creating Nexus endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an issue to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Created one for Python SDK-3837 "Python Nexus user testing utilities and docs"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have made this a features issue with checkboxes for each SDK but 🤷, thanks!
packages/worker/src/nexus.ts
Outdated
const input = await decodeFromPayload(this.dataConverter, payload); | ||
if (typeof this.handler === 'function') { | ||
const handler = this.handler as nexus.SyncOperationHandler<unknown, unknown>; | ||
const output = await this.invokeUserCode('startOperation', handler.bind(undefined, input, options)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be preferable to add some defensive checks here to prevent a user from returning a HandlerStartOperationResultSync
or HandlerStartOperationResultAsync
from their SyncOperationHandler
. Sounds like an easy error to make. But I don't think there's currently a safe way of doing this.
packages/worker/src/nexus.ts
Outdated
options: nexus.StartOperationOptions | ||
): Promise<coresdk.nexus.INexusTaskCompletion> { | ||
try { | ||
const input = await decodeFromPayload(this.dataConverter, payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the data converter fails here, I assume it should result in a 400 BAD_REQUEST
Nexus HTTP response, right? If that's correct can you confirm that it does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a test worth adding. Thanks!
- This PR is blocked on publishing an initial version of the `nexus-rpc` package. - There is a TODO to figure out HandlerError and OperationError message rehydration, pending discussion. - Interceptors not yet implemented. - `WorkflowRunOperation` and `getClient()` not implemented for the `@temporalio/nexus` package. - Tests use the HTTP API directly in lieu of a workflow caller or strongly typed client, we can refactor those later.
import { | ||
convertWorkflowEventLinkToNexusLink, | ||
convertNexusLinkToWorkflowEventLink, | ||
} from '@temporalio/worker/lib/nexus/link-converter'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} from '@temporalio/worker/lib/nexus/link-converter'; | |
} from '@temporalio/nexus/lib/link-converter'; |
) => Promise<WorkflowHandle<O>>; | ||
|
||
export class WorkflowRunOperation<I, O> implements nexus.OperationHandler<I, O> { | ||
constructor(readonly handler: WorkflowRunOperationHandler<I, O>) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that WorkflowRunOperation
implements OperationHandler
, isn't there an argument that WorkflowRunOperation
should be called WorkflowRunOperationHandler
and that the function passed into the constructor (type currently called WorkflowRunOperationHandler
) should be called startHandler: WorkflowRunOperationStartHandler
?
nexus-rpc
package.WorkflowRunOperation
andgetClient()
not implemented for the@temporalio/nexus
package.