Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |
ATOMIC_REQUESTS = CommonUtils.str_to_bool(
os.environ.get("DJANGO_ATOMIC_REQUESTS", "False")
)
MAX_FILE_SIZE_LIMIT_TO_READ = int(
os.environ.get("MAX_FILE_SIZE_LIMIT_TO_READ", 100 * 1024 * 1024)
) # 100MB limit for full file analysis
# Flag to Enable django admin
ADMIN_ENABLED = False

Expand Down
4 changes: 4 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,7 @@ FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 # 10 minutes
# Runner polling timeout
MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # 3 hours
RUNNER_POLLING_INTERVAL_SECONDS=2 # 2 seconds


# Max file size limit to read
MAX_FILE_SIZE_LIMIT_TO_READ=100 * 1024 * 1024 # 100MB limit for full file analysis
13 changes: 13 additions & 0 deletions backend/workflow_manager/endpoint_v2/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,16 @@ class AllowedFileTypes(Enum):
@classmethod
def is_allowed(cls, mime_type: str) -> bool:
return mime_type in cls._value2member_map_


class InvalidFileTypes(Enum):
OCTET_STREAM = "application/octet-stream"


class UncertainMimeTypes(Enum):
OCTET_STREAM = "application/octet-stream"
ZIP = "application/zip"

@classmethod
def is_uncertain(cls, mime_type: str) -> bool:
return mime_type in cls._value2member_map_
259 changes: 210 additions & 49 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import magic
from connector_processor.constants import ConnectorKeys
from connector_v2.models import ConnectorInstance
from django.conf import settings
from django.core.files.uploadedfile import UploadedFile
from django.db.models import Q
from utils.user_context import UserContext
Expand All @@ -24,7 +25,7 @@
SourceKey,
)
from workflow_manager.endpoint_v2.dto import FileHash, SourceConfig
from workflow_manager.endpoint_v2.enums import AllowedFileTypes
from workflow_manager.endpoint_v2.enums import AllowedFileTypes, UncertainMimeTypes
from workflow_manager.endpoint_v2.exceptions import (
InvalidInputDirectory,
InvalidSourceConnectionType,
Expand Down Expand Up @@ -64,7 +65,10 @@ class SourceConnector(BaseConnector):
workflow (Workflow): The workflow associated with the source connector.
"""

READ_CHUNK_SIZE = 4194304 # Chunk size for reading files
READ_CHUNK_SIZE = 4194304 # 4MB chunk size for reading files
MAX_FILE_SIZE_LIMIT_TO_READ = (
settings.MAX_FILE_SIZE_LIMIT_TO_READ
) # limit for full file analysis

def __init__(
self,
Expand Down Expand Up @@ -885,6 +889,169 @@ def load_file(self, input_file_path: str) -> tuple[str, BytesIO]:

return os.path.basename(input_file_path), file_stream

@classmethod
def _detect_mime_type(
cls, file: UploadedFile, first_chunk: bytes
) -> tuple[str, bool]:
"""Enhanced MIME type detection with smart fallback for uncertain types.

Args:
file: The uploaded file
first_chunk: First chunk of file data

Returns:
tuple: (mime_type, is_supported) where is_supported indicates if type is allowed
"""
mime_type = file.content_type
logger.info(f"Content-Type header: {mime_type} for file {file.name}")

# Primary detection using python-magic with first chunk
mime_type = magic.from_buffer(first_chunk, mime=True)
logger.info(
f"Detected MIME type using Python Magic: {mime_type} for file {file.name}"
)

# Smart fallback: if uncertain type and file is small enough, analyze full file
if (
UncertainMimeTypes.is_uncertain(mime_type)
and cls.READ_CHUNK_SIZE < file.size <= cls.MAX_FILE_SIZE_LIMIT_TO_READ
):
logger.info(
f"Uncertain MIME type '{mime_type}', trying full file analysis for {file.name}"
)

# Read full file content
file.seek(0)
full_content = file.read()
file.seek(0) # Reset for subsequent processing

# Analyze full file content
fallback_mime_type = magic.from_buffer(full_content, mime=True)

# Only use fallback result if it's more specific than chunk result
if fallback_mime_type not in UncertainMimeTypes:
mime_type = fallback_mime_type
logger.info(
f"Full file analysis detected: {mime_type} for file {file.name}"
)
else:
logger.info(
f"Full file analysis still uncertain, keeping: {mime_type} for file {file.name}"
)

# Check if detected type is supported
is_supported = AllowedFileTypes.is_allowed(mime_type)
return mime_type, is_supported

@classmethod
def _validate_and_store_file(
cls, file: UploadedFile, file_storage, destination_path: str
) -> tuple[str, str, bool]:
"""Validate file MIME type and store file content.

Args:
file: The uploaded file to process
file_storage: File storage instance
destination_path: Path where file should be stored

Returns:
tuple: (file_hash, mime_type, mime_type_detected) where mime_type_detected indicates if type is supported
"""
file_hash = sha256()
first_iteration = True

file.seek(0)

for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE):
if first_iteration:
# Enhanced MIME type detection
mime_type, is_supported = cls._detect_mime_type(file, chunk)

# If unsupported, return early without processing further chunks
if not is_supported:
return "", mime_type, False

first_iteration = False

# Process chunk - hash and store
file_hash.update(chunk)
file_storage.write(path=destination_path, mode="ab", data=chunk)

return file_hash.hexdigest(), mime_type, True

@classmethod
def _create_file_hash_object(
cls,
file_path: str,
connection_type,
file_name: str,
file_hash: str,
is_executed: bool,
file_size: int,
mime_type: str,
) -> FileHash:
"""Create FileHash object with provided parameters."""
return FileHash(
file_path=file_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=file_hash,
is_executed=is_executed,
file_size=file_size,
mime_type=mime_type,
)

@classmethod
def _handle_unsupported_file(
cls,
file_name: str,
mime_type: str,
destination_path: str,
connection_type,
file_size: int,
workflow_log,
) -> FileHash:
"""Handle files with unsupported MIME types."""
log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'"
workflow_log.log_info(logger=logger, message=log_message)

fake_hash = f"temp-hash-{uuid.uuid4().hex}"
return cls._create_file_hash_object(
file_path=destination_path,
connection_type=connection_type,
file_name=file_name,
file_hash=fake_hash,
is_executed=True,
file_size=file_size,
mime_type=mime_type,
)

@classmethod
def _check_duplicate_file(
cls, file_hash: str, unique_file_hashes: set[str], file_name: str, workflow_log
) -> bool:
"""Check if file is duplicate and log if needed."""
if file_hash in unique_file_hashes:
log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing."
workflow_log.log_info(logger=logger, message=log_message)
return True
Comment on lines +1034 to +1037
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhammad-ali-e didn't we recently discuss to not skip duplicates based on file hash / content and allow its processing?


unique_file_hashes.add(file_hash)
return False

@classmethod
def _is_execution_completed(
cls, use_file_history: bool, workflow, file_hash: str
) -> bool:
"""Check if file execution is completed based on history."""
if not use_file_history:
return False

file_history = FileHistoryHelper.get_file_history(
workflow=workflow, cache_key=file_hash
)
return True if file_history and file_history.is_completed() else False

@classmethod
def add_input_file_to_api_storage(
cls,
Expand Down Expand Up @@ -919,65 +1086,59 @@ def add_input_file_to_api_storage(
workflow_id=workflow_id, execution_id=execution_id
)
workflow: Workflow = Workflow.objects.get(id=workflow_id)
file_hashes: dict[str, FileHash] = {}
file_hashes_objects: dict[str, FileHash] = {}
unique_file_hashes: set[str] = set()
connection_type = WorkflowEndpoint.ConnectionType.API

for file in file_objs:
file_name = file.name
destination_path = os.path.join(api_storage_dir, file_name)

mime_type = file.content_type
logger.info(f"Detected MIME type: {mime_type} for file {file_name}")
if not AllowedFileTypes.is_allowed(mime_type):
log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'"
workflow_log.log_info(logger=logger, message=log_message)
# Generate a clearly marked temporary hash to avoid reading the file content
# Helps to prevent duplicate entries in file executions
fake_hash = f"temp-hash-{uuid.uuid4().hex}"
file_hash = FileHash(
file_path=destination_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=fake_hash,
is_executed=True,
file_size=file.size,
mime_type=mime_type,
)
file_hashes.update({file_name: file_hash})
continue

file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_hash = sha256()
for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE):
file_hash.update(chunk)
file_storage.write(path=destination_path, mode="ab", data=chunk)
file_hash = file_hash.hexdigest()

# Skip duplicate files
if file_hash in unique_file_hashes:
log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing."
workflow_log.log_info(logger=logger, message=log_message)
continue
unique_file_hashes.add(file_hash)
# Process file chunks and detect MIME type
file_hash, mime_type, mime_type_detected = cls._validate_and_store_file(
file, file_storage, destination_path
)

file_history = None
if use_file_history:
file_history = FileHistoryHelper.get_file_history(
workflow=workflow, cache_key=file_hash
# Handle unsupported files
if not mime_type_detected:
file_hash_object = cls._handle_unsupported_file(
file_name,
mime_type,
destination_path,
connection_type,
file.size,
workflow_log,
)
is_executed = True if file_history and file_history.is_completed() else False
file_hash = FileHash(
file_path=destination_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=file_hash,
is_executed=is_executed,
file_size=file.size,
mime_type=mime_type,
file_hashes_objects.update({file_name: file_hash_object})
continue

# Check for duplicates
if cls._check_duplicate_file(
file_hash, unique_file_hashes, file_name, workflow_log
):
continue

# Get execution status
is_executed = cls._is_execution_completed(
use_file_history, workflow, file_hash
)
file_hashes.update({file_name: file_hash})
return file_hashes

# Create file hash object
file_hash_object = cls._create_file_hash_object(
destination_path,
connection_type,
file_name,
file_hash,
is_executed,
file.size,
mime_type,
)
file_hashes_objects.update({file_name: file_hash_object})

return file_hashes_objects

@classmethod
def create_endpoint_for_workflow(
Expand Down