From c0345cd3be0ef38d45b4af04206b859c8cbb491d Mon Sep 17 00:00:00 2001 From: Jyri Sarha Date: Wed, 7 Aug 2024 15:35:49 +0300 Subject: [PATCH] tools: debug-stream.py: debug_stream record receiving and decoding Python implementation for receiving and decoding debug_stream records from debug window slot transportation. Opens SOF debugfs file "debug_stream" reads and decodes the records from the circular buffer documented in soc_debug_window_slot.h. This initial version only knows of DEBUG_STREAM_RECORD_ID_THREAD_INFO records. Signed-off-by: Jyri Sarha --- tools/debugstream/debug-stream.py | 399 ++++++++++++++++++++++++++++++ tools/debugstream/thread-info.py | 217 ---------------- 2 files changed, 399 insertions(+), 217 deletions(-) create mode 100644 tools/debugstream/debug-stream.py delete mode 100644 tools/debugstream/thread-info.py diff --git a/tools/debugstream/debug-stream.py b/tools/debugstream/debug-stream.py new file mode 100644 index 000000000000..ce28a9cee697 --- /dev/null +++ b/tools/debugstream/debug-stream.py @@ -0,0 +1,399 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause +# +# Copyright (c) 2024, Intel Corporation. + +import argparse +import mmap +import ctypes +import time +import sys + +import logging +logging.basicConfig(format="%(filename)s:%(lineno)s %(funcName)s: %(message)s", + level=logging.WARNING) + +DEBUG_STREAM_PAYLOAD_MAGIC = 0x1ED15EED + +""" Generic Debug-stream header """ +class DebugStreamHdr(ctypes.Structure): + _fields_ = [ + ("magic", ctypes.c_uint), + ("hdr_size", ctypes.c_uint), + ] + +""" Debug Stream record for passing debug data """ +class DebugStreamRecord(ctypes.Structure): + _fields_ = [ + ("id", ctypes.c_uint), + ("serial", ctypes.c_uint), + ("size", ctypes.c_uint), + ] + +""" Thread Info record header """ +class CPUInfo(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("hdr", DebugStreamRecord), + ("load", ctypes.c_ubyte), + ("thread_count", ctypes.c_ubyte), + ] + +""" Thread specific data-record, the thread name string starts after name_len """ +class ThreadInfo(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("stack_usage", ctypes.c_ubyte), + ("cpu_load", ctypes.c_ubyte), + ("name_len", ctypes.c_ubyte), + ] + +WSIZE = ctypes.sizeof(ctypes.c_uint) + +""" Debug Stream record decoder and printer class """ +class RecordPrinter: + RECORD_ID_UNINITIALIZED = 0 + RECORD_ID_THREAD_INFO = 1 + + def print_record(self, record, cpu): + recp = ctypes.cast(record, ctypes.POINTER(DebugStreamRecord)) + logging.debug(f"rec: {recp.contents.id} {recp.contents.serial} {recp.contents.size}") + if recp.contents.id == self.RECORD_ID_THREAD_INFO: + return self.print_thread_info(record, cpu) + else: + logging.warning(f"cpu {cpu}: Unsupported recodrd type {rec.contents.id}") + return True + + def print_thread_info(self, record, cpu): + remlen = len(record) - ctypes.sizeof(CPUInfo) + if remlen < 0: + logging.info(f"Buffer end reached, parsing failed") + return False + cpup = ctypes.cast(record, ctypes. + POINTER(CPUInfo)) + print("CPU %u: Load: %02.1f%% %u threads (serial %u)" % + (cpu, cpup.contents.load / 2.55, + cpup.contents.thread_count, cpup.contents.hdr.serial)) + remain = (ctypes.c_ubyte * + (len(record) - ctypes.sizeof(CPUInfo) + )).from_address(ctypes.addressof(record) + + ctypes.sizeof(CPUInfo)) + for i in range(cpup.contents.thread_count): + remlen = remlen - ctypes.sizeof(ThreadInfo) + if remlen < 0: + logging.info(f"Buffer end reached, parsing failed") + return False + threadp = ctypes.cast(remain, ctypes.POINTER(ThreadInfo)) + remain = (ctypes.c_ubyte * + (len(remain) - ctypes.sizeof(ThreadInfo) + )).from_address(ctypes.addressof(remain) + + ctypes.sizeof(ThreadInfo)) + remlen = remlen - threadp.contents.name_len + if remlen < 0: + logging.info(f"Buffer end reached, parsing failed") + return False + name = bytearray(remain[:threadp.contents.name_len]).decode("utf-8") + remain = (ctypes.c_ubyte * + (len(remain) - threadp.contents.name_len + )).from_address(ctypes.addressof(remain) + + threadp.contents.name_len) + print(" %-20s stack %02.1f%%\tload %02.1f%%" % + (name, threadp.contents.stack_usage / 2.55, + threadp.contents.cpu_load / 2.55)) + return True + +""" Describes CPU specific circular buffers """ +class DebugStreamSectionDescriptor(ctypes.Structure): + _fields_ = [ + ("core_id", ctypes.c_uint), + ("buf_words", ctypes.c_uint), + ("offset", ctypes.c_uint), + # This is for cacheline alignment + ("padding", ctypes.c_ubyte * (64 - 3 * WSIZE)) + ] + +""" Debug Slot transport specific Debug Stream header, padded to cache line """ +class DebugStreamSlotHdr(ctypes.Structure): + _fields_ = [ + ("hdr", DebugStreamHdr), + ("total_size", ctypes.c_uint), + ("num_sections", ctypes.c_uint), + # This is for cacheline alignment + ("padding", ctypes.c_ubyte * (64 - 2 * WSIZE - + ctypes.sizeof(DebugStreamHdr))) + ] + +""" Live data header for CPU specific circular buffer """ +class CircularBufHdr(ctypes.Structure): + _fields_ = [ + ("next_serial", ctypes.c_uint), + ("w_ptr", ctypes.c_uint), + ] + +""" Class for extracting records from circular buffer """ +class CircularBufferDecoder: + desc = None + boffset = None + buf_words = None + cpu = None + printer = None + prev_w_ptr = 0 + prev_serial = None + error_count = 0 + def __init__(self, desc, cpu, printer): + self.desc = desc + self.boffset = desc.offset + ctypes.sizeof(CircularBufHdr) + self.buf_words = desc.buf_words + self.cpu = cpu + self.printer = printer + logging.debug(f"boffset {self.boffset} buf_words {self.buf_words} cpu {self.cpu}") + + def get_hdr(self, slot, pos): + if pos >= self.buf_words: + logging.warning(f"Bad position {pos}") + return None + hdr_size = ctypes.sizeof(DebugStreamRecord) + hdr_words = hdr_size // WSIZE + if pos + hdr_words > self.buf_words: + hdr = (ctypes.c_ubyte * hdr_size)() + size1 = (self.buf_words - pos) * WSIZE + size2 = hdr_size - size1 + pos1 = self.boffset + pos * WSIZE + pos2 = self.boffset + logging.debug(f"Wrapped header {pos} {hdr_words} {self.buf_words} {size1}") + + hdr[0:size1] = slot[pos1:pos1 + size1] + hdr[size1:hdr_size] = slot[pos2:pos2 + size2] + header = ctypes.cast(hdr, ctypes.POINTER(DebugStreamRecord)).contents + else: + header = ctypes.cast(slot[self.boffset + pos * WSIZE:], + ctypes.POINTER(DebugStreamRecord)).contents + if header.id > 100 or header.size >= self.buf_words: + logging.warning(f"Broken record id {header.id} serial {header.serial} size {header.size}") + return None + return header + + def get_record(self, slot, pos, serial): + rec = self.get_hdr(slot, pos) + if rec == None or rec.size == 0: + return None + logging.debug(f"got header at pos {pos} rec {rec.id} {rec.serial} {rec.size}") + if serial != None and rec.serial != serial: + logging.warning(f"Record serial mismatch {rec.serial} != {serial}, pos {pos} size {rec.size}") + self.error_count = self.error_count + 1 + return None + rwords = rec.size + rsize = rec.size * WSIZE + if pos + rwords > self.buf_words: + record = (ctypes.c_ubyte * rsize)() + size1 = (self.buf_words - pos) * WSIZE + size2 = rsize - size1 + pos1 = self.boffset + pos * WSIZE + pos2 = self.boffset + logging.debug(f"Wrapped record {pos} {rsize} {self.buf_words} {size1}") + + record[0:size1] = slot[pos1:pos1 + size1] + record[size1:rsize] = slot[pos2:pos2 + size2] + else: + record = (ctypes.c_ubyte * rsize + ).from_buffer_copy(slot, self.boffset + pos * WSIZE) + logging.info(f"got {rec.serial}") + self.error_count = 0 + return record + + def catch_up(self, slot): + circ = CircularBufHdr.from_buffer_copy( + slot, self.desc.offset) + if circ.next_serial == 0 or circ.w_ptr >= self.buf_words: + return + self.decode_past_records(slot, circ.w_ptr, circ.next_serial) + self.prev_w_ptr = circ.w_ptr + self.prev_serial = circ.next_serial - 1 + logging.info(f"serial {self.prev_serial} w_ptr {self.prev_w_ptr}") + + def decode_past_records(self, slot, pos, serial): + if self.prev_serial != None and self.prev_serial >= serial - 1: + return + if pos == 0: + spos = self.buf_words - 1 + else: + spos = pos - 1 + bsize = ctypes.cast(slot[self.boffset + spos*4:], + ctypes.POINTER(ctypes.c_uint)).contents.value + bpos = pos - bsize + if bpos < 0: + bpos = self.buf_words + pos - bsize + rec = self.get_hdr(slot, bpos) + if bsize != rec.size: + return + if serial != None: + if rec.serial != serial - 1: + return + else: + serial = rec.serial + 1 + + self.decode_past_records(slot, bpos, serial - 1) + + record = self.get_record(slot, bpos, serial - 1) + if record != None: + if not self.printer.print_record(record, self.cpu): + logging.info(f"Parse failed on record {serial - 1}") + logging.info(f"Printing {serial - 1} success") + else: + logging.info(f"Broken record {serial - 1}") + + def get_next_record(self, slot): + if self.prev_serial != None: + record = self.get_record(slot, self.prev_w_ptr, self.prev_serial + 1) + else: + record = self.get_record(slot, self.prev_w_ptr, None) + if record != None: + success = self.printer.print_record(record, self.cpu) + if success: + recp = ctypes.cast(record, ctypes.POINTER(DebugStreamRecord)) + self.prev_w_ptr = (self.prev_w_ptr + recp.contents.size + ) % self.buf_words + self.prev_serial = recp.contents.serial + else: + logging.info(f"Parse failed on record {self.prev_serial + 1}") + return success + self.error_count = self.error_count + 1 + logging.info(f"Record decoding failed {self.error_count}") + return False + + def poll_buffer(self, slot): + circ = CircularBufHdr.from_buffer_copy( + slot, self.desc.offset) + if self.prev_w_ptr == circ.w_ptr: + return False + success = True + while self.prev_w_ptr != circ.w_ptr and success: + success = self.get_next_record(slot) + return True + + def check_error_count(self): + if self.error_count > 3: + return True + return False + +class DebugStreamDecoder: + """ + Class for finding thread analyzer chuck and initializing CoreData objects. + """ + file_size = 4096 # ADSP debug slot size + f = None + slot = None + descs = [] + circdec = [] + rec_printer = RecordPrinter() + + def set_file(self, f): + self.f = f + + def update_slot(self): + self.f.seek(0) + self.slot = self.f.read(self.file_size) + + def get_descriptors(self): + if self.slot == None: + return + hdr = ctypes.cast(self.slot, ctypes. + POINTER(DebugStreamSlotHdr)) + if hdr.contents.hdr.magic != DEBUG_STREAM_PAYLOAD_MAGIC: + logging.warning("Debug Slot has bad magic 0x%08x" % + hdr.contents.hdr.magic) + return False + num_sections = hdr.contents.num_sections + if num_sections == len(self.descs): + return True + hsize = ctypes.sizeof(DebugStreamSlotHdr) + dsize = ctypes.sizeof(DebugStreamSectionDescriptor) + self.descs = (DebugStreamSectionDescriptor * num_sections + ).from_buffer_copy(self.slot, hsize) + self.circdec = [CircularBufferDecoder(self.descs[i], i, + self.rec_printer) + for i in range(len(self.descs))] + logging.info(f"Descriptors {hdr.contents.hdr.hdr_size} {hdr.contents.total_size} {hdr.contents.num_sections}") + return True + + def catch_up_all(self): + if len(self.descs) == 0 or self.slot == None: + return + for i in range(len(self.descs)): + self.circdec[i].catch_up(self.slot) + + def poll(self): + if len(self.descs) == 0 or self.slot == None: + return + sleep = True + for i in range(len(self.descs)): + if self.circdec[i].poll_buffer(self.slot): + sleep = False + return sleep + + def check_slot(self): + hdr = ctypes.cast(self.slot, ctypes. + POINTER(DebugStreamSlotHdr)) + if hdr.contents.hdr.magic != DEBUG_STREAM_PAYLOAD_MAGIC: + self.slot = None + return False + if hdr.contents.num_sections != len(self.descs): + self.slot = None + return False + for i in range(len(self.descs)): + if self.circdec[i].check_error_count(): + self.circdec[i] = CircularBufferDecoder(self.descs[i], i, + self.rec_printer) + return True + + def reset(self): + self.f = None + self.slot = None + +def main_f(args): + """ + Open debug stream slot file and pass it to decoder + """ + decoder = DebugStreamDecoder() + prev_error = None + while True: + try: + with open(args.debugstream_file, "rb") as f: + decoder.set_file(f) + decoder.update_slot() + if not decoder.get_descriptors(): + break + decoder.catch_up_all() + while True: + if decoder.poll(): + time.sleep(args.update_interval) + decoder.update_slot() + if not decoder.check_slot(): + break + + except FileNotFoundError: + print(f"File {args.debugstream_file} not found!") + break + except OSError as e: + if str(e) != prev_error: + print(f"Open {args.debugstream_file} failed '{e}'") + prev_error = str(e) + decoder.reset() + time.sleep(args.update_interval) + +def parse_params(): + """ Parses parameters + """ + parser = argparse.ArgumentParser(description= + "SOF DebugStream thread info client. ") + parser.add_argument('-t', '--update-interval', type=float, + help='Telemetry2 window polling interval in seconds, default 1', + default=0.01) + parser.add_argument('-f', '--debugstream-file', + help='File to read the DebugStream data from, default /sys/kernel/debug/sof/debug_stream', + default="/sys/kernel/debug/sof/debug_stream") + parsed_args = parser.parse_args() + return parsed_args + +args = parse_params() +main_f(args) diff --git a/tools/debugstream/thread-info.py b/tools/debugstream/thread-info.py deleted file mode 100644 index c5c5e5783361..000000000000 --- a/tools/debugstream/thread-info.py +++ /dev/null @@ -1,217 +0,0 @@ -#!/usr/bin/env python3 -# SPDX-License-Identifier: BSD-3-Clause -# -# Copyright (c) 2024, Intel Corporation. - -import argparse -import mmap -import ctypes -import time -import sys - -TELEMETRY2_PAYLOAD_MAGIC = 0x1ED15EED - -TELEMETRY2_CHUNK_ID_EMPTY = 0 -TELEMETRY2_ID_THREAD_INFO = 1 - -TELEMETRY2_PAYLOAD_V0_0 = 0 - -#telemetry2 payload -class Telemetry2PayloadHdr(ctypes.Structure): - _fields_ = [ - ("magic", ctypes.c_uint), - ("hdr_size", ctypes.c_uint), - ("total_size", ctypes.c_uint), - ("abi", ctypes.c_uint), - ("tstamp", ctypes.c_ulonglong) - ] - -# telemetry2 header struct -class Telemetry2ChunkHdr(ctypes.Structure): - _fields_ = [("id", ctypes.c_uint), ("size", ctypes.c_uint)] - -def get_telemetry2_payload_ok(slot): - payload_hdr = ctypes.cast(slot, - ctypes.POINTER(Telemetry2PayloadHdr)) - if payload_hdr.contents.magic != TELEMETRY2_PAYLOAD_MAGIC: - print("Telemetry2 payload header bad magic 0x%x\n" % - payload_hdr.contents.magic) - return False - if payload_hdr.contents.abi != TELEMETRY2_PAYLOAD_V0_0: - print("Unknown Telemetry2 abi version %u\n" % - payload_hdr.contents.abi) - return True - -def get_telemetry2_chunk_offset(id, slot): - """ - Generic function to get telmetry2 chunk offset by type - """ - offset = 0 - while True: - struct_ptr = ctypes.cast(slot[offset:], - ctypes.POINTER(Telemetry2ChunkHdr)) - #print("Checking %u id 0x%x size %u\n" % - # (offset, struct_ptr.contents.id, struct_ptr.contents.size)) - if struct_ptr.contents.id == id: - return offset - if struct_ptr.contents.size == 0: - return -1 - offset = offset + struct_ptr.contents.size - -class ThreadInfo(ctypes.Structure): - _pack_ = 1 - _fields_ = [ - ("name", ctypes.c_char * 14), - ("stack_usage", ctypes.c_ubyte), - ("cpu_usage", ctypes.c_ubyte), - ] - -class CPUInfo(ctypes.Structure): - _pack_ = 1 - _fields_ = [ - ("state", ctypes.c_ubyte), - ("counter", ctypes.c_ubyte), - ("load", ctypes.c_ubyte), - ("thread_count", ctypes.c_ubyte), - ("thread", ThreadInfo * 16), - ] - -class ThreadInfoChunk(ctypes.Structure): - _pack_ = 1 - _fields_ = [ - ("hdr", Telemetry2ChunkHdr ), - ("core_count", ctypes.c_ushort), - ("core_offsets", ctypes.c_ushort * 16), - ] - -class CoreData: - """ Class for tracking thread analyzer info for one core """ - counter = 0 - offset = 0 - core = 0 - STATE_UNINITIALIZED = 0 - STATE_BEING_UPDATED = 1 - STATE_UPTODATE = 2 - - def __init__(self, offset, core): - self.counter = 0 - self.offset = offset - self.core = core - - def print(self, slot): - cpu_info = ctypes.cast(slot[self.offset:], - ctypes.POINTER(CPUInfo)) - if self.counter == cpu_info.contents.counter: - return - if cpu_info.contents.state != self.STATE_UPTODATE: - return - self.counter = cpu_info.contents.counter - print("Core %d load %02.1f%%" % - (self.core, cpu_info.contents.load / 2.55)) - for i in range(cpu_info.contents.thread_count): - thread = cpu_info.contents.thread[i] - print("\t%-20s cpu %02.1f%% stack %02.1f%%" % - (thread.name.decode('utf-8'), thread.cpu_usage / 2.55, - thread.stack_usage / 2.55)); - -class Telemetry2ThreadAnalyzerDecoder: - """ - Class for finding thread analyzer chuck and initializing CoreData objects. - """ - file_size = 4096 # ADSP debug slot size - f = None - chunk_offset = -1 - chunk = None - core_data = [] - - def set_file(self, f): - self.f = f - - def decode_chunk(self): - if self.chunk != None: - return - self.f.seek(0) - slot = self.f.read(self.file_size) - if not get_telemetry2_payload_ok(slot): - return - self.chunk_offset = get_telemetry2_chunk_offset(TELEMETRY2_ID_THREAD_INFO, slot) - if self.chunk_offset < 0: - print("Thread info chunk not found") - self.chunk = None - return - self.chunk = ctypes.cast(slot[self.chunk_offset:], - ctypes.POINTER(ThreadInfoChunk)) - hdr = self.chunk.contents.hdr - #print(f"Chunk at {self.chunk_offset} id: {hdr.id} size: {hdr.size}") - print(f"core_count: {self.chunk.contents.core_count}") - - def init_core_data(self): - if self.chunk == None: - return - if len(self.core_data) == self.chunk.contents.core_count: - return - self.core_data = [None] * self.chunk.contents.core_count - for i in range(self.chunk.contents.core_count): - offset = self.chunk_offset + self.chunk.contents.core_offsets[i] - self.core_data[i] = CoreData(offset, i) - #print(f"core_offsets {i}: {self.chunk.contents.core_offsets[i]}") - - def loop_cores(self): - if len(self.core_data) == 0: - return - self.f.seek(0) - slot = self.f.read(self.file_size) - for i in range(len(self.core_data)): - self.core_data[i].print(slot) - - def reset(self): - self.f = None - self.chunk = None - - def print_status(self, tag): - print(f"{tag} f {self.f} offset {self.chunk_offset} chunk {self.chunk} cores {len(self.core_data)}") - -def main_f(args): - """ - Map telemetry2 slot into memorey and find the chunk we are interested in - """ - decoder = Telemetry2ThreadAnalyzerDecoder() - prev_error = None - while True: - try: - with open(args.telemetry2_file, "rb") as f: - decoder.set_file(f) - decoder.decode_chunk() - decoder.init_core_data() - while True: - decoder.loop_cores() - time.sleep(args.update_interval) - except FileNotFoundError: - print(f"File {args.telemetry2_file} not found!") - break - except OSError as e: - if str(e) != prev_error: - print(f"Open {args.telemetry2_file} failed '{e}'") - prev_error = str(e) - decoder.reset() - time.sleep(args.update_interval) - -def parse_params(): - """ Parses parameters - """ - parser = argparse.ArgumentParser(description= - "SOF Telemetry2 thread info client. "+ - "Opens telemetry2 slot debugfs file, looks"+ - " for thread info chunk, and decodes its "+ - "contentes into stdout periodically.") - parser.add_argument('-t', '--update-interval', type=float, - help='Telemetry2 window polling interval in seconds, default 1', - default=1) - parser.add_argument('-f', '--telemetry2-file', - help='File to read the telemetry2 data from, default /sys/kernel/debug/sof/telemetry2', - default="/sys/kernel/debug/sof/telemetry2") - parsed_args = parser.parse_args() - return parsed_args - -args = parse_params() -main_f(args)