Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: RPC server, client, bidirectional streaming #1115

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 107 additions & 40 deletions cli/targets/static.js
Original file line number Diff line number Diff line change
Expand Up @@ -626,49 +626,116 @@ function buildService(ref, service) {
}

service.methodsArray.forEach(function(method) {
method.resolve();
var lcName = protobuf.util.lcFirst(method.name),
cbName = escapeName(method.name + "Callback");
push("");
pushComment([
"Callback as used by {@link " + exportName(service) + "#" + escapeName(lcName) + "}.",
// This is a more specialized version of protobuf.rpc.ServiceCallback
"@memberof " + exportName(service),
"@typedef " + cbName,
"@type {function}",
"@param {Error|null} error Error, if any",
"@param {" + exportName(method.resolvedResponseType) + "} [response] " + method.resolvedResponseType.name
]);
push("");
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@param {" + exportName(method.resolvedRequestType, !config.forceMessage) + "} request " + method.resolvedRequestType.name + " message or plain object",
"@param {" + exportName(service) + "." + cbName + "} callback Node-style callback called with the error, if any, and " + method.resolvedResponseType.name,
"@returns {undefined}",
"@variation 1"
]);
push("Object.defineProperty(" + escapeName(service.name) + ".prototype" + util.safeProp(lcName) + " = function " + escapeName(lcName) + "(request, callback) {");
++indent;
push("return this.rpcCall(" + escapeName(lcName) + ", $root." + exportName(method.resolvedRequestType) + ", $root." + exportName(method.resolvedResponseType) + ", request, callback);");
--indent;
push("}, \"name\", { value: " + JSON.stringify(method.name) + " });");
if (config.comments)
push("");
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@param {" + exportName(method.resolvedRequestType, !config.forceMessage) + "} request " + method.resolvedRequestType.name + " message or plain object",
"@returns {Promise<" + exportName(method.resolvedResponseType) + ">} Promise",
"@variation 2"
]);
if (method.requestStream && method.responseStream) {
buildBidiStreamingMethod(service, method);
} else if (method.requestStream) {
buildClientStreamingMethod(service, method);
} else if (method.responseStream) {
buildServerStreamingMethod(service, method);
} else {
buildUnaryServiceMethod(service, method);
}
});
}

function buildUnaryServiceMethod(service, method) {
method.resolve();
var lcName = protobuf.util.lcFirst(method.name),
cbName = escapeName(method.name + "Callback");
push("");
pushComment([
"Callback as used by {@link " + exportName(service) + "#" + escapeName(lcName) + "}.",
// This is a more specialized version of protobuf.rpc.ServiceCallback
"@memberof " + exportName(service),
"@typedef " + cbName,
"@type {function}",
"@param {Error|null} error Error, if any",
"@param {" + exportName(method.resolvedResponseType) + "} [response] " + method.resolvedResponseType.name
]);
push("");
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@param {" + exportName(method.resolvedRequestType, !config.forceMessage) + "} request " + method.resolvedRequestType.name + " message or plain object",
"@param {" + exportName(service) + "." + cbName + "} callback Node-style callback called with the error, if any, and " + method.resolvedResponseType.name,
"@returns {undefined}",
"@variation 1"
]);
push("Object.defineProperty(" + escapeName(service.name) + ".prototype" + util.safeProp(lcName) + " = function " + escapeName(lcName) + "(request, callback) {");
++indent;
push("return this.rpcCall(" + escapeName(lcName) + ", $root." + exportName(method.resolvedRequestType) + ", $root." + exportName(method.resolvedResponseType) + ", request, callback);");
--indent;
push("}, \"name\", { value: " + JSON.stringify(method.name) + " });");
if (config.comments)
push("");
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@param {" + exportName(method.resolvedRequestType, !config.forceMessage) + "} request " + method.resolvedRequestType.name + " message or plain object",
"@returns {Promise<" + exportName(method.resolvedResponseType) + ">} Promise",
"@variation 2"
]);
}

function buildServerStreamingMethod(service, method) {
method.resolve();
var lcName = protobuf.util.lcFirst(method.name);
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@param {" + exportName(method.resolvedRequestType, !config.forceMessage) + "} request " + method.resolvedRequestType.name + " message or plain object",
"@returns {$protobuf.RPCServerStream<" + exportName(method.resolvedResponseType) + ">}",
"@variation 1"
]);
push("Object.defineProperty(" + escapeName(service.name) + ".prototype" + util.safeProp(lcName) + " = function " + escapeName(lcName) + "(request, callback) {");
++indent;
push("return this.serverStreamCall(" + escapeName(lcName) + ", $root." + exportName(method.resolvedRequestType) + ", $root." + exportName(method.resolvedResponseType) + ", request);");
--indent;
push("}, \"name\", { value: " + JSON.stringify(method.name) + " });");
}

function buildClientStreamingMethod(service, method) {
method.resolve();
var lcName = protobuf.util.lcFirst(method.name);
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@returns {$protobuf.RPCClientStream<" + exportName(method.resolvedRequestType) + ">}",
"@variation 1"
]);
push("Object.defineProperty(" + escapeName(service.name) + ".prototype" + util.safeProp(lcName) + " = function " + escapeName(lcName) + "() {");
++indent;
push("return this.clientStreamCall(" + escapeName(lcName) + ", $root." + exportName(method.resolvedRequestType) + ", $root." + exportName(method.resolvedResponseType) + ");");
--indent;
push("}, \"name\", { value: " + JSON.stringify(method.name) + " });");
}

function buildBidiStreamingMethod(service, method) {
method.resolve();
var lcName = protobuf.util.lcFirst(method.name);
pushComment([
method.comment || "Calls " + method.name + ".",
"@function " + lcName,
"@memberof " + exportName(service),
"@instance",
"@returns {$protobuf.RPCBidiStream<" + exportName(method.resolvedRequestType) + ", " + exportName(method.resolvedResponseType) + ">}",
"@variation 1"
]);
push("Object.defineProperty(" + escapeName(service.name) + ".prototype" + util.safeProp(lcName) + " = function " + escapeName(lcName) + "() {");
++indent;
push("return this.bidiStreamCall(" + escapeName(lcName) + ", $root." + exportName(method.resolvedRequestType) + ", $root." + exportName(method.resolvedResponseType) + ");");
--indent;
push("}, \"name\", { value: " + JSON.stringify(method.name) + " });");
}

function buildEnum(ref, enm) {

push("");
Expand Down
51 changes: 47 additions & 4 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1305,14 +1305,13 @@ export namespace rpc {

/** An RPC service as returned by {@link Service#create}. */
class Service extends util.EventEmitter {

/**
* Constructs a new RPC service instance.
* @param rpcImpl RPC implementation
* @param [requestDelimited=false] Whether requests are length-delimited
* @param [responseDelimited=false] Whether responses are length-delimited
*/
constructor(rpcImpl: RPCImpl, requestDelimited?: boolean, responseDelimited?: boolean);
constructor(rpcImpl: RPCImpl | RPCHandler, requestDelimited?: boolean, responseDelimited?: boolean);

/** RPC implementation. Becomes `null` once the service is ended. */
public rpcImpl: (RPCImpl|null);
Expand Down Expand Up @@ -1342,13 +1341,57 @@ export namespace rpc {
}
}

type RPCUnaryCall = (method: (Method|rpc.ServiceMethod<Message<{}>, Message<{}>>), requestData: Uint8Array, callback: RPCImplCallback) => void;
// TODO: check if all args are valid
type RPCServerStreamCall = (method: (Method|rpc.ServiceMethod<Message<{}>, Message<{}>>), requestData: Uint8Array, decodeFn: (responseData: Uint8Array) => protobuf.Message) => util.EventEmitter;
type RPCClientStreamCall = (method: (Method|rpc.ServiceMethod<Message<{}>, Message<{}>>), encodeFn: (requestData: any) => Uint8Array, decodeFn: (responseData: Uint8Array) => protobuf.Message) => util.EventEmitter;
type RPCBidiStreamCall = (method: (Method|rpc.ServiceMethod<Message<{}>, Message<{}>>), encodeFn: (requestData: any) => Uint8Array, decodeFn: (responseData: Uint8Array) => protobuf.Message) => util.EventEmitter;

/**
* RPCHandler allows to pass custom RPC implementation for unary and streaming calls
*/
export interface RPCHandler {
unaryCall: RPCUnaryCall;
serverStreamCall: RPCServerStreamCall;
clientStreamCall: RPCClientStreamCall;
bidiStreamCall: RPCBidiStreamCall;
}

export interface RPCBidiStream<T, U = {}> {
on(evt: 'recv', fn: (data: U) => any, ctx?: any): util.EventEmitter;
on(evt: 'error', fn: (data: any) => any, ctx?: any): util.EventEmitter;
on(evt: 'end', fn: (data: any) => any, ctx?: any): util.EventEmitter;
// TODO: handle off event
off(evt?: string, fn?: () => any): util.EventEmitter;
emit(evt: 'send', data: T): util.EventEmitter;
emit(evt: 'close'): util.EventEmitter;
}

export interface RPCServerStream<T> {
on(evt: 'recv', fn: (data: T) => any, ctx?: any): util.EventEmitter;
on(evt: 'error', fn: (data: any) => any, ctx?: any): util.EventEmitter;
on(evt: 'end', fn: (data: any) => any, ctx?: any): util.EventEmitter;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a client end a server stream before the stream is actually ended by the server itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, client which sends end event is indicating to server that it don't want to continue.

// TODO: handle off event
off(evt?: string, fn?: () => any): util.EventEmitter;
emit(evt: 'close'): util.EventEmitter;
}

export interface RPCClientStream<T> {
on(evt: 'error', fn: (data: any) => any, ctx?: any): util.EventEmitter;
on(evt: 'end', fn: (data: any) => any, ctx?: any): util.EventEmitter;
// TODO: handle off event
off(evt?: string, fn?: () => any): util.EventEmitter;
emit(evt: 'send', value: T): util.EventEmitter;
emit(evt: 'close'): util.EventEmitter;
}

/**
* RPC implementation passed to {@link Service#create} performing a service request on network level, i.e. by utilizing http requests or websockets.
* @param method Reflected or static method being called
* @param requestData Request data
* @param callback Callback function
*/
type RPCImpl = (method: (Method|rpc.ServiceMethod<Message<{}>, Message<{}>>), requestData: Uint8Array, callback: RPCImplCallback) => void;
type RPCImpl = RPCUnaryCall;

/**
* Node-style callback as used by {@link RPCImpl}.
Expand Down Expand Up @@ -1397,7 +1440,7 @@ export class Service extends NamespaceBase {
* @param [responseDelimited=false] Whether responses are length-delimited
* @returns RPC service. Useful where requests and/or responses are streamed.
*/
public create(rpcImpl: RPCImpl, requestDelimited?: boolean, responseDelimited?: boolean): rpc.Service;
public create(rpcImpl: RPCImpl | RPCHandler, requestDelimited?: boolean, responseDelimited?: boolean): rpc.Service;
}

/** Service descriptor. */
Expand Down
82 changes: 77 additions & 5 deletions src/rpc/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ var util = require("../util/minimal");
* @param {boolean} [responseDelimited=false] Whether responses are length-delimited
*/
function Service(rpcImpl, requestDelimited, responseDelimited) {

if (typeof rpcImpl !== "function")
if (typeof rpcImpl !== "object" && typeof rpcImpl !== "function") {
throw TypeError("rpcImpl must be a function");
} else if (typeof rpcImpl === "object" && !isRPCV2(rpcImpl)) {
throw TypeError("rpcImpl should implement RPCHandler")
}

util.EventEmitter.call(this);

Expand All @@ -65,6 +68,13 @@ function Service(rpcImpl, requestDelimited, responseDelimited) {
this.responseDelimited = Boolean(responseDelimited);
}

function isRPCV2(rpcImpl) {
return typeof rpcImpl.unaryCall === "function" &&
typeof rpcImpl.serverStreamCall === "function" &&
typeof rpcImpl.clientStreamCall === "function" &&
typeof rpcImpl.bidiStreamCall === "function";
}

/**
* Calls a service method through {@link rpc.Service#rpcImpl|rpcImpl}.
* @param {Method|rpc.ServiceMethod<TReq,TRes>} method Reflected or static method
Expand All @@ -77,21 +87,25 @@ function Service(rpcImpl, requestDelimited, responseDelimited) {
* @template TRes extends Message<TRes>
*/
Service.prototype.rpcCall = function rpcCall(method, requestCtor, responseCtor, request, callback) {

if (!request)
throw TypeError("request must be specified");

var self = this;
if (!callback)
return util.asPromise(rpcCall, self, method, requestCtor, responseCtor, request);

if (!self.rpcImpl) {
var rpcUnaryImpl = self.rpcImpl;
if (!rpcUnaryImpl) {
setTimeout(function() { callback(Error("already ended")); }, 0);
return undefined;
}

if (typeof rpcUnaryImpl.unaryCall === "function") {
rpcUnaryImpl = rpcUnaryImpl.unaryCall;
}

try {
return self.rpcImpl(
return rpcUnaryImpl(
method,
requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish(),
function rpcCallback(err, response) {
Expand Down Expand Up @@ -126,6 +140,64 @@ Service.prototype.rpcCall = function rpcCall(method, requestCtor, responseCtor,
}
};

// TODO: docs
Service.prototype.serverStreamCall = function serverStreamCall(method, requestCtor, responseCtor, request) {
if (!request)
throw TypeError("request must be specified");

var self = this;

if (typeof self.rpcImpl.serverStreamCall !== "function") {
throw new Error("rpcImpl should implement serverStreamCall to support server streaming");
}

return self.rpcImpl.serverStreamCall(
method,
requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish(),
function responseFn (response) {
return responseCtor[self.responseDelimited ? "decodeDelimited" : "decode"](response);
}
);
};

// TODO: docs
Service.prototype.clientStreamCall = function clientStreamCall(method, requestCtor, responseCtor) {
var self = this;

if (typeof self.rpcImpl.clientStreamCall !== "function") {
throw new Error("rpcImpl should implement clientStreamCall to support client streaming");
}

return self.rpcImpl.clientStreamCall(
method,
function encodeFn (request) {
return requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish()
},
function decodeFn (response) {
return responseCtor[self.responseDelimited ? "decodeDelimited" : "decode"](response);
}
);
};

// TODO: docs
Service.prototype.bidiStreamCall = function bidiStreamCall(method, requestCtor, responseCtor) {
var self = this;

if (typeof self.rpcImpl.bidiStreamCall !== "function") {
throw new Error("rpcImpl should implement bidiStreamCall to support bidi streaming");
}

return self.rpcImpl.bidiStreamCall(
method,
function encodeFn (request) {
return requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish()
},
function decodeFn (response) {
return responseCtor[self.responseDelimited ? "decodeDelimited" : "decode"](response);
}
);
};

/**
* Ends this service and emits the `end` event.
* @param {boolean} [endedByRPC=false] Whether the service has been ended by the RPC implementation.
Expand Down