@@ -290,13 +290,13 @@ def subscribe(self, watchpoint):
290290 def set_subscriptions (self ):
291291 pass
292292
293- def process_log (self , ws , message ):
293+ def process_log (self , ws , message , shard = 'shard0' ):
294294 pass
295295
296- def process_results (self , ws , message ):
296+ def process_results (self , ws , message , shard = 'shard0' ):
297297 pass
298298
299- def process_error (self , ws , message ):
299+ def process_error (self , ws , message , shard = 'shard0' ):
300300 pass
301301
302302 def process_cpu (self , ws , data ):
@@ -315,7 +315,7 @@ def on_message(self, ws, message):
315315 return
316316
317317 if (message .startswith ('gz' )):
318- message = zlib .decompress (b64decode (message [3 :]), 0 )
318+ message = zlib .decompress (b64decode (message [3 :]), 0 ). decode ( 'utf-8' )
319319
320320 try :
321321 self .process_message (ws , message )
@@ -328,20 +328,24 @@ def on_message(self, ws, message):
328328 return
329329
330330 if data [0 ].endswith ('console' ):
331+ if 'shard' in data [1 ]:
332+ shard = data [1 ]['shard' ]
333+ else :
334+ shard = 'shard0'
331335
332336 if 'messages' in data [1 ]:
333337 stream = []
334338
335339 if 'log' in data [1 ]['messages' ]:
336340 for line in data [1 ]['messages' ]['log' ]:
337- self .process_log (ws , line )
341+ self .process_log (ws , line , shard )
338342
339343 if 'results' in data [1 ]['messages' ]:
340344 for line in data [1 ]['messages' ]['results' ]:
341- self .process_results (ws , line )
345+ self .process_results (ws , line , shard )
342346
343347 if 'error' in data [1 ]:
344- self .process_error (ws , data [1 ]['error' ])
348+ self .process_error (ws , data [1 ]['error' ], shard )
345349
346350 if data [0 ].endswith ('cpu' ):
347351 self .process_cpu (ws , data [1 ])
0 commit comments