Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 467 additions & 0 deletions .claude/skills/connector-ops/SKILL.md

Large diffs are not rendered by default.

204 changes: 204 additions & 0 deletions .claude/skills/connector-ops/assets/templates/database.py.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
Database Connector Template

Replace placeholders:
- {ClassName}: PascalCase class name (e.g., PostgreSQL, MongoDB)
- {connector_name}: lowercase connector name (e.g., postgresql, mongodb)
- {display_name}: Display name (e.g., "PostgreSQL", "MongoDB")
- {description}: Brief description
- {uuid}: Generated UUID (use uuid4())
- {icon_name}: Icon filename (e.g., "Postgresql.png")
- {connection_lib}: Python library for connections (e.g., psycopg2, pymongo)
"""

import os
from typing import Any

from unstract.connectors.databases.unstract_db import UnstractDB
from unstract.connectors.exceptions import ConnectorError


class {ClassName}(UnstractDB):
"""
{display_name} database connector.

{description}
"""

def __init__(self, settings: dict[str, Any]):
super().__init__("{display_name}")

# Connection URL mode
self.connection_url = settings.get("connection_url", "")

# Individual parameters mode
self.host = settings.get("host", "")
self.port = settings.get("port", "{default_port}")
self.database = settings.get("database", "")
self.user = settings.get("user", "")
self.password = settings.get("password", "")

# Optional settings
self.schema = settings.get("schema", "")
self.ssl_enabled = settings.get("sslEnabled", False)

@staticmethod
def get_id() -> str:
return "{connector_name}|{uuid}"

@staticmethod
def get_name() -> str:
return "{display_name}"

@staticmethod
def get_description() -> str:
return "{description}"

@staticmethod
def get_icon() -> str:
return "/icons/connector-icons/{icon_name}"

@staticmethod
def get_json_schema() -> str:
schema_path = os.path.join(
os.path.dirname(__file__),
"static",
"json_schema.json"
)
with open(schema_path, "r") as f:
return f.read()

@staticmethod
def can_write() -> bool:
return True

@staticmethod
def can_read() -> bool:
return True

@staticmethod
def requires_oauth() -> bool:
return False

@staticmethod
def python_social_auth_backend() -> str:
return ""

def get_engine(self) -> Any:
"""
Return database connection.

Returns:
Database connection object
"""
# Import here for fork safety
import {connection_lib}

try:
conn_params = {
# TCP keepalive for long-running queries
"connect_timeout": 30,
}

if self.connection_url:
# URL mode
conn_params["dsn"] = self.connection_url
else:
# Individual params mode
conn_params.update({
"host": self.host,
"port": int(self.port),
"database": self.database,
"user": self.user,
"password": self.password,
})

# Add SSL if enabled
if self.ssl_enabled:
conn_params["ssl"] = True
# Add more SSL options as needed:
# conn_params["ssl_ca"] = self.ssl_ca
# conn_params["ssl_cert"] = self.ssl_cert
# conn_params["ssl_key"] = self.ssl_key

return {connection_lib}.connect(**conn_params)

except Exception as e:
raise ConnectorError(
f"Failed to connect to {display_name}: {str(e)}",
treat_as_user_message=True
) from e

def test_credentials(self) -> bool:
"""
Test database credentials.

Returns:
True if connection successful

Raises:
ConnectorError: If connection fails
"""
try:
conn = self.get_engine()
# Execute simple test query
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
conn.close()
return True
except Exception as e:
raise ConnectorError(
f"Connection test failed: {str(e)}",
treat_as_user_message=True
) from e

def execute(self, query: str) -> list[tuple]:
"""
Execute SQL query.

Args:
query: SQL query string

Returns:
List of result tuples
"""
conn = self.get_engine()
try:
with conn.cursor() as cursor:
cursor.execute(query)
if cursor.description: # SELECT query
return cursor.fetchall()
else: # INSERT/UPDATE/DELETE
conn.commit()
return []
finally:
conn.close()

def sql_to_db_mapping(self, value: Any, column_name: str | None = None) -> str:
"""
Map Python types to database types.

Args:
value: Python value to map
column_name: Optional column name hint

Returns:
Database type string
"""
if value is None:
return "TEXT"

if isinstance(value, bool):
return "BOOLEAN"
elif isinstance(value, int):
return "INTEGER"
elif isinstance(value, float):
return "DOUBLE PRECISION"
elif isinstance(value, dict):
return "JSON" # or JSONB for PostgreSQL
elif isinstance(value, list):
return "JSON"
elif isinstance(value, bytes):
return "BYTEA"
else:
return "TEXT"
Comment on lines +177 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Template missing required abstract methods from UnstractDB.

Based on the UnstractDB base class, this template should also implement:

  • prepare_multi_column_migration(self, table_name: str, column_name: str) -> str | list
  • get_create_table_base_query(self, table: str) -> str

These are required for table migration and creation operations.

🔎 Proposed additions after `sql_to_db_mapping`
    def prepare_multi_column_migration(
        self, table_name: str, column_name: str
    ) -> str | list:
        """
        Prepare SQL for column migration.

        Args:
            table_name: Name of the table
            column_name: Name of the column to migrate

        Returns:
            SQL statement(s) for migration
        """
        # Adapt for specific database:
        # PostgreSQL: ALTER TABLE ... ADD COLUMN ...
        return f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {column_name}_v2 TEXT"

    def get_create_table_base_query(self, table: str) -> str:
        """
        Get base CREATE TABLE query.

        Args:
            table: Table name

        Returns:
            Base CREATE TABLE SQL
        """
        # Adapt for specific database syntax
        return f"CREATE TABLE IF NOT EXISTS {table}"
🤖 Prompt for AI Agents
In .claude/skills/connector-ops/assets/templates/database_template.py around
lines 177 to 204, the template class is missing two required implementations
from UnstractDB: prepare_multi_column_migration(self, table_name: str,
column_name: str) -> str | list and get_create_table_base_query(self, table:
str) -> str; add these two methods below sql_to_db_mapping with matching
signatures, simple database-agnostic default implementations (e.g.,
prepare_multi_column_migration returns an ALTER TABLE ... ADD COLUMN IF NOT
EXISTS {column_name}_v2 TEXT string or a list of such statements, and
get_create_table_base_query returns a CREATE TABLE IF NOT EXISTS {table} base
string), ensure docstrings match other methods and return types follow the
annotated Union[str, list] where appropriate.

190 changes: 190 additions & 0 deletions .claude/skills/connector-ops/assets/templates/filesystem.py.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Filesystem Connector Template

Replace placeholders:
- {ClassName}: PascalCase class name (e.g., MinioFS, AzureBlobFS)
- {connector_name}: lowercase connector name (e.g., minio, azure_blob)
- {display_name}: Display name (e.g., "Minio", "Azure Blob Storage")
- {description}: Brief description
- {uuid}: Generated UUID (use uuid4())
- {icon_name}: Icon filename (e.g., "Minio.png")
- {fsspec_class}: fsspec filesystem class (e.g., S3FileSystem, AzureBlobFileSystem)
- {fsspec_module}: Module path for fsspec class (e.g., s3fs, adlfs)
"""

import os
import threading
from typing import Any

from fsspec import AbstractFileSystem

from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.connectors.exceptions import ConnectorError


class {ClassName}(UnstractFileSystem):
"""
{display_name} filesystem connector.

{description}
"""

def __init__(self, settings: dict[str, Any]):
super().__init__("{display_name}")

# Store settings - DO NOT initialize clients in __init__
# (prevents gRPC issues with Celery fork)
self._settings = settings

# Authentication settings
self.access_key = settings.get("access_key", "")
self.secret_key = settings.get("secret_key", "")
self.endpoint_url = settings.get("endpoint_url", "")
self.bucket = settings.get("bucket", "")
self.region = settings.get("region", "")

# Lazy initialization
self._fs = None
self._fs_lock = threading.Lock()

@staticmethod
def get_id() -> str:
return "{connector_name}|{uuid}"

@staticmethod
def get_name() -> str:
return "{display_name}"

@staticmethod
def get_description() -> str:
return "{description}"

@staticmethod
def get_icon() -> str:
return "/icons/connector-icons/{icon_name}"

@staticmethod
def get_json_schema() -> str:
schema_path = os.path.join(
os.path.dirname(__file__),
"static",
"json_schema.json"
)
with open(schema_path, "r") as f:
return f.read()

@staticmethod
def can_write() -> bool:
return True

@staticmethod
def can_read() -> bool:
return True

@staticmethod
def requires_oauth() -> bool:
return False

@staticmethod
def python_social_auth_backend() -> str:
return ""

def get_fsspec_fs(self) -> AbstractFileSystem:
"""
Return fsspec filesystem instance.

Uses lazy initialization with thread safety for Celery compatibility.

Returns:
fsspec AbstractFileSystem instance
"""
if self._fs is None:
with self._fs_lock:
if self._fs is None:
# Import here for fork safety
from {fsspec_module} import {fsspec_class}

try:
self._fs = {fsspec_class}(
key=self.access_key,
secret=self.secret_key,
endpoint_url=self.endpoint_url,
# Add more options as needed:
# client_kwargs={"region_name": self.region},
)
except Exception as e:
raise ConnectorError(
f"Failed to initialize filesystem: {str(e)}",
treat_as_user_message=True
) from e

return self._fs

def test_credentials(self) -> bool:
"""
Test filesystem credentials.

Returns:
True if connection successful

Raises:
ConnectorError: If connection fails
"""
try:
fs = self.get_fsspec_fs()
# Try to list the bucket/root to verify access
fs.ls(self.bucket or "/")
return True
except Exception as e:
raise ConnectorError(
f"Credential test failed: {str(e)}",
treat_as_user_message=True
) from e

def extract_metadata_file_hash(self, metadata: dict[str, Any]) -> str | None:
"""
Extract unique file hash from fsspec metadata.

Different storage systems use different keys for file hashes:
- S3/Minio: ETag
- Azure: content_md5
- GCS: md5Hash

Args:
metadata: File metadata from fsspec

Returns:
File hash string or None
"""
# Try common hash fields
hash_fields = ["ETag", "etag", "md5Hash", "content_md5", "contentHash"]

for field in hash_fields:
if field in metadata:
value = metadata[field]
# Remove quotes from ETags
if isinstance(value, str):
return value.strip('"')
return str(value)

return None

def is_dir_by_metadata(self, metadata: dict[str, Any]) -> bool:
"""
Check if path is a directory from metadata.

Args:
metadata: File metadata from fsspec

Returns:
True if path is a directory
"""
# Different storage systems indicate directories differently
if metadata.get("type") == "directory":
return True
if metadata.get("StorageClass") == "DIRECTORY":
return True
if metadata.get("size") == 0 and metadata.get("name", "").endswith("/"):
return True

return False
Comment on lines +144 to +190
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Template missing extract_modified_date abstract method.

Based on the UnstractFileSystem base class, this template should also implement extract_modified_date(self, metadata: dict[str, Any]) -> datetime | None. This method is required for file sorting operations.

🔎 Proposed addition after `is_dir_by_metadata`
    def extract_modified_date(self, metadata: dict[str, Any]) -> datetime | None:
        """
        Extract last modified date from fsspec metadata.

        Args:
            metadata: File metadata from fsspec

        Returns:
            Datetime object or None if not available
        """
        from datetime import datetime, timezone

        # Try common date fields
        date_fields = ["LastModified", "last_modified", "mtime", "updated", "timeModified"]

        for field in date_fields:
            if field in metadata:
                value = metadata[field]
                if isinstance(value, datetime):
                    return value.replace(tzinfo=timezone.utc) if value.tzinfo is None else value
                if isinstance(value, (int, float)):
                    # Unix timestamp
                    return datetime.fromtimestamp(value, tz=timezone.utc)
                if isinstance(value, str):
                    try:
                        return datetime.fromisoformat(value.replace("Z", "+00:00"))
                    except ValueError:
                        continue

        return None
🤖 Prompt for AI Agents
In .claude/skills/connector-ops/assets/templates/filesystem_template.py around
lines 144 to 190, the template class is missing the required abstract method
extract_modified_date(self, metadata: dict[str, Any]) -> datetime | None; add
this method after is_dir_by_metadata that imports datetime and timezone, checks
common date fields (e.g., "LastModified", "last_modified", "mtime", "updated",
"timeModified"), handles values that are datetime (ensure tzinfo is set to UTC
if missing), numeric unix timestamps (convert with datetime.fromtimestamp(...,
tz=timezone.utc)), and ISO8601 strings (try datetime.fromisoformat after
replacing "Z" with "+00:00"), returning None if no valid date found.

Loading
Loading