Skip to content

Commit

Permalink
Some refactor on streambodyclientreadablestream
Browse files Browse the repository at this point in the history
  • Loading branch information
stanley-cheung committed Jun 9, 2020
1 parent 6b99a37 commit aadbdb7
Showing 1 changed file with 84 additions and 2 deletions.
86 changes: 84 additions & 2 deletions javascript/net/grpc/web/streambodyclientreadablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ goog.module.declareLegacyNamespace();

const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
const ErrorCode = goog.require('goog.net.ErrorCode');
const EventType = goog.require('goog.net.EventType');
const GrpcWebError = goog.requireType('grpc.web.Error');
const NodeReadableStream = goog.require('goog.net.streams.NodeReadableStream');
const StatusCode = goog.require('grpc.web.StatusCode');
const XhrIo = goog.require('goog.net.XhrIo');
const events = goog.require('goog.events');
const {GenericTransportInterface} = goog.require('grpc.web.GenericTransportInterface');
const {Status} = goog.require('grpc.web.Status');

Expand Down Expand Up @@ -107,7 +110,79 @@ const StreamBodyClientReadableStream = function(genericTransportInterface) {
*/
this.rpcStatusParseFn_ = null;

/**
* @private
* @type {function(?GrpcWebError, ?)|null}
*/
this.onErrorResponseCallback_ = null;

if (this.xhrNodeReadableStream_) {
this.setStreamCallback_();
} else if (this.xhr_) {
this.setUnaryCallback_();
}
};

/**
* @private
*/
StreamBodyClientReadableStream.prototype.setUnaryCallback_ = function() {
events.listen(/** @type {!XhrIo} */ (this.xhr_), EventType.COMPLETE, (e) => {
if (this.xhr_.isSuccess()) {
// If the response is serialized as Base64 (for example if the
// X-Goog-Encode-Response-If-Executable header is in effect), decode it
// before passing it to the deserializer.
var responseText = this.xhr_.getResponseText();
if (this.xhr_.headers.get('X-Goog-Encode-Response-If-Executable') ==
'base64' &&
this.xhr_.getResponseHeader(XhrIo.CONTENT_TYPE_HEADER) ===
'text/plain') {
if (!atob) {
throw new Error('Cannot decode Base64 response');
}
responseText = atob(responseText);
}

var response = this.responseDeserializeFn_(responseText);
var grpcStatus = StatusCode.fromHttpStatus(this.xhr_.getStatus());
if (grpcStatus == StatusCode.OK) {
this.onDataCallback_(response);
} else {
this.onErrorResponseCallback_(
/** @type {!GrpcWebError} */ ({
code: grpcStatus,
}),
response);
}
} else if (this.xhr_.getStatus() == 404) {
var message = 'Not Found: ' + this.xhr_.getLastUri();
this.onErrorCallback_({
code: StatusCode.NOT_FOUND,
message: message,
});
} else {
var rawResponse = this.xhr_.getResponseText();
if (rawResponse) {
var status = this.rpcStatusParseFn_(rawResponse);
this.onErrorCallback_({
code: status.code,
message: status.details,
metadata: status.metadata,
});
} else {
this.onErrorCallback_({
code: StatusCode.UNAVAILABLE,
message: ErrorCode.getDebugMessage(this.xhr_.getLastErrorCode()),
});
}
}
});
};

/**
* @private
*/
StreamBodyClientReadableStream.prototype.setStreamCallback_ = function() {
// Add the callback to the underlying stream
var self = this;
this.xhrNodeReadableStream_.on('data', function(data) {
Expand Down Expand Up @@ -163,7 +238,6 @@ const StreamBodyClientReadableStream = function(genericTransportInterface) {
});
};


/**
* @override
* @export
Expand Down Expand Up @@ -195,7 +269,15 @@ StreamBodyClientReadableStream.prototype.setResponseDeserializeFn =
this.responseDeserializeFn_ = responseDeserializeFn;
};


/**
* @param {function(?GrpcWebError, ?)} errorResponseFn
*/
StreamBodyClientReadableStream.prototype.setErrorResponseFn = function(
errorResponseFn) {
this.onErrorResponseCallback_ = errorResponseFn;
this.onDataCallback_ = (response) => errorResponseFn(null, response);
this.onErrorCallback_ = (error) => errorResponseFn(error, null);
};

/**
* Register a function to parse RPC status response
Expand Down

0 comments on commit aadbdb7

Please sign in to comment.