Skip to content

Commit

Permalink
Bug 470716. Make the close-on-completion behavior of nsAsyncStreamCop…
Browse files Browse the repository at this point in the history
…ier configurable. r+sr=bzbarsky.
  • Loading branch information
Michal Novotny committed Mar 30, 2009
1 parent e7f06f5 commit 6df9861
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 44 deletions.
10 changes: 8 additions & 2 deletions netwerk/base/public/nsIAsyncStreamCopier.idl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ interface nsIOutputStream;
interface nsIRequestObserver;
interface nsIEventTarget;

[scriptable, uuid(72e515de-a91e-4154-bb78-e5244cbaae74)]
[scriptable, uuid(5a19ca27-e041-4aca-8287-eb248d4c50c0)]
interface nsIAsyncStreamCopier : nsIRequest
{
/**
Expand All @@ -64,6 +64,10 @@ interface nsIAsyncStreamCopier : nsIRequest
* specifies how many bytes to read/write at a time. this controls
* the granularity of the copying. it should match the segment size
* of the "buffered" streams involved.
* @param aCloseSource
* true if aSource should be closed after copying.
* @param aCloseSink
* true if aSink should be closed after copying.
*
* NOTE: at least one of the streams must be buffered.
*/
Expand All @@ -72,7 +76,9 @@ interface nsIAsyncStreamCopier : nsIRequest
in nsIEventTarget aTarget,
in boolean aSourceBuffered,
in boolean aSinkBuffered,
in unsigned long aChunkSize);
in unsigned long aChunkSize,
in boolean aCloseSource,
in boolean aCloseSink);

/**
* asyncCopy triggers the start of the copy. The observer will be notified
Expand Down
7 changes: 5 additions & 2 deletions netwerk/base/public/nsNetUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,16 @@ NS_NewAsyncStreamCopier(nsIAsyncStreamCopier **result,
nsIEventTarget *target,
PRBool sourceBuffered = PR_TRUE,
PRBool sinkBuffered = PR_TRUE,
PRUint32 chunkSize = 0)
PRUint32 chunkSize = 0,
PRBool closeSource = PR_TRUE,
PRBool closeSink = PR_TRUE)
{
nsresult rv;
nsCOMPtr<nsIAsyncStreamCopier> copier =
do_CreateInstance(NS_ASYNCSTREAMCOPIER_CONTRACTID, &rv);
if (NS_SUCCEEDED(rv)) {
rv = copier->Init(source, sink, target, sourceBuffered, sinkBuffered, chunkSize);
rv = copier->Init(source, sink, target, sourceBuffered, sinkBuffered,
chunkSize, closeSource, closeSink);
if (NS_SUCCEEDED(rv)) {
*result = nsnull;
copier.swap(*result);
Expand Down
33 changes: 18 additions & 15 deletions netwerk/base/src/nsAsyncStreamCopier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ nsAsyncStreamCopier::Complete(nsresult status)
nsCOMPtr<nsISupports> ctx;
{
nsAutoLock lock(mLock);
mCopierCtx = nsnull;

if (mIsPending) {
mIsPending = PR_FALSE;
mStatus = status;
Expand Down Expand Up @@ -152,25 +154,21 @@ nsAsyncStreamCopier::GetStatus(nsresult *status)
NS_IMETHODIMP
nsAsyncStreamCopier::Cancel(nsresult status)
{
if (IsComplete())
return NS_OK;
nsCOMPtr<nsISupports> copierCtx;
{
nsAutoLock lock(mLock);
if (!mIsPending)
return NS_OK;
copierCtx.swap(mCopierCtx);
}

if (NS_SUCCEEDED(status)) {
NS_WARNING("cancel with non-failure status code");
status = NS_BASE_STREAM_CLOSED;
}

nsCOMPtr<nsIAsyncInputStream> asyncSource = do_QueryInterface(mSource);
if (asyncSource)
asyncSource->CloseWithStatus(status);
else
mSource->Close();

nsCOMPtr<nsIAsyncOutputStream> asyncSink = do_QueryInterface(mSink);
if (asyncSink)
asyncSink->CloseWithStatus(status);
else
mSink->Close();
if (copierCtx)
NS_CancelAsyncCopy(copierCtx, status);

return NS_OK;
}
Expand Down Expand Up @@ -224,7 +222,9 @@ nsAsyncStreamCopier::Init(nsIInputStream *source,
nsIEventTarget *target,
PRBool sourceBuffered,
PRBool sinkBuffered,
PRUint32 chunkSize)
PRUint32 chunkSize,
PRBool closeSource,
PRBool closeSink)
{
NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered");

Expand All @@ -239,6 +239,8 @@ nsAsyncStreamCopier::Init(nsIInputStream *source,

mSource = source;
mSink = sink;
mCloseSource = closeSource;
mCloseSink = closeSink;

mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS
: NS_ASYNCCOPY_VIA_WRITESEGMENTS;
Expand Down Expand Up @@ -281,7 +283,8 @@ nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx)
// OnAsyncCopyComplete.
NS_ADDREF_THIS();
rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize,
OnAsyncCopyComplete, this);
OnAsyncCopyComplete, this, mCloseSource, mCloseSink,
getter_AddRefs(mCopierCtx));
if (NS_FAILED(rv)) {
NS_RELEASE_THIS();
Cancel(rv);
Expand Down
4 changes: 4 additions & 0 deletions netwerk/base/src/nsAsyncStreamCopier.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ class nsAsyncStreamCopier : public nsIAsyncStreamCopier

nsCOMPtr<nsIEventTarget> mTarget;

nsCOMPtr<nsISupports> mCopierCtx;

PRLock *mLock;

nsAsyncCopyMode mMode;
PRUint32 mChunkSize;
nsresult mStatus;
PRPackedBool mIsPending;
PRPackedBool mCloseSource;
PRPackedBool mCloseSink;
};

#endif // !nsAsyncStreamCopier_h__
2 changes: 1 addition & 1 deletion netwerk/test/httpserver/httpd.js
Original file line number Diff line number Diff line change
Expand Up @@ -2921,7 +2921,7 @@ ServerHandler.prototype =
//
var copier = new StreamCopier(bodyStream, outStream,
null,
true, true, 8192);
true, true, 8192, true, true);
copier.asyncCopy(copyObserver, null);
}
else
Expand Down
174 changes: 174 additions & 0 deletions netwerk/test/unit/test_bug470716.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
const Cc = Components.classes;
const Ci = Components.interfaces;
const Cr = Components.results;
const CC = Components.Constructor;

const StreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
"nsIAsyncStreamCopier",
"init");

const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
"nsIScriptableInputStream",
"init");

const Pipe = CC("@mozilla.org/pipe;1",
"nsIPipe",
"init");

var pipe1;
var pipe2;
var copier;
var test_result;
var test_content;
var test_source_closed;
var test_sink_closed;
var test_nr;

var copyObserver =
{
onStartRequest: function(request, context) { },

onStopRequest: function(request, cx, statusCode)
{
// check status code
do_check_eq(statusCode, test_result);

// check number of copied bytes
do_check_eq(pipe2.inputStream.available(), test_content.length);

// check content
var scinp = new ScriptableInputStream(pipe2.inputStream);
var content = scinp.read(scinp.available());
do_check_eq(content, test_content);

// check closed sink
try {
pipe2.outputStream.write("closedSinkTest", 14);
do_check_false(test_sink_closed);
}
catch (ex) {
do_check_true(test_sink_closed);
}

// check closed source
try {
pipe1.outputStream.write("closedSourceTest", 16);
do_check_false(test_source_closed);
}
catch (ex) {
do_check_true(test_source_closed);
}

do_timeout(0, "do_test();");
},

QueryInterface: function(aIID)
{
if (aIID.equals(Ci.nsIRequestObserver) ||
aIID.equals(Ci.nsISupports))
return this;

throw Cr.NS_ERROR_NO_INTERFACE;
}
};

function startCopier(closeSource, closeSink) {
pipe1 = new Pipe(true /* nonBlockingInput */,
true /* nonBlockingOutput */,
0 /* segmentSize */,
0xffffffff /* segmentCount */,
null /* segmentAllocator */);

pipe2 = new Pipe(true /* nonBlockingInput */,
true /* nonBlockingOutput */,
0 /* segmentSize */,
0xffffffff /* segmentCount */,
null /* segmentAllocator */);

copier = new StreamCopier(pipe1.inputStream /* aSource */,
pipe2.outputStream /* aSink */,
null /* aTarget */,
true /* aSourceBuffered */,
true /* aSinkBuffered */,
8192 /* aChunkSize */,
closeSource /* aCloseSource */,
closeSink /* aCloseSink */);

copier.asyncCopy(copyObserver, null);
}

function do_test() {

test_nr++;
test_content = "test" + test_nr;

switch (test_nr) {
case 1:
case 2: // close sink
case 3: // close source
case 4: // close both
// test canceling transfer
// use some undefined error code to check if it is successfully passed
// to the request observer
test_result = 0x87654321;

test_source_closed = ((test_nr-1)>>1 != 0);
test_sink_closed = ((test_nr-1)%2 != 0);

startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
pipe1.outputStream.flush();
do_timeout(20,
"copier.cancel(test_result);" +
"pipe1.outputStream.write(\"a\", 1);");
break;
case 5:
case 6: // close sink
case 7: // close source
case 8: // close both
// test copying with EOF on source
test_result = 0;

test_source_closed = ((test_nr-5)>>1 != 0);
test_sink_closed = ((test_nr-5)%2 != 0);

startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
// we will close the source
test_source_closed = true;
pipe1.outputStream.close();
break;
case 9:
case 10: // close sink
case 11: // close source
case 12: // close both
// test copying with error on sink
// use some undefined error code to check if it is successfully passed
// to the request observer
test_result = 0x87654321;

test_source_closed = ((test_nr-9)>>1 != 0);
test_sink_closed = ((test_nr-9)%2 != 0);

startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
pipe1.outputStream.flush();
// we will close the sink
test_sink_closed = true;
do_timeout(20,
"pipe2.outputStream" +
" .QueryInterface(Ci.nsIAsyncOutputStream)" +
" .closeWithStatus(test_result);" +
"pipe1.outputStream.write(\"a\", 1);");
break;
case 13:
do_test_finished();
break;
}
}

function run_test() {
test_nr = 0;
do_timeout(0, "do_test();");
do_test_pending();
}
2 changes: 1 addition & 1 deletion netwerk/test/unit/test_streamcopier.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function run_test() {

var streamCopier = Cc["@mozilla.org/network/async-stream-copier;1"]
.createInstance(Ci.nsIAsyncStreamCopier);
streamCopier.init(inStr, pipe.outputStream, null, true, true, 1024);
streamCopier.init(inStr, pipe.outputStream, null, true, true, 1024, true, true);

var ctx = {
};
Expand Down
Loading

0 comments on commit 6df9861

Please sign in to comment.