Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.guacamole.GuacamoleException;
import org.apache.guacamole.net.GuacamoleTunnel;
import org.apache.guacamole.protocol.GuacamoleInstruction;
Expand All @@ -37,14 +39,23 @@
* sent automatically.
*/
public class OutputStreamInterceptingFilter
extends StreamInterceptingFilter<OutputStream> {
extends StreamInterceptingFilter<OutputStream>
implements OutputStreamWriter.ExecutionListener {

/**
* Logger for this class.
*/
private static final Logger logger =
LoggerFactory.getLogger(OutputStreamInterceptingFilter.class);

/**
* Active download writers, keyed by stream index. Blobs are enqueued here
* on the tunnel read thread and written asynchronously on the HTTP
* streaming thread.
*/
private final ConcurrentMap<String, OutputStreamWriter> streamWriters =
new ConcurrentHashMap<>();

/**
* Whether this OutputStreamInterceptingFilter should respond to received
* blobs with "ack" messages on behalf of the client. If false, blobs will
Expand Down Expand Up @@ -95,12 +106,29 @@ private void sendAck(String index, String message, GuacamoleStatus status) {

}

@Override
public void onBlobWritten(String streamIndex, boolean requiresAck) {
if (requiresAck) {
sendAck(streamIndex, "OK", GuacamoleStatus.SUCCESS);
}
}

@Override
public void onWriteFailed(String streamIndex) {
sendAck(streamIndex, "FAIL", GuacamoleStatus.SERVER_ERROR);
}

@Override
public void onStreamEnd(String streamIndex) {
closeInterceptedStream(streamIndex);
}

/**
* Handles a single "blob" instruction, decoding its base64 data,
* sending that data to the associated OutputStream, and ultimately
* Handles a single "blob" instruction, decoding its base64 data and
* enqueueing it for writing to the associated output stream, ultimately
* dropping the "blob" instruction such that the client never receives
* it. If no OutputStream is associated with the stream index within
* the "blob" instruction, the instruction is passed through untouched.
* it. If no writer is registered for the stream index within the "blob"
* instruction, the instruction is passed through untouched.
*
* @param instruction
* The "blob" instruction being handled.
Expand All @@ -117,10 +145,12 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
if (args.size() < 2)
return instruction;

// Pull associated stream
String index = args.get(0);
InterceptedStream<OutputStream> stream = getInterceptedStream(index);
if (stream == null)
// Get the stream index
String streamIndex = args.get(0);

// Enqueue the blob for the HTTP streaming thread if a writer exists
OutputStreamWriter streamWriter = streamWriters.get(streamIndex);
if (streamWriter == null)
return instruction;

// Decode blob
Expand All @@ -134,37 +164,31 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
return null;
}

try {

// Attempt to write data to stream
stream.getStream().write(blob);

// Force client to respond with their own "ack" if we need to
// confirm that they are not falling behind with respect to the
// graphical session
if (!acknowledgeBlobs) {
acknowledgeBlobs = true;
return new GuacamoleInstruction("blob", index, "");
}

// Otherwise, acknowledge the blob on the client's behalf
sendAck(index, "OK", GuacamoleStatus.SUCCESS);

}
catch (IOException e) {
sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
logger.debug("Write failed for intercepted stream.", e);
// Force client to respond with their own "ack" to confirm they are not
// falling behind with respect to the graphical session, only if
// - There are no blobs in the queue currently
// - Previous blob required server side acknowledgement
// This may lead to more than one blob in the writer queue temporarily,
// but not more than two blobs anyways.
if (!acknowledgeBlobs &&
streamWriter.getQueuedMessageCount() == 0 &&
streamWriter.didPrevBlobRequireAck()) {
streamWriter.handleBlob(blob, false);
acknowledgeBlobs = true;

// Send an empty blob to trigger client "ack"
return new GuacamoleInstruction("blob", streamIndex, "");
}

// Instruction was handled purely internally
// Put the blob to the writer queue
streamWriter.handleBlob(blob, true);
return null;

}

/**
* Handles a single "end" instruction, closing the associated
* OutputStream. If no OutputStream is associated with the stream index
* within the "end" instruction, this function has no effect.
* Handles a single "end" instruction. If a writer is registered for the
* stream index, an end marker is queued so the stream is closed after all
* prior blobs have been written. Otherwise this function has no effect.
*
* @param instruction
* The "end" instruction being handled.
Expand All @@ -176,8 +200,27 @@ private void handleEnd(GuacamoleInstruction instruction) {
if (args.size() < 1)
return;

// Terminate stream
closeInterceptedStream(args.get(0));
OutputStreamWriter streamWriter = streamWriters.get(args.get(0));
if (streamWriter == null)
return;

// Queue an end marker; the writer closes the stream after queued blobs
// have been written.
streamWriter.handleEnd();
}

@Override
public void closeAllInterceptedStreams() {

// When the tunnel closes during a download, the HTTP thread may be
// blocked in OutputStreamWriter.run() waiting for the next blob or
// end instruction. guacd will not send either once the connection is
// gone, and closing the OutputStream does not unblock take() on an
// empty queue, so each writer must be stopped explicitly.
for (OutputStreamWriter writer : streamWriters.values())
writer.stop();

super.closeAllInterceptedStreams();

}

Expand Down Expand Up @@ -221,9 +264,30 @@ public GuacamoleInstruction filter(GuacamoleInstruction instruction)
@Override
protected void handleInterceptedStream(InterceptedStream<OutputStream> stream) {

// Create the stream writer
OutputStreamWriter streamWriter = new OutputStreamWriter(stream, this);

// Put it into the container and check if there was another writer for the index
OutputStreamWriter old = streamWriters.put(stream.getIndex(), streamWriter);
if (old != null) {
logger.debug("Found an older stream #{}; will close it", stream.getIndex());
// Close the stream to be sure it does not get stuck on write
closeInterceptedStream(old.getStream());
// Unblock the previous HTTP thread if it is still in run()
old.stop();
}

// Acknowledge that the stream is ready to receive data
sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS);

}
// Block the HTTP streaming thread while queued blobs are written, until
// the stream ends, fails, or the tunnel is closed.
streamWriter.run();

// Close the stream if not closed yet
closeInterceptedStream(stream);

// Remove the stream from the container
streamWriters.entrySet().removeIf(entry -> entry.getValue().equals(streamWriter));
}
}
Loading
Loading