Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 160 additions & 134 deletions lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from typing import Optional
import uuid

import dap_server
Expand All @@ -11,10 +12,14 @@
class DAPTestCaseBase(TestBase):
# set timeout based on whether ASAN was enabled or not. Increase
# timeout by a factor of 10 if ASAN is enabled.
timeoutval = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
DEFAULT_TIMEOUT = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
NO_DEBUG_INFO_TESTCASE = True

def create_debug_adapter(self, lldbDAPEnv=None, connection=None):
def create_debug_adapter(
self,
lldbDAPEnv: Optional[dict[str, str]] = None,
connection: Optional[str] = None,
):
"""Create the Visual Studio Code debug adapter"""
self.assertTrue(
is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
Expand All @@ -28,7 +33,11 @@ def create_debug_adapter(self, lldbDAPEnv=None, connection=None):
env=lldbDAPEnv,
)

def build_and_create_debug_adapter(self, lldbDAPEnv=None, dictionary=None):
def build_and_create_debug_adapter(
self,
lldbDAPEnv: Optional[dict[str, str]] = None,
dictionary: Optional[dict] = None,
):
self.build(dictionary=dictionary)
self.create_debug_adapter(lldbDAPEnv)

Expand Down Expand Up @@ -78,13 +87,13 @@ def waitUntil(self, condition_callback):
time.sleep(0.5)
return False

def verify_breakpoint_hit(self, breakpoint_ids):
def verify_breakpoint_hit(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT):
"""Wait for the process we are debugging to stop, and verify we hit
any breakpoint location in the "breakpoint_ids" array.
"breakpoint_ids" should be a list of breakpoint ID strings
(["1", "2"]). The return value from self.set_source_breakpoints()
or self.set_function_breakpoints() can be passed to this function"""
stopped_events = self.dap_server.wait_for_stopped()
stopped_events = self.dap_server.wait_for_stopped(timeout)
for stopped_event in stopped_events:
if "body" in stopped_event:
body = stopped_event["body"]
Expand All @@ -110,16 +119,15 @@ def verify_breakpoint_hit(self, breakpoint_ids):
match_desc = "breakpoint %s." % (breakpoint_id)
if match_desc in description:
return
self.assertTrue(False, "breakpoint not hit")
self.assertTrue(False, f"breakpoint not hit, stopped_events={stopped_events}")

def verify_stop_exception_info(self, expected_description, timeout=timeoutval):
def verify_stop_exception_info(self, expected_description, timeout=DEFAULT_TIMEOUT):
"""Wait for the process we are debugging to stop, and verify the stop
reason is 'exception' and that the description matches
'expected_description'
"""
stopped_events = self.dap_server.wait_for_stopped(timeout=timeout)
stopped_events = self.dap_server.wait_for_stopped(timeout)
for stopped_event in stopped_events:
print("stopped_event", stopped_event)
if "body" in stopped_event:
body = stopped_event["body"]
if "reason" not in body:
Expand Down Expand Up @@ -263,46 +271,61 @@ def set_global(self, name, value, id=None):
return self.dap_server.request_setVariable(2, name, str(value), id=id)

def stepIn(
self, threadId=None, targetId=None, waitForStop=True, granularity="statement"
self,
threadId=None,
targetId=None,
waitForStop=True,
granularity="statement",
timeout=DEFAULT_TIMEOUT,
):
response = self.dap_server.request_stepIn(
threadId=threadId, targetId=targetId, granularity=granularity
)
self.assertTrue(response["success"])
if waitForStop:
return self.dap_server.wait_for_stopped()
return self.dap_server.wait_for_stopped(timeout)
return None

def stepOver(self, threadId=None, waitForStop=True, granularity="statement"):
def stepOver(
self,
threadId=None,
waitForStop=True,
granularity="statement",
timeout=DEFAULT_TIMEOUT,
):
self.dap_server.request_next(threadId=threadId, granularity=granularity)
if waitForStop:
return self.dap_server.wait_for_stopped()
return self.dap_server.wait_for_stopped(timeout)
return None

def stepOut(self, threadId=None, waitForStop=True):
def stepOut(self, threadId=None, waitForStop=True, timeout=DEFAULT_TIMEOUT):
self.dap_server.request_stepOut(threadId=threadId)
if waitForStop:
return self.dap_server.wait_for_stopped()
return self.dap_server.wait_for_stopped(timeout)
return None

def continue_to_next_stop(self):
self.dap_server.request_continue()
return self.dap_server.wait_for_stopped()
def do_continue(self): # `continue` is a keyword.
resp = self.dap_server.request_continue()
self.assertTrue(resp["success"], f"continue request failed: {resp}")

def continue_to_next_stop(self, timeout=DEFAULT_TIMEOUT):
self.do_continue()
return self.dap_server.wait_for_stopped(timeout)

def continue_to_breakpoints(self, breakpoint_ids):
self.dap_server.request_continue()
self.verify_breakpoint_hit(breakpoint_ids)
def continue_to_breakpoints(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT):
self.do_continue()
self.verify_breakpoint_hit(breakpoint_ids, timeout)

def continue_to_exception_breakpoint(self, filter_label):
self.dap_server.request_continue()
def continue_to_exception_breakpoint(self, filter_label, timeout=DEFAULT_TIMEOUT):
self.do_continue()
self.assertTrue(
self.verify_stop_exception_info(filter_label),
self.verify_stop_exception_info(filter_label, timeout),
'verify we got "%s"' % (filter_label),
)

def continue_to_exit(self, exitCode=0):
self.dap_server.request_continue()
stopped_events = self.dap_server.wait_for_stopped()
def continue_to_exit(self, exitCode=0, timeout=DEFAULT_TIMEOUT):
self.do_continue()
stopped_events = self.dap_server.wait_for_stopped(timeout)
self.assertEqual(
len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
)
Expand Down Expand Up @@ -330,27 +353,15 @@ def disassemble(self, threadId=None, frameIndex=None):

def attach(
self,
program=None,
pid=None,
waitFor=None,
trace=None,
initCommands=None,
preRunCommands=None,
stopCommands=None,
exitCommands=None,
attachCommands=None,
coreFile=None,
*,
stopOnAttach=True,
disconnectAutomatically=True,
terminateCommands=None,
postRunCommands=None,
sourceMap=None,
sourceInitFile=False,
expectFailure=False,
gdbRemotePort=None,
gdbRemoteHostname=None,
sourceBreakpoints=None,
functionBreakpoints=None,
timeout=DEFAULT_TIMEOUT,
**kwargs,
):
"""Build the default Makefile target, create the DAP debug adapter,
and attach to the process.
Expand All @@ -367,7 +378,7 @@ def cleanup():
self.addTearDownHook(cleanup)
# Initialize and launch the program
self.dap_server.request_initialize(sourceInitFile)
self.dap_server.wait_for_event("initialized")
self.dap_server.wait_for_event("initialized", timeout)

# Set source breakpoints as part of the launch sequence.
if sourceBreakpoints:
Expand All @@ -389,64 +400,28 @@ def cleanup():
)

self.dap_server.request_configurationDone()
response = self.dap_server.request_attach(
program=program,
pid=pid,
waitFor=waitFor,
trace=trace,
initCommands=initCommands,
preRunCommands=preRunCommands,
stopCommands=stopCommands,
exitCommands=exitCommands,
attachCommands=attachCommands,
terminateCommands=terminateCommands,
coreFile=coreFile,
stopOnAttach=stopOnAttach,
postRunCommands=postRunCommands,
sourceMap=sourceMap,
gdbRemotePort=gdbRemotePort,
gdbRemoteHostname=gdbRemoteHostname,
)
response = self.dap_server.request_attach(stopOnAttach=stopOnAttach, **kwargs)
if expectFailure:
return response
if not (response and response["success"]):
self.assertTrue(
response["success"], "attach failed (%s)" % (response["message"])
)
if stopOnAttach:
self.dap_server.wait_for_stopped(timeout)

def launch(
self,
program=None,
args=None,
cwd=None,
env=None,
stopOnEntry=False,
disableASLR=False,
disableSTDIO=False,
shellExpandArguments=False,
trace=False,
initCommands=None,
preRunCommands=None,
stopCommands=None,
exitCommands=None,
terminateCommands=None,
sourcePath=None,
debuggerRoot=None,
*,
sourceInitFile=False,
launchCommands=None,
sourceMap=None,
disconnectAutomatically=True,
runInTerminal=False,
expectFailure=False,
postRunCommands=None,
enableAutoVariableSummaries=False,
displayExtendedBacktrace=False,
enableSyntheticChildDebugging=False,
commandEscapePrefix=None,
customFrameFormat=None,
customThreadFormat=None,
sourceBreakpoints=None,
functionBreakpoints=None,
expectFailure=False,
stopOnEntry=True,
timeout=DEFAULT_TIMEOUT,
**kwargs,
):
"""Sending launch request to dap"""

Expand All @@ -462,7 +437,7 @@ def cleanup():

# Initialize and launch the program
self.dap_server.request_initialize(sourceInitFile)
self.dap_server.wait_for_event("initialized")
self.dap_server.wait_for_event("initialized", timeout)

# Set source breakpoints as part of the launch sequence.
if sourceBreakpoints:
Expand All @@ -487,115 +462,36 @@ def cleanup():

response = self.dap_server.request_launch(
program,
args=args,
cwd=cwd,
env=env,
stopOnEntry=stopOnEntry,
disableASLR=disableASLR,
disableSTDIO=disableSTDIO,
shellExpandArguments=shellExpandArguments,
trace=trace,
initCommands=initCommands,
preRunCommands=preRunCommands,
stopCommands=stopCommands,
exitCommands=exitCommands,
terminateCommands=terminateCommands,
sourcePath=sourcePath,
debuggerRoot=debuggerRoot,
launchCommands=launchCommands,
sourceMap=sourceMap,
runInTerminal=runInTerminal,
postRunCommands=postRunCommands,
enableAutoVariableSummaries=enableAutoVariableSummaries,
displayExtendedBacktrace=displayExtendedBacktrace,
enableSyntheticChildDebugging=enableSyntheticChildDebugging,
commandEscapePrefix=commandEscapePrefix,
customFrameFormat=customFrameFormat,
customThreadFormat=customThreadFormat,
**kwargs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :-)

)

if expectFailure:
return response

if not (response and response["success"]):
self.assertTrue(
response["success"],
"launch failed (%s)" % (response["body"]["error"]["format"]),
)
if stopOnEntry:
self.dap_server.wait_for_stopped(timeout)

return response

def build_and_launch(
self,
program,
args=None,
cwd=None,
env=None,
stopOnEntry=False,
disableASLR=False,
disableSTDIO=False,
shellExpandArguments=False,
trace=False,
initCommands=None,
preRunCommands=None,
stopCommands=None,
exitCommands=None,
terminateCommands=None,
sourcePath=None,
debuggerRoot=None,
sourceInitFile=False,
runInTerminal=False,
disconnectAutomatically=True,
postRunCommands=None,
lldbDAPEnv=None,
enableAutoVariableSummaries=False,
displayExtendedBacktrace=False,
enableSyntheticChildDebugging=False,
commandEscapePrefix=None,
customFrameFormat=None,
customThreadFormat=None,
launchCommands=None,
expectFailure=False,
sourceBreakpoints=None,
functionBreakpoints=None,
*,
lldbDAPEnv: Optional[dict[str, str]] = None,
**kwargs,
):
"""Build the default Makefile target, create the DAP debug adapter,
and launch the process.
"""
self.build_and_create_debug_adapter(lldbDAPEnv)
self.assertTrue(os.path.exists(program), "executable must exist")

return self.launch(
program,
args,
cwd,
env,
stopOnEntry,
disableASLR,
disableSTDIO,
shellExpandArguments,
trace,
initCommands,
preRunCommands,
stopCommands,
exitCommands,
terminateCommands,
sourcePath,
debuggerRoot,
sourceInitFile,
runInTerminal=runInTerminal,
disconnectAutomatically=disconnectAutomatically,
postRunCommands=postRunCommands,
enableAutoVariableSummaries=enableAutoVariableSummaries,
enableSyntheticChildDebugging=enableSyntheticChildDebugging,
displayExtendedBacktrace=displayExtendedBacktrace,
commandEscapePrefix=commandEscapePrefix,
customFrameFormat=customFrameFormat,
customThreadFormat=customThreadFormat,
launchCommands=launchCommands,
expectFailure=expectFailure,
sourceBreakpoints=sourceBreakpoints,
functionBreakpoints=functionBreakpoints,
)
return self.launch(program, **kwargs)

def getBuiltinDebugServerTool(self):
# Tries to find simulation/lldb-server/gdbserver tool path.
Expand Down
Loading
Loading