Skip to content

Unable to stream audio signal in chunks via Java Servlet #4094

@appted

Description

@appted

Hi,

we are facing an issue while we are processing a binary stream with the google-cloud-dialogflow v2 Java API version 0.59.0-alpha.
In our scenario we built a servlet which provides a websocket to receive binary audio signal chunks. The implementation is based on
https://cloud.google.com/dialogflow-enterprise/docs/detect-intent-stream#detect-intent-text-java and runs on a tomcat 8 instance with a Java 8. The issue occurs independent of using
a deprecated bidiStreamingCall or invoking a send() on a ClientStream.
In case we buffer the bytes from @OnMessage executions and send all bytes at once when closing the websocket connection our
ResponseObserver will get called so we are sure the audio signal is correct. If the bytes are send in a streamed manner,
the observer never receives a response. The cause is, that no requests will be sent by the google-cloud-dialogflow v2 API. The following error, which is raised
after some time, points to it:
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 9961472 bytes.
The error occurs when the thread.sleep in line 111 is skipped. With skipping the thread.sleep, the client permanently sends chunks after the connection was openend.
Though the invocation of drainPendingCallbacks in io.grpc.internal.DelayedStream never sets the boolean passThrough = true.
In case the thread.sleep is executed and causes the passThrough flag to be set to true, all runnables within delayOrExecute of io.grpc.internal.DelayedStream
will be invoked and the following exception appears(which is reproducable with the deprecated bidiStreamingCall or a send() on a ClientStream):

After updating google-cloud-dialogflow to 0.71.0-alpha the error still persists.

our use case is quite simple. We would like to open a ClientStream with your Java library when a client opens a Websocket connection. The Client will constantly deliver the audio signal as binary which is treated onMessage within the Socket connection and send tot he ClientStream.
Our expectations are to get called in the StreamingDetectResponseObserver with the QueryResults as well as a Webhook execution triggered by the DialogFlow Intent.
Unfortunately the audio signal won´t get sent from your Code as it is described in the documentation.

Environment details

  • OS: Windows
  • Java version: 1.8.0_151
  • google-cloud-java version(s): 0.71.0-alpha

Steps to reproduce

Use send() requests in a streaming manner instead of sending complete byte[] with the full audio signal

Stacktrace

java.util.concurrent.RejectedExecutionException: event executor terminated
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:842)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:328)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:321)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:765)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.scheduleFlush(WriteQueue.java:65)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.enqueue(WriteQueue.java:97)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$Sink.writeFrame(NettyClientStream.java:168)
at io.grpc.internal.AbstractClientStream.deliverFrame(AbstractClientStream.java:184)
at io.grpc.internal.MessageFramer.commitToSink(MessageFramer.java:350)
at io.grpc.internal.MessageFramer.flush(MessageFramer.java:300)
at io.grpc.internal.AbstractStream.flush(AbstractStream.java:63)
at io.grpc.internal.ForwardingClientStream.flush(ForwardingClientStream.java:42)
at io.grpc.internal.DelayedStream.flush(DelayedStream.java:238)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:435)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
at com.google.api.gax.grpc.GrpcDirectBidiStreamingCallable$1.send(GrpcDirectBidiStreamingCallable.java:67)
at com.google.api.gax.rpc.BidiStreamingCallable$3.onNext(BidiStreamingCallable.java:214)
at com.axians.itsolutions.ui5.convention.demo2018.web.NMSMPClientSpeechWebSocketEndpoint.processBinaryMessage(NMSMPClientSpeechWebSocketEndpoint.java:164)
at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.tomcat.websocket.pojo.PojoMessageHandlerPartialBase.onMessage(PojoMessageHandlerPartialBase.java:71)
at org.apache.tomcat.websocket.WsFrameBase.sendMessageBinary(WsFrameBase.java:579)
at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageBinary(WsFrameServer.java:131)
at org.apache.tomcat.websocket.WsFrameBase.processDataBinary(WsFrameBase.java:526)
at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:300)
at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:82)
at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:171)
at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:151)
at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:148)
at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:54)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:800)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1471)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:836)

Code snippet

package com.axians.itsolutions.ui5.convention.demo2018.web;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;

import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.dialogflow.v2.AudioEncoding;
import com.google.cloud.dialogflow.v2.InputAudioConfig;
import com.google.cloud.dialogflow.v2.QueryInput;
import com.google.cloud.dialogflow.v2.SessionName;
import com.google.cloud.dialogflow.v2.SessionsClient;
import com.google.cloud.dialogflow.v2.SessionsSettings;
import com.google.cloud.dialogflow.v2.StreamingDetectIntentRequest;
import com.google.cloud.dialogflow.v2.StreamingDetectIntentResponse;
import com.google.protobuf.ByteString;

@serverendpoint(value="/speech/{conversationId}/{sampleRate}/{locale}/{dialogFlowId}")
public class NMSMPClientSpeechWebSocketEndpoint {

private ClientStream<StreamingDetectIntentRequest> clientStream;

private ByteArrayOutputStream receivedBytes = new ByteArrayOutputStream();

private NMSMPFixedGoogleCredentialsProvider credentialsProvider = new NMSMPFixedGoogleCredentialsProvider();

private ResponseObserver<StreamingDetectIntentResponse> responseObserver;

private CountDownLatch notification;

private SessionName dialogFlowSession;

private SessionsClient sessionClient;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

public static final HashMap<String,Session> clientSpeechConnections = new HashMap<String, Session>();

private int sampleRate;

private String locale;

@OnOpen
public void onOpen(@PathParam("conversationId") String conversationId, @PathParam("sampleRate") Integer sampleRate, @PathParam("locale") String locale, @PathParam("dialogFlowId") String dialogFlowId, Session webSession, EndpointConfig config) throws Throwable {
	
	getLogger().error("onOpen::" + webSession.getId());
    
	if(conversationId != null ) {

		// Set the session name using the sessionId (UUID) and projectID (my-project-id)
        setDialogFlowSession(SessionName.of(dialogFlowId, conversationId));
		
    	clientSpeechConnections.put(getDialogFlowSession().toString(), webSession);
        setNotification(new CountDownLatch(1));
        setSampleRate(sampleRate);
        setLocale(locale);
		
		if (getClientStream() == null) {
    		long time = System.currentTimeMillis();
    		try (SessionsClient sessionsClient = SessionsClient.create(SessionsSettings.newBuilder().setCredentialsProvider(getCredentialsProvider()).build())) {
                getLogger().error("Session Path: " + getDialogFlowSession().toString());

                setSessionClient(sessionsClient);
                
                // Note: hard coding audioEncoding and sampleRateHertz for simplicity.
	            // Audio encoding of the audio content sent in the query request.
	            AudioEncoding audioEncoding = AudioEncoding.AUDIO_ENCODING_LINEAR_16;
	            int sampleRateHertz = getSampleRate();
	
	            // Instructs the speech recognizer how to process the audio content.
	            InputAudioConfig inputAudioConfig = InputAudioConfig.newBuilder()
	                .setAudioEncoding(audioEncoding) // audioEncoding = AudioEncoding.AUDIO_ENCODING_LINEAR_16
	                .setLanguageCode(getLocale())
	                .setSampleRateHertz(sampleRateHertz)
	                .build();
	            
	            setResponseObserver(new NMSMPStreamingDetectResponseObserver(getNotification()));
	              
	           // Performs the streaming detect intent callable request
	            setClientStream(getSessionClient().streamingDetectIntentCallable().splitCall(getResponseObserver()));
	                
	            // The first request contains the configuration
	            StreamingDetectIntentRequest request = StreamingDetectIntentRequest.newBuilder()
	                .setSession(getDialogFlowSession().toString())
	                .setQueryInput(QueryInput.newBuilder().setAudioConfig(inputAudioConfig).build())
	//                    .setSingleUtterance(false)
	                .build();
	                
                // Make the first request
	            getClientStream().send(request);
	            
	            getLogger().error("Stream establishment took: " + String.valueOf(System.currentTimeMillis() - time));
	            
    		} catch (Exception e) {
              	getLogger().error("sessionClient runtimeException:", e);
                  // Cancel stream and close Websocket.
    		}
    	}
    }
}

@OnMessage
public void onMessage(String message, Session session) {
	if (message.equals("stop")) {
		getLogger().error("onMessage received stop from client");
	} else {
		SynchronousQueue<JSONObject> fulfillmentQueue = com.axians.itsolutions.ui5.convention.demo2018.web.rest.DialogFlowServlet.fulfillmentQueues.get(getDialogFlowSession().toString());
		getLogger().error("onMessage client responded fulFillment:", message);
		if (fulfillmentQueue != null) {
			getLogger().error("onMessage found fulFillmentQueue... offering message");
			fulfillmentQueue.offer(new JSONObject(message));
		}
		Session webSession = clientSpeechConnections.get(getDialogFlowSession().toString());
		if (webSession != null) {
			try {
				webSession.close();
				getLogger().error("client speech connection was closed:");
			} catch (IOException e) {
				getLogger().error("error while closing the client speech connection:", e);
			}
		}
	}
}

private void sendCollectedBytes(byte[] bytes) {
    getClientStream().send(
              StreamingDetectIntentRequest.newBuilder().setSession(getDialogFlowSession().toString())
                  .setInputAudio(ByteString.copyFrom(bytes, 0, bytes.length))
                  .build());
}

@OnMessage
public void processBinaryMessage(byte[] bytes, boolean last, Session session) throws Throwable {
	// Ensure grpc max payload size does not exceed
	if(bytes != null && bytes.length > 0 && getReceivedBytes().size() < 9800000) {
		sendCollectedBytes(bytes);
	}
}

@OnClose
public void onClose(Session session) throws Throwable {
	getLogger().error("onClose:: removing from sessions sessionId: " +  session.getId() + " conversationId: " + session.getRequestParameterMap().get("conversationId").toString());
	
	try {
		getNotification().await();
	} catch (InterruptedException e) {
		getLogger().error("error while closing the client speech connection:", e);
	}
    
    if (getSessionClient() != null) {
		getSessionClient().close();
	}
	setClientStream(null);
	clientSpeechConnections.remove(session.getRequestParameterMap().get("conversationId").toString());
}

@OnError
public void onError(Throwable t) {
    getLogger().error("onError:", t);
}

public CountDownLatch getNotification() {
	return notification;
}


public void setNotification(CountDownLatch notification) {
	this.notification = notification;
}


public SessionName getDialogFlowSession() {
	return dialogFlowSession;
}


public void setDialogFlowSession(SessionName dialogFlowSession) {
	this.dialogFlowSession = dialogFlowSession;
}

public Logger getLogger() {
	return logger;
}


public NMSMPFixedGoogleCredentialsProvider getCredentialsProvider() {
	return credentialsProvider;
}

public void setCredentialsProvider(NMSMPFixedGoogleCredentialsProvider credentialsProvider) {
	this.credentialsProvider = credentialsProvider;
}

public ResponseObserver<StreamingDetectIntentResponse> getResponseObserver() {
	return responseObserver;
}

public void setResponseObserver(ResponseObserver<StreamingDetectIntentResponse> responseObserver) {
	this.responseObserver = responseObserver;
}

public ClientStream<StreamingDetectIntentRequest> getClientStream() {
	return clientStream;
}

public void setClientStream(ClientStream<StreamingDetectIntentRequest> clientStream) {
	this.clientStream = clientStream;
}

public ByteArrayOutputStream getReceivedBytes() {
	return receivedBytes;
}

public void setReceivedBytes(ByteArrayOutputStream receivedBytes) {
	this.receivedBytes = receivedBytes;
}

public int getSampleRate() {
	return sampleRate;
}

public void setSampleRate(int sampleRate) {
	this.sampleRate = sampleRate;
}

public String getLocale() {
	return locale;
}

public void setLocale(String locale) {
	this.locale = locale;
}

public SessionsClient getSessionClient() {
	return sessionClient;
}

public void setSessionClient(SessionsClient sessionClient) {
	this.sessionClient = sessionClient;
}

}

External references such as API reference guides used

Any additional information below

Metadata

Metadata

Assignees

Labels

🚨This issue needs some love.api: dialogflowIssues related to the Dialogflow API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions