Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,19 @@ pip install lakebench[duckdb,polars,daft,tpcds_datagen,tpch_datagen,sparkmeasure
To run any LakeBench benchmark, first do a one time generation of the data required for the benchmark and scale of interest. LakeBench provides datagen classes to quickly generate parquet datasets required by the benchmarks.

### Data Generation
- **TPC-H** data generation is provided via the (tpchgen-rs)[https://github.com/clflushopt/tpchgen-rs] project. The project is currently about 10x faster than the next closest method of generating TPC-H datasets. _The TPC-DS version of project is currently under development._
- **TPC-H** data generation is provided via the (tpchgen-rs)[https://github.com/clflushopt/tpchgen-rs] project. The project is currently about 10x+ faster than the next closest method of generating TPC-H datasets. _The TPC-DS version of project is currently under development._

_The below are generation runtimes on a 64 v-core VM writing to OneLake. Scale factors below 1000 can easily be generated on a 2 v-core machine._
| Scale Factor | Duration (hh:mm:ss)|
|:------------:|:------------------:|
| 1 | 00:00:04 |
| 10 | 00:00:09 |
| 100 | 00:01:09 |
| 1000 | 00:10:15 |

- **TPC-DS** data generation is provided via the DuckDB [TPC-DS](https://duckdb.org/docs/stable/core_extensions/tpcds) extension. The LakeBench wrapper around DuckDB adds support for writing out parquet files with a provided row-group target file size as normally the files generated by DuckDB are atypically small (i.e. 10MB) and are most suitable for ultra-small scale scenarios. LakeBench defaults to target 128MB row groups but can be configured via the `target_row_group_size_mb` parameter of both TPC-H and TPC-DS DataGenerator classes.
- **ClickBench** data is downloaded directly from the Clickhouse host site.

_Generating TPC-H scale factor 1 data takes about 14 seconds on a 2vCore VM._

#### TPC-H Data Generation
```python
from lakebench.datagen import TPCHDataGenerator
Expand All @@ -182,7 +189,7 @@ datagen.run()

_Notes:_
- TPC-DS data up to SF1000 can be generated on a 32-vCore machine.
- TPC-H datasets are generated extremely fast (i.e. SF1000 in 20 minutes on an 8-vCore machine).
- TPC-H datasets are generated extremely fast (i.e. SF1000 in 10 minutes on an 64-vCore machine).
- The ClickBench dataset (only 1 size) should download with partitioned files in ~ 1 minute and ~ 6 minutes as a single file.

#### Is BYO Data Supported?
Expand Down
145 changes: 132 additions & 13 deletions src/lakebench/datagen/_tpc_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import fsspec
from fsspec import AbstractFileSystem
import subprocess
import threading
import math
from concurrent.futures import ThreadPoolExecutor, as_completed
from lakebench.utils.path_utils import to_unix_path
from urllib.parse import urlparse

Expand All @@ -13,8 +16,28 @@ class _TPCRsDataGenerator:
"""
GEN_UTIL = ''
GEN_TYPE = ''
GEN_SF1000_FILE_COUNT_MAP = {
'table1': 10,
'table2': 40
}
GEN_TABLE_REGISTRY = [
'table1', 'table2'
]
TARGET_FILE_SIZE_MAP = [
(10, 128), # up to 10GB -> 128MB files
(1024, 256), # up to 1TB -> 256MB files
(5120, 512), # up to 5TB -> 512MB files
(10240, 1024) # up to 10TB and larger -> 1GB files
]
SF1000_SIZE_GB_DICT = {
'table1': 100,
'table2': 12
}

# Class-level lock for thread-safe printing
_print_lock = threading.Lock()

def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_size_mb: int = 128) -> None:
def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_size_mb: int = 128, compression: str = "ZSTD(1)") -> None:
"""
Initialize the TPC data generator with a scale factor.

Expand All @@ -26,7 +49,9 @@ def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_s
Test data will be written to this location where tables are represented as folders containing parquet files.
target_row_group_size_mb: int, default=128
Desired row group size for the generated parquet files.

compression: str, default="ZSTD(1)"
Compression codec to use for the generated parquet files.
Supports codecs: "UNCOMPRESSED", "SNAPPY", "GZIP(compression_level)", "BROTLI(compression_level)", "LZ4", "LZ4_RAW", "LZO", "ZSTD(compression_level)"
"""
self.scale_factor = scale_factor
uri_scheme = urlparse(target_folder_uri).scheme
Expand All @@ -35,11 +60,15 @@ def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_s
cloud_schemes = {'s3', 'gs', 'gcs', 'abfs', 'abfss', 'adl', 'wasb', 'wasbs'}

if uri_scheme in cloud_schemes:
raise ValueError(f"{uri_scheme} protocol is not currently supported for TPC-RS data generation. Please use a local file path.")
raise ValueError(f"{uri_scheme} protocol is not currently supported for TPC-RS data generation. Please use a local file system path or mount the storage location.")

if compression.split('(')[0] not in ["UNCOMPRESSED", "SNAPPY", "GZIP", "BROTLI", "LZ4", "LZ4_RAW", "LZO", "ZSTD"]:
raise ValueError(f"Unsupported compression codec: {compression}")

self.fs: AbstractFileSystem = fsspec.filesystem("file")
self.target_folder_uri = to_unix_path(target_folder_uri)
self.target_row_group_size_mb = target_row_group_size_mb
self.target_row_group_size_mb = int(target_row_group_size_mb * 2.6) # 2.6 for uncompressed-> ZSTD(1) compression ratio
self.compression = compression

def get_tpcgen_path():
import shutil
Expand All @@ -59,31 +88,121 @@ def get_tpcgen_path():

self.tpcgen_exe = get_tpcgen_path()


def run(self) -> None:
"""
This method uses a rust based TPC data generation utility to generate Parquet files
based on the specified scale factor. The generated tables are written to the target folder.
This method uses multithreading to generate individual tables in parallel using
a rust-based TPC data generation utility. Each table is generated with an optimal
number of parts (based on the GEN_SF1000_FILE_COUNT_MAP) to target having files around 1GB.
"""

# cleanup target directory
if self.fs.exists(self.target_folder_uri):
self.fs.rm(self.target_folder_uri, recursive=True)
self.fs.mkdirs(self.target_folder_uri, exist_ok=True)

tables = self.GEN_TABLE_REGISTRY

print(f"🚀 Starting parallel generation of {len(tables)} tables with multithreading...")
print(f"📊 Scale Factor: {self.scale_factor}")
print(f"📁 Output Directory: {self.target_folder_uri}")

with ThreadPoolExecutor() as executor:
future_to_table = {
executor.submit(self._generate_table, table_name): table_name
for table_name in tables
}

completed_tables = []
failed_tables = []

for future in as_completed(future_to_table):
table_name = future_to_table[future]
try:
result = future.result()
if result:
completed_tables.append(table_name)
print(f"✅ {table_name} - Generation completed successfully")
else:
failed_tables.append(table_name)
print(f"❌ {table_name} - Generation failed")
except Exception as exc:
failed_tables.append(table_name)
print(f"❌ {table_name} - Generation failed with exception: {exc}")

print(f"\n📋 Generation Summary:")
print(f" ✅ Successfully generated: {len(completed_tables)} tables")
if completed_tables:
print(f" Tables: {', '.join(completed_tables)}")

if failed_tables:
print(f" ❌ Failed to generate: {len(failed_tables)} tables")
print(f" Tables: {', '.join(failed_tables)}")
raise RuntimeError(f"Failed to generate {len(failed_tables)} tables: {', '.join(failed_tables)}")
else:
print(f"🎉 All {len(tables)} tables generated successfully!")

def _generate_table(self, table_name: str) -> bool:
"""
Generate a single table using the optimal number of parts.

Parameters
----------
table_name: str
Name of the table to generate

Returns
-------
bool
True if generation was successful, False otherwise
"""
def find_target_size(size: float) -> int:
for threshold_gb, target_mb in self.TARGET_FILE_SIZE_MAP:
if size < threshold_gb:
return target_mb
return 1024

# Scale the parts based on the scale factor and size of the table
sf1000_size_gb = self.SF1000_SIZE_GB_DICT.get(table_name, 0)
scale_adj_size_gb = sf1000_size_gb * (self.scale_factor / 1000.0)
target_size_mb = find_target_size(scale_adj_size_gb)
optimal_parts = max(round(scale_adj_size_gb * 1024 / target_size_mb), 1)

print(f"🔧 {table_name} - Using {optimal_parts} parts (target file size: {target_size_mb}mb)")

# ensure that 128mb target files have a single row group
adj_row_group_target_mb = 1024 if target_size_mb == 128 else self.target_row_group_size_mb
# Build command for individual table generation
cmd = [
self.tpcgen_exe,
"--scale-factor", str(self.scale_factor),
"--output-dir", self.target_folder_uri,
"--parts", "1",
"--parts", str(optimal_parts),
"--format", "parquet",
"--parquet-row-group-bytes", str(self.target_row_group_size_mb * 1024 * 1024),
"--parquet-compression", "SNAPPY"
"--parquet-row-group-bytes", str(adj_row_group_target_mb * 1024 * 1024),
"--parquet-compression", self.compression,
"--tables", table_name
]

try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
if result.stdout:
print(result.stdout)
with self._print_lock:
print(f"📝 {table_name} output:")
for line in result.stdout.strip().split('\n'):
if line.strip():
print(f" {line}")
return True

except subprocess.CalledProcessError as e:
print(f"stdout: {e.stdout}")
print(f"stderr: {e.stderr}")
with self._print_lock:
print(f"❌ {table_name} failed:")
if e.stdout:
print(f" stdout: {e.stdout}")
if e.stderr:
print(f" stderr: {e.stderr}")
return False
except Exception as e:
with self._print_lock:
print(f"❌ {table_name} failed with exception: {e}")
return False
31 changes: 28 additions & 3 deletions src/lakebench/datagen/tpch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from ._tpc_rs import _TPCRsDataGenerator
class TPCHDataGenerator(_TPCRsDataGenerator):
"""
This class is a wrapper of the rust-based TPC-H data generator, `tpchgen-rs`. It generates TPC-H data in Parquet format
based on the specified scale factor and target row group size in MB.
This class is a multithreading wrapper of the rust-based TPC-H data generator, `tpchgen-rs`. It generates TPC-H data in Parquet format
based on the specified scale factor, target row group size in MB, and compression codec. Each table is partitioned into multiple parts to
target generating 1GB sized files.

Attributes
----------
Expand All @@ -12,11 +13,35 @@ class TPCHDataGenerator(_TPCRsDataGenerator):
The folder path where the generated Parquet data will be stored. A folder for each table will be created.
target_row_group_size_mb : int
The target size of row groups in megabytes for the generated Parquet files.
compression: str, default="ZSTD"
Compression codec to use for the generated parquet files.
Supports codecs: "UNCOMPRESSED", "SNAPPY", "GZIP(compression_level)", "BROTLI(compression_level)", "LZ4", "LZ4_RAW", "LZO", "ZSTD(compression_level)"

Methods
-------
run()
Generates TPC-H data in Parquet format based on the input scale factor and writes it to the target folder.
"""
GEN_UTIL = 'dbgen'
GEN_TYPE = 'tpch'
GEN_TYPE = 'tpch'
GEN_SF1000_FILE_COUNT_MAP = {
'lineitem': 150,
'orders': 40,
'partsupp': 26,
'part': 4,
'customer': 8
}
GEN_TABLE_REGISTRY = [
'customer', 'lineitem', 'nation', 'orders', 'part',
'partsupp', 'region', 'supplier'
]
SF1000_SIZE_GB_DICT = {
'lineitem': 152,
'orders': 38,
'partsupp': 26.7,
'part': 4,
'customer': 7.6,
'supplier': 0.48,
'region': 0.00,
'nation': 0.00
}