@@ -17,16 +17,9 @@ import {
1717 type ServerResponse as Response ,
1818} from "node:http" ;
1919import { type Socket } from "node:net" ;
20- import type {
21- ErrorCallback ,
22- NormalizedServerOptions ,
23- NormalizeProxyTarget ,
24- ProxyServer ,
25- ProxyTarget ,
26- ProxyTargetUrl ,
27- ServerOptions ,
28- } from ".." ;
29- import { Dispatcher , request , stream as uStream , Client } from "undici" ;
20+ import type { ErrorCallback , NormalizedServerOptions , NormalizeProxyTarget , ProxyServer , ProxyTarget , ProxyTargetUrl , ServerOptions } from ".." ;
21+ import Stream , { Writable } from "node:stream" ;
22+ import { Client , Dispatcher } from "undici" ;
3023
3124export type ProxyResponse = Request & {
3225 headers : { [ key : string ] : string | string [ ] } ;
@@ -82,132 +75,48 @@ export function XHeaders(req: Request, _res: Response, options: ServerOptions) {
8275// Does the actual proxying. If `forward` is enabled fires up
8376// a ForwardStream (there is NO RESPONSE), same happens for ProxyStream. The request
8477// just dies otherwise.
85- export async function stream (
86- req : Request ,
87- res : Response ,
88- options : NormalizedServerOptions ,
89- _ : Buffer | undefined ,
90- server : ProxyServer ,
91- cb : ErrorCallback | undefined ,
92- ) {
78+ export function stream ( req : Request , res : Response , options : NormalizedServerOptions , _ : Buffer | undefined , server : ProxyServer , cb : ErrorCallback | undefined ) {
9379 // And we begin!
9480 server . emit ( "start" , req , res , options . target || options . forward ! ) ;
9581
82+ if ( options . clientOptions || options . requestOptions || true ) {
83+ return stream2 ( req , res , options , _ , server , cb ) ;
84+ }
85+
9686 const agents = options . followRedirects ? followRedirects : nativeAgents ;
97- const http = agents . http as typeof import ( " http" ) ;
98- const https = agents . https as typeof import ( " https" ) ;
87+ const http = agents . http as typeof import ( ' http' ) ;
88+ const https = agents . https as typeof import ( ' https' ) ;
9989
10090 if ( options . forward ) {
91+ // forward enabled, so just pipe the request
92+ const proto = options . forward . protocol === "https:" ? https : http ;
10193 const outgoingOptions = common . setupOutgoing (
10294 options . ssl || { } ,
10395 options ,
10496 req ,
10597 "forward" ,
10698 ) ;
99+ const forwardReq = proto . request ( outgoingOptions ) ;
107100
108- const targetUrl = `${ outgoingOptions . url } ` ;
109-
110- const undiciOptions : any = {
111- method : outgoingOptions . method as Dispatcher . HttpMethod ,
112- headers : outgoingOptions . headers ,
113- path : outgoingOptions . path ,
114- } ;
115-
116- // Handle request body
117- if ( options . buffer ) {
118- undiciOptions . body = options . buffer ;
119- } else if ( req . method !== "GET" && req . method !== "HEAD" ) {
120- undiciOptions . body = req ;
121- }
122-
123- try {
124- const client = new Client ( targetUrl ) ;
125- await client . request ( undiciOptions ) ;
126- } catch ( err ) {
127- if ( cb ) {
128- cb ( err as Error , req , res , options . forward ) ;
129- } else {
130- server . emit ( "error" , err as Error , req , res , options . forward ) ;
131- }
132- }
101+ // error handler (e.g. ECONNRESET, ECONNREFUSED)
102+ // Handle errors on incoming request as well as it makes sense to
103+ const forwardError = createErrorHandler ( forwardReq , options . forward ) ;
104+ req . on ( "error" , forwardError ) ;
105+ forwardReq . on ( "error" , forwardError ) ;
133106
107+ ( options . buffer || req ) . pipe ( forwardReq ) ;
134108 if ( ! options . target ) {
109+ // no target, so we do not send anything back to the client.
110+ // If target is set, we do a separate proxy below, which might be to a
111+ // completely different server.
135112 return res . end ( ) ;
136113 }
137114 }
138115
139116 // Request initalization
117+ const proto = options . target ! . protocol === "https:" ? https : http ;
140118 const outgoingOptions = common . setupOutgoing ( options . ssl || { } , options , req ) ;
141- const client = new Client ( outgoingOptions . url , {
142- allowH2 : req . httpVersionMajor === 2 ,
143- } ) ;
144- // const proxyReq = proto.request(outgoingOptions);
145-
146- const dispatchOptions : Dispatcher . DispatchOptions = {
147- method : outgoingOptions . method as Dispatcher . HttpMethod ,
148- path : outgoingOptions . path || "/" ,
149- headers : outgoingOptions . headers ,
150-
151- body :
152- options . buffer ||
153- ( req . method !== "GET" && req . method !== "HEAD" ? req : undefined ) ,
154- } ;
155-
156- let responseStarted = false ;
157-
158- client . dispatch ( dispatchOptions , {
159- onRequestStart ( controller , context ) {
160- // Can modify the request just before headers are sent
161- console . log ( "onRequestStart" ) ;
162- } ,
163- onResponseStart ( controller , statusCode , headers , statusMessage ) {
164- // Set response status and headers - crucial for SSE
165- res . statusCode = statusCode ;
166-
167- // Set headers from the record object
168- for ( const [ name , value ] of Object . entries ( headers ) ) {
169- res . setHeader ( name , value ) ;
170- }
171-
172- // For SSE, ensure headers are sent immediately
173- const contentType = headers [ "content-type" ] || headers [ "Content-Type" ] ;
174- if ( contentType && contentType . toString ( ) . includes ( "text/event-stream" ) ) {
175- res . flushHeaders ( ) ;
176- }
177-
178- responseStarted = true ;
179- } ,
180- onResponseError ( controller , err ) {
181- if (
182- req . socket . destroyed &&
183- ( err as NodeJS . ErrnoException ) . code === "ECONNRESET"
184- ) {
185- server . emit ( "econnreset" , err , req , res , outgoingOptions . url ) ;
186- controller . abort ( err ) ;
187- return ;
188- }
189-
190- if ( cb ) {
191- cb ( err , req , res , outgoingOptions . url ) ;
192- } else {
193- server . emit ( "error" , err , req , res , outgoingOptions . url ) ;
194- }
195- } ,
196- onResponseData ( controller , chunk ) {
197- if ( responseStarted ) {
198- res . write ( chunk ) ;
199- }
200- } ,
201- onResponseEnd ( controller , trailers ) {
202- if ( trailers ) {
203- res . addTrailers ( trailers ) ;
204- }
205- res . end ( ) ;
206- client . close ( ) ;
207- } ,
208- } ) ;
209-
210- return ;
119+ const proxyReq = proto . request ( outgoingOptions ) ;
211120
212121 // Enable developers to modify the proxyReq before headers are sent
213122 proxyReq . on ( "socket" , ( socket : Socket ) => {
@@ -237,15 +146,9 @@ export async function stream(
237146 req . on ( "error" , proxyError ) ;
238147 proxyReq . on ( "error" , proxyError ) ;
239148
240- function createErrorHandler (
241- proxyReq : http . ClientRequest ,
242- url : NormalizeProxyTarget < ProxyTargetUrl > ,
243- ) {
149+ function createErrorHandler ( proxyReq : http . ClientRequest , url : NormalizeProxyTarget < ProxyTargetUrl > ) {
244150 return ( err : Error ) => {
245- if (
246- req . socket . destroyed &&
247- ( err as NodeJS . ErrnoException ) . code === "ECONNRESET"
248- ) {
151+ if ( req . socket . destroyed && ( err as NodeJS . ErrnoException ) . code === "ECONNRESET" ) {
249152 server . emit ( "econnreset" , err , req , res , url ) ;
250153 proxyReq . destroy ( ) ;
251154 return ;
@@ -267,14 +170,7 @@ export async function stream(
267170 if ( ! res . headersSent && ! options . selfHandleResponse ) {
268171 for ( const pass of web_o ) {
269172 // note: none of these return anything
270- pass (
271- req ,
272- res as EditableResponse ,
273- proxyRes ,
274- options as NormalizedServerOptions & {
275- target : NormalizeProxyTarget < ProxyTarget > ;
276- } ,
277- ) ;
173+ pass ( req , res as EditableResponse , proxyRes , options as NormalizedServerOptions & { target : NormalizeProxyTarget < ProxyTarget > } ) ;
278174 }
279175 }
280176
@@ -293,4 +189,120 @@ export async function stream(
293189 } ) ;
294190}
295191
192+
193+ async function stream2 (
194+ req : Request ,
195+ res : Response ,
196+ options : NormalizedServerOptions ,
197+ _ : Buffer | undefined ,
198+ server : ProxyServer ,
199+ cb ?: ErrorCallback ,
200+ ) {
201+ // Implementation of stream2 function
202+ if ( options . forward ) {
203+ const outgoingOptions = common . setupOutgoing (
204+ options . ssl || { } ,
205+ options ,
206+ req ,
207+ "forward" ,
208+ ) ;
209+
210+ const clientOptions = {
211+ allowH2 : outgoingOptions . url . startsWith ( 'https://' ) ,
212+ connect : {
213+ rejectUnauthorized : options . secure !== false ,
214+ } ,
215+ ...options . clientOptions ,
216+ } ;
217+
218+ const client = new Client ( outgoingOptions . url , options . clientOptions ) ;
219+
220+
221+ const requestOptions : Dispatcher . RequestOptions = {
222+ method : outgoingOptions . method as Dispatcher . HttpMethod ,
223+ headers : outgoingOptions . headers ,
224+ path : outgoingOptions . path || "/" ,
225+ } ;
226+
227+ // Handle request body
228+ if ( options . buffer ) {
229+ requestOptions . body = options . buffer as Stream . Readable ;
230+ } else if ( req . method !== "GET" && req . method !== "HEAD" ) {
231+ requestOptions . body = req ;
232+ }
233+
234+ try {
235+ await client . request ( requestOptions )
236+ } catch ( err ) {
237+ if ( cb ) {
238+ cb ( err as Error , req , res , options . forward ) ;
239+ } else {
240+ server . emit ( "error" , err as Error , req , res , options . forward ) ;
241+ }
242+ }
243+
244+ if ( ! options . target ) {
245+ return res . end ( ) ;
246+ }
247+ }
248+
249+ const outgoingOptions = common . setupOutgoing ( options . ssl || { } , options , req ) ;
250+
251+ const clientOptions = {
252+ ...options . clientOptions ,
253+ allowH2 : outgoingOptions . url . startsWith ( 'https://' ) ,
254+ connect : {
255+ rejectUnauthorized : options . secure !== false ,
256+ }
257+ } ;
258+
259+ const client = new Client ( outgoingOptions . url , clientOptions ) ;
260+
261+
262+ const requestOptions : Dispatcher . RequestOptions = {
263+ method : outgoingOptions . method as Dispatcher . HttpMethod ,
264+ headers : outgoingOptions . headers ,
265+ path : outgoingOptions . path || "/" ,
266+ } ;
267+
268+ // Handle request body
269+ if ( options . buffer ) {
270+ requestOptions . body = options . buffer as Stream . Readable ;
271+ } else if ( req . method !== "GET" && req . method !== "HEAD" ) {
272+ requestOptions . body = req ;
273+ }
274+
275+ client . stream (
276+ requestOptions ,
277+ ( { statusCode, headers } ) => {
278+ if ( ! res . headersSent ) {
279+ if ( req . httpVersionMajor === 2 ) {
280+ delete headers . connection ;
281+ delete headers [ "keep-alive" ] ;
282+ delete headers [ "transfer-encoding" ] ;
283+ }
284+ res . writeHead ( statusCode , headers ) ;
285+ }
286+ return new Writable ( {
287+ write ( chunk , _encoding , callback ) {
288+ res . write ( chunk ) ;
289+ callback ( ) ;
290+ } ,
291+ } ) ;
292+ } ,
293+ ( err , { trailers } ) => {
294+ if ( err ) {
295+ if ( cb ) {
296+ cb ( err as Error , req , res , options . forward ) ;
297+ } else {
298+ server . emit ( "error" , err as Error , req , res , options . forward ) ;
299+ }
300+ }
301+ if ( trailers ) {
302+ res . end ( ) ;
303+ }
304+ } ,
305+ ) ;
306+ }
307+
296308export const WEB_PASSES = { deleteLength, timeout, XHeaders, stream } ;
0 commit comments