-
Notifications
You must be signed in to change notification settings - Fork 576
feat: Add connector-ops skill and SharePoint/OneDrive connector #1722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5ebec9c
3c0ccf0
4808b63
2d4944b
01a4361
23174cb
5640c1e
b9f43bf
bdc961f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,204 @@ | ||
| """ | ||
| Database Connector Template | ||
|
|
||
| Replace placeholders: | ||
| - {ClassName}: PascalCase class name (e.g., PostgreSQL, MongoDB) | ||
| - {connector_name}: lowercase connector name (e.g., postgresql, mongodb) | ||
| - {display_name}: Display name (e.g., "PostgreSQL", "MongoDB") | ||
| - {description}: Brief description | ||
| - {uuid}: Generated UUID (use uuid4()) | ||
| - {icon_name}: Icon filename (e.g., "Postgresql.png") | ||
| - {connection_lib}: Python library for connections (e.g., psycopg2, pymongo) | ||
| """ | ||
|
|
||
| import os | ||
| from typing import Any | ||
|
|
||
| from unstract.connectors.databases.unstract_db import UnstractDB | ||
| from unstract.connectors.exceptions import ConnectorError | ||
|
|
||
|
|
||
| class {ClassName}(UnstractDB): | ||
| """ | ||
| {display_name} database connector. | ||
|
|
||
| {description} | ||
| """ | ||
|
|
||
| def __init__(self, settings: dict[str, Any]): | ||
| super().__init__("{display_name}") | ||
|
|
||
| # Connection URL mode | ||
| self.connection_url = settings.get("connection_url", "") | ||
|
|
||
| # Individual parameters mode | ||
| self.host = settings.get("host", "") | ||
| self.port = settings.get("port", "{default_port}") | ||
| self.database = settings.get("database", "") | ||
| self.user = settings.get("user", "") | ||
| self.password = settings.get("password", "") | ||
|
|
||
| # Optional settings | ||
| self.schema = settings.get("schema", "") | ||
| self.ssl_enabled = settings.get("sslEnabled", False) | ||
|
|
||
| @staticmethod | ||
| def get_id() -> str: | ||
| return "{connector_name}|{uuid}" | ||
|
|
||
| @staticmethod | ||
| def get_name() -> str: | ||
| return "{display_name}" | ||
|
|
||
| @staticmethod | ||
| def get_description() -> str: | ||
| return "{description}" | ||
|
|
||
| @staticmethod | ||
| def get_icon() -> str: | ||
| return "/icons/connector-icons/{icon_name}" | ||
|
|
||
| @staticmethod | ||
| def get_json_schema() -> str: | ||
| schema_path = os.path.join( | ||
| os.path.dirname(__file__), | ||
| "static", | ||
| "json_schema.json" | ||
| ) | ||
| with open(schema_path, "r") as f: | ||
| return f.read() | ||
|
|
||
| @staticmethod | ||
| def can_write() -> bool: | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def can_read() -> bool: | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def requires_oauth() -> bool: | ||
| return False | ||
|
|
||
| @staticmethod | ||
| def python_social_auth_backend() -> str: | ||
| return "" | ||
|
|
||
| def get_engine(self) -> Any: | ||
| """ | ||
| Return database connection. | ||
|
|
||
| Returns: | ||
| Database connection object | ||
| """ | ||
| # Import here for fork safety | ||
| import {connection_lib} | ||
|
|
||
| try: | ||
| conn_params = { | ||
| # TCP keepalive for long-running queries | ||
| "connect_timeout": 30, | ||
| } | ||
|
|
||
| if self.connection_url: | ||
| # URL mode | ||
| conn_params["dsn"] = self.connection_url | ||
| else: | ||
| # Individual params mode | ||
| conn_params.update({ | ||
| "host": self.host, | ||
| "port": int(self.port), | ||
| "database": self.database, | ||
| "user": self.user, | ||
| "password": self.password, | ||
| }) | ||
|
|
||
| # Add SSL if enabled | ||
| if self.ssl_enabled: | ||
| conn_params["ssl"] = True | ||
| # Add more SSL options as needed: | ||
| # conn_params["ssl_ca"] = self.ssl_ca | ||
| # conn_params["ssl_cert"] = self.ssl_cert | ||
| # conn_params["ssl_key"] = self.ssl_key | ||
|
|
||
| return {connection_lib}.connect(**conn_params) | ||
|
|
||
| except Exception as e: | ||
| raise ConnectorError( | ||
| f"Failed to connect to {display_name}: {str(e)}", | ||
| treat_as_user_message=True | ||
| ) from e | ||
|
|
||
| def test_credentials(self) -> bool: | ||
| """ | ||
| Test database credentials. | ||
|
|
||
| Returns: | ||
| True if connection successful | ||
|
|
||
| Raises: | ||
| ConnectorError: If connection fails | ||
| """ | ||
| try: | ||
| conn = self.get_engine() | ||
| # Execute simple test query | ||
| with conn.cursor() as cursor: | ||
| cursor.execute("SELECT 1") | ||
| conn.close() | ||
| return True | ||
| except Exception as e: | ||
| raise ConnectorError( | ||
| f"Connection test failed: {str(e)}", | ||
| treat_as_user_message=True | ||
| ) from e | ||
|
|
||
| def execute(self, query: str) -> list[tuple]: | ||
| """ | ||
| Execute SQL query. | ||
|
|
||
| Args: | ||
| query: SQL query string | ||
|
|
||
| Returns: | ||
| List of result tuples | ||
| """ | ||
| conn = self.get_engine() | ||
| try: | ||
| with conn.cursor() as cursor: | ||
| cursor.execute(query) | ||
| if cursor.description: # SELECT query | ||
| return cursor.fetchall() | ||
| else: # INSERT/UPDATE/DELETE | ||
| conn.commit() | ||
| return [] | ||
| finally: | ||
| conn.close() | ||
|
|
||
| def sql_to_db_mapping(self, value: Any, column_name: str | None = None) -> str: | ||
| """ | ||
| Map Python types to database types. | ||
|
|
||
| Args: | ||
| value: Python value to map | ||
| column_name: Optional column name hint | ||
|
|
||
| Returns: | ||
| Database type string | ||
| """ | ||
| if value is None: | ||
| return "TEXT" | ||
|
|
||
| if isinstance(value, bool): | ||
| return "BOOLEAN" | ||
| elif isinstance(value, int): | ||
| return "INTEGER" | ||
| elif isinstance(value, float): | ||
| return "DOUBLE PRECISION" | ||
| elif isinstance(value, dict): | ||
| return "JSON" # or JSONB for PostgreSQL | ||
| elif isinstance(value, list): | ||
| return "JSON" | ||
| elif isinstance(value, bytes): | ||
| return "BYTEA" | ||
| else: | ||
| return "TEXT" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| """ | ||
| Filesystem Connector Template | ||
|
|
||
| Replace placeholders: | ||
| - {ClassName}: PascalCase class name (e.g., MinioFS, AzureBlobFS) | ||
| - {connector_name}: lowercase connector name (e.g., minio, azure_blob) | ||
| - {display_name}: Display name (e.g., "Minio", "Azure Blob Storage") | ||
| - {description}: Brief description | ||
| - {uuid}: Generated UUID (use uuid4()) | ||
| - {icon_name}: Icon filename (e.g., "Minio.png") | ||
| - {fsspec_class}: fsspec filesystem class (e.g., S3FileSystem, AzureBlobFileSystem) | ||
| - {fsspec_module}: Module path for fsspec class (e.g., s3fs, adlfs) | ||
| """ | ||
|
|
||
| import os | ||
| import threading | ||
| from typing import Any | ||
|
|
||
| from fsspec import AbstractFileSystem | ||
|
|
||
| from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem | ||
| from unstract.connectors.exceptions import ConnectorError | ||
|
|
||
|
|
||
| class {ClassName}(UnstractFileSystem): | ||
| """ | ||
| {display_name} filesystem connector. | ||
|
|
||
| {description} | ||
| """ | ||
|
|
||
| def __init__(self, settings: dict[str, Any]): | ||
| super().__init__("{display_name}") | ||
|
|
||
| # Store settings - DO NOT initialize clients in __init__ | ||
| # (prevents gRPC issues with Celery fork) | ||
| self._settings = settings | ||
|
|
||
| # Authentication settings | ||
| self.access_key = settings.get("access_key", "") | ||
| self.secret_key = settings.get("secret_key", "") | ||
| self.endpoint_url = settings.get("endpoint_url", "") | ||
| self.bucket = settings.get("bucket", "") | ||
| self.region = settings.get("region", "") | ||
|
|
||
| # Lazy initialization | ||
| self._fs = None | ||
| self._fs_lock = threading.Lock() | ||
|
|
||
| @staticmethod | ||
| def get_id() -> str: | ||
| return "{connector_name}|{uuid}" | ||
|
|
||
| @staticmethod | ||
| def get_name() -> str: | ||
| return "{display_name}" | ||
|
|
||
| @staticmethod | ||
| def get_description() -> str: | ||
| return "{description}" | ||
|
|
||
| @staticmethod | ||
| def get_icon() -> str: | ||
| return "/icons/connector-icons/{icon_name}" | ||
|
|
||
| @staticmethod | ||
| def get_json_schema() -> str: | ||
| schema_path = os.path.join( | ||
| os.path.dirname(__file__), | ||
| "static", | ||
| "json_schema.json" | ||
| ) | ||
| with open(schema_path, "r") as f: | ||
| return f.read() | ||
|
|
||
| @staticmethod | ||
| def can_write() -> bool: | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def can_read() -> bool: | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def requires_oauth() -> bool: | ||
| return False | ||
|
|
||
| @staticmethod | ||
| def python_social_auth_backend() -> str: | ||
| return "" | ||
|
|
||
| def get_fsspec_fs(self) -> AbstractFileSystem: | ||
| """ | ||
| Return fsspec filesystem instance. | ||
|
|
||
| Uses lazy initialization with thread safety for Celery compatibility. | ||
|
|
||
| Returns: | ||
| fsspec AbstractFileSystem instance | ||
| """ | ||
| if self._fs is None: | ||
| with self._fs_lock: | ||
| if self._fs is None: | ||
| # Import here for fork safety | ||
| from {fsspec_module} import {fsspec_class} | ||
|
|
||
| try: | ||
| self._fs = {fsspec_class}( | ||
| key=self.access_key, | ||
| secret=self.secret_key, | ||
| endpoint_url=self.endpoint_url, | ||
| # Add more options as needed: | ||
| # client_kwargs={"region_name": self.region}, | ||
| ) | ||
| except Exception as e: | ||
| raise ConnectorError( | ||
| f"Failed to initialize filesystem: {str(e)}", | ||
| treat_as_user_message=True | ||
| ) from e | ||
|
|
||
| return self._fs | ||
|
|
||
| def test_credentials(self) -> bool: | ||
| """ | ||
| Test filesystem credentials. | ||
|
|
||
| Returns: | ||
| True if connection successful | ||
|
|
||
| Raises: | ||
| ConnectorError: If connection fails | ||
| """ | ||
| try: | ||
| fs = self.get_fsspec_fs() | ||
| # Try to list the bucket/root to verify access | ||
| fs.ls(self.bucket or "/") | ||
| return True | ||
| except Exception as e: | ||
| raise ConnectorError( | ||
| f"Credential test failed: {str(e)}", | ||
| treat_as_user_message=True | ||
| ) from e | ||
|
|
||
| def extract_metadata_file_hash(self, metadata: dict[str, Any]) -> str | None: | ||
| """ | ||
| Extract unique file hash from fsspec metadata. | ||
|
|
||
| Different storage systems use different keys for file hashes: | ||
| - S3/Minio: ETag | ||
| - Azure: content_md5 | ||
| - GCS: md5Hash | ||
|
|
||
| Args: | ||
| metadata: File metadata from fsspec | ||
|
|
||
| Returns: | ||
| File hash string or None | ||
| """ | ||
| # Try common hash fields | ||
| hash_fields = ["ETag", "etag", "md5Hash", "content_md5", "contentHash"] | ||
|
|
||
| for field in hash_fields: | ||
| if field in metadata: | ||
| value = metadata[field] | ||
| # Remove quotes from ETags | ||
| if isinstance(value, str): | ||
| return value.strip('"') | ||
| return str(value) | ||
|
|
||
| return None | ||
|
|
||
| def is_dir_by_metadata(self, metadata: dict[str, Any]) -> bool: | ||
| """ | ||
| Check if path is a directory from metadata. | ||
|
|
||
| Args: | ||
| metadata: File metadata from fsspec | ||
|
|
||
| Returns: | ||
| True if path is a directory | ||
| """ | ||
| # Different storage systems indicate directories differently | ||
| if metadata.get("type") == "directory": | ||
| return True | ||
| if metadata.get("StorageClass") == "DIRECTORY": | ||
| return True | ||
| if metadata.get("size") == 0 and metadata.get("name", "").endswith("/"): | ||
| return True | ||
|
|
||
| return False | ||
|
Comment on lines
+144
to
+190
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Template missing Based on the 🔎 Proposed addition after `is_dir_by_metadata` def extract_modified_date(self, metadata: dict[str, Any]) -> datetime | None:
"""
Extract last modified date from fsspec metadata.
Args:
metadata: File metadata from fsspec
Returns:
Datetime object or None if not available
"""
from datetime import datetime, timezone
# Try common date fields
date_fields = ["LastModified", "last_modified", "mtime", "updated", "timeModified"]
for field in date_fields:
if field in metadata:
value = metadata[field]
if isinstance(value, datetime):
return value.replace(tzinfo=timezone.utc) if value.tzinfo is None else value
if isinstance(value, (int, float)):
# Unix timestamp
return datetime.fromtimestamp(value, tz=timezone.utc)
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
continue
return None🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Template missing required abstract methods from
UnstractDB.Based on the
UnstractDBbase class, this template should also implement:prepare_multi_column_migration(self, table_name: str, column_name: str) -> str | listget_create_table_base_query(self, table: str) -> strThese are required for table migration and creation operations.
🔎 Proposed additions after `sql_to_db_mapping`
🤖 Prompt for AI Agents