1
+ import { request as httpsRequest } from "node:https" ;
2
+ import { request as httpRequest } from "node:http" ;
3
+ import { URL } from "node:url" ;
4
+
1
5
export type MetadataOptions < T > = {
2
6
baseUrl : string ;
3
7
runId : string ;
@@ -7,57 +11,140 @@ export type MetadataOptions<T> = {
7
11
signal ?: AbortSignal ;
8
12
version ?: "v1" | "v2" ;
9
13
target ?: "self" | "parent" | "root" ;
14
+ maxRetries ?: number ;
10
15
} ;
11
16
12
17
export class MetadataStream < T > {
13
18
private controller = new AbortController ( ) ;
14
19
private serverStream : ReadableStream < T > ;
15
20
private consumerStream : ReadableStream < T > ;
16
- private streamPromise : Promise < void | Response > ;
21
+ private streamPromise : Promise < void > ;
22
+ private retryCount = 0 ;
23
+ private readonly maxRetries : number ;
24
+ private currentChunkIndex = 0 ;
25
+ private reader : ReadableStreamDefaultReader < T > ;
17
26
18
27
constructor ( private options : MetadataOptions < T > ) {
19
28
const [ serverStream , consumerStream ] = this . createTeeStreams ( ) ;
20
29
this . serverStream = serverStream ;
21
30
this . consumerStream = consumerStream ;
31
+ this . maxRetries = options . maxRetries ?? 10 ;
32
+ this . reader = this . serverStream . getReader ( ) ;
22
33
23
34
this . streamPromise = this . initializeServerStream ( ) ;
24
35
}
25
36
26
37
private createTeeStreams ( ) {
27
38
const readableSource = new ReadableStream < T > ( {
28
39
start : async ( controller ) => {
29
- for await ( const value of this . options . source ) {
30
- controller . enqueue ( value ) ;
40
+ try {
41
+ for await ( const value of this . options . source ) {
42
+ controller . enqueue ( value ) ;
43
+ }
44
+ controller . close ( ) ;
45
+ } catch ( error ) {
46
+ controller . error ( error ) ;
31
47
}
32
-
33
- controller . close ( ) ;
34
48
} ,
35
49
} ) ;
36
50
37
51
return readableSource . tee ( ) ;
38
52
}
39
53
40
- private initializeServerStream ( ) : Promise < Response > {
41
- const serverStream = this . serverStream . pipeThrough (
42
- new TransformStream < T , string > ( {
43
- async transform ( chunk , controller ) {
44
- controller . enqueue ( JSON . stringify ( chunk ) + "\n" ) ;
54
+ private async makeRequest ( startFromChunk : number = 0 ) : Promise < void > {
55
+ return new Promise ( ( resolve , reject ) => {
56
+ const url = new URL ( this . buildUrl ( ) ) ;
57
+ const timeout = 15 * 60 * 1000 ; // 15 minutes
58
+
59
+ const requestFn = url . protocol === "https:" ? httpsRequest : httpRequest ;
60
+ const req = requestFn ( {
61
+ method : "POST" ,
62
+ hostname : url . hostname ,
63
+ port : url . port || ( url . protocol === "https:" ? 443 : 80 ) ,
64
+ path : url . pathname + url . search ,
65
+ headers : {
66
+ ...this . options . headers ,
67
+ "Content-Type" : "application/json" ,
68
+ "X-Resume-From-Chunk" : startFromChunk . toString ( ) ,
45
69
} ,
46
- } )
47
- ) ;
48
-
49
- return fetch ( this . buildUrl ( ) , {
50
- method : "POST" ,
51
- headers : this . options . headers ?? { } ,
52
- body : serverStream ,
53
- signal : this . controller . signal ,
54
- // @ts -expect-error
55
- duplex : "half" ,
70
+ timeout,
71
+ } ) ;
72
+
73
+ req . on ( "error" , ( error ) => {
74
+ reject ( error ) ;
75
+ } ) ;
76
+
77
+ req . on ( "timeout" , ( ) => {
78
+ req . destroy ( new Error ( "Request timed out" ) ) ;
79
+ } ) ;
80
+
81
+ req . on ( "response" , ( res ) => {
82
+ if ( res . statusCode === 408 ) {
83
+ if ( this . retryCount < this . maxRetries ) {
84
+ this . retryCount ++ ;
85
+
86
+ resolve ( this . makeRequest ( this . currentChunkIndex ) ) ;
87
+ return ;
88
+ }
89
+ reject ( new Error ( `Max retries (${ this . maxRetries } ) exceeded after timeout` ) ) ;
90
+ return ;
91
+ }
92
+
93
+ if ( res . statusCode && ( res . statusCode < 200 || res . statusCode >= 300 ) ) {
94
+ const error = new Error ( `HTTP error! status: ${ res . statusCode } ` ) ;
95
+ reject ( error ) ;
96
+ return ;
97
+ }
98
+
99
+ res . on ( "end" , ( ) => {
100
+ resolve ( ) ;
101
+ } ) ;
102
+
103
+ res . resume ( ) ;
104
+ } ) ;
105
+
106
+ if ( this . options . signal ) {
107
+ this . options . signal . addEventListener ( "abort" , ( ) => {
108
+ req . destroy ( new Error ( "Request aborted" ) ) ;
109
+ } ) ;
110
+ }
111
+
112
+ const processStream = async ( ) => {
113
+ try {
114
+ while ( true ) {
115
+ const { done, value } = await this . reader . read ( ) ;
116
+
117
+ if ( done ) {
118
+ req . end ( ) ;
119
+ break ;
120
+ }
121
+
122
+ const stringified = JSON . stringify ( value ) + "\n" ;
123
+ req . write ( stringified ) ;
124
+ this . currentChunkIndex ++ ;
125
+ }
126
+ } catch ( error ) {
127
+ req . destroy ( error as Error ) ;
128
+ }
129
+ } ;
130
+
131
+ processStream ( ) . catch ( ( error ) => {
132
+ reject ( error ) ;
133
+ } ) ;
56
134
} ) ;
57
135
}
58
136
137
+ private async initializeServerStream ( ) : Promise < void > {
138
+ try {
139
+ await this . makeRequest ( 0 ) ;
140
+ } catch ( error ) {
141
+ this . reader . releaseLock ( ) ;
142
+ throw error ;
143
+ }
144
+ }
145
+
59
146
public async wait ( ) : Promise < void > {
60
- return this . streamPromise . then ( ( ) => void 0 ) ;
147
+ return this . streamPromise ;
61
148
}
62
149
63
150
public [ Symbol . asyncIterator ] ( ) {
0 commit comments