@@ -4,7 +4,13 @@ import stream = require('stream');
44import { V1Status } from './api' ;
55import { KubeConfig } from './config' ;
66
7- const protocols = [ 'v4.channel.k8s.io' , 'v3.channel.k8s.io' , 'v2.channel.k8s.io' , 'channel.k8s.io' ] ;
7+ const protocols = [
8+ 'v5.channel.k8s.io' ,
9+ 'v4.channel.k8s.io' ,
10+ 'v3.channel.k8s.io' ,
11+ 'v2.channel.k8s.io' ,
12+ 'channel.k8s.io' ,
13+ ] ;
814
915export type TextHandler = ( text : string ) => boolean ;
1016export type BinaryHandler = ( stream : number , buff : Buffer ) => boolean ;
@@ -17,12 +23,41 @@ export interface WebSocketInterface {
1723 ) : Promise < WebSocket . WebSocket > ;
1824}
1925
26+ export interface StreamInterface {
27+ stdin : stream . Readable ;
28+ stdout : stream . Writable ;
29+ stderr : stream . Writable ;
30+ }
31+
2032export class WebSocketHandler implements WebSocketInterface {
2133 public static readonly StdinStream : number = 0 ;
2234 public static readonly StdoutStream : number = 1 ;
2335 public static readonly StderrStream : number = 2 ;
2436 public static readonly StatusStream : number = 3 ;
2537 public static readonly ResizeStream : number = 4 ;
38+ public static readonly CloseStream : number = 255 ;
39+
40+ public negotiatedProtocol : string | null = null ;
41+
42+ public static supportsClose ( protocol : string ) : boolean {
43+ return protocol === 'v5.channel.k8s.io' ;
44+ }
45+
46+ public static closeStream ( streamNum : number , streams : StreamInterface ) : void {
47+ console . log ( 'Closing stream: ' + streamNum ) ;
48+ switch ( streamNum ) {
49+ case WebSocketHandler . StdinStream :
50+ streams . stdin . pause ( ) ;
51+ break ;
52+ case WebSocketHandler . StdoutStream :
53+ console . log ( 'closing stdout' ) ;
54+ streams . stdout . end ( ) ;
55+ break ;
56+ case WebSocketHandler . StderrStream :
57+ streams . stderr . end ( ) ;
58+ break ;
59+ }
60+ }
2661
2762 public static handleStandardStreams (
2863 streamNum : number ,
@@ -39,6 +74,7 @@ export class WebSocketHandler implements WebSocketInterface {
3974 stderr . write ( buff ) ;
4075 } else if ( streamNum === WebSocketHandler . StatusStream ) {
4176 // stream closing.
77+ // Hacky, change tests to use the stream interface
4278 if ( stdout && stdout !== process . stdout ) {
4379 stdout . end ( ) ;
4480 }
@@ -69,6 +105,12 @@ export class WebSocketHandler implements WebSocketInterface {
69105 } ) ;
70106
71107 stdin . on ( 'end' , ( ) => {
108+ if ( WebSocketHandler . supportsClose ( ws . protocol ) ) {
109+ const buff = Buffer . alloc ( 2 ) ;
110+ buff . writeUint8 ( this . CloseStream , 0 ) ;
111+ buff . writeUint8 ( this . StdinStream , 1 ) ;
112+ ws . send ( buff ) ;
113+ }
72114 ws . close ( ) ;
73115 } ) ;
74116 // Keep the stream open
@@ -141,7 +183,16 @@ export class WebSocketHandler implements WebSocketInterface {
141183 // factory is really just for test injection
142184 public constructor (
143185 readonly config : KubeConfig ,
144- readonly socketFactory ?: ( uri : string , opts : WebSocket . ClientOptions ) => WebSocket . WebSocket ,
186+ readonly socketFactory ?: (
187+ uri : string ,
188+ protocols : string [ ] ,
189+ opts : WebSocket . ClientOptions ,
190+ ) => WebSocket . WebSocket ,
191+ readonly streams : StreamInterface = {
192+ stdin : process . stdin ,
193+ stdout : process . stdout ,
194+ stderr : process . stderr ,
195+ } ,
145196 ) { }
146197
147198 /**
@@ -173,7 +224,7 @@ export class WebSocketHandler implements WebSocketInterface {
173224
174225 return await new Promise < WebSocket . WebSocket > ( ( resolve , reject ) => {
175226 const client = this . socketFactory
176- ? this . socketFactory ( uri , opts )
227+ ? this . socketFactory ( uri , protocols , opts )
177228 : new WebSocket ( uri , protocols , opts ) ;
178229 let resolved = false ;
179230
@@ -191,11 +242,18 @@ export class WebSocketHandler implements WebSocketInterface {
191242 client . onmessage = ( { data } : { data : WebSocket . Data } ) => {
192243 // TODO: support ArrayBuffer and Buffer[] data types?
193244 if ( typeof data === 'string' ) {
245+ if ( data . charCodeAt ( 0 ) === WebSocketHandler . CloseStream ) {
246+ WebSocketHandler . closeStream ( data . charCodeAt ( 1 ) , this . streams ) ;
247+ }
194248 if ( textHandler && ! textHandler ( data ) ) {
195249 client . close ( ) ;
196250 }
197251 } else if ( data instanceof Buffer ) {
198- const streamNum = data . readInt8 ( 0 ) ;
252+ const streamNum = data . readUint8 ( 0 ) ;
253+ if ( streamNum === WebSocketHandler . CloseStream ) {
254+ console . log ( 'Closing stream!' ) ;
255+ WebSocketHandler . closeStream ( data . readInt8 ( 1 ) , this . streams ) ;
256+ }
199257 if ( binaryHandler && ! binaryHandler ( streamNum , data . subarray ( 1 ) ) ) {
200258 client . close ( ) ;
201259 }
0 commit comments