Skip to content

Cognition integration provider #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 65 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
551c995
build: third party integration first commit
andhreljaKern May 13, 2025
e115cac
chore: update enums
andhreljaKern May 14, 2025
4d9970a
perf: add integration acess
andhreljaKern May 15, 2025
80712fc
perf: rename to integration
andhreljaKern May 15, 2025
fbd5a85
perf: add last_extraction column to integration
andhreljaKern May 16, 2025
0dbad39
perf: update integration delta
andhreljaKern May 16, 2025
458e4dc
perf: update integration access to list types
andhreljaKern May 16, 2025
7d70cd4
perf: add integration_types to integration access
andhreljaKern May 16, 2025
bbf643e
perf: add integration_types to integration access
andhreljaKern May 16, 2025
cc57cb4
perf: rename last_extraction to extract_history
andhreljaKern May 16, 2025
2da4e15
fix: store enum.value instead of enum
andhreljaKern May 16, 2025
edad963
fix: integration.project_id nullable
andhreljaKern May 16, 2025
4e52cb8
fix: nulable column instead of foreignkey
andhreljaKern May 16, 2025
03842fd
fix: enum values
andhreljaKern May 16, 2025
0386f6c
perf: task cancellation
andhreljaKern May 16, 2025
90debd4
fix: keyword arguments
andhreljaKern May 19, 2025
d099974
perf: integration record
andhreljaKern May 19, 2025
6fd3656
perf: add tokenizer
andhreljaKern May 20, 2025
76fd2ff
perf: add update integration access
andhreljaKern May 20, 2025
23099d3
perf: update integration endpoints
andhreljaKern May 20, 2025
5d2d503
perf: add get endpoint
andhreljaKern May 20, 2025
4cd321f
perf: add org_id to integration provider
andhreljaKern May 26, 2025
0b6a9fa
Merge branch 'dev' into cognition-integration-provider
andhreljaKern May 26, 2025
0467f29
perf: add org_id support to integration
andhreljaKern May 26, 2025
888a542
perf: add record delta criteria
andhreljaKern May 26, 2025
8af9e39
fix: task execution finish on failed integration
andhreljaKern May 26, 2025
99494f8
perf: add integration finished_at
andhreljaKern May 26, 2025
5989801
perf: add started_at
andhreljaKern May 26, 2025
bec5f20
fix: started_at - finished_at syntax error
andhreljaKern May 26, 2025
8898a3c
perf: add integration records
andhreljaKern May 27, 2025
4aeea83
Merge branch 'cognition-integration-provider' of github.com:code-kern…
andhreljaKern May 27, 2025
66e8a0c
perf: add integration tables
andhreljaKern May 27, 2025
2ec00db
perf: update integrations delta
andhreljaKern May 27, 2025
614d706
perf: add sharepoint integration
andhreljaKern May 27, 2025
d7c8d6b
perf: update integration objects
andhreljaKern May 29, 2025
359187e
perf: expand IntegrationSharepoint
andhreljaKern May 29, 2025
3f774d1
fix: integration.started_at
andhreljaKern May 29, 2025
03169de
perf: integration data types
andhreljaKern May 29, 2025
18990bb
perf: unique constraint names
andhreljaKern May 29, 2025
25b039a
Reset finished at for new integrations
lumburovskalina Jun 2, 2025
aa4416e
perf: update integration objects
andhreljaKern Jun 3, 2025
1093257
Merge branch 'cognition-integration-provider' of https://github.com/c…
andhreljaKern Jun 3, 2025
40cbf2f
perf: add integration delta deletion
andhreljaKern Jun 3, 2025
ac935a0
Merge branch 'dev' into cognition-integration-provider
andhreljaKern Jun 3, 2025
6b7bc0b
Merge branch 'dev' into cognition-integration-provider
andhreljaKern Jun 3, 2025
6c6f4de
perf: last_synced_at integration column
andhreljaKern Jun 3, 2025
5e52d26
Merge branch 'cognition-integration-provider' of https://github.com/c…
andhreljaKern Jun 3, 2025
c18a6eb
perf: add is_synced column
andhreljaKern Jun 3, 2025
d5afe9b
chore: add typing
andhreljaKern Jun 3, 2025
dc1e5e8
perf: add sync columns
andhreljaKern Jun 3, 2025
00c46d6
perf: add get_all integrations
andhreljaKern Jun 3, 2025
08e04a6
chore: add todo comment
andhreljaKern Jun 3, 2025
fad5cc4
perf: add sharepoint db bo
andhreljaKern Jun 4, 2025
19768dd
perf: integration update
andhreljaKern Jun 4, 2025
7231952
perf: tech discussion feedback
andhreljaKern Jun 5, 2025
7e88d90
perf: get integrations updates
andhreljaKern Jun 9, 2025
57bfe5d
perf: integration updates
andhreljaKern Jun 10, 2025
c67bae4
perf: introduce managers
andhreljaKern Jun 11, 2025
0373a58
chore: typing
andhreljaKern Jun 11, 2025
0bdebd5
perf: access + check for updates
andhreljaKern Jun 11, 2025
ffa35bf
perf: update integration
andhreljaKern Jun 13, 2025
342b221
perf: add delta url to sharepoint integration
andhreljaKern Jun 13, 2025
c9f4791
fix: move delta_url to cognitionintegration
andhreljaKern Jun 13, 2025
02ad8cc
perf: integration updates
andhreljaKern Jun 13, 2025
7c023ee
perf: add updated_by + delta_criteria
andhreljaKern Jun 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions business_objects/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
markdown_file as markdown_file_db_bo,
file_extraction as file_extraction_db_bo,
file_transformation as file_transformation_db_bo,
integration as integration_db_bo,
)

FILE_CACHING_IN_PROGRESS_STATES = [
Expand Down Expand Up @@ -197,6 +198,16 @@ def set_parse_cognition_file_task_to_failed(
general.commit()


def set_integration_task_to_failed(
integration_id: str,
with_commit: bool = False,
) -> None:
integration = integration_db_bo.get_by_id(integration_id)
if integration:
integration.state = enums.CognitionMarkdownFileState.FAILED.value
general.flush_or_commit(with_commit)


def __select_running_information_source_payloads(
project_id: Optional[str] = None,
only_running: bool = False,
Expand Down
11 changes: 11 additions & 0 deletions business_objects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,14 @@ def get_project_by_project_id_sql(project_id: str) -> Dict[str, Any]:
return value[0]
else:
return None


def get_by_name_and_org_id(name: str, organization_id: str) -> Optional[Project]:
return (
session.query(Project)
.filter(
Project.name == name,
Project.organization_id == organization_id,
)
.first()
)
20 changes: 18 additions & 2 deletions business_objects/record.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import with_statement
from typing import List, Dict, Any, Optional, Tuple, Iterable
from sqlalchemy import cast, Text
from sqlalchemy import cast, Text, String
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.sql.expression import bindparam
from sqlalchemy import update
Expand Down Expand Up @@ -609,7 +609,7 @@ def count_missing_tokenized_records(project_id: str) -> int:
query = f"""
SELECT COUNT(*)
FROM (
{get_records_without_tokenization(project_id, None, query_only = True)}
{get_records_without_tokenization(project_id, None, query_only=True)}
) record_query
"""
return general.execute_first(query)[0]
Expand Down Expand Up @@ -925,3 +925,19 @@ def get_first_no_text_column(project_id: str, record_id: str) -> str:
WHERE r.project_id = '{project_id}' AND r.id = '{record_id}'
"""
return general.execute_first(query)[0]


def get_record_ids_by_running_ids(project_id: str, running_ids: List[int]) -> List[str]:
return [
row[0]
for row in (
session.query(cast(Record.id, String))
.filter(
Record.project_id == project_id,
Record.data[attribute.get_running_id_name(project_id)]
.as_integer()
.in_(running_ids),
)
.all()
)
]
242 changes: 242 additions & 0 deletions cognition_objects/integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
from typing import List, Optional, Dict
import datetime
from fastapi import HTTPException
from sqlalchemy import func

from ..business_objects import general
from ..session import session
from ..models import CognitionIntegration, Project
from ..enums import (
CognitionMarkdownFileState,
CognitionIntegrationType,
)


def get_by_id(id: str) -> CognitionIntegration:
return (
session.query(CognitionIntegration)
.filter(CognitionIntegration.id == id)
.first()
)


def get_all(
integration_type: Optional[str] = None,
exclude_failed: Optional[bool] = False,
only_synced: Optional[bool] = False,
) -> List[CognitionIntegration]:
query = session.query(CognitionIntegration)
if integration_type:
query = query.filter(CognitionIntegration.type == integration_type)
if exclude_failed:
query = query.filter(
CognitionIntegration.state != CognitionMarkdownFileState.FAILED.value
)
if only_synced:
query = query.filter(CognitionIntegration.is_synced == True)
return query.order_by(CognitionIntegration.created_at.desc()).all()


def get_all_in_org(
org_id: str,
integration_type: Optional[str] = None,
only_synced: Optional[bool] = False,
) -> List[CognitionIntegration]:
query = session.query(CognitionIntegration).filter(
CognitionIntegration.organization_id == org_id
)
if integration_type:
query = query.filter(CognitionIntegration.type == integration_type)
if only_synced:
query = query.filter(CognitionIntegration.is_synced == True)
return query.order_by(CognitionIntegration.created_at.desc()).all()


def get_all_in_org_paginated(
org_id: str,
integration_type: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> List[CognitionIntegration]:
schema_name = CognitionIntegration.__table__.schema or "public"
table_name = f"{schema_name}.{CognitionIntegration.__tablename__}"

first_page = (page - 1) * page_size
last_page = page * page_size

sql = f"""
SELECT id FROM (
SELECT
ROW_NUMBER () OVER(PARTITION BY intg.id ORDER BY intg.created_at ASC) rn,
intg.id
FROM {table_name} intg
WHERE intg.organization_id = '{org_id}'
) pages
WHERE rn BETWEEN {first_page} AND {last_page}
"""
integration_ids = general.execute_all(sql)
if not integration_ids:
return []

query = session.query(CognitionIntegration).filter(
CognitionIntegration.id.in_([row[0] for row in integration_ids])
)
if integration_type:
query = query.filter(CognitionIntegration.type == integration_type)
return query.order_by(CognitionIntegration.created_at.desc()).all()


def get_all_by_project_id(project_id: str) -> List[CognitionIntegration]:
return (
session.query(CognitionIntegration)
.filter(
CognitionIntegration.project_id == project_id,
)
.order_by(CognitionIntegration.created_at.desc())
.all()
)


def get_last_synced_at(
org_id: str, integration_type: Optional[str] = None
) -> datetime.datetime:
query = session.query(func.max(CognitionIntegration.last_synced_at)).filter(
CognitionIntegration.organization_id == org_id
)
if integration_type:
query = query.filter(CognitionIntegration.type == integration_type)
result = query.first()
return result[0] if result else None


def count_org_integrations(org_id: str) -> Dict[str, int]:
counts = (
session.query(CognitionIntegration.type, func.count(CognitionIntegration.id))
.filter(
CognitionIntegration.organization_id == org_id,
)
.group_by(CognitionIntegration.type)
.all()
)
return {cognition_type: count for cognition_type, count in counts}


def create(
org_id: str,
user_id: str,
name: str,
description: str,
tokenizer: str,
state: str,
integration_type: CognitionIntegrationType,
integration_config: Dict,
llm_config: Dict,
started_at: Optional[datetime.datetime] = None,
created_at: Optional[datetime.datetime] = None,
finished_at: Optional[datetime.datetime] = None,
id: Optional[str] = None,
project_id: Optional[str] = None,
with_commit: bool = True,
) -> CognitionIntegration:
if state not in CognitionMarkdownFileState.all():
raise HTTPException(status_code=400, detail=f"Invalid state: {state}")
integration: CognitionIntegration = CognitionIntegration(
id=id,
organization_id=org_id,
project_id=project_id,
created_by=user_id,
updated_by=user_id,
created_at=created_at,
started_at=started_at,
finished_at=finished_at,
name=name,
description=description,
tokenizer=tokenizer,
state=state,
type=integration_type.value,
config=integration_config,
llm_config=llm_config,
delta_criteria={"delta_url": None},
)
general.add(integration, with_commit)

return integration


def update(
id: str,
updated_by: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
tokenizer: Optional[str] = None,
state: Optional[CognitionMarkdownFileState] = None,
integration_config: Optional[int] = None,
llm_config: Optional[Dict] = None,
error_message: Optional[str] = None,
started_at: Optional[datetime.datetime] = None,
finished_at: Optional[datetime.datetime] = None,
last_synced_at: Optional[datetime.datetime] = None,
is_synced: Optional[bool] = None,
delta_criteria: Optional[Dict[str, str]] = None,
with_commit: bool = True,
) -> CognitionIntegration:
integration: CognitionIntegration = get_by_id(id)

if updated_by is not None:
integration.updated_by = updated_by
if name is not None:
integration.name = name
if description is not None:
integration.description = description
if tokenizer is not None:
integration.tokenizer = tokenizer
if state is not None:
integration.state = state.value
if integration_config is not None:
integration.config = integration_config
if llm_config is not None:
integration.llm_config = llm_config
if error_message is not None:
integration.error_message = error_message
if started_at is not None:
integration.started_at = started_at
if last_synced_at is not None:
integration.last_synced_at = last_synced_at
if delta_criteria is not None:
integration.delta_criteria = delta_criteria

integration.is_synced = is_synced
integration.finished_at = finished_at

general.add(integration, with_commit)
return integration


def execution_finished(id: str) -> bool:
return bool(
session.query(CognitionIntegration)
.filter(
CognitionIntegration.id == id,
CognitionIntegration.state.in_(
[
CognitionMarkdownFileState.FINISHED.value,
CognitionMarkdownFileState.FAILED.value,
]
),
)
.first()
)


def delete_many(
ids: List[str], delete_refinery_projects: bool = False, with_commit: bool = True
) -> None:
integrations = session.query(CognitionIntegration).filter(
CognitionIntegration.id.in_(ids)
)
if delete_refinery_projects:
session.query(Project).filter(
Project.id.in_(filter(None, [i.project_id for i in integrations]))
).delete(synchronize_session=False)
integrations.delete(synchronize_session=False)
general.flush_or_commit(with_commit)
Loading