-
Notifications
You must be signed in to change notification settings - Fork 22
feature/sharepoint datasource #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
WalkthroughAdds a new Sharepoint streaming write data source for PySpark: new implementation, package exports, documentation, and optional dependencies for Azure/Microsoft Graph authentication and API access; includes async batching, configurable error handling, and commit reporting. Changes
Sequence Diagram(s)sequenceDiagram
participant Spark as PySpark Streaming
participant Writer as SharepointStreamWriter
participant Graph as Microsoft Graph Client
participant SP as SharePoint Lists
Spark->>Writer: write(iterator)
loop per batch
Writer->>Writer: convert rows -> records
Writer->>Writer: buffer until batch_size
Writer->>Graph: async push batch (concurrent per-record tasks)
par concurrent pushes
Graph->>SP: create list item
SP-->>Graph: success / failure
end
Graph-->>Writer: per-record results
Writer->>Writer: aggregate (records_written, records_failed)
alt fail-fast enabled & error
Writer->>Writer: raise / abort
end
end
Writer-->>Spark: SharepointCommitMessage(records_written, records_failed, batch_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (4)
pyspark_datasources/sharepoint.py (4)
91-91: Annotate class attribute withClassVar.Static analysis correctly identifies that
_required_optionsis a class-level constant that should be annotated withtyping.ClassVarto clarify it's not an instance variable.Apply this diff:
+from typing import Dict, List, Any, Tuple, ClassVar + class ListResource(SharepointResource): """...""" - _required_options = ["list_id", "fields"] + _required_options: ClassVar[List[str]] = ["list_id", "fields"]
119-122: Improve exception handling with chaining.Exception chaining with
raise ... from epreserves the original traceback and provides better debugging context. This is especially important for data conversion errors where understanding the root cause is critical.Apply this diff:
except AttributeError as e: - raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") + raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") from e except Exception as e: - raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") + raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") from e
272-274: Unused parameters are acceptable for interface compliance.The
schemaandoverwriteparameters are part of theDataSource.streamWriter()interface contract. While unused in this implementation, they must be present. Consider adding a docstring note explaining why they're not used.Optionally add a docstring to clarify:
def streamWriter(self, schema: StructType, overwrite: bool) -> "SharepointStreamWriter": """Create a stream writer for Sharepoint datasource integration. Args: schema: Input schema (unused - inferred from data) overwrite: Overwrite flag (unused - append-only datasource) """ return SharepointStreamWriter(options=self.options, datasource=self.name())
284-285: Annotate class attributes withClassVar.Similar to
ListResource, these class-level constants should be annotated withtyping.ClassVar.Apply this diff:
+from typing import Dict, List, Any, Tuple, ClassVar + class SharepointStreamWriter(DataSourceStreamWriter): """...""" _graph_api_version = "1.0" _graph_api_url = "https://graph.microsoft.com" _auth_url = "https://login.microsoftonline.com" - _required_options = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"] - _supported_resources = {"list": ListResource} + _required_options: ClassVar[List[str]] = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"] + _supported_resources: ClassVar[Dict[str, type]] = {"list": ListResource}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
README.md(1 hunks)docs/data-sources-guide.md(2 hunks)pyproject.toml(2 hunks)pyspark_datasources/__init__.py(1 hunks)pyspark_datasources/sharepoint.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pyspark_datasources/__init__.py (1)
pyspark_datasources/sharepoint.py (2)
SharepointResource(14-63)SharepointDataSource(149-274)
🪛 LanguageTool
README.md
[uncategorized] ~60-~60: The official name of this software platform is spelled with a capital “H”.
Context: ...nstall pyspark-data-sources[faker]| |github` | Batch | Read GitHub pull requ...
(GITHUB)
🪛 Ruff (0.14.1)
pyspark_datasources/sharepoint.py
91-91: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
119-119: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
120-120: Create your own exception
(TRY002)
120-120: Avoid specifying long messages outside the exception class
(TRY003)
121-121: Do not catch blind exception: Exception
(BLE001)
122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
122-122: Create your own exception
(TRY002)
122-122: Avoid specifying long messages outside the exception class
(TRY003)
122-122: Use explicit conversion flag
Replace with conversion flag
(RUF010)
136-136: Use explicit conversion flag
Replace with conversion flag
(RUF010)
272-272: Unused method argument: schema
(ARG002)
272-272: Unused method argument: overwrite
(ARG002)
284-284: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
285-285: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
303-303: Avoid specifying long messages outside the exception class
(TRY003)
303-303: Use explicit conversion flag
Replace with conversion flag
(RUF010)
309-312: Avoid specifying long messages outside the exception class
(TRY003)
324-327: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
324-327: Avoid specifying long messages outside the exception class
(TRY003)
345-345: Do not catch blind exception: Exception
(BLE001)
346-346: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
347-347: Avoid specifying long messages outside the exception class
(TRY003)
347-347: Use explicit conversion flag
Replace with conversion flag
(RUF010)
366-366: Do not catch blind exception: Exception
(BLE001)
368-368: Use explicit conversion flag
Replace with conversion flag
(RUF010)
382-382: Create your own exception
(TRY002)
402-402: Do not catch blind exception: Exception
(BLE001)
403-403: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
403-403: Create your own exception
(TRY002)
403-403: Avoid specifying long messages outside the exception class
(TRY003)
403-403: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (12)
docs/data-sources-guide.md (2)
15-15: LGTM!The TOC entry is correctly formatted and numbered.
426-477: Comprehensive documentation with clear examples.The documentation provides thorough installation instructions, usage examples, and feature descriptions that align well with the implementation. The examples clearly demonstrate authentication, batching, and error handling options.
pyproject.toml (1)
34-35: LGTM once the dependency declarations are fixed.The extras group definitions are correct. Once lines 23-24 are updated to use
optional = true, these extras declarations will work as intended.pyspark_datasources/sharepoint.py (7)
1-11: LGTM!Import organization and logger setup are appropriate.
125-138: LGTM! Good pattern for async API calls.The inline imports avoid Spark serialization issues, and the error handling pattern (log warning then re-raise) is appropriate for this context.
140-147: LGTM!The commit message dataclass appropriately tracks write results.
354-362: LGTM! Clean batching pattern.The nested
flush_bufferfunction withnonlocalvariables is a clean pattern for managing batched writes.
363-377: LGTM! Robust record processing logic.The conversion loop correctly handles errors, buffers records efficiently, and ensures all records are flushed.
379-386: LGTM! Appropriate error handling with fail-fast control.The error aggregation and conditional failure behavior based on
fail_fastprovides users with control over error tolerance.
388-415: LGTM! Solid async pattern with proper concurrency control.The use of
asyncio.Semaphoreto limit concurrent API calls andasyncio.gather(..., return_exceptions=True)to collect results is a robust pattern for batch processing with the Microsoft Graph API.pyspark_datasources/__init__.py (1)
13-13: LGTM! Appropriate public API exports.Exposing both
SharepointResource(for extensibility) andSharepointDataSource(for usage) is consistent with the project's public API design.README.md (1)
57-68: LGTM! Table entry is accurate and well-formatted.The Sharepoint datasource entry correctly describes it as a Stream Write datasource with the appropriate install tag.
| try: | ||
| credentials = ClientSecretCredential( | ||
| tenant_id=self.tenant_id, | ||
| client_id=self.client_id, | ||
| client_secret=self.client_secret, | ||
| ) | ||
| client = GraphServiceClient(credentials=credentials, scopes=self.scopes) | ||
| logger.info(f"Connected to Microsoft Graph API (batch {batch_id})") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Could not connect to Microsoft Graph API (batch {batch_id})") | ||
| raise ConnectionError(f"Microsoft Graph API connection failed: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Consider security best practices for credential handling.
While the current credential handling is functional, ensure that:
- Credentials are not logged in case of errors (currently safe)
- The
ClientSecretCredentialobject properly clears secrets from memory - Users are advised to use secret management services in production
The inline import pattern and error handling are appropriate.
Optionally verify Azure Identity library's credential handling:
🌐 Web query:
Does azure-identity ClientSecretCredential securely handle client secrets in memory?
💡 Result:
Short answer: No — ClientSecretCredential does not provide OS-level protected/locked memory for the client secret. It accepts and retains the secret as a normal in-process string/object and passes it to MSAL/token endpoints; there’s no special secure-memory handling or automatic zeroing. Use certificates/managed identity or external secret stores to reduce exposure. [1][2][3][4]
Sources:
- azure-identity ClientSecretCredential implementation (shows secret accepted as a normal parameter). [1]
- MSAL / Microsoft docs on client secrets and recommended alternatives (certificates, managed identities). [2][3]
- Community discussion noting memory/network exposure and recommending alternatives. [4]
References:
[1] azure.identity._credentials.client_secret source/docs.
[2] MSAL Python - Client credentials (Microsoft Learn).
[3] Azure Identity docs / guidance on using certificates or managed identity.
[4] StackOverflow discussion re: memory exposure of client secrets.
Address critical security vulnerability in credential handling.
ClientSecretCredential does not provide OS-level protected/locked memory for the client secret and retains it as a normal in-process string/object with no automatic zeroing. The current implementation at lines 337-341 directly passes client_secret to ClientSecretCredential, creating memory exposure risk.
Required: Replace with certificates or managed identity approaches to eliminate the credential exposure in memory. For Azure deployments, use either:
- Client certificate credentials
- Managed identity (if running on Azure infrastructure)
- External secret store with token-based authentication
This is a security boundary issue, not a best-practice recommendation.
🧰 Tools
🪛 Ruff (0.14.1)
345-345: Do not catch blind exception: Exception
(BLE001)
346-346: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
347-347: Avoid specifying long messages outside the exception class
(TRY003)
347-347: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
In pyspark_datasources/sharepoint.py around lines 336 to 347, the code currently
passes a plaintext client_secret into ClientSecretCredential which retains that
secret in process memory; replace this with a certificate- or identity-based
flow to avoid in-memory secrets. Concretely: stop passing client_secret; instead
load a client certificate from a secure store (Key Vault or file protected by
OS-level permissions) and instantiate ClientCertificateCredential with
tenant_id, client_id and the certificate, or use ManagedIdentityCredential /
DefaultAzureCredential when running on Azure so no long-lived secret is
constructed; if you must retrieve credentials from Key Vault, fetch only
short-lived tokens and do not store the secret on module scope (use ephemeral
variables and zero references promptly); update GraphServiceClient creation to
use the new credential object and remove any persistent client_secret variables
from the module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
pyproject.toml (1)
23-24: LGTM! Dependencies correctly configured.The optional dependency syntax is now correct, using
optional = trueas required by Poetry.pyspark_datasources/sharepoint.py (2)
61-63: LGTM! Option validation correctly handles both dotted and non-dotted names.The fix using
rsplit(".", 1)[-1]correctly extracts the attribute name for both formats (e.g., "auth.client_id" → "client_id" and "list_id" → "list_id").
191-196: LGTM! Docstring correctly identifies the datasource.The documentation example properly says "Register the Sharepoint Datasource" (previously had copy-paste error from Salesforce).
🧹 Nitpick comments (2)
pyspark_datasources/sharepoint.py (2)
107-123: Consider exception chaining for better debugging.The exception handling here is functional, but adding exception chaining with
raise ... from ewould preserve the full stack trace for debugging.Apply this diff if you want to improve exception chaining:
except AttributeError as e: - raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") + raise Exception(f"Field '{rowfield}' not found in row with keys {', '.join(row.asDict().keys())}") from e except Exception as e: - raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") + raise Exception(f"Conversion failed for field '{rowfield}' --> '{listfield}': {str(e)}") from e
284-285: Consider adding ClassVar annotations for class-level constants.The
_required_optionsand_supported_resourcesattributes are class-level constants and should be annotated withtyping.ClassVarfor better type clarity.Apply this diff:
+from typing import Dict, List, Any, Tuple, ClassVar + class SharepointStreamWriter(DataSourceStreamWriter): """Stream writer implementation for Sharepoint datasource integration.""" _graph_api_version = "1.0" _graph_api_url = "https://graph.microsoft.com" _auth_url = "https://login.microsoftonline.com" - _required_options = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"] - _supported_resources = {"list": ListResource} + _required_options: ClassVar[List[str]] = ["auth.client_id", "auth.client_secret", "auth.tenant_id", "site_id"] + _supported_resources: ClassVar[Dict[str, type]] = {"list": ListResource}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pyproject.toml(2 hunks)pyspark_datasources/sharepoint.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.14.1)
pyspark_datasources/sharepoint.py
91-91: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
119-119: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
120-120: Create your own exception
(TRY002)
120-120: Avoid specifying long messages outside the exception class
(TRY003)
121-121: Do not catch blind exception: Exception
(BLE001)
122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
122-122: Create your own exception
(TRY002)
122-122: Avoid specifying long messages outside the exception class
(TRY003)
122-122: Use explicit conversion flag
Replace with conversion flag
(RUF010)
136-136: Use explicit conversion flag
Replace with conversion flag
(RUF010)
272-272: Unused method argument: schema
(ARG002)
272-272: Unused method argument: overwrite
(ARG002)
284-284: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
285-285: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
303-303: Avoid specifying long messages outside the exception class
(TRY003)
303-303: Use explicit conversion flag
Replace with conversion flag
(RUF010)
309-312: Avoid specifying long messages outside the exception class
(TRY003)
324-327: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
324-327: Avoid specifying long messages outside the exception class
(TRY003)
345-345: Do not catch blind exception: Exception
(BLE001)
346-346: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
347-347: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
347-347: Avoid specifying long messages outside the exception class
(TRY003)
347-347: Use explicit conversion flag
Replace with conversion flag
(RUF010)
366-366: Do not catch blind exception: Exception
(BLE001)
368-368: Use explicit conversion flag
Replace with conversion flag
(RUF010)
382-382: Create your own exception
(TRY002)
402-402: Do not catch blind exception: Exception
(BLE001)
403-403: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
403-403: Create your own exception
(TRY002)
403-403: Avoid specifying long messages outside the exception class
(TRY003)
403-403: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (7)
pyproject.toml (1)
34-35: LGTM! Extras groups properly configured.The new
sharepointextras group and updatedallgroup correctly reference the optional dependencies, allowing users to install SharePoint support withpip install pyspark-data-sources[sharepoint].pyspark_datasources/sharepoint.py (6)
1-11: LGTM! Clean import structure.The imports are well-organized and all necessary dependencies are present.
125-137: LGTM! Async push implementation with proper error handling.The inline imports avoid serialization issues, and the exception handling appropriately logs failures before re-raising.
140-147: LGTM! Commit message structure is appropriate.The dataclass properly extends
WriterCommitMessagewith fields needed to track write operations.
349-386: LGTM! Write method implements robust buffering and error handling.The implementation properly:
- Buffers records to control memory usage
- Tracks successes and failures separately
- Supports both fail-fast and tolerant modes
- Flushes remaining records after iteration
The exception handling approach is appropriate for a data pipeline where you want to continue processing even if some records fail.
388-415: LGTM! Concurrent record pushing with proper concurrency control.The implementation correctly:
- Uses a semaphore to limit concurrent API calls to batch_size
- Leverages
asyncio.gatherwithreturn_exceptions=Trueto process all records even if some fail- Aggregates results to separate successes from failures
The async approach efficiently handles I/O-bound operations against the Microsoft Graph API.
336-347: ---Verify authentication method aligns with your deployment model and production requirements.
Microsoft advises against using long-lived client secrets in production due to leakage and rotation risk. For this SharePoint connector in a production context:
- If running on Azure infrastructure, use managed identity (ManagedIdentityCredential or DefaultAzureCredential)—it is secretless and requires no credential rotation.
- If managed identity isn't available, use ClientCertificateCredential with a service principal certificate instead of a client secret.
- ClientSecretCredential is acceptable only for short-lived test/dev scenarios.
Before merging, confirm whether:
- This code will run in Azure (enabling managed identity) or external infrastructure (requiring certificate-based auth)
- You intend to address this before production deployment
added sharepoint datasource
Summary by CodeRabbit
New Features
Documentation