This document provides additional details about the implementation of DataChunkInputStream
.
The input stream implementation is not thread-safe: concurrent accesses should not be
allowed, and even invocations of read(...)
should be synchronized by out-of-band means for
any stream state updates to be visible across threads.
The following assumptions are made about the operation of the stream:
-
Subscription.request
is invoked only after one chunk has been consumed -
The number of chunks requested is always 1
-
Publishers fully conforms to
Flow.Publisher
in the Reactive Streams Specification [I] with respect to:- total order of
onNext
/onComplete
/onError
- strictly heeding backpressure (not calling
onNext
until more chunks were requested) - relaxed ordering of calls to request, allowing class after
onComplete
/onError
- total order of
Given the assumptions that the number of chunks requested is at most 1, the requests are totally
ordered with onSubscribe
/onNext
by construction. This affords the following safety guarantees:
-
The only place where
next
is assigned is inonNext
, before the next chunk is published -
Initially
next
andcurrent
are identical; firstrequest(1)
is called on subscription -
All subsequent calls to
request(1)
happen after the publishing of the chunk is observed byread(...)
-
It follows from (3) and (1) that one and only one assignment to
next
happens before observing the chunk byread(...)
--provided the Publisher observes backpressure -
Such
next
is never lost, because it is copied intocurrent
beforerequest(1)
, therefore a new assignment ofnext
inonNext
never loses the reference to a future with an unobserved chunk --provided the Publisher observes backpressure -
The publishing of the chunk by
onNext
synchronizes-with the observation of the chunk by aread(...)
: (1) and (5) ensurecurrent
observed byread(...)
is the samenext
at the timeonNext
is invoked, soonNext
completes the same future as accessed byread(...)
. Moreover, the store tonext
byonNext
and load ofnext
byread(...)
are in happens-before relationship due to this synchronizes-with edge, the program order inonNext
, and program order inread(...)
(and out-of-bands synchronization between multiple reads)
A conforming Publisher establishes total order of onNext
, therefore, a total order of
assignments to next
and Future.complete
:
-
onSubscribe
: assertcurrent == next
request(1)
-
onNext
: assertcurrent == next
prev = next
next = new Future
(A)prev.complete(chunk)
(B): assertprev == this.current
-
read(...)
current.get()
(C): (C) synchronizes-with (B): any read is blocked until (B)
-
read(...)
(same or subsequent read)current.get()
: synchronizes-with (B)- chunk is seen to be consumed entirely: release the chunk, and request next:
current = next
: (D): (A) happens-before (D), no furtheronNext
intervenes invariant:current
never references a released chunk as seen byclose(...)
, assumingread(...)
andclose(...)
are totally ordered --either by program order, or through out-of-bands synchronizationrequest(1)
: assert a conforming Publisher does not invoke onNext before this
-
onNext
: assertcurrent == next
: a conformingPublisher
does not invokeonNext
beforerequest(1)
prev = next
next = new Future
(E)prev.complete(chunk)
(F): assertprev == current
-
read(...)
current.get()
: (G): (G) synchronizes-with (F): any read after (D) is blocked until (F)
-
onComplete
/onError
: assert:next
has not been completed: stream is either empty (noonNext
will ever be called), or anonNext
assigned a new uncompleted future tonext
next.complete(...)
: (H): assert conformingPublisher
ensuresnext
assignments byonNext
are visible here by totally orderingonNext
/onComplete
/onError
-
read(...)
: assert eventuallycurrent == next
: either initially, or after some read that consumed the chunk in its entirety and requested the new chunkcurrent.get()
: (I): (I) synchronizes-with (H)- signal EOF
- close(...)
:
- assert
current
never references a released chunk; it either eventually references a chunk that has been produced byonNext
and has not been consumed fully byread(...)
, or a null produced byonComplete
/onError
- assert if
next != current
,next
will never produce a new chunk: this is the case if and only ifonNext
has occurred, butread(...)
has not consumed the chunk in its entirety, hence has not requested any new chunks current.whenComplete(release)
[I] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification