Skip to content

Conversation

@dnks0
Copy link

@dnks0 dnks0 commented Oct 23, 2025

added sharepoint datasource

  • supporting streaming writes of structured data to sharepoint lists
  • added flexible Resource definition to model resources to be pushed to sharepoint
  • future resource types (e.g. excel, pdf, etc.) to be implemented

Summary by CodeRabbit

  • New Features

    • Added SharePoint as a streaming write-only data source for PySpark, enabling writes to SharePoint lists with batching, authentication options, error handling, and checkpoint support.
  • Documentation

    • Added a comprehensive SharePoint data source guide with installation, streaming write usage, options, and examples.
    • Updated README data sources table formatting and added the new SharePoint install tag.

@coderabbitai
Copy link

coderabbitai bot commented Oct 23, 2025

Walkthrough

Adds a new Sharepoint streaming write data source for PySpark: new implementation, package exports, documentation, and optional dependencies for Azure/Microsoft Graph authentication and API access; includes async batching, configurable error handling, and commit reporting.

Changes

Cohort / File(s) Summary
Documentation Updates
README.md, docs/data-sources-guide.md
Reformatted Available Data Sources table and added pyspark.datasource.sharepoint entry. Added full SharepointDataSource documentation (installation, streaming write usage, options, features, examples).
Dependency Configuration
pyproject.toml
Added optional dependencies azure-identity (^1.25.1) and msgraph-sdk (^1.47.0). Added sharepoint extras grouping them and included them in existing all extras.
Public API Exports
pyspark_datasources/__init__.py
Exported SharepointResource and SharepointDataSource at package level (imported from .sharepoint).
Sharepoint Integration Implementation
pyspark_datasources/sharepoint.py
New module implementing Sharepoint streaming write: abstract SharepointResource, concrete ListResource, SharepointDataSource, SharepointStreamWriter, async Graph API client usage, batching, concurrency-controlled pushes, configurable fail-fast behavior, and SharepointCommitMessage.

Sequence Diagram(s)

sequenceDiagram
    participant Spark as PySpark Streaming
    participant Writer as SharepointStreamWriter
    participant Graph as Microsoft Graph Client
    participant SP as SharePoint Lists

    Spark->>Writer: write(iterator)
    loop per batch
        Writer->>Writer: convert rows -> records
        Writer->>Writer: buffer until batch_size
        Writer->>Graph: async push batch (concurrent per-record tasks)
        par concurrent pushes
            Graph->>SP: create list item
            SP-->>Graph: success / failure
        end
        Graph-->>Writer: per-record results
        Writer->>Writer: aggregate (records_written, records_failed)
        alt fail-fast enabled & error
            Writer->>Writer: raise / abort
        end
    end
    Writer-->>Spark: SharepointCommitMessage(records_written, records_failed, batch_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I hopped a stream of rows tonight,
Batching, pushing with async light,
Lists receive each tiny song,
Commit messages hum along,
A rabbit cheers: Sharepoint writes take flight! 🎉

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.42% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "feature/sharepoint datasource" directly corresponds to the main change in the pull request: the addition of a new SharePoint datasource that supports streaming writes to SharePoint lists. The title is specific and concise, clearly indicating both the feature type and the primary subject (SharePoint datasource). A developer scanning the repository history would understand that this commit introduces a SharePoint datasource feature, making the title fully aligned with the changeset's primary objective and enabling clear identification of the change.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
pyspark_datasources/sharepoint.py (4)

91-91: Annotate class attribute with ClassVar.

Static analysis correctly identifies that _required_options is a class-level constant that should be annotated with typing.ClassVar to clarify it's not an instance variable.

Apply this diff:

+from typing import Dict, List, Any, Tuple, ClassVar
+
 class ListResource(SharepointResource):
     """..."""
 
-    _required_options = ["list_id", "fields"]
+    _required_options: ClassVar[List[str]] = ["list_id", "fields"]

119-122: Improve exception handling with chaining.

Exception chaining with raise ... from e preserves the original traceback and provides better debugging context. This is especially important for data conversion errors where understanding the root cause is critical.

Apply this diff:

             except AttributeError as e:
-                raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}")
+                raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") from e
             except Exception as e:
-                raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}")
+                raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") from e

272-274: Unused parameters are acceptable for interface compliance.

The schema and overwrite parameters are part of the DataSource.streamWriter() interface contract. While unused in this implementation, they must be present. Consider adding a docstring note explaining why they're not used.

Optionally add a docstring to clarify:

def streamWriter(self, schema: StructType, overwrite: bool) -> "SharepointStreamWriter":
    """Create a stream writer for Sharepoint datasource integration.
    
    Args:
        schema: Input schema (unused - inferred from data)
        overwrite: Overwrite flag (unused - append-only datasource)
    """
    return SharepointStreamWriter(options=self.options, datasource=self.name())

284-285: Annotate class attributes with ClassVar.

Similar to ListResource, these class-level constants should be annotated with typing.ClassVar.

Apply this diff:

+from typing import Dict, List, Any, Tuple, ClassVar
+
 class SharepointStreamWriter(DataSourceStreamWriter):
     """..."""
 
     _graph_api_version = "1.0"
     _graph_api_url = "https://graph.microsoft.com"
     _auth_url = "https://login.microsoftonline.com"
 
-    _required_options = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"]
-    _supported_resources = {"list": ListResource}
+    _required_options: ClassVar[List[str]] = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"]
+    _supported_resources: ClassVar[Dict[str, type]] = {"list": ListResource}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e877ce and 94786a7.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • README.md (1 hunks)
  • docs/data-sources-guide.md (2 hunks)
  • pyproject.toml (2 hunks)
  • pyspark_datasources/__init__.py (1 hunks)
  • pyspark_datasources/sharepoint.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pyspark_datasources/__init__.py (1)
pyspark_datasources/sharepoint.py (2)
  • SharepointResource (14-63)
  • SharepointDataSource (149-274)
🪛 LanguageTool
README.md

[uncategorized] ~60-~60: The official name of this software platform is spelled with a capital “H”.
Context: ...nstall pyspark-data-sources[faker]| |github` | Batch | Read GitHub pull requ...

(GITHUB)

🪛 Ruff (0.14.1)
pyspark_datasources/sharepoint.py

91-91: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


119-119: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


120-120: Create your own exception

(TRY002)


120-120: Avoid specifying long messages outside the exception class

(TRY003)


121-121: Do not catch blind exception: Exception

(BLE001)


122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


122-122: Create your own exception

(TRY002)


122-122: Avoid specifying long messages outside the exception class

(TRY003)


122-122: Use explicit conversion flag

Replace with conversion flag

(RUF010)


136-136: Use explicit conversion flag

Replace with conversion flag

(RUF010)


272-272: Unused method argument: schema

(ARG002)


272-272: Unused method argument: overwrite

(ARG002)


284-284: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


285-285: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


303-303: Avoid specifying long messages outside the exception class

(TRY003)


303-303: Use explicit conversion flag

Replace with conversion flag

(RUF010)


309-312: Avoid specifying long messages outside the exception class

(TRY003)


324-327: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


324-327: Avoid specifying long messages outside the exception class

(TRY003)


345-345: Do not catch blind exception: Exception

(BLE001)


346-346: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


347-347: Avoid specifying long messages outside the exception class

(TRY003)


347-347: Use explicit conversion flag

Replace with conversion flag

(RUF010)


366-366: Do not catch blind exception: Exception

(BLE001)


368-368: Use explicit conversion flag

Replace with conversion flag

(RUF010)


382-382: Create your own exception

(TRY002)


402-402: Do not catch blind exception: Exception

(BLE001)


403-403: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


403-403: Create your own exception

(TRY002)


403-403: Avoid specifying long messages outside the exception class

(TRY003)


403-403: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🔇 Additional comments (12)
docs/data-sources-guide.md (2)

15-15: LGTM!

The TOC entry is correctly formatted and numbered.


426-477: Comprehensive documentation with clear examples.

The documentation provides thorough installation instructions, usage examples, and feature descriptions that align well with the implementation. The examples clearly demonstrate authentication, batching, and error handling options.

pyproject.toml (1)

34-35: LGTM once the dependency declarations are fixed.

The extras group definitions are correct. Once lines 23-24 are updated to use optional = true, these extras declarations will work as intended.

pyspark_datasources/sharepoint.py (7)

1-11: LGTM!

Import organization and logger setup are appropriate.


125-138: LGTM! Good pattern for async API calls.

The inline imports avoid Spark serialization issues, and the error handling pattern (log warning then re-raise) is appropriate for this context.


140-147: LGTM!

The commit message dataclass appropriately tracks write results.


354-362: LGTM! Clean batching pattern.

The nested flush_buffer function with nonlocal variables is a clean pattern for managing batched writes.


363-377: LGTM! Robust record processing logic.

The conversion loop correctly handles errors, buffers records efficiently, and ensures all records are flushed.


379-386: LGTM! Appropriate error handling with fail-fast control.

The error aggregation and conditional failure behavior based on fail_fast provides users with control over error tolerance.


388-415: LGTM! Solid async pattern with proper concurrency control.

The use of asyncio.Semaphore to limit concurrent API calls and asyncio.gather(..., return_exceptions=True) to collect results is a robust pattern for batch processing with the Microsoft Graph API.

pyspark_datasources/__init__.py (1)

13-13: LGTM! Appropriate public API exports.

Exposing both SharepointResource (for extensibility) and SharepointDataSource (for usage) is consistent with the project's public API design.

README.md (1)

57-68: LGTM! Table entry is accurate and well-formatted.

The Sharepoint datasource entry correctly describes it as a Stream Write datasource with the appropriate install tag.

Comment on lines +336 to +347
try:
credentials = ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
client = GraphServiceClient(credentials=credentials, scopes=self.scopes)
logger.info(f"Connected to Microsoft Graph API (batch {batch_id})")

except Exception as e:
logger.error(f"Could not connect to Microsoft Graph API (batch {batch_id})")
raise ConnectionError(f"Microsoft Graph API connection failed: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Consider security best practices for credential handling.

While the current credential handling is functional, ensure that:

  1. Credentials are not logged in case of errors (currently safe)
  2. The ClientSecretCredential object properly clears secrets from memory
  3. Users are advised to use secret management services in production

The inline import pattern and error handling are appropriate.

Optionally verify Azure Identity library's credential handling:


🌐 Web query:

Does azure-identity ClientSecretCredential securely handle client secrets in memory?

💡 Result:

Short answer: No — ClientSecretCredential does not provide OS-level protected/locked memory for the client secret. It accepts and retains the secret as a normal in-process string/object and passes it to MSAL/token endpoints; there’s no special secure-memory handling or automatic zeroing. Use certificates/managed identity or external secret stores to reduce exposure. [1][2][3][4]

Sources:

  • azure-identity ClientSecretCredential implementation (shows secret accepted as a normal parameter). [1]
  • MSAL / Microsoft docs on client secrets and recommended alternatives (certificates, managed identities). [2][3]
  • Community discussion noting memory/network exposure and recommending alternatives. [4]

References:
[1] azure.identity._credentials.client_secret source/docs.
[2] MSAL Python - Client credentials (Microsoft Learn).
[3] Azure Identity docs / guidance on using certificates or managed identity.
[4] StackOverflow discussion re: memory exposure of client secrets.


Address critical security vulnerability in credential handling.

ClientSecretCredential does not provide OS-level protected/locked memory for the client secret and retains it as a normal in-process string/object with no automatic zeroing. The current implementation at lines 337-341 directly passes client_secret to ClientSecretCredential, creating memory exposure risk.

Required: Replace with certificates or managed identity approaches to eliminate the credential exposure in memory. For Azure deployments, use either:

  • Client certificate credentials
  • Managed identity (if running on Azure infrastructure)
  • External secret store with token-based authentication

This is a security boundary issue, not a best-practice recommendation.

🧰 Tools
🪛 Ruff (0.14.1)

345-345: Do not catch blind exception: Exception

(BLE001)


346-346: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


347-347: Avoid specifying long messages outside the exception class

(TRY003)


347-347: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In pyspark_datasources/sharepoint.py around lines 336 to 347, the code currently
passes a plaintext client_secret into ClientSecretCredential which retains that
secret in process memory; replace this with a certificate- or identity-based
flow to avoid in-memory secrets. Concretely: stop passing client_secret; instead
load a client certificate from a secure store (Key Vault or file protected by
OS-level permissions) and instantiate ClientCertificateCredential with
tenant_id, client_id and the certificate, or use ManagedIdentityCredential /
DefaultAzureCredential when running on Azure so no long-lived secret is
constructed; if you must retrieve credentials from Key Vault, fetch only
short-lived tokens and do not store the secret on module scope (use ephemeral
variables and zero references promptly); update GraphServiceClient creation to
use the new credential object and remove any persistent client_secret variables
from the module.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
pyproject.toml (1)

23-24: LGTM! Dependencies correctly configured.

The optional dependency syntax is now correct, using optional = true as required by Poetry.

pyspark_datasources/sharepoint.py (2)

61-63: LGTM! Option validation correctly handles both dotted and non-dotted names.

The fix using rsplit(".", 1)[-1] correctly extracts the attribute name for both formats (e.g., "auth.client_id" → "client_id" and "list_id" → "list_id").


191-196: LGTM! Docstring correctly identifies the datasource.

The documentation example properly says "Register the Sharepoint Datasource" (previously had copy-paste error from Salesforce).

🧹 Nitpick comments (2)
pyspark_datasources/sharepoint.py (2)

107-123: Consider exception chaining for better debugging.

The exception handling here is functional, but adding exception chaining with raise ... from e would preserve the full stack trace for debugging.

Apply this diff if you want to improve exception chaining:

         except AttributeError as e:
-            raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}")
+            raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") from e
         except Exception as e:
-            raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}")
+            raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") from e

284-285: Consider adding ClassVar annotations for class-level constants.

The _required_options and _supported_resources attributes are class-level constants and should be annotated with typing.ClassVar for better type clarity.

Apply this diff:

+from typing import Dict, List, Any, Tuple, ClassVar
+
 class SharepointStreamWriter(DataSourceStreamWriter):
     """Stream writer implementation for Sharepoint datasource integration."""
 
     _graph_api_version = "1.0"
     _graph_api_url = "https://graph.microsoft.com"
     _auth_url = "https://login.microsoftonline.com"
 
-    _required_options = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"]
-    _supported_resources = {"list": ListResource}
+    _required_options: ClassVar[List[str]] = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"]
+    _supported_resources: ClassVar[Dict[str, type]] = {"list": ListResource}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 94786a7 and d8d5904.

📒 Files selected for processing (2)
  • pyproject.toml (2 hunks)
  • pyspark_datasources/sharepoint.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.14.1)
pyspark_datasources/sharepoint.py

91-91: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


119-119: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


120-120: Create your own exception

(TRY002)


120-120: Avoid specifying long messages outside the exception class

(TRY003)


121-121: Do not catch blind exception: Exception

(BLE001)


122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


122-122: Create your own exception

(TRY002)


122-122: Avoid specifying long messages outside the exception class

(TRY003)


122-122: Use explicit conversion flag

Replace with conversion flag

(RUF010)


136-136: Use explicit conversion flag

Replace with conversion flag

(RUF010)


272-272: Unused method argument: schema

(ARG002)


272-272: Unused method argument: overwrite

(ARG002)


284-284: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


285-285: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


303-303: Avoid specifying long messages outside the exception class

(TRY003)


303-303: Use explicit conversion flag

Replace with conversion flag

(RUF010)


309-312: Avoid specifying long messages outside the exception class

(TRY003)


324-327: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


324-327: Avoid specifying long messages outside the exception class

(TRY003)


345-345: Do not catch blind exception: Exception

(BLE001)


346-346: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


347-347: Avoid specifying long messages outside the exception class

(TRY003)


347-347: Use explicit conversion flag

Replace with conversion flag

(RUF010)


366-366: Do not catch blind exception: Exception

(BLE001)


368-368: Use explicit conversion flag

Replace with conversion flag

(RUF010)


382-382: Create your own exception

(TRY002)


402-402: Do not catch blind exception: Exception

(BLE001)


403-403: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


403-403: Create your own exception

(TRY002)


403-403: Avoid specifying long messages outside the exception class

(TRY003)


403-403: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🔇 Additional comments (7)
pyproject.toml (1)

34-35: LGTM! Extras groups properly configured.

The new sharepoint extras group and updated all group correctly reference the optional dependencies, allowing users to install SharePoint support with pip install pyspark-data-sources[sharepoint].

pyspark_datasources/sharepoint.py (6)

1-11: LGTM! Clean import structure.

The imports are well-organized and all necessary dependencies are present.


125-137: LGTM! Async push implementation with proper error handling.

The inline imports avoid serialization issues, and the exception handling appropriately logs failures before re-raising.


140-147: LGTM! Commit message structure is appropriate.

The dataclass properly extends WriterCommitMessage with fields needed to track write operations.


349-386: LGTM! Write method implements robust buffering and error handling.

The implementation properly:

  • Buffers records to control memory usage
  • Tracks successes and failures separately
  • Supports both fail-fast and tolerant modes
  • Flushes remaining records after iteration

The exception handling approach is appropriate for a data pipeline where you want to continue processing even if some records fail.


388-415: LGTM! Concurrent record pushing with proper concurrency control.

The implementation correctly:

  • Uses a semaphore to limit concurrent API calls to batch_size
  • Leverages asyncio.gather with return_exceptions=True to process all records even if some fail
  • Aggregates results to separate successes from failures

The async approach efficiently handles I/O-bound operations against the Microsoft Graph API.


336-347: ---

Verify authentication method aligns with your deployment model and production requirements.

Microsoft advises against using long-lived client secrets in production due to leakage and rotation risk. For this SharePoint connector in a production context:

  • If running on Azure infrastructure, use managed identity (ManagedIdentityCredential or DefaultAzureCredential)—it is secretless and requires no credential rotation.
  • If managed identity isn't available, use ClientCertificateCredential with a service principal certificate instead of a client secret.
  • ClientSecretCredential is acceptable only for short-lived test/dev scenarios.

Before merging, confirm whether:

  1. This code will run in Azure (enabling managed identity) or external infrastructure (requiring certificate-based auth)
  2. You intend to address this before production deployment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant