11import { Observable } from '../../Observable' ;
22import { Subscription } from '../../Subscription' ;
3+ import { from } from '../../observable/from' ;
4+ import { ObservableInput } from '../../types' ;
5+
6+ export function fromFetch < T > (
7+ input : string | Request ,
8+ init : RequestInit & {
9+ selector : ( response : Response ) => ObservableInput < T >
10+ }
11+ ) : Observable < T > ;
12+
13+ export function fromFetch (
14+ input : string | Request ,
15+ init ?: RequestInit
16+ ) : Observable < Response > ;
317
418/**
519 * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
@@ -42,7 +56,36 @@ import { Subscription } from '../../Subscription';
4256 * data$.subscribe({
4357 * next: result => console.log(result),
4458 * complete: () => console.log('done')
45- * })
59+ * });
60+ * ```
61+ *
62+ * ### Use with Chunked Transfer Encoding
63+ *
64+ * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
65+ * the promise returned by `fetch` will resolve as soon as the response's headers are
66+ * received.
67+ *
68+ * That means the `fromFetch` observable will emit a `Response` - and will
69+ * then complete - before the body is received. When one of the methods on the
70+ * `Response` - like `text()` or `json()` - is called, the returned promise will not
71+ * resolve until the entire body has been received. Unsubscribing from any observable
72+ * that uses the promise as an observable input will not abort the request.
73+ *
74+ * To facilitate aborting the retrieval of responses that use chunked transfer encoding,
75+ * a `selector` can be specified via the `init` parameter:
76+ *
77+ * ```ts
78+ * import { of } from 'rxjs';
79+ * import { fromFetch } from 'rxjs/fetch';
80+ *
81+ * const data$ = fromFetch('https://api.github.com/users?per_page=5', {
82+ * selector: response => response.json()
83+ * });
84+ *
85+ * data$.subscribe({
86+ * next: result => console.log(result),
87+ * complete: () => console.log('done')
88+ * });
4689 * ```
4790 *
4891 * @param input The resource you would like to fetch. Can be a url or a request object.
@@ -51,8 +94,14 @@ import { Subscription } from '../../Subscription';
5194 * @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
5295 * function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
5396 */
54- export function fromFetch ( input : string | Request , init ?: RequestInit ) : Observable < Response > {
55- return new Observable < Response > ( subscriber => {
97+ export function fromFetch < T > (
98+ input : string | Request ,
99+ initWithSelector : RequestInit & {
100+ selector ?: ( response : Response ) => ObservableInput < T >
101+ } = { }
102+ ) : Observable < Response | T > {
103+ const { selector, ...init } = initWithSelector ;
104+ return new Observable < Response | T > ( subscriber => {
56105 const controller = new AbortController ( ) ;
57106 const signal = controller . signal ;
58107 let abortable = true ;
@@ -91,9 +140,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
91140 }
92141
93142 fetch ( input , perSubscriberInit ) . then ( response => {
94- abortable = false ;
95- subscriber . next ( response ) ;
96- subscriber . complete ( ) ;
143+ if ( selector ) {
144+ subscription . add ( from ( selector ( response ) ) . subscribe (
145+ value => subscriber . next ( value ) ,
146+ err => {
147+ abortable = false ;
148+ if ( ! unsubscribed ) {
149+ // Only forward the error if it wasn't an abort.
150+ subscriber . error ( err ) ;
151+ }
152+ } ,
153+ ( ) => {
154+ abortable = false ;
155+ subscriber . complete ( ) ;
156+ }
157+ ) ) ;
158+ } else {
159+ abortable = false ;
160+ subscriber . next ( response ) ;
161+ subscriber . complete ( ) ;
162+ }
97163 } ) . catch ( err => {
98164 abortable = false ;
99165 if ( ! unsubscribed ) {
0 commit comments