Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions surfsense_backend/app/tasks/celery_tasks/connector_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Celery tasks for connector indexing."""

import logging
import traceback

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
Expand All @@ -11,6 +12,36 @@
logger = logging.getLogger(__name__)


def _handle_greenlet_error(e: Exception, task_name: str, connector_id: int) -> None:
"""
Handle greenlet_spawn errors with detailed logging for debugging.

The 'greenlet_spawn has not been called' error occurs when:
1. SQLAlchemy lazy-loads a relationship outside of an async context
2. A sync operation is called from an async context (or vice versa)
3. Session objects are accessed after the session is closed

This helper logs detailed context to help identify the root cause.
"""
error_str = str(e)
if "greenlet_spawn has not been called" in error_str:
logger.error(
f"GREENLET ERROR in {task_name} for connector {connector_id}: {error_str}\n"
f"This error typically occurs when SQLAlchemy tries to lazy-load a relationship "
f"outside of an async context. Check for:\n"
f"1. Accessing relationship attributes (e.g., document.chunks, connector.search_space) "
f"without using selectinload() or joinedload()\n"
f"2. Accessing model attributes after the session is closed\n"
f"3. Passing ORM objects between different async contexts\n"
f"Stack trace:\n{traceback.format_exc()}"
)
else:
logger.error(
f"Error in {task_name} for connector {connector_id}: {error_str}\n"
f"Stack trace:\n{traceback.format_exc()}"
)


def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
Expand Down Expand Up @@ -46,6 +77,9 @@ def index_slack_messages_task(
connector_id, search_space_id, user_id, start_date, end_date
)
)
except Exception as e:
_handle_greenlet_error(e, "index_slack_messages", connector_id)
raise
finally:
loop.close()

Expand Down Expand Up @@ -89,6 +123,9 @@ def index_notion_pages_task(
connector_id, search_space_id, user_id, start_date, end_date
)
)
except Exception as e:
_handle_greenlet_error(e, "index_notion_pages", connector_id)
raise
finally:
loop.close()

Expand Down Expand Up @@ -347,6 +384,9 @@ def index_google_calendar_events_task(
connector_id, search_space_id, user_id, start_date, end_date
)
)
except Exception as e:
_handle_greenlet_error(e, "index_google_calendar_events", connector_id)
raise
finally:
loop.close()

Expand Down Expand Up @@ -696,6 +736,9 @@ def index_crawled_urls_task(
connector_id, search_space_id, user_id, start_date, end_date
)
)
except Exception as e:
_handle_greenlet_error(e, "index_crawled_urls", connector_id)
raise
finally:
loop.close()

Expand Down
48 changes: 48 additions & 0 deletions surfsense_backend/app/tasks/connector_indexers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@ def get_current_timestamp() -> datetime:
return datetime.now(UTC)


def parse_date_flexible(date_str: str) -> datetime:
"""
Parse date from multiple common formats.

Args:
date_str: Date string to parse

Returns:
Parsed datetime object

Raises:
ValueError: If unable to parse the date string
"""
formats = ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]

for fmt in formats:
try:
return datetime.strptime(date_str.rstrip("Z"), fmt)
except ValueError:
continue

# Try ISO format as fallback
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
raise ValueError(f"Unable to parse date: {date_str}")


async def check_duplicate_document_by_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
Expand Down Expand Up @@ -159,6 +187,26 @@ def calculate_date_range(
)
end_date_str = end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")

# FIX: Ensure end_date is at least 1 day after start_date to avoid
# "start_date must be strictly before end_date" errors when dates are the same
# (e.g., when last_indexed_at is today)
if start_date_str == end_date_str:
logger.info(
f"Start date ({start_date_str}) equals end date ({end_date_str}), "
"adjusting end date to next day to ensure valid date range"
)
# Parse end_date and add 1 day
try:
end_dt = parse_date_flexible(end_date_str)
except ValueError:
logger.warning(
f"Could not parse end_date '{end_date_str}', using current date"
)
end_dt = datetime.now()
end_dt = end_dt + timedelta(days=1)
end_date_str = end_dt.strftime("%Y-%m-%d")
logger.info(f"Adjusted end date to {end_date_str}")

return start_date_str, end_date_str


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_connector_by_id,
get_current_timestamp,
logger,
parse_date_flexible,
update_connector_last_indexed,
)

Expand Down Expand Up @@ -217,6 +218,26 @@ async def index_google_calendar_events(
start_date_str = start_date
end_date_str = end_date

# FIX: Ensure end_date is at least 1 day after start_date to avoid
# "start_date must be strictly before end_date" errors when dates are the same
# (e.g., when last_indexed_at is today)
if start_date_str == end_date_str:
logger.info(
f"Start date ({start_date_str}) equals end date ({end_date_str}), "
"adjusting end date to next day to ensure valid date range"
)
# Parse end_date and add 1 day
try:
end_dt = parse_date_flexible(end_date_str)
except ValueError:
logger.warning(
f"Could not parse end_date '{end_date_str}', using current date"
)
end_dt = datetime.now()
end_dt = end_dt + timedelta(days=1)
end_date_str = end_dt.strftime("%Y-%m-%d")
logger.info(f"Adjusted end date to {end_date_str}")

await task_logger.log_task_progress(
log_entry,
f"Fetching Google Calendar events from {start_date_str} to {end_date_str}",
Expand Down
43 changes: 37 additions & 6 deletions surfsense_backend/app/tasks/connector_indexers/notion_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,44 @@ async def index_notion_pages(
"Recommend reconnecting with OAuth."
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Notion pages for connector {connector_id}",
str(e),
{"error_type": "PageFetchError"},
error_str = str(e)
# Check if this is an unsupported block type error (transcription, ai_block, etc.)
# These are known Notion API limitations and should be logged as warnings, not errors
unsupported_block_errors = [
"transcription is not supported",
"ai_block is not supported",
"is not supported via the API",
]
is_unsupported_block_error = any(
err in error_str.lower() for err in unsupported_block_errors
)
logger.error(f"Error fetching Notion pages: {e!s}", exc_info=True)

if is_unsupported_block_error:
# Log as warning since this is a known Notion API limitation
logger.warning(
f"Notion API limitation for connector {connector_id}: {error_str}. "
"This is a known issue with Notion AI blocks (transcription, ai_block) "
"that are not accessible via the Notion API."
)
await task_logger.log_task_failure(
log_entry,
f"Failed to get Notion pages: Notion API limitation",
f"{error_str} - This page contains Notion AI content (transcription/ai_block) that cannot be accessed via the API.",
{"error_type": "UnsupportedBlockType", "is_known_limitation": True},
)
else:
# Log as error for other failures
logger.error(
f"Error fetching Notion pages for connector {connector_id}: {error_str}",
exc_info=True,
)
await task_logger.log_task_failure(
log_entry,
f"Failed to get Notion pages for connector {connector_id}",
str(e),
{"error_type": "PageFetchError"},
)

await notion_client.close()
return 0, f"Failed to get Notion pages: {e!s}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,15 @@ async def index_crawled_urls(
api_key = connector.config.get("FIRECRAWL_API_KEY")

# Get URLs from connector config
urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS"))
raw_initial_urls = connector.config.get("INITIAL_URLS")
urls = parse_webcrawler_urls(raw_initial_urls)

# DEBUG: Log connector config details for troubleshooting empty URL issues
logger.info(
f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs"
f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs. "
f"Connector name: {connector.name}, "
f"INITIAL_URLS type: {type(raw_initial_urls).__name__}, "
f"INITIAL_URLS value: {repr(raw_initial_urls)[:200] if raw_initial_urls else 'None'}"
)

# Initialize webcrawler client
Expand All @@ -128,11 +133,18 @@ async def index_crawled_urls(

# Validate URLs
if not urls:
# DEBUG: Log detailed connector config for troubleshooting
logger.error(
f"No URLs provided for indexing. Connector ID: {connector_id}, "
f"Connector name: {connector.name}, "
f"Config keys: {list(connector.config.keys()) if connector.config else 'None'}, "
f"INITIAL_URLS raw value: {repr(raw_initial_urls)}"
)
await task_logger.log_task_failure(
log_entry,
"No URLs provided for indexing",
"Empty URL list",
{"error_type": "ValidationError"},
f"Empty URL list. INITIAL_URLS value: {repr(raw_initial_urls)[:100]}",
{"error_type": "ValidationError", "connector_name": connector.name},
)
return 0, "No URLs provided for indexing"

Expand Down
Loading