FastMSSQL is an async Python library for Microsoft SQL Server (MSSQL), built in Rust. Unlike pyodbc or pymssql, it uses a native SQL Server client—no ODBC required—simplifying installation on Windows, macOS, and Linux. Great for data ingestion, bulk inserts, and large-scale query workloads.
- High performance: optimized for very high RPS and low overhead
- Rust core: memory‑safe and reliable, tuned Tokio runtime
- No ODBC: native SQL Server client, no external drivers needed
- Connection pooling: bb8‑based, smart defaults (default max_size=10, min_idle=2)
- Async first: clean async/await API with
async withcontext managers - Strong typing: fast conversions for common SQL Server types
- Thread‑safe: safe to use in concurrent apps
- Cross‑platform: Windows, macOS, Linux
- Batch operations: high-performance bulk inserts and batch query execution
Core methods for individual operations:
query()— SELECT statements that return rowsexecute()— INSERT/UPDATE/DELETE/DDL that return affected row count
# Use query() for SELECT statements
result = await conn.query("SELECT * FROM users WHERE age > @P1", [25])
rows = result.rows()
# Use execute() for data modification
affected = await conn.execute("INSERT INTO users (name) VALUES (@P1)", ["John"])pip install fastmssql- Python 3.9 to 3.14
- Microsoft SQL Server (any recent version)
import asyncio
from fastmssql import Connection
async def main():
conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass"
async with Connection(conn_str) as conn:
# SELECT: use query() -> rows()
result = await conn.query("SELECT @@VERSION as version")
for row in result.rows():
print(row['version'])
# Pool statistics (tuple: connected, connections, idle, max_size, min_idle)
connected, connections, idle, max_size, min_idle = await conn.pool_stats()
print(f"Pool: connected={connected}, size={connections}/{max_size}, idle={idle}, min_idle={min_idle}")
asyncio.run(main())When not utilizing Python's context manager (async with), FastMssql uses lazy connection initialization:
if you call query() or execute() on a new Connection, the underlying pool is created if not already present.
For more control, you can explicitly connect and disconnect:
import asyncio
from fastmssql import Connection
async def main():
conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass"
conn = Connection(conn_str)
# Explicitly connect
await conn.connect()
assert await conn.is_connected()
# Run queries
result = await conn.query("SELECT 42 as answer")
print(result.rows()[0]["answer"]) # -> 42
# Explicitly disconnect
await conn.disconnect()
assert not await conn.is_connected()
asyncio.run(main())You can connect either with a connection string or individual parameters.
- Connection string
import asyncio
from fastmssql import Connection
async def main():
conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass"
async with Connection(connection_string=conn_str) as conn:
rows = (await conn.query("SELECT DB_NAME() as db")).rows()
print(rows[0]['db'])
asyncio.run(main())- Individual parameters
import asyncio
from fastmssql import Connection
async def main():
async with Connection(
server="localhost",
database="master",
username="myuser",
password="mypassword"
) as conn:
rows = (await conn.query("SELECT SUSER_SID() as sid")).rows()
print(rows[0]['sid'])
asyncio.run(main())Note: Windows authentication (Trusted Connection) is currently not supported. Use SQL authentication (username/password).
import asyncio
from fastmssql import Connection
async def main():
async with Connection("Server=.;Database=MyDB;User Id=sa;Password=StrongPwd;") as conn:
# SELECT (returns rows)
users = (await conn.query(
"SELECT id, name, email FROM users WHERE active = 1"
)).rows()
for u in users:
print(f"User {u['id']}: {u['name']} ({u['email']})")
# INSERT / UPDATE / DELETE (returns affected row count)
inserted = await conn.execute(
"INSERT INTO users (name, email) VALUES (@P1, @P2)",
["Jane", "jane@example.com"],
)
print(f"Inserted {inserted} row(s)")
updated = await conn.execute(
"UPDATE users SET last_login = GETDATE() WHERE id = @P1",
[123],
)
print(f"Updated {updated} row(s)")
asyncio.run(main())Parameters use positional placeholders: @P1, @P2, ... Provide values as a list in the same order.
For high-throughput scenarios, use batch methods to reduce network round-trips:
import asyncio
from fastmssql import Connection
async def main():
async with Connection("Server=.;Database=MyDB;User Id=sa;Password=StrongPwd;") as conn:
# Bulk insert for fast data loading
columns = ["name", "email", "age"]
data_rows = [
["Alice Johnson", "alice@example.com", 28],
["Bob Smith", "bob@example.com", 32],
["Carol Davis", "carol@example.com", 25]
]
rows_inserted = await conn.bulk_insert("users", columns, data_rows)
print(f"Bulk inserted {rows_inserted} rows")
# Batch queries for multiple SELECT operations
queries = [
("SELECT COUNT(*) as total FROM users WHERE age > @P1", [25]),
("SELECT AVG(age) as avg_age FROM users", None),
("SELECT name FROM users WHERE email LIKE @P1", ["%@example.com"])
]
results = await conn.query_batch(queries)
print(f"Total users over 25: {results[0].rows()[0]['total']}")
print(f"Average age: {results[1].rows()[0]['avg_age']:.1f}")
print(f"Example.com users: {len(results[2].rows())}")
# Batch commands for multiple operations
commands = [
("UPDATE users SET last_login = GETDATE() WHERE name = @P1", ["Alice Johnson"]),
("INSERT INTO user_logs (action, user_name) VALUES (@P1, @P2)", ["login", "Alice Johnson"])
]
affected_counts = await conn.execute_batch(commands)
print(f"Updated {affected_counts[0]} users, inserted {affected_counts[1]} logs")
asyncio.run(main())Tune the pool to fit your workload. Constructor signature:
from fastmssql import PoolConfig
# PoolConfig(max_size=10, min_idle=2, max_lifetime_secs=None, idle_timeout_secs=None, connection_timeout_secs=30)
config = PoolConfig(
max_size=20, # max connections in pool
min_idle=5, # keep at least this many idle
max_lifetime_secs=3600, # recycle connections after 1h
idle_timeout_secs=600, # close idle connections after 10m
connection_timeout_secs=30
)Presets:
high = PoolConfig.high_throughput() # ~ max_size=50, min_idle=15
low = PoolConfig.low_resource() # ~ max_size=3, min_idle=1
dev = PoolConfig.development() # ~ max_size=5, min_idle=1
maxp = PoolConfig.maximum_performance() # ~ max_size=100, min_idle=30
ultra = PoolConfig.ultra_high_concurrency() # ~ max_size=200, min_idle=50Apply to a connection:
async with Connection(conn_str, pool_config=high) as conn:
rows = (await conn.query("SELECT 1 AS ok")).rows()Default pool (if omitted): max_size=10, min_idle=2.
from fastmssql import SslConfig, EncryptionLevel, Connection
ssl = SslConfig(
encryption_level=EncryptionLevel.REQUIRED, # or "Required"
trust_server_certificate=False,
)
async with Connection(conn_str, ssl_config=ssl) as conn:
...Helpers:
SslConfig.development()– encrypt, trust all (dev only)SslConfig.with_ca_certificate(path)– use custom CASslConfig.login_only()/SslConfig.disabled()– legacy modes
For maximum throughput in highly concurrent scenarios, use multiple Connection instances (each with its own pool) and batch your work:
import asyncio
from fastmssql import Connection, PoolConfig
async def worker(conn_str, cfg):
async with Connection(conn_str, pool_config=cfg) as conn:
for _ in range(1000):
_ = (await conn.query("SELECT 1 as v")).rows()
async def main():
conn_str = "Server=.;Database=master;User Id=sa;Password=StrongPwd;"
cfg = PoolConfig.high_throughput()
await asyncio.gather(*[asyncio.create_task(worker(conn_str, cfg)) for _ in range(32)])
asyncio.run(main())- Examples:
examples/comprehensive_example.py - Benchmarks:
benchmarks/(MIT licensed)
- Import/build: ensure Rust toolchain and
maturinare installed if building from source - Connection: verify connection string; Windows auth not supported
- Timeouts: increase pool size or tune
connection_timeout_secs - Parameters: use
@P1, @P2, ...and pass a list of values
Contributions are welcome. Please open an issue or PR.
FastMSSQL is dual‑licensed:
- GPL‑3.0 (for open source projects)
- Commercial (for proprietary use). Contact: riverb514@gmail.com
See the LICENSE file for details.
examples/andbenchmarks/are under the MIT License. See files inlicenses/.
Built on excellent open source projects: Tiberius, PyO3, pyo3‑asyncio, bb8, tokio, serde, pytest, maturin, and more. See licenses/NOTICE.txt for the full list. The full texts of Apache‑2.0 and MIT are in licenses/.
Thanks to the maintainers of Tiberius, PyO3, pyo3‑asyncio, Tokio, pytest, maturin, and the broader open source community.