Making query engines understandable for Python developers
Starlink is an educational query engine implementation in Python, designed to demystify how modern query engines work. Built with clarity and learning in mind, it demonstrates the complete journey from SQL queries to data retrieval.
Starlink aims to make query engines easily understandable for Pythonistas. It focuses on the process - showing you exactly how a query flows through the system:
SQL → Logical Plan → Optimized Plan → Physical Plan → Data Sources
Special thanks to Andy Grove and How Query Engines Work for inspiring this project. The book provides a comprehensive introduction to query engines, and Starlink aims to make these concepts accessible to Python developers through a clean, educational implementation.
- Supported hash join, inner join at this moment.
- Supported SQL alias expr.
- Supported filter pushdown.
# Clone the repository
git clone https://github.com/harrydevforlife/starlink.git
cd starlink
# Install dependencies
uv sync
# Set up Python path
export PYTHONPATH=$PWD/src:$PYTHONPATHfrom pathlib import Path
from starlink.execution.context import ExecutionContext
# Create execution context
ctx = ExecutionContext({})
# Register a CSV file
ctx.register_csv("tripdata", Path("data/yellow_tripdata_2019-01.csv"))
# Execute SQL query
df = ctx.sql("""
SELECT
passenger_count,
MAX(fare_amount) AS max_fare
FROM tripdata
GROUP BY passenger_count
""")
# View the logical plan
print("Original Plan:")
print(df.logicalPlan().pretty())
# View the optimized plan
print("\nOptimized Plan:")
print(df.optimizedPlan().pretty())
# Execute and get results
result = ctx.execute(df)
result.show()The output will be like this:
Original Plan:
Projection: #0, #1 as max_fare
Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)]
Selection: CAST(#fare_amount AS double) > 80
Projection: #passenger_count, #fare_amount
Scan: data/yellow_tripdata_2019-01.csv; projection=None
Optimized Plan:
Projection: #0, #1 as max_fare
Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)]
Projection: #passenger_count, #fare_amount
Scan: data/yellow_tripdata_2019-01.csv; projection=[passenger_count, fare_amount], filter=CAST(#fare_amount AS double) > 80
| passenger_count | max_fare |
|-------------------|------------|
| 1 | 99.99 |
| 2 | 99.75 |
| 4 | 99 |
| 5 | 99.5 |
| 6 | 98 |
| 3 | 99.75 |
| 0 | 99 |
| 8 | 87 |
| 9 | 92 |
(9 row(s) shown)
For join operations, you can use the join method:
SELECT
c.name,
c.city,
o.order_id,
o.total
FROM customers AS c
JOIN orders AS o
ON c.customer_id = o.order_customer_id
WHERE CAST(o.total AS DOUBLE) > 30
AND c.city = 'New York'from starlink.execution.context import ExecutionContext
from starlink.logicalplan.expressions import col, Max, Count
ctx = ExecutionContext({})
df = (
ctx.csv("data/yellow_tripdata_2019-01.csv")
.aggregate(
[col("passenger_count")],
[Max(col("fare_amount")), Count(col("fare_amount"))]
)
)
result = ctx.execute(df)
result.show()Starlink demonstrates the complete query execution pipeline. Here's how a query flows through the system:
Input: SQL string
Output: Abstract Syntax Tree (AST)
# SQL: "SELECT passenger_count, MAX(fare_amount) FROM tripdata GROUP BY passenger_count"
# ↓
# SqlSelect AST with projection, table, and groupBy information
Key Components:
sql_tokenizer.py- Tokenizes SQL into tokenssql_parser.py- Parses tokens into SQL AST using Pratt parsersql_expr.py- Defines SQL expression types
Input: SQL AST
Output: Logical Plan (what to do, not how)
# Logical Plan represents the "what" - high-level operations
Projection: #0, #1
Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)]
Scan: tripdata; projection=None
Key Components:
scan.py- Represents reading from a data sourceprojection.py- Represents column selection/transformationaggregate.py- Represents grouping and aggregationselect.py- Represents filtering (WHERE clause)expressions.py- Logical expressions (Column, Max, Count, etc.)
Input: Logical Plan
Output: Optimized Logical Plan
# Optimizer applies rules to improve the plan
# Example: Projection Pushdown
Projection: #0, #1
Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)]
Scan: tripdata; projection=[passenger_count, fare_amount] # ← Pushed down!
Optimization Rules:
- Projection Pushdown - Only read columns that are actually needed
- Predicate Pushdown - Only read rows that are actually needed
- More rules can be added following the
OptimizerRuleinterface
Input: Optimized Logical Plan
Output: Physical Plan (how to execute)
# Physical Plan represents the "how" - concrete execution steps
HashAggregateExec: groupExpr=[ColumnExpression(0)], aggregateExpr=[MaxExpression(ColumnExpression(1))]
ScanExec: projection=[passenger_count, fare_amount]
Key Components:
queryplanner.py- Converts logical plans to physical plans- Physical operators in
physicalplan/:ScanExec- Scans data sourcesProjectionExec- Applies projectionsSelectionExec- Applies filtersHashAggregateExec- Performs hash-based aggregation
Input: Physical Plan
Output: RecordBatch results
# Physical operators execute the plan
# Each operator produces RecordBatch objects (columnar data)
results = physical_plan.execute_batches()
# Returns: Sequence[RecordBatch]
results = ctx.execute(df)
# Returns: QueryResultExecution Flow:
ExecutionContext.execute()- Entry point- Optimizes the logical plan
- Creates physical plan
- Executes physical plan
- Returns results as
RecordBatchobjects
Input: File paths or data
Output: RecordBatch streams
Supported Formats:
- CSV (
csv.py) - With automatic delimiter detection, header support - Parquet (
parquet.py) - Efficient columnar format - Memory (
memory.py) - In-memory data sources. Just for testing.
Logical Plan (What):
- High-level, declarative representation
- Independent of execution strategy
- Example: "Aggregate by passenger_count"
Physical Plan (How):
- Concrete execution strategy
- Specific algorithms chosen
- Example: "HashAggregateExec using hash table"
Starlink uses Apache Arrow's columnar format:
- RecordBatch - A batch of rows stored column-wise
- ColumnVector - Interface for column data
- ArrowFieldVector - PyArrow-based implementation
This format enables:
- Efficient vectorized operations
- Better cache locality
- Reduced memory overhead
A key optimization that pushes column selection down to the data source:
# Without optimization: Read all columns, then filter
Scan: tripdata; projection=None # Reads all 18 columns
# With optimization: Only read needed columns
Scan: tripdata; projection=[passenger_count, fare_amount] # Reads only 2 columns
This dramatically reduces I/O and memory usage for wide tables.
A key optimization that pushes filter selection down to the data source:
# Without optimization: Read all columns, then filter
Scan: tripdata; filter=None # Reads all 18 columns
# With optimization: Only read needed columns
Scan: tripdata; filter=CAST(#total AS double) > 30 # Reads only 2 columns
starlink/
├── datatypes/ # Core data types (Schema, RecordBatch, ColumnVector)
├── datasources/ # Data source implementations (CSV, Parquet)
├── logicalplan/ # Logical plan nodes and expressions
├── optimizer/ # Query optimization rules
├── physicalplan/ # Physical execution operators
├── queryplanner/ # Logical → Physical plan conversion
├── sql/ # SQL parsing and planning
└── execution/ # Execution context and coordination
Check out the examples/ directory for more examples:
query_sql.py- SQL query examplesquery_csv.py- DataFrame API with CSVquery_parquet.py- Parquet file queriesquery_count.py- COUNT aggregation examplesquery_sql_join.py- SQL join examplesqueries.py- SQL query examples
from starlink.optimizer.optimizer import OptimizerRule
from starlink.logicalplan.logical import LogicalPlan
class MyOptimizationRule(OptimizerRule):
def optimize(self, plan: LogicalPlan) -> LogicalPlan:
# Your optimization logic here
return optimized_planfrom starlink.datasources.datasource import DataSource
from starlink.datatypes.record_batch import RecordBatch
class MyDataSource(DataSource):
def schema(self) -> Schema:
# Return schema
pass
def scan(self, projection: List[str], filter: Optional[Expression] = None) -> Sequence[RecordBatch]:
# Return RecordBatch stream
pass- Add logical expression in
logicalplan/expressions.py - Add physical expression in
physicalplan/expressions/ - Add accumulator in the aggregate expression
- Update
queryplanner.pyto handle the new function
# Run all tests
PYTHONPATH=src python -m pytest tests/ -v
# Run specific test suite
PYTHONPATH=src python -m pytest tests/starlink/optimizer/ -vStarlink is designed for educational purposes and focuses on clarity over performance. However, it demonstrates:
- Columnar data processing with Apache Arrow
- Projection pushdown optimization
- Efficient batch processing
- Memory-efficient streaming
Benchmarking results for more details.
For production use, consider:
- Apache Arrow DataFusion (Rust)
- Polars (Rust, Python bindings)
- DuckDB (C++)
Contributions are welcome! Areas where help is needed:
- Additional optimization rules
- More data source formats
- Additional SQL features
- Performance improvements
- Documentation improvements
- Test coverage
- Andy Grove and How Query Engines Work - For the excellent educational resource that inspired this project
- Apache Arrow - For the columnar data format and PyArrow library
- The Python data engineering community - For continuous inspiration
- How Query Engines Work - The book that inspired this project
- Apache Arrow Documentation
- PyArrow Documentation
- NYC Dataset
- Additional SQL features (JOINs, subqueries, etc.). Inner join is supported.
- More optimization rules (predicate pushdown, etc.). Already implemented projection pushdown, predicate pushdown.
- Additional data sources (JSON, database connectors)
- Query execution statistics
- Better error messages
- Performance profiling tools
Happy Querying and Exploring!
Starlink - Making query engines understandable, one Python line at a time.