22
33namespace KubernetesClient ;
44
5+ use KubernetesClient \Dotty \DotAccess ;
6+
57/**
68 * Used for the various kubernetes watch endpoints for continuous feed of data
79 *
@@ -201,6 +203,7 @@ private function getHandle()
201203 $ handle = @fopen ($ url , 'r ' , false , $ this ->getClient ()->getStreamContext ());
202204 if ($ handle === false ) {
203205 $ e = error_get_last ();
206+ var_dump ($ e );
204207 throw new \Exception ($ e ['message ' ], $ e ['type ' ]);
205208 }
206209 stream_set_timeout ($ handle , 0 , $ this ->getStreamTimeout ());
@@ -408,7 +411,22 @@ private function internal_iterator($cycles = 0)
408411 {
409412 $ handle = $ this ->getHandle ();
410413 $ i_cycles = 0 ;
414+
415+ $ associative = $ this ->getClient ()->getRequestOption ('decode_associative ' , []);
416+ $ decode_flags = $ this ->getClient ()->getRequestOption ('decode_flags ' , []);
417+ $ decode_response = $ this ->getClient ()->getRequestOption ('decode_response ' , []);
418+
419+ /**
420+ * Mitigation for improper ordering especially during initial load
421+ * This acts as a tripwire, once tripped it should never go back to false
422+ *
423+ * https://github.com/kubernetes/kubernetes/issues/49745
424+ */
425+ $ initial_load_finished = false ;
411426 while (true ) {
427+ if (function_exists ('pcntl_signal_dispatch ' )) {
428+ \pcntl_signal_dispatch ();
429+ }
412430 if ($ this ->getStop ()) {
413431 $ this ->internal_stop ();
414432 return ;
@@ -426,7 +444,7 @@ private function internal_iterator($cycles = 0)
426444
427445 //$meta = stream_get_meta_data($handle);
428446 if (feof ($ handle )) {
429- if ($ this ->params ['timeoutSeconds ' ] > 0 ) {
447+ if (key_exists ( ' timeoutSeconds ' , $ this -> params ) && $ this ->params ['timeoutSeconds ' ] > 0 ) {
430448 //assume we've reached a successful end of watch
431449 return ;
432450 } else {
@@ -449,6 +467,10 @@ private function internal_iterator($cycles = 0)
449467 $ this ->setLastBytesReadTimestamp (time ());
450468 }
451469
470+ if (!$ initial_load_finished && empty ($ data )) {
471+ $ initial_load_finished = true ;
472+ }
473+
452474 $ this ->buffer .= $ data ;
453475
454476 //break immediately if nothing is on the buffer
@@ -462,7 +484,7 @@ private function internal_iterator($cycles = 0)
462484 for ($ x = 0 ; $ x < ($ parts_count - 1 ); $ x ++) {
463485 if (!empty ($ parts [$ x ])) {
464486 try {
465- $ response = json_decode ($ parts [$ x ], true );
487+ $ response = json_decode ($ parts [$ x ], $ associative , 512 , $ decode_flags );
466488 $ code = $ this ->preProcessResponse ($ response );
467489 if ($ code != 0 ) {
468490 $ this ->resetHandle ();
@@ -471,12 +493,24 @@ private function internal_iterator($cycles = 0)
471493 goto end;
472494 }
473495
474- if ($ response[ ' object ' ][ ' metadata ' ][ ' resourceVersion ' ] > $ this -> getResourceVersionLastSuccess () ) {
475- ( $ this -> callback )( $ response , $ this ) ;
496+ if (! $ initial_load_finished && DotAccess:: get ( $ response, ' type ' ) != " ADDED " ) {
497+ $ initial_load_finished = true ;
476498 }
477499
478- $ this ->setResourceVersion ($ response ['object ' ]['metadata ' ]['resourceVersion ' ]);
479- $ this ->setResourceVersionLastSuccess ($ response ['object ' ]['metadata ' ]['resourceVersion ' ]);
500+ $ rv = DotAccess::get ($ response , 'object.metadata.resourceVersion ' );
501+
502+ if (!$ initial_load_finished || $ rv > $ this ->getResourceVersionLastSuccess ()) {
503+ if (!$ decode_response ) {
504+ ($ this ->callback )($ parts [$ x ], $ this );
505+ } else {
506+ ($ this ->callback )($ response , $ this );
507+ }
508+ }
509+
510+ if ($ rv > $ this ->getResourceVersionLastSuccess ()) {
511+ $ this ->setResourceVersion ($ rv );
512+ $ this ->setResourceVersionLastSuccess ($ rv );
513+ }
480514
481515 if ($ this ->getStop ()) {
482516 $ this ->internal_stop ();
@@ -509,7 +543,23 @@ private function internal_generator($cycles = 0)
509543 {
510544 $ handle = $ this ->getHandle ();
511545 $ i_cycles = 0 ;
546+
547+ $ associative = $ this ->getClient ()->getRequestOption ('decode_associative ' , []);
548+ $ decode_flags = $ this ->getClient ()->getRequestOption ('decode_flags ' , []);
549+ $ decode_response = $ this ->getClient ()->getRequestOption ('decode_response ' , []);
550+
551+ /**
552+ * Mitigation for improper ordering especially during initial load
553+ * This acts as a tripwire, once tripped it should never go back to false
554+ *
555+ * https://github.com/kubernetes/kubernetes/issues/49745
556+ */
557+ $ initial_load_finished = false ;
512558 while (true ) {
559+ if (function_exists ('pcntl_signal_dispatch ' )) {
560+ \pcntl_signal_dispatch ();
561+ }
562+
513563 if ($ this ->getStop ()) {
514564 $ this ->internal_stop ();
515565 return ;
@@ -527,7 +577,7 @@ private function internal_generator($cycles = 0)
527577
528578 //$meta = stream_get_meta_data($handle);
529579 if (feof ($ handle )) {
530- if ($ this ->params ['timeoutSeconds ' ] > 0 ) {
580+ if (key_exists ( ' timeoutSeconds ' , $ this -> params ) && $ this ->params ['timeoutSeconds ' ] > 0 ) {
531581 //assume we've reached a successful end of watch
532582 return ;
533583 } else {
@@ -550,6 +600,10 @@ private function internal_generator($cycles = 0)
550600 $ this ->setLastBytesReadTimestamp (time ());
551601 }
552602
603+ if (!$ initial_load_finished && empty ($ data )) {
604+ $ initial_load_finished = true ;
605+ }
606+
553607 $ this ->buffer .= $ data ;
554608
555609 //break immediately if nothing is on the buffer
@@ -563,7 +617,7 @@ private function internal_generator($cycles = 0)
563617 for ($ x = 0 ; $ x < ($ parts_count - 1 ); $ x ++) {
564618 if (!empty ($ parts [$ x ])) {
565619 try {
566- $ response = json_decode ($ parts [$ x ], true );
620+ $ response = json_decode ($ parts [$ x ], $ associative , 512 , $ decode_flags );
567621 $ code = $ this ->preProcessResponse ($ response );
568622 if ($ code != 0 ) {
569623 $ this ->resetHandle ();
@@ -572,13 +626,26 @@ private function internal_generator($cycles = 0)
572626 goto end;
573627 }
574628
575- $ yield = ($ response ['object ' ]['metadata ' ]['resourceVersion ' ] > $ this ->getResourceVersionLastSuccess ());
629+ if (!$ initial_load_finished && DotAccess::get ($ response , 'type ' ) != "ADDED " ) {
630+ $ initial_load_finished = true ;
631+ }
632+
633+ $ rv = DotAccess::get ($ response , 'object.metadata.resourceVersion ' );
576634
577- $ this ->setResourceVersion ($ response ['object ' ]['metadata ' ]['resourceVersion ' ]);
578- $ this ->setResourceVersionLastSuccess ($ response ['object ' ]['metadata ' ]['resourceVersion ' ]);
635+ // https://github.com/kubernetes/kubernetes/issues/49745
636+ $ yield = (!$ initial_load_finished || $ rv > $ this ->getResourceVersionLastSuccess ());
637+
638+ if ($ rv > $ this ->getResourceVersionLastSuccess ()) {
639+ $ this ->setResourceVersion ($ rv );
640+ $ this ->setResourceVersionLastSuccess ($ rv );
641+ }
579642
580643 if ($ yield ) {
581- yield $ response ;
644+ if (!$ decode_response ) {
645+ yield $ parts [$ x ];
646+ } else {
647+ yield $ response ;
648+ }
582649 }
583650
584651 if ($ this ->getStop ()) {
@@ -603,16 +670,16 @@ private function internal_generator($cycles = 0)
603670
604671 private function preProcessResponse ($ response )
605672 {
606- if (!is_array ($ response )) {
673+ if (!DotAccess:: isStructuredData ($ response )) {
607674 return 1 ;
608675 }
609676
610- if ( key_exists ( ' kind ' , $ response) && $ response [ 'kind ' ] == 'Status ' && $ response[ 'status ' ] == 'Failure ' ) {
677+ if (DotAccess:: get ( $ response, 'kind ' , null ) == 'Status ' && DotAccess:: get ( $ response, 'status ' , null ) == 'Failure ' ) {
611678 return 1 ;
612679 }
613680
614681 // resourceVersion is too old
615- if ($ response[ 'type ' ] == 'ERROR ' && $ response[ 'object ' ][ ' code '] == 410 ) {
682+ if (DotAccess:: get ( $ response, 'type ' , null ) == 'ERROR ' && DotAccess:: get ( $ response, 'object. code ' , null ) == 410 ) {
616683 return 1 ;
617684 }
618685
0 commit comments