Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 232 additions & 17 deletions tests/test_003_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pytest
import time
from mssql_python import Connection, connect, pooling
import threading

def drop_table_if_exists(cursor, table_name):
"""Drop the table if it exists"""
Expand Down Expand Up @@ -225,31 +226,245 @@ def test_connection_close(conn_str):
temp_conn.close()

def test_connection_pooling_speed(conn_str):
# No pooling
start_no_pool = time.perf_counter()
# Disable pooling first
pooling(enabled=False)

# Warm up - establish initial connection to avoid first-connection overhead
warmup = connect(conn_str)
warmup.close()

# Measure multiple non-pooled connections
non_pooled_times = []
for _ in range(5):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
non_pooled_times.append(end - start)

avg_no_pool = sum(non_pooled_times) / len(non_pooled_times)

# Enable pooling
pooling(max_size=5, idle_timeout=30)

# Prime the pool with a connection
primer = connect(conn_str)
primer.close()

# Small delay to ensure connection is properly returned to pool
time.sleep(0.1)

# Measure multiple pooled connections
pooled_times = []
for _ in range(5):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
pooled_times.append(end - start)

avg_pooled = sum(pooled_times) / len(pooled_times)

# Pooled should be significantly faster than non-pooled
assert avg_pooled < avg_no_pool, \
f"Pooled connections ({avg_pooled:.6f}s) not significantly faster than non-pooled ({avg_no_pool:.6f}s)"

# Clean up - disable pooling for other tests
pooling(enabled=False)

def test_connection_pooling_reuse_spid(conn_str):
"""Test that connections are actually reused from the pool"""
# Enable pooling
pooling(max_size=1, idle_timeout=30)

# Create and close a connection
conn1 = connect(conn_str)
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID") # Get SQL Server process ID
spid1 = cursor1.fetchone()[0]
conn1.close()
end_no_pool = time.perf_counter()
no_pool_duration = end_no_pool - start_no_pool

# Second connection
start2 = time.perf_counter()

# Get another connection - should be the same one from pool
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
conn2.close()
end2 = time.perf_counter()
duration2 = end2 - start2

# The SPID should be the same, indicating connection reuse
assert spid1 == spid2, "Connections not reused - different SPIDs"

# Clean up

def test_pool_exhaustion_max_size_1(conn_str):
"""Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
pooling(max_size=1, idle_timeout=30)
conn1 = connect(conn_str)
results = []

def try_connect():
try:
conn2 = connect(conn_str)
results.append("success")
conn2.close()
except Exception as e:
results.append(str(e))

# Start a thread that will attempt to get a second connection while the first is open
t = threading.Thread(target=try_connect)
t.start()
t.join(timeout=2)
conn1.close()

# Depending on implementation, either blocks, raises, or times out
assert results, "Second connection attempt did not complete"
# If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
assert results[0] == "success" or "pool" in results[0].lower() or "timeout" in results[0].lower(), \
f"Unexpected pool exhaustion result: {results[0]}"
pooling(enabled=False)

def test_pool_idle_timeout_removes_connections(conn_str):
"""Test that idle_timeout removes connections from the pool after the timeout."""
pooling(max_size=2, idle_timeout=2)
conn1 = connect(conn_str)
spid_list = []
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
spid_list.append(spid1)
conn1.close()

# Pooling enabled
pooling(max_size=2, idle_timeout=10)
connect(conn_str).close()
# Wait for longer than idle_timeout
time.sleep(3)

# Pooled connection (should be reused, hence faster)
start_pool = time.perf_counter()
# Get a new connection, which should not reuse the previous SPID
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
spid_list.append(spid2)
conn2.close()
end_pool = time.perf_counter()
pool_duration = end_pool - start_pool
assert pool_duration < no_pool_duration, "Expected faster connection with pooling"

assert spid1 != spid2, "Idle timeout did not remove connection from pool"

def test_connection_timeout_invalid_password(conn_str):
"""Test that connecting with an invalid password raises an exception quickly (timeout)."""
# Modify the connection string to use an invalid password
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
start = time.perf_counter()
with pytest.raises(Exception):
connect(bad_conn_str)
elapsed = time.perf_counter() - start
# Should fail quickly (within 10 seconds)
assert elapsed < 10, f"Connection with invalid password took too long: {elapsed:.2f}s"

def test_connection_timeout_invalid_host(conn_str):
"""Test that connecting to an invalid host fails with a timeout."""
# Replace server/host with an invalid one
if "Server=" in conn_str:
bad_conn_str = conn_str.replace("Server=", "Server=invalidhost12345;")
elif "host=" in conn_str:
bad_conn_str = conn_str.replace("host=", "host=invalidhost12345;")
else:
pytest.skip("No server/host found in connection string to modify")
start = time.perf_counter()
with pytest.raises(Exception):
connect(bad_conn_str)
elapsed = time.perf_counter() - start
# Should fail within a reasonable time (30s)
# Note: This may vary based on network conditions, so adjust as needed
# but generally, a connection to an invalid host should not take too long
# to fail.
# If it takes too long, it may indicate a misconfiguration or network issue.
assert elapsed < 30, f"Connection to invalid host took too long: {elapsed:.2f}s"

def test_pool_removes_invalid_connections(conn_str):
"""Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
pooling(max_size=1, idle_timeout=30)
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
# Simulate invalidation by forcibly closing the connection at the driver level
try:
# Try to access a private attribute or method to forcibly close the underlying connection
# This is implementation-specific; if not possible, skip
if hasattr(conn, "_conn") and hasattr(conn._conn, "close"):
conn._conn.close()
else:
pytest.skip("Cannot forcibly close underlying connection for this driver")
except Exception:
pass
# Safely close the connection, ignoring errors due to forced invalidation
try:
conn.close()
except RuntimeError as e:
if "not initialized" not in str(e):
raise
# Now, get a new connection from the pool and ensure it works
new_conn = connect(conn_str)
new_cursor = new_conn.cursor()
try:
new_cursor.execute("SELECT 1")
result = new_cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not remove invalid connection"
finally:
new_conn.close()
pooling(enabled=False)

def test_pool_recovery_after_failed_connection(conn_str):
"""Test that the pool recovers after a failed connection attempt."""
pooling(max_size=1, idle_timeout=30)
# First, try to connect with a bad password (should fail)
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
with pytest.raises(Exception):
connect(bad_conn_str)
# Now, connect with the correct string and ensure it works
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not recover after failed connection"
conn.close()
pooling(enabled=False)

def test_pool_capacity_limit_and_overflow(conn_str):
"""Test that pool does not grow beyond max_size and handles overflow gracefully."""
pooling(max_size=2, idle_timeout=30)
conns = []
try:
# Open up to max_size connections
conns.append(connect(conn_str))
conns.append(connect(conn_str))
# Try to open a third connection, which should fail or block
overflow_result = []
def try_overflow():
try:
c = connect(conn_str)
overflow_result.append("success")
c.close()
except Exception as e:
overflow_result.append(str(e))
t = threading.Thread(target=try_overflow)
t.start()
t.join(timeout=2)
assert overflow_result, "Overflow connection attempt did not complete"
# Accept either block, error, or success if pool implementation allows overflow
assert overflow_result[0] == "success" or "pool" in overflow_result[0].lower() or "timeout" in overflow_result[0].lower(), \
f"Unexpected pool overflow result: {overflow_result[0]}"
finally:
for c in conns:
c.close()
pooling(enabled=False)

def test_connection_pooling_basic(conn_str):
# Enable pooling with small pool size
Expand Down