Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/data_generation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
"\n",
"datagen = TPCHDataGenerator(\n",
" scale_factor=0.1,\n",
" target_folder_uri=r'C:\\lakebench_local_tests\\data\\source\\tpch\\sf1_parquet'\n",
" target_folder_uri=r'C:\\lakebench_local_tests\\data\\source\\tpch\\sf1_parquet',\n",
" compression='ZSTD(1)',\n",
" multithreading=True\n",
")\n",
"datagen.run()"
]
Expand Down
92 changes: 59 additions & 33 deletions src/lakebench/datagen/_tpc_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ class _TPCRsDataGenerator:
subclasses instead.
"""
GEN_UTIL = ''
GEN_TYPE = ''
GEN_SF1000_FILE_COUNT_MAP = {
'table1': 10,
'table2': 40
}
GEN_TYPE = 'tpch'
GEN_TABLE_REGISTRY = [
'table1', 'table2'
'customer', 'lineitem', 'nation', 'orders', 'part',
'partsupp', 'region', 'supplier'
]
TARGET_FILE_SIZE_MAP = [
(10, 128), # up to 10GB -> 128MB files
Expand All @@ -30,14 +27,20 @@ class _TPCRsDataGenerator:
(10240, 1024) # up to 10TB and larger -> 1GB files
]
SF1000_SIZE_GB_DICT = {
'table1': 100,
'table2': 12
'lineitem': 152,
'orders': 38,
'partsupp': 26.7,
'part': 4,
'customer': 7.6,
'supplier': 0.48,
'region': 0.00,
'nation': 0.00
}

# Class-level lock for thread-safe printing
_print_lock = threading.Lock()

def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_size_mb: int = 128, compression: str = "ZSTD(1)") -> None:
def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_size_mb: int = 128, compression: str = "ZSTD(1)", table_list: list = None, multithreading: bool = True) -> None:
"""
Initialize the TPC data generator with a scale factor.

Expand Down Expand Up @@ -69,6 +72,8 @@ def __init__(self, scale_factor: int, target_folder_uri: str, target_row_group_s
self.target_folder_uri = to_unix_path(target_folder_uri)
self.target_row_group_size_mb = int(target_row_group_size_mb * 2.6) # 2.6 for uncompressed-> ZSTD(1) compression ratio
self.compression = compression
self.table_list = table_list
self.multithreading = multithreading

def get_tpcgen_path():
import shutil
Expand Down Expand Up @@ -97,38 +102,59 @@ def run(self) -> None:
"""

# cleanup target directory
if self.fs.exists(self.target_folder_uri):
self.fs.rm(self.target_folder_uri, recursive=True)
self.fs.mkdirs(self.target_folder_uri, exist_ok=True)
def clean_dir(path: str) -> None:
if self.fs.exists(path):
self.fs.rm(path, recursive=True)
self.fs.mkdirs(path, exist_ok=True)

if self.table_list is None:
clean_dir(self.target_folder_uri)
else:
for table_name in self.table_list:
table_path = posixpath.join(self.target_folder_uri, table_name)
clean_dir(table_path)

tables = self.GEN_TABLE_REGISTRY
if self.table_list is None:
tables = self.GEN_TABLE_REGISTRY
else:
tables = [table for table in self.GEN_TABLE_REGISTRY if table in self.table_list]

print(f"🚀 Starting parallel generation of {len(tables)} tables with multithreading...")
print(f"📊 Scale Factor: {self.scale_factor}")
print(f"📁 Output Directory: {self.target_folder_uri}")

with ThreadPoolExecutor() as executor:
future_to_table = {
executor.submit(self._generate_table, table_name): table_name
for table_name in tables
}

completed_tables = []
failed_tables = []

for future in as_completed(future_to_table):
table_name = future_to_table[future]
try:
result = future.result()
if result:
completed_tables.append(table_name)
print(f"✅ {table_name} - Generation completed successfully")
else:
completed_tables = []
failed_tables = []

if self.multithreading:
with ThreadPoolExecutor() as executor:
future_to_table = {
executor.submit(self._generate_table, table_name): table_name
for table_name in tables
}

for future in as_completed(future_to_table):
table_name = future_to_table[future]
try:
result = future.result()
if result:
completed_tables.append(table_name)
print(f"✅ {table_name} - Generation completed successfully")
else:
failed_tables.append(table_name)
print(f"❌ {table_name} - Generation failed")
except Exception as exc:
failed_tables.append(table_name)
print(f"❌ {table_name} - Generation failed")
except Exception as exc:
print(f"❌ {table_name} - Generation failed with exception: {exc}")
else:
for table_name in tables:
result = self._generate_table(table_name)
if result:
completed_tables.append(table_name)
print(f"✅ {table_name} - Generation completed successfully")
else:
failed_tables.append(table_name)
print(f"❌ {table_name} - Generation failed with exception: {exc}")
print(f"❌ {table_name} - Generation failed")

print(f"\n📋 Generation Summary:")
print(f" ✅ Successfully generated: {len(completed_tables)} tables")
Expand Down