Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Read from OpenSky Network. | None |
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `salesforce` | Streaming sink for writing data to Salesforce | `simple-salesforce` |
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `pyspark.datasource.salesforce` | Streaming datasource for writing data to Salesforce | `simple-salesforce` |

See more here: https://allisonwang-db.github.io/pyspark-data-sources/.

Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` |
| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None |
| [SimpleJsonDataSource](./datasources/simplejson.md) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Salesforce Sink Example
Salesforce Datasource Example

This example demonstrates how to use the SalesforceDataSource as a streaming sink
This example demonstrates how to use the SalesforceDataSource as a streaming datasource
to write data from various sources to Salesforce objects.

Requirements:
Expand Down Expand Up @@ -64,11 +64,11 @@ def example_1_rate_source_to_accounts():
)

try:
# Register Salesforce sink
# Register Salesforce Datasource
from pyspark_datasources.salesforce import SalesforceDataSource

spark.dataSource.register(SalesforceDataSource)
print("✅ Salesforce sink registered")
print("✅ Salesforce datasource registered")

# Create streaming data from rate source
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 2).load()
Expand All @@ -84,7 +84,7 @@ def example_1_rate_source_to_accounts():

# Write to Salesforce
query = (
account_data.writeStream.format("salesforce")
account_data.writeStream.format("pyspark.datasource.salesforce")
.option("username", username)
.option("password", password)
.option("security_token", security_token)
Expand Down Expand Up @@ -135,7 +135,7 @@ def example_2_csv_to_contacts():
)

try:
# Register Salesforce sink
# Register Salesforce datasource
from pyspark_datasources.salesforce import SalesforceDataSource

spark.dataSource.register(SalesforceDataSource)
Expand Down Expand Up @@ -177,7 +177,7 @@ def example_2_csv_to_contacts():

# Write to Salesforce with custom schema
query = (
streaming_df.writeStream.format("salesforce")
streaming_df.writeStream.format("pyspark.datasource.salesforce")
.option("username", username)
.option("password", password)
.option("security_token", security_token)
Expand Down Expand Up @@ -279,9 +279,9 @@ def example_3_checkpoint_demonstration():
col("industry").alias("Industry"),
col("revenue").alias("AnnualRevenue"),
)

query1 = (
account_df1.writeStream.format("salesforce")
account_df1.writeStream.format("pyspark.datasource.salesforce")
.option("username", username)
.option("password", password)
.option("security_token", security_token)
Expand Down Expand Up @@ -328,7 +328,7 @@ def example_3_checkpoint_demonstration():
)

query2 = (
account_df2.writeStream.format("salesforce")
account_df2.writeStream.format("pyspark.datasource.salesforce")
.option("username", username)
.option("password", password)
.option("security_token", security_token)
Expand Down Expand Up @@ -415,8 +415,8 @@ def example_4_custom_object():

# Example code (commented out since custom object may not exist)
print("""
query = custom_data.writeStream \\
.format("salesforce") \\
query = custom_data.writeStream \\
.format("pyspark.datasource.salesforce") \\
.option("username", username) \\
.option("password", password) \\
.option("security_token", security_token) \\
Expand All @@ -438,8 +438,8 @@ def example_4_custom_object():

def main():
"""Run all examples"""
print("🚀 Salesforce Sink Examples")
print("This demonstrates various ways to use the Salesforce streaming sink")
print("🚀 Salesforce Datasource Examples")
print("This demonstrates various ways to use the Salesforce streaming datasource")

try:
# Run examples
Expand All @@ -452,7 +452,7 @@ def main():
print("✅ All examples completed!")
print("=" * 60)
print("\n💡 Key takeaways:")
print(" - Salesforce sink supports various input sources (rate, CSV, etc.)")
print(" - Salesforce datasource supports various input sources (rate, CSV, etc.)")
print(" - Checkpoint functionality enables exactly-once processing")
print(" - Custom schemas allow flexibility for different Salesforce objects")
print(" - Batch processing optimizes Salesforce API usage")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyspark-data-sources"
version = "0.1.9"
version = "0.1.10"
description = "Custom Spark data sources for reading and writing data in Apache Spark, using the Python Data Source API"
authors = ["allisonwang-db <allison.wang@databricks.com>"]
license = "Apache License 2.0"
Expand Down
37 changes: 21 additions & 16 deletions pyspark_datasources/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@ class SalesforceCommitMessage(WriterCommitMessage):

class SalesforceDataSource(DataSource):
"""
A Salesforce streaming sink for PySpark to write data to Salesforce objects.
A Salesforce streaming datasource for PySpark to write data to Salesforce objects.

This data sink enables writing streaming data from Spark to Salesforce using the
This datasource enables writing streaming data from Spark to Salesforce using the
Salesforce REST API. It supports common Salesforce objects like Account, Contact,
Opportunity, and custom objects.

Note: This is a write-only sink, not a full bidirectional data source.
Note: This is a write-only datasource, not a full bidirectional data source.

Name: `salesforce`

Notes
-----
- Requires the `simple-salesforce` library for Salesforce API integration
- **Write-only sink**: Only supports streaming write operations (no read operations)
- **Write-only datasource**: Only supports streaming write operations (no read operations)
- Uses Salesforce username/password/security token authentication
- Supports batch writing with Salesforce Composite Tree API for efficient processing
- Implements exactly-once semantics through Spark's checkpoint mechanism
- If a streaming write job fails and is resumed from the checkpoint,
it will not overwrite records already written in Salesforce;
it resumes from the last committed offset.
However, if records were written to Salesforce but not yet committed at the time of failure,
duplicate records may occur after recovery.

Parameters
----------
Expand All @@ -57,7 +62,7 @@ class SalesforceDataSource(DataSource):

Examples
--------
Register the Salesforce sink:
Register the Salesforce Datasource:

>>> from pyspark_datasources import SalesforceDataSource
>>> spark.dataSource.register(SalesforceDataSource)
Expand All @@ -78,9 +83,9 @@ class SalesforceDataSource(DataSource):
... (col("value") * 100000).cast("double").alias("AnnualRevenue")
... )
>>>
>>> # Write to Salesforce using the sink
>>> # Write to Salesforce using the datasource
>>> query = account_data.writeStream \\
... .format("salesforce") \\
... .format("pyspark.datasource.salesforce") \\
... .option("username", "your-username@company.com") \\
... .option("password", "your-password") \\
... .option("security_token", "your-security-token") \\
Expand All @@ -98,7 +103,7 @@ class SalesforceDataSource(DataSource):
... )
>>>
>>> query = contact_data.writeStream \\
... .format("salesforce") \\
... .format("pyspark.datasource.salesforce") \\
... .option("username", "your-username@company.com") \\
... .option("password", "your-password") \\
... .option("security_token", "your-security-token") \\
Expand All @@ -114,7 +119,7 @@ class SalesforceDataSource(DataSource):
... )
>>>
>>> query = custom_data.writeStream \\
... .format("salesforce") \\
... .format("pyspark.datasource.salesforce") \\
... .option("username", "your-username@company.com") \\
... .option("password", "your-password") \\
... .option("security_token", "your-security-token") \\
Expand All @@ -128,7 +133,7 @@ class SalesforceDataSource(DataSource):
>>> contact_schema = "FirstName STRING NOT NULL, LastName STRING NOT NULL, Email STRING, Phone STRING"
>>>
>>> query = contact_data.writeStream \\
... .format("salesforce") \\
... .format("pyspark.datasource.salesforce") \\
... .option("username", "your-username@company.com") \\
... .option("password", "your-password") \\
... .option("security_token", "your-security-token") \\
Expand All @@ -148,7 +153,7 @@ class SalesforceDataSource(DataSource):
... )
>>>
>>> query = opportunity_data.writeStream \\
... .format("salesforce") \\
... .format("pyspark.datasource.salesforce") \\
... .option("username", "your-username@company.com") \\
... .option("password", "your-password") \\
... .option("security_token", "your-security-token") \\
Expand All @@ -159,7 +164,7 @@ class SalesforceDataSource(DataSource):

Key Features:

- **Write-only sink**: Designed specifically for writing data to Salesforce
- **Write-only datasource**: Designed specifically for writing data to Salesforce
- **Batch processing**: Uses Salesforce Composite Tree API for efficient bulk writes
- **Exactly-once semantics**: Integrates with Spark's checkpoint mechanism
- **Error handling**: Graceful fallback to individual record creation if batch fails
Expand All @@ -168,8 +173,8 @@ class SalesforceDataSource(DataSource):

@classmethod
def name(cls) -> str:
"""Return the short name for this Salesforce sink."""
return "salesforce"
"""Return the short name for this Salesforce datasource."""
return "pyspark.datasource.salesforce"

def schema(self) -> str:
"""
Expand All @@ -196,12 +201,12 @@ def schema(self) -> str:
"""

def streamWriter(self, schema: StructType, overwrite: bool) -> "SalesforceStreamWriter":
"""Create a stream writer for Salesforce sink integration."""
"""Create a stream writer for Salesforce datasource integration."""
return SalesforceStreamWriter(schema, self.options)


class SalesforceStreamWriter(DataSourceStreamWriter):
"""Stream writer implementation for Salesforce sink integration."""
"""Stream writer implementation for Salesforce datasource integration."""

def __init__(self, schema: StructType, options: Dict[str, str]):
self.schema = schema
Expand Down
4 changes: 2 additions & 2 deletions tests/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_salesforce_datasource_registration(spark):
spark.dataSource.register(SalesforceDataSource)

# Test that the datasource is registered with correct name
assert SalesforceDataSource.name() == "salesforce"
assert SalesforceDataSource.name() == "pyspark.datasource.salesforce"

# Test that the data source is streaming-only (no batch writer)
from pyspark.sql.functions import lit
Expand All @@ -91,7 +91,7 @@ def test_salesforce_datasource_registration(spark):
lit(50000.0).alias("AnnualRevenue"),
)

df.write.format("salesforce").mode("append").save()
df.write.format("pyspark.datasource.salesforce").mode("append").save()
assert False, "Should have raised error - Salesforce DataSource only supports streaming"
except Exception as e:
# This is expected - Salesforce DataSource only supports streaming writes
Expand Down
Loading