Skip to content

Commit e2526cb

Browse files
committed
adding streaming support in fetch for varcharmax type
1 parent 83ab8ea commit e2526cb

File tree

3 files changed

+250
-64
lines changed

3 files changed

+250
-64
lines changed

mssql_python/cursor.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -348,16 +348,16 @@ def _map_sql_type(self, param, parameters_list, i):
348348
if utf16_len > MAX_INLINE_CHAR: # Long strings -> DAE
349349
if is_unicode:
350350
return (
351-
ddbc_sql_const.SQL_WLONGVARCHAR.value,
351+
ddbc_sql_const.SQL_WVARCHAR.value,
352352
ddbc_sql_const.SQL_C_WCHAR.value,
353-
utf16_len,
353+
0,
354354
0,
355355
True,
356356
)
357357
return (
358-
ddbc_sql_const.SQL_LONGVARCHAR.value,
358+
ddbc_sql_const.SQL_VARCHAR.value,
359359
ddbc_sql_const.SQL_C_CHAR.value,
360-
len(param),
360+
0,
361361
0,
362362
True,
363363
)
@@ -766,20 +766,20 @@ def execute(
766766
# Executing a new statement. Reset is_stmt_prepared to false
767767
self.is_stmt_prepared = [False]
768768

769-
log('debug', "Executing query: %s", operation)
770-
for i, param in enumerate(parameters):
771-
log('debug',
772-
"""Parameter number: %s, Parameter: %s,
773-
Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""",
774-
i + 1,
775-
param,
776-
str(type(param)),
777-
parameters_type[i].paramSQLType,
778-
parameters_type[i].paramCType,
779-
parameters_type[i].columnSize,
780-
parameters_type[i].decimalDigits,
781-
parameters_type[i].inputOutputType,
782-
)
769+
# log('debug', "Executing query: %s", operation)
770+
# for i, param in enumerate(parameters):
771+
# log('debug',
772+
# """Parameter number: %s, Parameter: %s,
773+
# Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""",
774+
# i + 1,
775+
# param,
776+
# str(type(param)),
777+
# parameters_type[i].paramSQLType,
778+
# parameters_type[i].paramCType,
779+
# parameters_type[i].columnSize,
780+
# parameters_type[i].decimalDigits,
781+
# parameters_type[i].inputOutputType,
782+
# )
783783

784784
ret = ddbc_bindings.DDBCSQLExecute(
785785
self.hstmt,

mssql_python/pybind/ddbc_bindings.cpp

Lines changed: 119 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,8 +1711,88 @@ SQLRETURN SQLFetch_wrap(SqlHandlePtr StatementHandle) {
17111711
return SQLFetch_ptr(StatementHandle->get());
17121712
}
17131713

1714+
static py::object FetchLobColumnData(SQLHSTMT hStmt,
1715+
SQLUSMALLINT colIndex,
1716+
SQLSMALLINT cType,
1717+
bool isWideChar,
1718+
bool isBinary)
1719+
{
1720+
std::vector<char> buffer;
1721+
SQLLEN indicator = 0;
1722+
SQLRETURN ret;
1723+
int loopCount = 0;
1724+
1725+
while (true) {
1726+
++loopCount;
1727+
std::vector<char> chunk(DAE_CHUNK_SIZE);
1728+
ret = SQLGetData_ptr(
1729+
hStmt,
1730+
colIndex,
1731+
cType,
1732+
chunk.data(),
1733+
DAE_CHUNK_SIZE,
1734+
&indicator
1735+
);
1736+
if (indicator == SQL_NULL_DATA) {
1737+
LOG("Loop {}: Column {} is NULL", loopCount, colIndex);
1738+
return py::none();
1739+
}
1740+
if (!SQL_SUCCEEDED(ret) && ret != SQL_SUCCESS_WITH_INFO) {
1741+
LOG("Loop {}: Error fetching col={} with cType={} ret={}", loopCount, colIndex, cType, ret);
1742+
return py::none();
1743+
}
1744+
SQLLEN copyCount = 0;
1745+
if (indicator > 0 && indicator != SQL_NO_TOTAL) {
1746+
copyCount = std::min<SQLLEN>(indicator, DAE_CHUNK_SIZE);
1747+
} else {
1748+
copyCount = DAE_CHUNK_SIZE;
1749+
}
1750+
1751+
// Check if last byte(s) is a null terminator
1752+
if (copyCount > 0) {
1753+
if (!isWideChar && chunk[copyCount - 1] == '\0') {
1754+
--copyCount;
1755+
LOG("Loop {}: Trimmed null terminator (narrow)", loopCount);
1756+
} else if (isWideChar) {
1757+
auto wcharBuf = reinterpret_cast<const wchar_t*>(chunk.data());
1758+
if (wcharBuf[(copyCount / sizeof(wchar_t)) - 1] == L'\0') {
1759+
copyCount -= sizeof(wchar_t);
1760+
LOG("Loop {}: Trimmed null terminator (wide)", loopCount);
1761+
}
1762+
}
1763+
}
1764+
if (copyCount > 0) {
1765+
buffer.insert(buffer.end(), chunk.begin(), chunk.begin() + copyCount);
1766+
LOG("Loop {}: Appended {} bytes", loopCount, copyCount);
1767+
}
1768+
if (ret == SQL_SUCCESS) {
1769+
LOG("Loop {}: SQL_SUCCESS → no more data", loopCount);
1770+
break;
1771+
}
1772+
}
1773+
LOG("FetchLobColumnData: Total bytes collected = {}", buffer.size());
1774+
1775+
if (indicator == 0 || buffer.empty()) {
1776+
LOG("FetchLobColumnData: Returning empty string for col {}", colIndex);
1777+
return py::str("");
1778+
}
1779+
1780+
if (isWideChar) {
1781+
std::wstring wstr(reinterpret_cast<const wchar_t*>(buffer.data()),
1782+
buffer.size() / sizeof(wchar_t));
1783+
LOG("FetchLobColumnData: Returning wide string of length {}", wstr.length());
1784+
return py::cast(wstr);
1785+
}
1786+
if (isBinary) {
1787+
LOG("FetchLobColumnData: Returning binary of {} bytes", buffer.size());
1788+
return py::bytes(buffer.data(), buffer.size());
1789+
}
1790+
std::string str(buffer.data(), buffer.size());
1791+
LOG("FetchLobColumnData: Returning narrow string of length {}", str.length());
1792+
return py::str(str);
1793+
}
1794+
17141795
// Helper function to retrieve column data
1715-
// TODO: Handle variable length data correctly
17161796
SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, py::list& row) {
17171797
LOG("Get data from columns");
17181798
if (!SQLGetData_ptr) {
@@ -1735,60 +1815,53 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p
17351815
if (!SQL_SUCCEEDED(ret)) {
17361816
LOG("Error retrieving data for column - {}, SQLDescribeCol return code - {}", i, ret);
17371817
row.append(py::none());
1738-
// TODO: Do we want to continue in this case or return?
17391818
continue;
17401819
}
17411820

17421821
switch (dataType) {
17431822
case SQL_CHAR:
17441823
case SQL_VARCHAR:
17451824
case SQL_LONGVARCHAR: {
1746-
// TODO: revisit
1747-
HandleZeroColumnSizeAtFetch(columnSize);
1748-
uint64_t fetchBufferSize = columnSize + 1 /* null-termination */;
1749-
std::vector<SQLCHAR> dataBuffer(fetchBufferSize);
1750-
SQLLEN dataLen;
1751-
// TODO: Handle the return code better
1752-
ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, dataBuffer.data(), dataBuffer.size(),
1753-
&dataLen);
1754-
1755-
if (SQL_SUCCEEDED(ret)) {
1756-
// TODO: Refactor these if's across other switches to avoid code duplication
1757-
// columnSize is in chars, dataLen is in bytes
1758-
if (dataLen > 0) {
1759-
uint64_t numCharsInData = dataLen / sizeof(SQLCHAR);
1760-
// NOTE: dataBuffer.size() includes null-terminator, dataLen doesn't. Hence use '<'.
1761-
if (numCharsInData < dataBuffer.size()) {
1762-
// SQLGetData will null-terminate the data
1763-
#if defined(__APPLE__) || defined(__linux__)
1764-
std::string fullStr(reinterpret_cast<char*>(dataBuffer.data()));
1765-
row.append(fullStr);
1766-
LOG("macOS/Linux: Appended CHAR string of length {} to result row", fullStr.length());
1767-
#else
1768-
row.append(std::string(reinterpret_cast<char*>(dataBuffer.data())));
1769-
#endif
1770-
} else {
1771-
// In this case, buffer size is smaller, and data to be retrieved is longer
1772-
// TODO: Revisit
1773-
std::ostringstream oss;
1774-
oss << "Buffer length for fetch (" << dataBuffer.size()-1 << ") is smaller, & data "
1775-
<< "to be retrieved is longer (" << numCharsInData << "). ColumnID - "
1776-
<< i << ", datatype - " << dataType;
1777-
ThrowStdException(oss.str());
1825+
if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) {
1826+
LOG("Streaming LOB for column {}", i);
1827+
row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false));
1828+
} else {
1829+
uint64_t fetchBufferSize = columnSize + 1 /* null-termination */;
1830+
std::vector<SQLCHAR> dataBuffer(fetchBufferSize);
1831+
SQLLEN dataLen;
1832+
ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, dataBuffer.data(), dataBuffer.size(),
1833+
&dataLen);
1834+
if (SQL_SUCCEEDED(ret)) {
1835+
// columnSize is in chars, dataLen is in bytes
1836+
if (dataLen > 0) {
1837+
uint64_t numCharsInData = dataLen / sizeof(SQLCHAR);
1838+
if (numCharsInData < dataBuffer.size()) {
1839+
// SQLGetData will null-terminate the data
1840+
#if defined(__APPLE__) || defined(__linux__)
1841+
std::string fullStr(reinterpret_cast<char*>(dataBuffer.data()));
1842+
row.append(fullStr);
1843+
LOG("macOS/Linux: Appended CHAR string of length {} to result row", fullStr.length());
1844+
#else
1845+
row.append(std::string(reinterpret_cast<char*>(dataBuffer.data())));
1846+
#endif
1847+
}
1848+
} else if (dataLen == SQL_NULL_DATA) {
1849+
LOG("Column {} is NULL (CHAR)", i);
1850+
row.append(py::none());
1851+
} else if (dataLen == 0) {
1852+
row.append(py::str(""));
1853+
} else {
1854+
assert(dataLen == SQL_NO_TOTAL);
1855+
LOG("SQLGetData couldn't determine the length of the data. "
1856+
"Returning NULL value instead. Column ID - {}", i);
1857+
row.append(py::none());
17781858
}
1779-
} else if (dataLen == SQL_NULL_DATA) {
1780-
row.append(py::none());
17811859
} else {
1782-
assert(dataLen == SQL_NO_TOTAL);
1783-
LOG("SQLGetData couldn't determine the length of the data. "
1784-
"Returning NULL value instead. Column ID - {}", i);
1785-
row.append(py::none());
1860+
LOG("Error retrieving data for column - {}, data type - {}, SQLGetData return "
1861+
"code - {}. Returning NULL value instead",
1862+
i, dataType, ret);
1863+
row.append(py::none());
17861864
}
1787-
} else {
1788-
LOG("Error retrieving data for column - {}, data type - {}, SQLGetData return "
1789-
"code - {}. Returning NULL value instead",
1790-
i, dataType, ret);
1791-
row.append(py::none());
17921865
}
17931866
break;
17941867
}
@@ -1797,7 +1870,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p
17971870
case SQL_WLONGVARCHAR: {
17981871
// TODO: revisit
17991872
HandleZeroColumnSizeAtFetch(columnSize);
1800-
uint64_t fetchBufferSize = columnSize + 1 /* null-termination */;
1873+
uint64_t fetchBufferSize = columnSize + 1 /* null-termination */;
18011874
std::vector<SQLWCHAR> dataBuffer(fetchBufferSize);
18021875
SQLLEN dataLen;
18031876
ret = SQLGetData_ptr(hStmt, i, SQL_C_WCHAR, dataBuffer.data(),

tests/test_004_cursor.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5305,6 +5305,119 @@ def test_empty_string_chunk(cursor, db_connection):
53055305
cursor.execute("DROP TABLE IF EXISTS #pytest_empty_string")
53065306
db_connection.commit()
53075307

5308+
5309+
def test_varcharmax_short(cursor, db_connection):
5310+
try:
5311+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5312+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5313+
db_connection.commit()
5314+
5315+
short_str = "hello"
5316+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [short_str])
5317+
db_connection.commit()
5318+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [short_str])
5319+
assert cursor.fetchone()[0] == short_str
5320+
finally:
5321+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5322+
db_connection.commit()
5323+
5324+
5325+
def test_varcharmax_boundary(cursor, db_connection):
5326+
try:
5327+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5328+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5329+
db_connection.commit()
5330+
5331+
boundary_str = "X" * 8000
5332+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [boundary_str])
5333+
db_connection.commit()
5334+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [boundary_str])
5335+
assert cursor.fetchone()[0] == boundary_str
5336+
finally:
5337+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5338+
db_connection.commit()
5339+
5340+
5341+
def test_varcharmax_streaming(cursor, db_connection):
5342+
try:
5343+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5344+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5345+
db_connection.commit()
5346+
5347+
streaming_str = "Y" * 8100
5348+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [streaming_str])
5349+
db_connection.commit()
5350+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [streaming_str])
5351+
assert cursor.fetchone()[0] == streaming_str
5352+
finally:
5353+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5354+
db_connection.commit()
5355+
5356+
5357+
def test_varcharmax_large(cursor, db_connection):
5358+
try:
5359+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5360+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5361+
db_connection.commit()
5362+
5363+
large_str = "Z" * 100_000
5364+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [large_str])
5365+
db_connection.commit()
5366+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [large_str])
5367+
assert cursor.fetchone()[0] == large_str
5368+
finally:
5369+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5370+
db_connection.commit()
5371+
5372+
5373+
def test_varcharmax_empty_string(cursor, db_connection):
5374+
try:
5375+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5376+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5377+
db_connection.commit()
5378+
5379+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [""])
5380+
db_connection.commit()
5381+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [""])
5382+
assert cursor.fetchone()[0] == ""
5383+
finally:
5384+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5385+
db_connection.commit()
5386+
5387+
5388+
def test_varcharmax_null(cursor, db_connection):
5389+
try:
5390+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5391+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5392+
db_connection.commit()
5393+
5394+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [None])
5395+
db_connection.commit()
5396+
cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col IS NULL")
5397+
assert cursor.fetchone()[0] is None
5398+
finally:
5399+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5400+
db_connection.commit()
5401+
5402+
5403+
def test_varcharmax_transaction_rollback(cursor, db_connection):
5404+
try:
5405+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5406+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
5407+
db_connection.commit()
5408+
5409+
db_connection.autocommit = False
5410+
rollback_str = "ROLLBACK" * 2000
5411+
cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [rollback_str])
5412+
db_connection.rollback()
5413+
cursor.execute("SELECT COUNT(*) FROM #pytest_varcharmax WHERE col = ?", [rollback_str])
5414+
assert cursor.fetchone()[0] == 0
5415+
finally:
5416+
db_connection.autocommit = True # reset state
5417+
cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax")
5418+
db_connection.commit()
5419+
5420+
53085421
def test_close(db_connection):
53095422
"""Test closing the cursor"""
53105423
try:

0 commit comments

Comments
 (0)