Skip to content

Introduce manual SEA test scripts for Exec Phase #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: sea-migration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
138c2ae
[squash from exec-sea] bring over execution phase changes
varun-edachali-dbx Jun 9, 2025
3e3ab94
remove excess test
varun-edachali-dbx Jun 9, 2025
4a78165
add docstring
varun-edachali-dbx Jun 9, 2025
0dac4aa
remvoe exec func in sea backend
varun-edachali-dbx Jun 9, 2025
1b794c7
remove excess files
varun-edachali-dbx Jun 9, 2025
da5a6fe
remove excess models
varun-edachali-dbx Jun 9, 2025
686ade4
remove excess sea backend tests
varun-edachali-dbx Jun 9, 2025
31e6c83
cleanup
varun-edachali-dbx Jun 9, 2025
69ea238
re-introduce get_schema_desc
varun-edachali-dbx Jun 9, 2025
66d7517
remove SeaResultSet
varun-edachali-dbx Jun 9, 2025
71feef9
clean imports and attributes
varun-edachali-dbx Jun 9, 2025
ae9862f
pass CommandId to ExecResp
varun-edachali-dbx Jun 9, 2025
d8aa69e
remove changes in types
varun-edachali-dbx Jun 9, 2025
db139bc
add back essential types (ExecResponse, from_sea_state)
varun-edachali-dbx Jun 9, 2025
b977b12
fix fetch types
varun-edachali-dbx Jun 9, 2025
da615c0
excess imports
varun-edachali-dbx Jun 9, 2025
0da04a6
reduce diff by maintaining logs
varun-edachali-dbx Jun 9, 2025
ea9d456
fix int test types
varun-edachali-dbx Jun 9, 2025
8985c62
[squashed from exec-sea] init execution func
varun-edachali-dbx Jun 9, 2025
d9bcdbe
remove irrelevant changes
varun-edachali-dbx Jun 9, 2025
ee9fa1c
remove ResultSetFilter functionality
varun-edachali-dbx Jun 9, 2025
24c6152
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
67fd101
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
271fcaf
even more irrelevant changes
varun-edachali-dbx Jun 9, 2025
bf26ea3
remove sea response as init option
varun-edachali-dbx Jun 9, 2025
ed7cf91
exec test example scripts
varun-edachali-dbx Jun 9, 2025
dae15e3
formatting (black)
varun-edachali-dbx Jun 9, 2025
3e22c6c
change to valid table name
varun-edachali-dbx Jun 11, 2025
787f1f7
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx Jun 11, 2025
165c4f3
remove un-necessary changes
varun-edachali-dbx Jun 11, 2025
a6e40d0
simplify test module
varun-edachali-dbx Jun 11, 2025
52e3088
logging -> debug level
varun-edachali-dbx Jun 11, 2025
641c09b
change table name in log
varun-edachali-dbx Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 105 additions & 56 deletions examples/experimental/sea_connector_test.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,115 @@
"""
Main script to run all SEA connector tests.

This script runs all the individual test modules and displays
a summary of test results with visual indicators.
"""
import os
import sys
import logging
from databricks.sql.client import Connection
import subprocess
from typing import List, Tuple

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def test_sea_session():
"""
Test opening and closing a SEA session using the connector.

This function connects to a Databricks SQL endpoint using the SEA backend,
opens a session, and then closes it.

Required environment variables:
- DATABRICKS_SERVER_HOSTNAME: Databricks server hostname
- DATABRICKS_HTTP_PATH: HTTP path for the SQL endpoint
- DATABRICKS_TOKEN: Personal access token for authentication
"""

server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error("Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.")
sys.exit(1)

logger.info(f"Connecting to {server_hostname}")
logger.info(f"HTTP Path: {http_path}")
if catalog:
logger.info(f"Using catalog: {catalog}")

try:
logger.info("Creating connection with SEA backend...")
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client" # add custom user agent
TEST_MODULES = [
"test_sea_session",
"test_sea_sync_query",
"test_sea_async_query",
"test_sea_metadata",
]


def run_test_module(module_name: str) -> bool:
"""Run a test module and return success status."""
module_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "tests", f"{module_name}.py"
)

# Simply run the module as a script - each module handles its own test execution
result = subprocess.run(
[sys.executable, module_path], capture_output=True, text=True
)

# Log the output from the test module
if result.stdout:
for line in result.stdout.strip().split("\n"):
logger.info(line)

if result.stderr:
for line in result.stderr.strip().split("\n"):
logger.error(line)

return result.returncode == 0


def run_tests() -> List[Tuple[str, bool]]:
"""Run all tests and return results."""
results = []

for module_name in TEST_MODULES:
try:
logger.info(f"\n{'=' * 50}")
logger.info(f"Running test: {module_name}")
logger.info(f"{'-' * 50}")

success = run_test_module(module_name)
results.append((module_name, success))

status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"Test {module_name}: {status}")

except Exception as e:
logger.error(f"Error loading or running test {module_name}: {str(e)}")
import traceback

logger.error(traceback.format_exc())
results.append((module_name, False))

return results


def print_summary(results: List[Tuple[str, bool]]) -> None:
"""Print a summary of test results."""
logger.info(f"\n{'=' * 50}")
logger.info("TEST SUMMARY")
logger.info(f"{'-' * 50}")

passed = sum(1 for _, success in results if success)
total = len(results)

for module_name, success in results:
status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"{status} - {module_name}")

logger.info(f"{'-' * 50}")
logger.info(f"Total: {total} | Passed: {passed} | Failed: {total - passed}")
logger.info(f"{'=' * 50}")


if __name__ == "__main__":
# Check if required environment variables are set
required_vars = [
"DATABRICKS_SERVER_HOSTNAME",
"DATABRICKS_HTTP_PATH",
"DATABRICKS_TOKEN",
]
missing_vars = [var for var in required_vars if not os.environ.get(var)]

if missing_vars:
logger.error(
f"Missing required environment variables: {', '.join(missing_vars)}"
)

logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}")
logger.info(f"backend type: {type(connection.session.backend)}")

# Close the connection
logger.info("Closing the SEA session...")
connection.close()
logger.info("Successfully closed SEA session")

except Exception as e:
logger.error(f"Error testing SEA session: {str(e)}")
import traceback
logger.error(traceback.format_exc())
logger.error("Please set these variables before running the tests.")
sys.exit(1)

logger.info("SEA session test completed successfully")

if __name__ == "__main__":
test_sea_session()
# Run all tests
results = run_tests()

# Print summary
print_summary(results)

# Exit with appropriate status code
all_passed = all(success for _, success in results)
sys.exit(0 if all_passed else 1)
Empty file.
191 changes: 191 additions & 0 deletions examples/experimental/tests/test_sea_async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""
Test for SEA asynchronous query execution functionality.
"""
import os
import sys
import logging
import time
from databricks.sql.client import Connection
from databricks.sql.backend.types import CommandState

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def test_sea_async_query_with_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch enabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch enabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=True,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
cursor = connection.cursor()
logger.info(
"Executing asynchronous query with cloud fetch: SELECT 1 as test_value"
)
cursor.execute_async("SELECT 1 as test_value")
logger.info(
"Asynchronous query submitted successfully with cloud fetch enabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
logger.info(
"Successfully retrieved asynchronous query results with cloud fetch enabled"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_without_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch disabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch disabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=False,
enable_query_result_lz4_compression=False,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
cursor = connection.cursor()
logger.info(
"Executing asynchronous query without cloud fetch: SELECT 1 as test_value"
)
cursor.execute_async("SELECT 1 as test_value")
logger.info(
"Asynchronous query submitted successfully with cloud fetch disabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
logger.info(
"Successfully retrieved asynchronous query results with cloud fetch disabled"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_exec():
"""
Run both asynchronous query tests and return overall success.
"""
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch()
logger.info(
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}"
)

without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch()
logger.info(
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}"
)

return with_cloud_fetch_success and without_cloud_fetch_success


if __name__ == "__main__":
success = test_sea_async_query_exec()
sys.exit(0 if success else 1)
Loading
Loading