Skip to content

FEAT: Bulk Copy #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions EmployeeFullNames.bcp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Abdul "Kalam" 00000000000084D1
Narendra Modi 00000000000084D2
Rahul Gandhi 00000000000084D3
Rabindranath Tagore 00000000000084D4
Donald Trump 00000000000084D5
Satya Nadella 00000000000084D6
Sam Altman 2025-01-28 05:15:30.0000000 00000000000084D7 Wide str 1 210 123456789 4.2743594E+27
Jeff Bezos 2025-01-28 05:15:30.0000000 00000000000084D8 Wide str2 1 127 123456789 5.8486031E+35
Jeff Bezos 2025-01-28 05:15:30.0000000 00000000000084D9 Wide str2 1 127 123456789 5.8486031E+35
direct direct 00000000000084DC 12.34
Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DD Wide str3 1 127 123456789 12.34 12.34 34.12
Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DE Wide str3 1 127 123456789 12.34 12.34 12345
Float Cast 2025-01-28 05:15:30.0000000 00000000000084DF Wide str3 1 127 123456789 12.34 12.34 0.567
Float Cast 2025-01-28 05:15:30.0000000 00000000000084E0 Wide str3 1 127 123456789 12.34 12.34
Float Cast 2025-01-28 05:15:30.0000000 00000000000084E1 Wide str3 1 127 123456789 12.34 12.34
Test fix 2025-01-28 05:15:30.0000000 00000000000084E2 Wide str3 1 127 123456789 12.34 12.34
Harry Potter2 2025-01-28 05:15:30.0000000 00000000000084E3 Wide str3 1 127 123456789 12.34 12.34
floattest scientificNotation 2025-01-28 05:15:30.0000000 00000000000084E4 Wide str3 1 127 123456789 1.7899999E+10 12.34
floattest scientificNotation 2025-01-28 05:15:30.0000000 00000000000084E5 Wide str3 1 127 123456789 12.34 12.34
is_stmt_prepared test 2025-01-28 05:15:30.0000000 000000000000C355 Wide str3 1 127 123456789 12.34 12.34
test approx Float 2025-01-28 05:15:30.0000000 000000000000C356 Wide str3 1 127 123456789 12.34 12.34
test22 approx Float 2025-01-28 05:15:30.0000000 000000000000C357 Wide str3 1 127 123456789 12.34 12.34
test22 approx Float 2025-01-28 05:15:30.0000000 000000000000C358 Wide str3 1 127 123456789 12.345 12.34
FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C359 Wide str3 1 127 123456789 12.345 12.34
FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35A Wide str3 1 127 123456789 12.345 12.34
FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35B Wide str3 1 127 123456789 12.345 12.34
FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35C Wide str3 1 127 123456789 12.345 12.34
FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35D Wide str3 0 127 123456789 12.345 12.34
FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35E Wide str3 0 127 123456789 12.345 12.34
FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35F Wide str3 0 127 123456789 12.345 12.34
1 Test 000000000000C36A 1.00000
1 Test 000000000000C36B 1.00000
Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DA Wide str3 1 127 123456789 5.8486031E+35 12.34
2025-01-28 05:15:30.0000000 00000000000084E6 Wide str3 1 127 123456789 12.34 12.34
2025-01-28 05:15:30.0000000 00000000000084E7 Wide str3 1 127 123456789 12.34 12.34
main check 2025-01-28 05:15:30.0000000 00000000000084E8 Wide str3 1 127 123456789 12.34 12.34
is_stmt_prepared 00000000000084EA
is_stmt_prepared 05:15:30.0000000 00000000000084EB
is_stmt_prepared test 2025-01-28 05:15:30.0000000 00000000000084F6 Wide str3 1 127 123456789 12.34 12.34
14 changes: 14 additions & 0 deletions EmployeeFullNames.fmt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
14.0
12
1 SQLCHAR 0 50 "\t" 1 FirstName SQL_Latin1_General_CP1_CI_AS
2 SQLCHAR 0 50 "\t" 2 LastName SQL_Latin1_General_CP1_CI_AS
3 SQLCHAR 0 11 "\t" 3 date_ ""
4 SQLCHAR 0 19 "\t" 4 time_ ""
5 SQLCHAR 0 16 "\t" 5 datetime_ ""
6 SQLCHAR 0 20 "\t" 6 wchar_ SQL_Latin1_General_CP1_CI_AS
7 SQLCHAR 0 1 "\t" 7 bool_ ""
8 SQLCHAR 0 5 "\t" 8 tinyint_ ""
9 SQLCHAR 0 21 "\t" 9 bigint_ ""
10 SQLCHAR 0 30 "\t" 10 float_ ""
11 SQLCHAR 0 30 "\t" 11 double_ ""
12 SQLCHAR 0 41 "\r\n" 12 numeric_ ""
4 changes: 4 additions & 0 deletions TestBCP.fmt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
14.0
2
1 SQLCHAR 0 200 "\t" 1 id SQL_Latin1_General_CP1_CI_AS
2 SQLCHAR 0 200 "\r\n" 2 names SQL_Latin1_General_CP1_CI_AS
5 changes: 5 additions & 0 deletions data_unicode.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 Alice
2 Bob
3 Charlie
4 David
5 Eve
6 changes: 5 additions & 1 deletion mssql_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
from .logging_config import setup_logging, get_logger

# Constants
from .constants import ConstantsDDBC
from .constants import ConstantsDDBC, BCPControlOptions

# BCP
from .bcp_options import BCPOptions, ColumnFormat
from .bcp_main import BCPClient

# GLOBALS
# Read-Only
Expand Down
206 changes: 206 additions & 0 deletions mssql_python/bcp_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import logging
from mssql_python.bcp_options import (
BCPOptions,
)
from ddbc_bindings import BCPWrapper
from mssql_python.constants import BCPControlOptions
from typing import Optional # Import Optional for type hints

logger = logging.getLogger(__name__) # Add a logger instance

# defining constants for BCP control options
SUPPORTED_DIRECTIONS = ("in", "out")
# Define SQL_CHAR if not already available, e.g., from a constants module
SQL_CHAR = 1

class BCPClient:
"""
A client for performing bulk copy operations using the BCP (Bulk Copy Program) utility.
This class provides methods to initialize and execute BCP operations.
"""

def __init__(self, connection): # connection is an instance of mssql_python.connection.Connection
"""
Initializes the BCPClient with a database connection.
Args:
connection: A mssql_python.connection.Connection object.
"""
logger.info("Initializing BCPClient.")
if connection is None:
logger.error("Connection object is None during BCPClient initialization.")
raise ValueError(
"A valid connection object is required to initialize BCPClient."
)

# Access the underlying C++ ddbc_bindings.Connection object
# stored in the _conn attribute of your Python Connection wrapper.
if not hasattr(connection, '_conn'):
logger.error("The provided Python connection object does not have the expected '_conn' attribute.")
raise TypeError("The Python Connection object is missing the '_conn' attribute holding the native C++ connection.")

self.wrapper = BCPWrapper(connection._conn)
print(f"connection: {connection._conn}")
logger.info("BCPClient initialized successfully.")

def sql_bulk_copy(self, table: str, options: BCPOptions): # options is no longer Optional
"""
Executes a bulk copy operation to or from a specified table or using a query.

Args:
table (str): The name of the table (for 'in', 'out', 'format') or the query string (for 'queryout').
options (BCPOptions): Configuration for the bulk copy operation. Must be provided.
The options.direction field dictates the BCP operation.
Raises:
ValueError: If 'table' is not provided, or if 'options' are invalid
or use a direction not supported by this client.
TypeError: If 'options' is not an instance of BCPOptions.
RuntimeError: If the BCPWrapper was not initialized.
"""
logger.info(f"Starting sql_bulk_copy for table/query: '{table}', direction: '{options.direction}'.")
if not table:
logger.error("Validation failed: 'table' (or query) not provided for sql_bulk_copy.")
raise ValueError(
"The 'table' name (or query for queryout) must be provided."
)

if not isinstance(options, BCPOptions):
logger.error(f"Validation failed: 'options' is not an instance of BCPOptions. Got type: {type(options)}.")
# This check is good practice, though type hints help statically.
raise TypeError("The 'options' argument must be an instance of BCPOptions.")

# BCPOptions.__post_init__ has already performed its internal validation.
# BCPClient can add its own operational constraints:
if options.direction not in SUPPORTED_DIRECTIONS:
logger.error(f"Validation failed: Unsupported BCP direction '{options.direction}'. Supported: {SUPPORTED_DIRECTIONS}")
raise ValueError(
f"BCPClient currently only supports directions: {', '.join(SUPPORTED_DIRECTIONS)}. "
f"Got '{options.direction}'."
)

current_options = options # Use the validated options directly
logger.debug(f"Using BCPOptions: {current_options}")

if not self.wrapper: # Should be caught by __init__ ideally
logger.error("BCPWrapper was not initialized before calling sql_bulk_copy.")
raise RuntimeError("BCPWrapper was not initialized.")

try:
logger.info(
f"Initializing BCP operation: table='{table}', data_file='{current_options.data_file}', "
f"error_file='{current_options.error_file}', direction='{current_options.direction}'"
)
# 'table' here is used as szTable for bcp_init, which can be a table name or view.
# For 'queryout', the C++ wrapper would need to handle 'table' as the query string
# if bcp_init is used, or use bcp_queryout directly if that's the chosen C++ API.
# Assuming bcp_initialize_operation is flexible or maps to bcp_init.
self.wrapper.bcp_initialize_operation(
table,
current_options.data_file,
current_options.error_file,
current_options.direction,
)
logger.debug("BCP operation initialized with BCPWrapper.")

# # Set BCP control options
# if current_options.batch_size is not None:
# logger.debug(f"Setting BCPControlOptions.BATCH_SIZE to {current_options.batch_size}")
# self.wrapper.bcp_control(
# BCPControlOptions.BATCH_SIZE.value, current_options.batch_size
# )
# if current_options.max_errors is not None:
# logger.debug(f"Setting BCPControlOptions.MAX_ERRORS to {current_options.max_errors}")
# self.wrapper.bcp_control(
# BCPControlOptions.MAX_ERRORS.value, current_options.max_errors
# )
# if current_options.first_row is not None:
# logger.debug(f"Setting BCPControlOptions.FIRST_ROW to {current_options.first_row}")
# self.wrapper.bcp_control(
# BCPControlOptions.FIRST_ROW.value, current_options.first_row
# )
# if current_options.last_row is not None:
# logger.debug(f"Setting BCPControlOptions.LAST_ROW to {current_options.last_row}")
# self.wrapper.bcp_control(
# BCPControlOptions.LAST_ROW.value, current_options.last_row
# )
# if current_options.code_page is not None:
# logger.debug(f"Setting BCPControlOptions.FILE_CODE_PAGE to {current_options.code_page}")
# self.wrapper.bcp_control(
# BCPControlOptions.FILE_CODE_PAGE.value, current_options.code_page
# )
# if current_options.keep_identity:
# logger.debug("Setting BCPControlOptions.KEEP_IDENTITY to 1")
# self.wrapper.bcp_control(BCPControlOptions.KEEP_IDENTITY.value, 1)
# if current_options.keep_nulls:
# logger.debug("Setting BCPControlOptions.KEEP_NULLS to 1")
# self.wrapper.bcp_control(BCPControlOptions.KEEP_NULLS.value, 1)
# if current_options.hints:
# logger.debug(f"Setting BCPControlOptions.HINTS to '{current_options.hints}'")
# self.wrapper.bcp_control(
# BCPControlOptions.HINTS.value, current_options.hints
# )
# if (
# current_options.columns
# and current_options.columns[0].row_terminator is not None
# ): # Check if columns list is not empty
# logger.debug(f"Setting BCPControlOptions.SET_ROW_TERMINATOR to '{current_options.columns[0].row_terminator}'")
# self.wrapper.bcp_control(
# BCPControlOptions.SET_ROW_TERMINATOR.value,
# current_options.columns[0].row_terminator,
# )

# Handle format file or column definitions
if current_options.format_file:
logger.info(f"Reading format file: '{current_options.format_file}'")
self.wrapper.read_format_file(current_options.format_file)
elif current_options.columns:
logger.info(f"Defining {len(current_options.columns)} columns programmatically.")
self.wrapper.define_columns(len(current_options.columns))
for i, col_fmt_obj in enumerate(current_options.columns):
logger.debug(f"Defining column format for file column {col_fmt_obj.file_col}: {col_fmt_obj}")

col_user_type = col_fmt_obj.user_data_type
col_data_len = col_fmt_obj.data_len
# For bcp_colfmt, the terminator applies to the current column's data in the file.
# If a row_terminator is specified on this ColumnFormat object, it means this
# column's data is terminated by that row_terminator.
# Otherwise, its field_terminator is used.
terminator_for_colfmt = col_fmt_obj.field_terminator
if col_fmt_obj.row_terminator is not None:
terminator_for_colfmt = col_fmt_obj.row_terminator

if current_options.bulk_mode == "char":
if col_user_type == 0: # Default to SQL_CHAR if not specified for char mode
col_user_type = SQL_CHAR
# data_len=0 for char means read until terminator, which is fine.
# If a specific max length is desired, it should be set in ColumnFormat.
elif current_options.bulk_mode == "native":
col_user_type = 0 # Ensure native type
terminator_for_colfmt = None # Native mode does not use explicit terminators in bcp_colfmt
# data_len for native is often 0 or SQL_VARLEN_DATA etc.

self.wrapper.define_column_format(
file_col_idx=col_fmt_obj.file_col,
user_data_type=col_user_type,
indicator_length=col_fmt_obj.prefix_len,
user_data_length=col_data_len,
terminator_bytes=terminator_for_colfmt,
server_col_idx=col_fmt_obj.server_col
)
else:
logger.info("No format file or explicit column definitions provided. Relying on BCP defaults or server types.")


logger.info("Executing BCP operation via wrapper.exec_bcp().")
self.wrapper.exec_bcp()
logger.info("BCP operation executed successfully.")

except Exception as e:
logger.exception(f"An error occurred during BCP operation for table '{table}': {e}")
raise # Re-raise the exception after logging
finally:
if self.wrapper:
logger.info("Finishing and closing BCPWrapper.")
# self.wrapper.finish()
# self.wrapper.close()
logger.debug("BCPWrapper finished and closed.")
logger.info(f"sql_bulk_copy for table/query: '{table}' completed.")
Loading
Loading