Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
686 changes: 678 additions & 8 deletions docs/concepts/data_handler/README.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies = [
"types-pyyaml>=6.0,<7.0",
"fasttext-wheel (>=0.9.2,<0.10.0)",
"litellm (>=1.79.3,<2.0.0)",
"pysnc (>=1.1.0,<1.2.0)",
"boto3 (>=1.40.71,<2.0.0)",
"google-auth (>=2.43.0,<3.0.0)",
"google-cloud-aiplatform (>=1.128.0,<2.0.0)",
Expand Down Expand Up @@ -143,6 +144,7 @@ sentence-transformers = "^5.1"
soundfile = "^0.13"
types-pyyaml = "^6.0"
litellm = "^1.79.3"
pysnc = ">=1.1.0,<1.2.0"
boto3 = "^1.40.71"
google-auth = "^2.43.0"
google-cloud-aiplatform = "^1.128.0"
Expand Down
10 changes: 9 additions & 1 deletion sygra/core/base_task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sygra.core.dataset.dataset_processor import DatasetProcessor
from sygra.core.dataset.file_handler import FileHandler
from sygra.core.dataset.huggingface_handler import HuggingFaceHandler
from sygra.core.dataset.servicenow_handler import ServiceNowHandler
from sygra.core.graph.graph_config import GraphConfig
from sygra.core.graph.langgraph.graph_builder import LangGraphBuilder
from sygra.core.graph.sygra_state import SygraState
Expand Down Expand Up @@ -436,7 +437,7 @@ def _generate_empty_dataset(self) -> list[dict]:
logger.info(f"Generating {num_records} empty records")
return [{} for _ in range(num_records)]

def _get_data_reader(self) -> Union[HuggingFaceHandler, FileHandler]:
def _get_data_reader(self) -> Union[HuggingFaceHandler, FileHandler, ServiceNowHandler]:
"""Get appropriate data reader based on source type"""
if self.source_config is None:
raise ValueError("source_config must be set to get a data reader")
Expand All @@ -445,6 +446,8 @@ def _get_data_reader(self) -> Union[HuggingFaceHandler, FileHandler]:
return HuggingFaceHandler(self.source_config)
elif self.source_config.type == DataSourceType.DISK_FILE:
return FileHandler(self.source_config)
elif self.source_config.type == DataSourceType.SERVICENOW:
return ServiceNowHandler(self.source_config)
else:
raise ValueError(f"Unsupported data source type: {self.source_config.type}")

Expand Down Expand Up @@ -731,6 +734,11 @@ def execute(self):
source_config=self.source_config,
output_config=self.output_config,
).write(data)
elif self.output_config.type == OutputType.SERVICENOW:
ServiceNowHandler(
source_config=None,
output_config=self.output_config,
).write(data)
else:
if self.output_config.file_path is None:
raise ValueError("file_path must be set for output_config")
Expand Down
78 changes: 74 additions & 4 deletions sygra/core/dataset/dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ class DataSourceType(Enum):
Attributes:
HUGGINGFACE: HuggingFace dataset source
DISK_FILE: Local file system source
SERVICENOW: ServiceNow table source
"""

HUGGINGFACE = "hf"
DISK_FILE = "disk"
SERVICENOW = "servicenow"


class TransformConfig(BaseModel):
Expand All @@ -44,13 +46,15 @@ class OutputType(Enum):
JSONL: JSON Lines file output
CSV: CSV file output
PARQUET: Parquet file output
SERVICENOW: ServiceNow table output
"""

HUGGINGFACE = "hf"
JSON = "json"
JSONL = "jsonl"
CSV = "csv"
PARQUET = "parquet"
SERVICENOW = "servicenow"
NONE = None


Expand All @@ -69,8 +73,14 @@ class ShardConfig(BaseModel):
class DataSourceConfig(BaseModel):
"""Configuration for data sources.

This class provides configuration options for both HuggingFace datasets
and local file system sources, including transformation specifications.
This class provides configuration options for HuggingFace datasets,
local file system sources, and ServiceNow tables, including transformation specs.

For ServiceNow sources:
- Connection credentials (instance, username, password) are read from
environment variables: SNOW_INSTANCE, SNOW_USERNAME, SNOW_PASSWORD
- Only query details (table, filters, fields, etc.) need to be specified
- Config values for credentials are optional overrides

Attributes:
type (DataSourceType): Type of data source
Expand All @@ -83,6 +93,9 @@ class DataSourceConfig(BaseModel):
file_format (Optional[str]): Format for local files
file_path (Optional[str]): Path to local file
encoding (str): Character encoding for text files
table (Optional[str]): ServiceNow table name for queries
filters (Optional[dict]): Filters for ServiceNow queries
fields (Optional[list[str]]): Fields to retrieve from ServiceNow
transformations (Optional[list[TransformConfig]]): List of transformations to apply
"""

Expand All @@ -101,6 +114,27 @@ class DataSourceConfig(BaseModel):
file_path: Optional[str] = None
encoding: str = "utf-8"

# For ServiceNow tables
instance: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
oauth_client_id: Optional[str] = None
oauth_client_secret: Optional[str] = None
table: Optional[str] = None
query: Optional[str] = None
filters: Optional[dict[str, Any]] = None
fields: Optional[list[str]] = None
limit: Optional[int] = None
batch_size: int = 100
order_by: Optional[str] = None
order_desc: bool = False
display_value: str = "all"
exclude_reference_link: bool = True
proxy: Optional[str] = None
verify_ssl: Optional[bool] = None
cert: Optional[str] = None
auto_retry: bool = True

# Transformation functions
transformations: Optional[list[TransformConfig]] = None

Expand Down Expand Up @@ -169,8 +203,14 @@ def from_dict(cls, config: dict[str, Any]) -> "DataSourceConfig":
class OutputConfig(BaseModel):
"""Configuration for data output operations.

This class provides configuration options for both HuggingFace datasets
and local file system outputs.
This class provides configuration options for HuggingFace datasets,
local file system outputs, and ServiceNow tables.

For ServiceNow outputs:
- Connection credentials (instance, username, password) are read from
environment variables: SNOW_INSTANCE, SNOW_USERNAME, SNOW_PASSWORD
- Only operation details (table, operation, key_field) need to be specified
- Config values for credentials are optional overrides

Attributes:
type (OutputType): Type of output
Expand All @@ -183,6 +223,9 @@ class OutputConfig(BaseModel):
filename (Optional[str]): Output filename
file_path (Optional[str]): Output file path
encoding (str): Character encoding for text files
table (Optional[str]): ServiceNow table name for output
operation (str): ServiceNow operation (insert/update/upsert)
key_field (str): Field to match for update/upsert operations
"""

type: Optional[OutputType] = None
Expand All @@ -196,6 +239,20 @@ class OutputConfig(BaseModel):
file_path: Optional[str] = None
encoding: str = "utf-8"

# For ServiceNow output
instance: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
oauth_client_id: Optional[str] = None
oauth_client_secret: Optional[str] = None
table: Optional[str] = None
operation: str = "insert" # insert, update, or upsert
key_field: str = "sys_id" # Field to match for update/upsert
proxy: Optional[str] = None
verify_ssl: Optional[bool] = None
cert: Optional[str] = None
auto_retry: bool = True

@classmethod
def from_dict(cls, config: dict[str, Any]) -> "OutputConfig":
"""Create configuration from dictionary.
Expand All @@ -217,4 +274,17 @@ def from_dict(cls, config: dict[str, Any]) -> "OutputConfig":
filename=config.get("filename"),
file_path=config.get("file_path"),
encoding=config.get("encoding", "utf-8"),
# ServiceNow fields
instance=config.get("instance"),
username=config.get("username"),
password=config.get("password"),
oauth_client_id=config.get("oauth_client_id"),
oauth_client_secret=config.get("oauth_client_secret"),
table=config.get("table"),
operation=config.get("operation", "insert"),
key_field=config.get("key_field", "sys_id"),
proxy=config.get("proxy"),
verify_ssl=config.get("verify_ssl"),
cert=config.get("cert"),
auto_retry=config.get("auto_retry", True),
)
Loading