-
Notifications
You must be signed in to change notification settings - Fork 19
Add support for Salesforce sink #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
2798e28
8a3ec8b
a0f51a2
03fda11
478fcca
510ff30
c9fe38a
340ef03
8e4e8c9
9889a72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| # SalesforceDataSource | ||
|
|
||
| > Requires the [`simple-salesforce`](https://github.com/simple-salesforce/simple-salesforce) library. You can install it manually: `pip install simple-salesforce` | ||
| > or use `pip install pyspark-data-sources[salesforce]`. | ||
|
|
||
| ::: pyspark_datasources.salesforce.SalesforceDataSource |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,294 @@ | ||||||||||||||||||||
| import logging | ||||||||||||||||||||
| from dataclasses import dataclass | ||||||||||||||||||||
| from typing import Dict, List, Any | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from pyspark.sql.types import StructType | ||||||||||||||||||||
| from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @dataclass | ||||||||||||||||||||
| class SalesforceCommitMessage(WriterCommitMessage): | ||||||||||||||||||||
| """Commit message for Salesforce write operations.""" | ||||||||||||||||||||
| records_written: int | ||||||||||||||||||||
| batch_id: int | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| class SalesforceDataSource(DataSource): | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| A Salesforce streaming data source for PySpark to write data to Salesforce objects. | ||||||||||||||||||||
| This data source enables writing streaming data from Spark to Salesforce using the | ||||||||||||||||||||
| Salesforce REST API. It supports common Salesforce objects like Account, Contact, | ||||||||||||||||||||
| Opportunity, and custom objects. | ||||||||||||||||||||
| Name: `salesforce` | ||||||||||||||||||||
| Notes | ||||||||||||||||||||
| ----- | ||||||||||||||||||||
| - Requires the `simple-salesforce` library for Salesforce API integration | ||||||||||||||||||||
| - Only supports streaming write operations (not read operations) | ||||||||||||||||||||
| - Uses Salesforce username/password/security token authentication | ||||||||||||||||||||
| - Supports streaming processing for efficient API usage | ||||||||||||||||||||
| Parameters | ||||||||||||||||||||
| ---------- | ||||||||||||||||||||
| username : str | ||||||||||||||||||||
| Salesforce username (email address) | ||||||||||||||||||||
| password : str | ||||||||||||||||||||
| Salesforce password | ||||||||||||||||||||
| security_token : str | ||||||||||||||||||||
| Salesforce security token (obtained from Salesforce setup) | ||||||||||||||||||||
| salesforce_object : str, optional | ||||||||||||||||||||
| Target Salesforce object name (default: "Account") | ||||||||||||||||||||
|
Comment on lines
+46
to
+47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where can I get a list of objects? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's on salesforce UI. We can also add features to pull objects list and their schemas from salesforce |
||||||||||||||||||||
| batch_size : str, optional | ||||||||||||||||||||
| Number of records to process per batch (default: "200") | ||||||||||||||||||||
| instance_url : str, optional | ||||||||||||||||||||
| Custom Salesforce instance URL (auto-detected if not provided) | ||||||||||||||||||||
| Examples | ||||||||||||||||||||
| -------- | ||||||||||||||||||||
| Register the data source: | ||||||||||||||||||||
| >>> from pyspark_datasources import SalesforceDataSource | ||||||||||||||||||||
| >>> spark.dataSource.register(SalesforceDataSource) | ||||||||||||||||||||
| Write streaming data to Salesforce Accounts: | ||||||||||||||||||||
| >>> from pyspark.sql import SparkSession | ||||||||||||||||||||
| >>> from pyspark.sql.functions import col, lit | ||||||||||||||||||||
| >>> | ||||||||||||||||||||
| >>> spark = SparkSession.builder.appName("SalesforceExample").getOrCreate() | ||||||||||||||||||||
| >>> spark.dataSource.register(SalesforceDataSource) | ||||||||||||||||||||
| >>> | ||||||||||||||||||||
| >>> # Create sample streaming data | ||||||||||||||||||||
| >>> streaming_df = spark.readStream.format("rate").load() | ||||||||||||||||||||
| >>> account_data = streaming_df.select( | ||||||||||||||||||||
| ... col("value").cast("string").alias("Name"), | ||||||||||||||||||||
| ... lit("Technology").alias("Industry"), | ||||||||||||||||||||
| ... (col("value") * 100000).cast("double").alias("AnnualRevenue") | ||||||||||||||||||||
| ... ) | ||||||||||||||||||||
| >>> | ||||||||||||||||||||
| >>> # Write to Salesforce | ||||||||||||||||||||
| >>> query = account_data.writeStream \\ | ||||||||||||||||||||
| ... .format("salesforce") \\ | ||||||||||||||||||||
| ... .option("username", "your-username@company.com") \\ | ||||||||||||||||||||
| ... .option("password", "your-password") \\ | ||||||||||||||||||||
| ... .option("security_token", "your-security-token") \\ | ||||||||||||||||||||
| ... .option("salesforce_object", "Account") \\ | ||||||||||||||||||||
| ... .option("batch_size", "100") \\ | ||||||||||||||||||||
| ... .start() | ||||||||||||||||||||
| Write to Salesforce Contacts: | ||||||||||||||||||||
| >>> contact_data = streaming_df.select( | ||||||||||||||||||||
| ... col("value").cast("string").alias("FirstName"), | ||||||||||||||||||||
| ... lit("Doe").alias("LastName"), | ||||||||||||||||||||
| ... lit("contact@example.com").alias("Email") | ||||||||||||||||||||
| ... ) | ||||||||||||||||||||
| >>> | ||||||||||||||||||||
| >>> query = contact_data.writeStream \\ | ||||||||||||||||||||
| ... .format("salesforce") \\ | ||||||||||||||||||||
| ... .option("username", "your-username@company.com") \\ | ||||||||||||||||||||
| ... .option("password", "your-password") \\ | ||||||||||||||||||||
| ... .option("security_token", "your-security-token") \\ | ||||||||||||||||||||
| ... .option("salesforce_object", "Contact") \\ | ||||||||||||||||||||
| ... .start() | ||||||||||||||||||||
| Write to custom Salesforce objects: | ||||||||||||||||||||
| >>> custom_data = streaming_df.select( | ||||||||||||||||||||
| ... col("value").cast("string").alias("Custom_Field__c"), | ||||||||||||||||||||
| ... lit("Custom Value").alias("Another_Field__c") | ||||||||||||||||||||
| ... ) | ||||||||||||||||||||
| >>> | ||||||||||||||||||||
| >>> query = custom_data.writeStream \\ | ||||||||||||||||||||
| ... .format("salesforce") \\ | ||||||||||||||||||||
| ... .option("username", "your-username@company.com") \\ | ||||||||||||||||||||
| ... .option("password", "your-password") \\ | ||||||||||||||||||||
| ... .option("security_token", "your-security-token") \\ | ||||||||||||||||||||
| ... .option("salesforce_object", "Custom_Object__c") \\ | ||||||||||||||||||||
| ... .start() | ||||||||||||||||||||
| """ | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @classmethod | ||||||||||||||||||||
| def name(cls) -> str: | ||||||||||||||||||||
| """Return the short name for this data source.""" | ||||||||||||||||||||
| return "salesforce" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def schema(self) -> str: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Define the default schema for Salesforce Account objects. | ||||||||||||||||||||
| This schema can be overridden by users when creating their DataFrame. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| return """ | ||||||||||||||||||||
| Name STRING NOT NULL, | ||||||||||||||||||||
| Industry STRING, | ||||||||||||||||||||
| Phone STRING, | ||||||||||||||||||||
| Website STRING, | ||||||||||||||||||||
| AnnualRevenue DOUBLE, | ||||||||||||||||||||
| NumberOfEmployees INT, | ||||||||||||||||||||
| BillingStreet STRING, | ||||||||||||||||||||
| BillingCity STRING, | ||||||||||||||||||||
| BillingState STRING, | ||||||||||||||||||||
| BillingPostalCode STRING, | ||||||||||||||||||||
| BillingCountry STRING | ||||||||||||||||||||
| """ | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def streamWriter(self, schema: StructType, overwrite: bool) -> "SalesforceStreamWriter": | ||||||||||||||||||||
| """Create a stream writer for Salesforce integration.""" | ||||||||||||||||||||
| return SalesforceStreamWriter(schema, self.options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| class SalesforceStreamWriter(DataSourceStreamWriter): | ||||||||||||||||||||
| """Stream writer implementation for Salesforce integration.""" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def __init__(self, schema: StructType, options: Dict[str, str]): | ||||||||||||||||||||
| self.schema = schema | ||||||||||||||||||||
| self.options = options | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Extract Salesforce configuration | ||||||||||||||||||||
| self.username = options.get("username") | ||||||||||||||||||||
| self.password = options.get("password") | ||||||||||||||||||||
| self.security_token = options.get("security_token") | ||||||||||||||||||||
| self.instance_url = options.get("instance_url") | ||||||||||||||||||||
| self.salesforce_object = options.get("salesforce_object", "Account") | ||||||||||||||||||||
| self.batch_size = int(options.get("batch_size", "200")) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
Comment on lines
+210
to
+216
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Security: Avoid storing sensitive credentials as instance variables. Storing passwords and security tokens as instance variables could expose them in logs, stack traces, or memory dumps. Consider extracting credentials only when needed in the Apply this diff to improve security: def __init__(self, schema: StructType, options: Dict[str, str]):
self.schema = schema
self.options = options
- # Extract Salesforce configuration
- self.username = options.get("username")
- self.password = options.get("password")
- self.security_token = options.get("security_token")
- self.instance_url = options.get("instance_url")
self.salesforce_object = options.get("salesforce_object", "Account")
self.batch_size = int(options.get("batch_size", "200"))
# Validate required options
- if not all([self.username, self.password, self.security_token]):
+ username = options.get("username")
+ password = options.get("password")
+ security_token = options.get("security_token")
+ if not all([username, password, security_token]):
raise ValueError(
"Salesforce username, password, and security_token are required. "
"Set them using .option() method in your streaming query."
)Then update the # In write method, after line 186:
username = self.options.get("username")
password = self.options.get("password")
security_token = self.options.get("security_token")
instance_url = self.options.get("instance_url")🤖 Prompt for AI Agents |
||||||||||||||||||||
| # Validate required options | ||||||||||||||||||||
| if not all([self.username, self.password, self.security_token]): | ||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||
| "Salesforce username, password, and security_token are required. " | ||||||||||||||||||||
| "Set them using .option() method in your streaming query." | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logger.info(f"Initializing Salesforce writer for object '{self.salesforce_object}'") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def write(self, iterator) -> SalesforceCommitMessage: | ||||||||||||||||||||
| """Write data to Salesforce.""" | ||||||||||||||||||||
| # Import here to avoid serialization issues | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| from simple_salesforce import Salesforce | ||||||||||||||||||||
| except ImportError: | ||||||||||||||||||||
| raise ImportError( | ||||||||||||||||||||
| "simple-salesforce library is required for Salesforce integration. " | ||||||||||||||||||||
| "Install it with: pip install simple-salesforce" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
Comment on lines
+232
to
+235
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add exception chaining for better error traceability. When re-raising exceptions, use Apply this diff: except ImportError:
raise ImportError(
"simple-salesforce library is required for Salesforce integration. "
"Install it with: pip install simple-salesforce"
- )
+ ) from None📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.12.2)175-178: Within an (B904) 🤖 Prompt for AI Agents |
||||||||||||||||||||
|
|
||||||||||||||||||||
| from pyspark import TaskContext | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Get task context for batch identification | ||||||||||||||||||||
| context = TaskContext.get() | ||||||||||||||||||||
| batch_id = context.taskAttemptId() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Connect to Salesforce | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| sf_kwargs = { | ||||||||||||||||||||
| 'username': self.username, | ||||||||||||||||||||
| 'password': self.password, | ||||||||||||||||||||
| 'security_token': self.security_token | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if self.instance_url: | ||||||||||||||||||||
| sf_kwargs['instance_url'] = self.instance_url | ||||||||||||||||||||
|
|
||||||||||||||||||||
| sf = Salesforce(**sf_kwargs) | ||||||||||||||||||||
| logger.info(f"✓ Connected to Salesforce (batch {batch_id})") | ||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | ||||||||||||||||||||
| raise ConnectionError(f"Salesforce connection failed: {str(e)}") | ||||||||||||||||||||
|
Comment on lines
+256
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add exception chaining to preserve error context. Apply this diff: except Exception as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
- raise ConnectionError(f"Salesforce connection failed: {str(e)}")
+ raise ConnectionError(f"Salesforce connection failed: {str(e)}") from e📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.12.2)200-200: Within an (B904) 🤖 Prompt for AI Agents |
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Convert rows to Salesforce records | ||||||||||||||||||||
| records = [] | ||||||||||||||||||||
| for row in iterator: | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| record = self._convert_row_to_salesforce_record(row) | ||||||||||||||||||||
| if record: # Only add non-empty records | ||||||||||||||||||||
| records.append(record) | ||||||||||||||||||||
|
||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||
| logger.warning(f"Failed to convert row to Salesforce record: {str(e)}") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if not records: | ||||||||||||||||||||
| logger.info(f"No valid records to write in batch {batch_id}") | ||||||||||||||||||||
| return SalesforceCommitMessage(records_written=0, batch_id=batch_id) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Write records to Salesforce | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| records_written = self._write_to_salesforce(sf, records, batch_id) | ||||||||||||||||||||
| logger.info(f"✅ Batch {batch_id}: Successfully wrote {records_written} records") | ||||||||||||||||||||
| return SalesforceCommitMessage(records_written=records_written, batch_id=batch_id) | ||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||
| logger.error(f"❌ Batch {batch_id}: Failed to write records: {str(e)}") | ||||||||||||||||||||
| raise | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def _convert_row_to_salesforce_record(self, row) -> Dict[str, Any]: | ||||||||||||||||||||
| """Convert a Spark Row to a Salesforce record format.""" | ||||||||||||||||||||
| record = {} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for field in self.schema.fields: | ||||||||||||||||||||
| field_name = field.name | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| # Use getattr for safe field access | ||||||||||||||||||||
| value = getattr(row, field_name, None) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if value is not None: | ||||||||||||||||||||
| # Convert value based on field type | ||||||||||||||||||||
| if hasattr(value, 'isoformat'): # datetime objects | ||||||||||||||||||||
| record[field_name] = value.isoformat() | ||||||||||||||||||||
| elif isinstance(value, (int, float)): | ||||||||||||||||||||
| record[field_name] = value | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| record[field_name] = str(value) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||
| logger.warning(f"Failed to convert field '{field_name}': {str(e)}") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return record | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def _write_to_salesforce(self, sf, records: List[Dict[str, Any]], batch_id: int) -> int: | ||||||||||||||||||||
| """Write records to Salesforce using REST API.""" | ||||||||||||||||||||
| success_count = 0 | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Get the Salesforce object API | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| sf_object = getattr(sf, self.salesforce_object) | ||||||||||||||||||||
| except AttributeError: | ||||||||||||||||||||
| raise ValueError(f"Salesforce object '{self.salesforce_object}' not found") | ||||||||||||||||||||
|
||||||||||||||||||||
| except AttributeError: | |
| raise ValueError(f"Salesforce object '{self.salesforce_object}' not found") | |
| except AttributeError: | |
| raise ValueError(f"Salesforce object '{self.salesforce_object}' not found") from None |
🧰 Tools
🪛 Ruff (0.12.2)
257-257: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
In pyspark_datasources/salesforce.py around lines 256 to 257, the ValueError
raised in the except block for AttributeError lacks exception chaining. Modify
the raise statement to include "from e" where "e" is the caught AttributeError,
ensuring consistent exception chaining for better error traceability.
Uh oh!
There was an error while loading. Please reload this page.