Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Cursor:
fetchmany(size=None) -> Sequence of sequences (e.g. list of tuples).
fetchall() -> Sequence of sequences (e.g. list of tuples).
nextset() -> True if there is another result set, None otherwise.
next() -> Fetch the next row from the cursor.
setinputsizes(sizes) -> None.
setoutputsize(size, column=None) -> None.
"""
Expand Down Expand Up @@ -536,6 +537,48 @@ def _map_data_type(self, sql_type):
}
return sql_to_python_type.get(sql_type, str)

def __iter__(self):
"""
Return the cursor itself as an iterator.

This allows direct iteration over the cursor after execute():

for row in cursor.execute("SELECT * FROM table"):
print(row)
"""
self._check_closed()
return self

def __next__(self):
"""
Fetch the next row when iterating over the cursor.

Returns:
The next Row object.

Raises:
StopIteration: When no more rows are available.
"""
self._check_closed()
row = self.fetchone()
if row is None:
raise StopIteration
return row

def next(self):
"""
Fetch the next row from the cursor.

This is an alias for __next__() to maintain compatibility with older code.

Returns:
The next Row object.

Raises:
StopIteration: When no more rows are available.
"""
return self.__next__()

def execute(
self,
operation: str,
Expand Down
306 changes: 306 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,252 @@ def test_chaining_with_parameters(cursor, db_connection):
except:
pass

def test_chaining_with_iteration(cursor, db_connection):
"""Test method chaining with iteration (for loop)"""
try:
# Create test table
cursor.execute("CREATE TABLE #test_iteration (id INT, name NVARCHAR(50))")
db_connection.commit()

# Insert test data
names = ["Alice", "Bob", "Charlie", "Diana"]
for i, name in enumerate(names, 1):
cursor.execute("INSERT INTO #test_iteration VALUES (?, ?)", i, name)
db_connection.commit()

# Test iteration over execute() result (should work because cursor implements __iter__)
results = []
for row in cursor.execute("SELECT id, name FROM #test_iteration ORDER BY id"):
results.append((row[0], row[1]))

expected = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "Diana")]
assert results == expected, f"Iteration results should match expected: {results} != {expected}"

# Test iteration with WHERE clause
results = []
for row in cursor.execute("SELECT name FROM #test_iteration WHERE id > ?", 2):
results.append(row[0])

expected_names = ["Charlie", "Diana"]
assert results == expected_names, f"Filtered iteration should return: {expected_names}, got: {results}"

finally:
try:
cursor.execute("DROP TABLE #test_iteration")
db_connection.commit()
except:
pass

def test_cursor_next_functionality(cursor, db_connection):
"""Test cursor next() functionality for future iterator implementation"""
try:
# Create test table
cursor.execute("CREATE TABLE #test_next (id INT, name NVARCHAR(50))")
db_connection.commit()

# Insert test data
test_data = [
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "Diana")
]

for id_val, name in test_data:
cursor.execute("INSERT INTO #test_next VALUES (?, ?)", id_val, name)
db_connection.commit()

# Execute query
cursor.execute("SELECT id, name FROM #test_next ORDER BY id")

# Test next() function (this will work once __iter__ and __next__ are implemented)
# For now, we'll test the equivalent functionality using fetchone()

# Test 1: Get first row using next() equivalent
first_row = cursor.fetchone()
assert first_row is not None, "First row should not be None"
assert first_row[0] == 1, "First row id should be 1"
assert first_row[1] == "Alice", "First row name should be Alice"

# Test 2: Get second row using next() equivalent
second_row = cursor.fetchone()
assert second_row is not None, "Second row should not be None"
assert second_row[0] == 2, "Second row id should be 2"
assert second_row[1] == "Bob", "Second row name should be Bob"

# Test 3: Get third row using next() equivalent
third_row = cursor.fetchone()
assert third_row is not None, "Third row should not be None"
assert third_row[0] == 3, "Third row id should be 3"
assert third_row[1] == "Charlie", "Third row name should be Charlie"

# Test 4: Get fourth row using next() equivalent
fourth_row = cursor.fetchone()
assert fourth_row is not None, "Fourth row should not be None"
assert fourth_row[0] == 4, "Fourth row id should be 4"
assert fourth_row[1] == "Diana", "Fourth row name should be Diana"

# Test 5: Try to get fifth row (should return None, equivalent to StopIteration)
fifth_row = cursor.fetchone()
assert fifth_row is None, "Fifth row should be None (no more data)"

# Test 6: Test with empty result set
cursor.execute("SELECT id, name FROM #test_next WHERE id > 100")
empty_row = cursor.fetchone()
assert empty_row is None, "Empty result set should return None immediately"

# Test 7: Test next() with single row result
cursor.execute("SELECT id, name FROM #test_next WHERE id = 2")
single_row = cursor.fetchone()
assert single_row is not None, "Single row should not be None"
assert single_row[0] == 2, "Single row id should be 2"
assert single_row[1] == "Bob", "Single row name should be Bob"

# Next call should return None
no_more_rows = cursor.fetchone()
assert no_more_rows is None, "No more rows should return None"

finally:
try:
cursor.execute("DROP TABLE #test_next")
db_connection.commit()
except:
pass

def test_cursor_next_with_different_data_types(cursor, db_connection):
"""Test next() functionality with various data types"""
try:
# Create test table with various data types
cursor.execute("""
CREATE TABLE #test_next_types (
id INT,
name NVARCHAR(50),
score FLOAT,
active BIT,
created_date DATE,
created_time DATETIME
)
""")
db_connection.commit()

# Insert test data with different types
from datetime import date, datetime
cursor.execute("""
INSERT INTO #test_next_types
VALUES (?, ?, ?, ?, ?, ?)
""", 1, "Test User", 95.5, True, date(2024, 1, 15), datetime(2024, 1, 15, 10, 30, 0))
db_connection.commit()

# Execute query and test next() equivalent
cursor.execute("SELECT * FROM #test_next_types")

# Get the row using next() equivalent (fetchone)
row = cursor.fetchone()
assert row is not None, "Row should not be None"
assert row[0] == 1, "ID should be 1"
assert row[1] == "Test User", "Name should be 'Test User'"
assert abs(row[2] - 95.5) < 0.001, "Score should be approximately 95.5"
assert row[3] == True, "Active should be True"
assert row[4] == date(2024, 1, 15), "Date should match"
assert row[5] == datetime(2024, 1, 15, 10, 30, 0), "Datetime should match"

# Next call should return None
next_row = cursor.fetchone()
assert next_row is None, "No more rows should return None"

finally:
try:
cursor.execute("DROP TABLE #test_next_types")
db_connection.commit()
except:
pass

def test_cursor_next_error_conditions(cursor, db_connection):
"""Test next() functionality error conditions"""
try:
# Test next() on closed cursor (should raise exception when implemented)
test_cursor = db_connection.cursor()
test_cursor.execute("SELECT 1")
test_cursor.close()

# This should raise an exception when iterator is implemented
try:
test_cursor.fetchone() # Equivalent to next() call
assert False, "Should raise exception on closed cursor"
except Exception:
pass # Expected behavior

# Test next() without executing query first
fresh_cursor = db_connection.cursor()
try:
fresh_cursor.fetchone() # This might work but return None or raise exception
except Exception:
pass # Either behavior is acceptable
finally:
fresh_cursor.close()

except Exception as e:
# Some error conditions might not be testable without full iterator implementation
pass

def test_future_iterator_protocol_compatibility(cursor, db_connection):
"""Test that demonstrates future iterator protocol usage"""
try:
# Create test table
cursor.execute("CREATE TABLE #test_future_iter (value INT)")
db_connection.commit()

# Insert test data
for i in range(1, 4):
cursor.execute("INSERT INTO #test_future_iter VALUES (?)", i)
db_connection.commit()

# Execute query
cursor.execute("SELECT value FROM #test_future_iter ORDER BY value")

# Demonstrate how it will work with iterator protocol:
# This is what will be possible once __iter__ and __next__ are implemented:

# Method 1: Using next() function (future implementation)
# row1 = next(cursor) # Will work with __next__
# row2 = next(cursor) # Will work with __next__
# row3 = next(cursor) # Will work with __next__
# try:
# row4 = next(cursor) # Should raise StopIteration
# except StopIteration:
# pass

# Method 2: Using for loop (future implementation)
# results = []
# for row in cursor: # Will work with __iter__ and __next__
# results.append(row[0])

# For now, test equivalent functionality with fetchone()
results = []
while True:
row = cursor.fetchone()
if row is None:
break
results.append(row[0])

expected = [1, 2, 3]
assert results == expected, f"Results should be {expected}, got {results}"

# Test method chaining with iteration (current working implementation)
results2 = []
for row in cursor.execute("SELECT value FROM #test_future_iter ORDER BY value DESC").fetchall():
results2.append(row[0])

expected2 = [3, 2, 1]
assert results2 == expected2, f"Chained results should be {expected2}, got {results2}"

finally:
try:
cursor.execute("DROP TABLE #test_future_iter")
db_connection.commit()
except:
pass

def test_chaining_error_handling(cursor):
"""Test that chaining works properly even when errors occur"""
# Test that cursor is still chainable after an error
Expand Down Expand Up @@ -1581,6 +1827,66 @@ def test_chaining_performance_statement_reuse(cursor, db_connection):
except:
pass

def test_execute_chaining_compatibility_examples(cursor, db_connection):
"""Test real-world chaining examples"""
try:
# Create users table
cursor.execute("""
CREATE TABLE #users (
user_id INT IDENTITY(1,1) PRIMARY KEY,
user_name NVARCHAR(50),
last_logon DATETIME,
status NVARCHAR(20)
)
""")
db_connection.commit()

# Insert test users
cursor.execute("INSERT INTO #users (user_name, status) VALUES ('john_doe', 'active')")
cursor.execute("INSERT INTO #users (user_name, status) VALUES ('jane_smith', 'inactive')")
db_connection.commit()

# Example 1: Iterate over results directly (pyodbc style)
user_names = []
for row in cursor.execute("SELECT user_id, user_name FROM #users WHERE status = ?", "active"):
user_names.append(f"{row.user_id}: {row.user_name}")
assert len(user_names) == 1, "Should find 1 active user"
assert "john_doe" in user_names[0], "Should contain john_doe"

# Example 2: Single row fetch chaining
user = cursor.execute("SELECT user_name FROM #users WHERE user_id = ?", 1).fetchone()
assert user[0] == "john_doe", "Should return john_doe"

# Example 3: All rows fetch chaining
all_users = cursor.execute("SELECT user_name FROM #users ORDER BY user_id").fetchall()
assert len(all_users) == 2, "Should return 2 users"
assert all_users[0] == ["john_doe"], "First user should be john_doe"
assert all_users[1] == ["jane_smith"], "Second user should be jane_smith"

# Example 4: Update with rowcount chaining
from datetime import datetime
now = datetime.now()
updated_count = cursor.execute(
"UPDATE #users SET last_logon = ? WHERE user_name = ?",
now, "john_doe"
).rowcount
assert updated_count == 1, "Should update 1 user"

# Example 5: Delete with rowcount chaining
deleted_count = cursor.execute("DELETE FROM #users WHERE status = ?", "inactive").rowcount
assert deleted_count == 1, "Should delete 1 inactive user"

# Verify final state
remaining_users = cursor.execute("SELECT COUNT(*) FROM #users").fetchone()[0]
assert remaining_users == 1, "Should have 1 user remaining"

finally:
try:
cursor.execute("DROP TABLE #users")
db_connection.commit()
except:
pass

def test_close(db_connection):
"""Test closing the cursor"""
try:
Expand Down