Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CLI and data enrichment utilities for the [Parallel API](https://docs.parallel.a
- **CLI for Humans & AI Agents** - Works interactively or fully via command-line arguments
- **Web Search** - AI-powered search with domain filtering and date ranges
- **Content Extraction** - Extract clean markdown from any URL
- **Data Enrichment** - Enrich CSV, DuckDB, and BigQuery data with AI
- **Data Enrichment** - Enrich CSV, JSON, DuckDB, and BigQuery data with AI
- **AI-Assisted Planning** - Use natural language to define what data you want
- **Multiple Integrations** - Polars, DuckDB, Snowflake, BigQuery, Spark

Expand All @@ -30,7 +30,7 @@ curl -fsSL https://parallel.ai/install.sh | bash

This automatically detects your platform (macOS/Linux, x64/arm64) and installs to `~/.local/bin`.

> **Note:** The standalone binary supports `search`, `extract`, `research`, and `enrich run` with CLI arguments and CSV files. For YAML config files, interactive planner, DuckDB/BigQuery sources, or deployment commands, use pip install.
> **Note:** The standalone binary supports `search`, `extract`, `research`, and `enrich run` with CLI arguments, CSV files, and JSON files. For YAML config files, interactive planner, DuckDB/BigQuery sources, or deployment commands, use pip install.

### Python Package

Expand Down Expand Up @@ -140,6 +140,14 @@ parallel-cli enrich run \
--target enriched.csv \
--source-columns '[{"name": "company", "description": "Company name"}]' \
--intent "Find the CEO and annual revenue"

# Enrich a JSON file
parallel-cli enrich run \
--source-type json \
--source companies.json \
--target enriched.json \
--source-columns '[{"name": "company", "description": "Company name"}]' \
--enriched-columns '[{"name": "ceo", "description": "CEO name"}]'
```

### 5. Deploy to Cloud Systems
Expand Down Expand Up @@ -245,7 +253,7 @@ run_enrichment_from_dict({
```yaml
source: input.csv
target: output.csv
source_type: csv # csv, duckdb, or bigquery
source_type: csv # csv, json, duckdb, or bigquery
processor: core-fast # lite, base, core, pro, ultra (add -fast for speed)

source_columns:
Expand Down
5 changes: 5 additions & 0 deletions examples/example_file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
{"business_name": "Netflix", "web_site": "www.netflix.com"},
{"business_name": "Parallel Web Systems", "web_site": "parallel.ai"},
{"business_name": "Apple", "web_site": "www.apple.com"}
]
13 changes: 13 additions & 0 deletions examples/example_json_schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source: examples/example_file.json
target: data/output_file.json
source_type: json
processor: core-fast # Options: lite, base, core, pro, ultra, etc. (add -fast for fast variant, default: core-fast)
source_columns:
- name: business_name
description: The name of a business
- name: web_site
description: The business's web site
enriched_columns:
- name: estimated_ad_spend_last_quarter
description: The estimated spend in the last quarter by this business in USD
type: int # Specify output type (default: str)
4 changes: 2 additions & 2 deletions parallel_web_tools/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
# DuckDB requires: pip install parallel-web-tools[duckdb]
# BigQuery requires: pip install parallel-web-tools[bigquery]
if _STANDALONE_MODE:
AVAILABLE_SOURCE_TYPES = ["csv"]
AVAILABLE_SOURCE_TYPES = ["csv", "json"]
else:
AVAILABLE_SOURCE_TYPES = ["csv", "duckdb", "bigquery"]
AVAILABLE_SOURCE_TYPES = ["csv", "json", "duckdb", "bigquery"]


# =============================================================================
Expand Down
4 changes: 4 additions & 0 deletions parallel_web_tools/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def _run_processor(parsed_schema: InputSchema, no_wait: bool = False) -> dict |
from parallel_web_tools.processors.csv import process_csv

return process_csv(parsed_schema, no_wait=no_wait)
case SourceType.JSON:
from parallel_web_tools.processors.json import process_json

return process_json(parsed_schema, no_wait=no_wait)
case SourceType.DUCKDB:
from parallel_web_tools.processors.duckdb import process_duckdb

Expand Down
1 change: 1 addition & 0 deletions parallel_web_tools/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class SourceType(Enum):
"""Supported data sources."""

CSV = "csv"
JSON = "json"
DUCKDB = "duckdb"
BIGQUERY = "bigquery"

Expand Down
3 changes: 2 additions & 1 deletion parallel_web_tools/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Data processors for different source types."""

from parallel_web_tools.processors.csv import process_csv
from parallel_web_tools.processors.json import process_json


def process_duckdb(schema):
Expand All @@ -17,4 +18,4 @@ def process_bigquery(schema):
return _process_bigquery(schema)


__all__ = ["process_csv", "process_duckdb", "process_bigquery"]
__all__ = ["process_csv", "process_json", "process_duckdb", "process_bigquery"]
33 changes: 33 additions & 0 deletions parallel_web_tools/processors/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""JSON processor for data enrichment."""

import json
import logging
from typing import Any

from parallel_web_tools.core import InputSchema, parse_input_and_output_models, run_tasks
from parallel_web_tools.core.batch import create_task_group

logger = logging.getLogger(__name__)


def process_json(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | None:
"""Process JSON file and enrich data."""
logger.info("Processing JSON file: %s", schema.source)

InputModel, OutputModel = parse_input_and_output_models(schema)

# Read all rows from JSON (expects a JSON array of objects)
with open(schema.source) as f:
data = json.load(f)

if no_wait:
return create_task_group(data, InputModel, OutputModel, schema.processor)

# Process all rows in batch
output_rows = run_tasks(data, InputModel, OutputModel, schema.processor)

# Write results to target JSON
with open(schema.target, "w") as f:
json.dump(output_rows, f, indent=2)

return None
73 changes: 73 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ def test_source_types_include_duckdb_bigquery_when_not_standalone(self, runner):
# When not in standalone mode, all source types are available
assert commands._STANDALONE_MODE is False
assert "csv" in commands.AVAILABLE_SOURCE_TYPES
assert "json" in commands.AVAILABLE_SOURCE_TYPES
assert "duckdb" in commands.AVAILABLE_SOURCE_TYPES
assert "bigquery" in commands.AVAILABLE_SOURCE_TYPES

Expand Down Expand Up @@ -1431,3 +1432,75 @@ def test_enrich_poll_timeout_json_output(self, runner):
json_text = "\n".join(lines[json_start:])
output = json.loads(json_text)
assert output["error"]["type"] == "TimeoutError"


class TestEnrichRunJsonSourceType:
"""Tests for enrich run with --source-type json."""

def test_enrich_run_accepts_json_source_type(self, runner):
"""CLI should accept --source-type json."""
result = runner.invoke(main, ["enrich", "run", "--help"])
assert result.exit_code == 0
assert "json" in result.output

def test_enrich_run_json_source_type_valid(self, runner):
"""Should accept json as a valid source type option."""
with mock.patch("parallel_web_tools.cli.commands.run_enrichment_from_dict") as mock_run:
mock_run.return_value = None

result = runner.invoke(
main,
[
"enrich",
"run",
"--source-type",
"json",
"--source",
"input.json",
"--target",
"output.json",
"--source-columns",
'[{"name": "company", "description": "Company name"}]',
"--enriched-columns",
'[{"name": "ceo", "description": "CEO name"}]',
],
)

assert result.exit_code == 0
mock_run.assert_called_once()
config = mock_run.call_args[0][0]
assert config["source_type"] == "json"

def test_enrich_plan_accepts_json_source_type(self, runner, tmp_path):
"""enrich plan should accept json as a valid source type."""
output_file = tmp_path / "config.yaml"

result = runner.invoke(
main,
[
"enrich",
"plan",
"-o",
str(output_file),
"--source-type",
"json",
"--source",
"input.json",
"--target",
"output.json",
"--source-columns",
'[{"name": "company", "description": "Company name"}]',
"--enriched-columns",
'[{"name": "ceo", "description": "CEO name"}]',
],
)

assert result.exit_code == 0
assert output_file.exists()

import yaml

with open(output_file) as f:
config = yaml.safe_load(f)

assert config["source_type"] == "json"
119 changes: 119 additions & 0 deletions tests/test_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for the core.batch module."""

import json
import os
from types import SimpleNamespace
from unittest import mock
Expand Down Expand Up @@ -1018,3 +1019,121 @@ def test_handles_failed_runs(self):
assert len(results) == 1
assert results[0]["error"] == "Processing failed"
assert results[0]["input"] == {"company": "X"}


class TestProcessJson:
"""Tests for the JSON processor."""

def test_read_json_array(self, tmp_path):
"""Should read a JSON array of objects."""
from parallel_web_tools.core.schema import Column, InputSchema, SourceType
from parallel_web_tools.processors.json import process_json

source_file = tmp_path / "input.json"
target_file = tmp_path / "output.json"
data = [
{"company": "Google", "website": "google.com"},
{"company": "Apple", "website": "apple.com"},
]
source_file.write_text(json.dumps(data))

schema = InputSchema(
source=str(source_file),
target=str(target_file),
source_type=SourceType.JSON,
source_columns=[
Column("company", "Company name"),
Column("website", "Company website"),
],
enriched_columns=[Column("ceo", "CEO name")],
)

with mock.patch("parallel_web_tools.processors.json.run_tasks") as mock_run:
mock_run.return_value = [
{"company": "Google", "ceo": "Sundar Pichai"},
{"company": "Apple", "ceo": "Tim Cook"},
]
result = process_json(schema)

assert result is None
mock_run.assert_called_once()
# Verify it read the correct data
call_args = mock_run.call_args
assert len(call_args[0][0]) == 2
assert call_args[0][0][0]["company"] == "Google"

def test_write_json_output(self, tmp_path):
"""Should write enriched results to JSON file with indent=2."""
from parallel_web_tools.core.schema import Column, InputSchema, SourceType
from parallel_web_tools.processors.json import process_json

source_file = tmp_path / "input.json"
target_file = tmp_path / "output.json"
data = [{"company": "Google"}]
source_file.write_text(json.dumps(data))

schema = InputSchema(
source=str(source_file),
target=str(target_file),
source_type=SourceType.JSON,
source_columns=[Column("company", "Company name")],
enriched_columns=[Column("ceo", "CEO name")],
)

expected_output = [{"company": "Google", "ceo": "Sundar Pichai"}]

with mock.patch("parallel_web_tools.processors.json.run_tasks") as mock_run:
mock_run.return_value = expected_output
process_json(schema)

assert target_file.exists()
with open(target_file) as f:
output_data = json.load(f)
assert output_data == expected_output
# Verify indent=2 formatting
raw = target_file.read_text()
assert " " in raw # indented

def test_no_wait_returns_task_group(self, tmp_path):
"""Should return task group info when no_wait=True."""
from parallel_web_tools.core.schema import Column, InputSchema, SourceType
from parallel_web_tools.processors.json import process_json

source_file = tmp_path / "input.json"
target_file = tmp_path / "output.json"
data = [{"company": "Google"}]
source_file.write_text(json.dumps(data))

schema = InputSchema(
source=str(source_file),
target=str(target_file),
source_type=SourceType.JSON,
source_columns=[Column("company", "Company name")],
enriched_columns=[Column("ceo", "CEO name")],
)

with mock.patch("parallel_web_tools.processors.json.create_task_group") as mock_create:
mock_create.return_value = {"taskgroup_id": "tgrp_json_123"}
result = process_json(schema, no_wait=True)

assert result == {"taskgroup_id": "tgrp_json_123"}
mock_create.assert_called_once()

def test_invalid_json_raises_error(self, tmp_path):
"""Should raise error for invalid JSON input files."""
from parallel_web_tools.core.schema import Column, InputSchema, SourceType
from parallel_web_tools.processors.json import process_json

source_file = tmp_path / "bad.json"
source_file.write_text("not valid json {{{")

schema = InputSchema(
source=str(source_file),
target=str(tmp_path / "output.json"),
source_type=SourceType.JSON,
source_columns=[Column("company", "Company name")],
enriched_columns=[Column("ceo", "CEO name")],
)

with pytest.raises(json.JSONDecodeError):
process_json(schema)