Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add split batches #469

Merged
merged 15 commits into from
Feb 23, 2024
74 changes: 62 additions & 12 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)
from urllib import parse as urllib_parse

import orjson
import requests
from requests import adapters as requests_adapters
from urllib3.util import Retry
Expand Down Expand Up @@ -215,6 +216,24 @@ def _serialize_json(obj: Any, depth: int = 0) -> Any:
return repr(obj)


def _dumps_json(obj: Any) -> bytes:
"""Serialize an object to a JSON formatted string.

Parameters
----------
obj : Any
The object to serialize.
default : Callable[[Any], Any] or None, default=None
The default function to use for serialization.

Returns:
-------
str
The JSON formatted string.
"""
return orjson.dumps(obj, default=_serialize_json)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add some of the flags like serialize numpy etc etc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that applied before or after our default serialize json?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ya otherwise the passthrough options wouldn't make sense. thanks



def close_session(session: requests.Session) -> None:
"""Close the session.

Expand Down Expand Up @@ -729,7 +748,7 @@ def _get_cursor_paginated_list(
request_method,
f"{self.api_url}{path}",
request_kwargs={
"data": json.dumps(params_, default=_serialize_json),
"data": _dumps_json(params_),
"headers": self._headers,
"timeout": self.timeout_ms / 1000,
},
Expand Down Expand Up @@ -1012,7 +1031,7 @@ def create_run(
"post",
f"{self.api_url}/runs",
request_kwargs={
"data": json.dumps(run_create, default=_serialize_json),
"data": _dumps_json(run_create),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
Expand Down Expand Up @@ -1082,20 +1101,50 @@ def batch_ingest_runs(
)
# filter out runs that are not sampled
if pre_sampled:
body = {
raw_body = {
"post": create_dicts,
"patch": update_dicts,
}
else:
body = {
raw_body = {
"post": self._filter_for_sampling(create_dicts),
"patch": self._filter_for_sampling(update_dicts, patch=True),
}
if not body["post"] and not body["patch"]:
if not raw_body["post"] and not raw_body["patch"]:
return

self._insert_runtime_env(body["post"])
self._insert_runtime_env(raw_body["post"])

if self.info is None:
raise ls_utils.LangSmithUserError(
"Batch ingest is not supported by your LangSmith server version. "
"Please upgrade to a newer version."
)
info = cast(ls_schemas.LangSmithInfo, self.info)

size_limit_bytes = (info.batch_ingest_config or {}).get(
"size_limit_bytes"
# 20 MB max by default
) or 20_971_520
# Get orjson fragments to avoid going over the max request size
partial_body = {
"post": [_dumps_json(run) for run in raw_body["post"]],
"patch": [_dumps_json(run) for run in raw_body["patch"]],
}
body_chunks: DefaultDict[str, list] = collections.defaultdict(list)
body_size = 0
for key in ["post", "patch"]:
body = collections.deque(partial_body[key])
while body:
if body_size > 0 and body_size + len(body[0]) > size_limit_bytes:
self._post_batch_ingest_runs(orjson.dumps(body_chunks))
body_size = 0
body_size += len(body[0])
body_chunks[key].append(orjson.Fragment(body.popleft()))
if body_size:
self._post_batch_ingest_runs(orjson.dumps(body_chunks))

def _post_batch_ingest_runs(self, body: bytes):
def handle_429(response: requests.Response, attempt: int) -> bool:
# Min of 30 seconds, max of 1 minute
if response.status_code == 429:
Expand All @@ -1118,7 +1167,7 @@ def handle_429(response: requests.Response, attempt: int) -> bool:
"post",
f"{self.api_url}/runs/batch",
request_kwargs={
"data": json.dumps(body, default=_serialize_json),
"data": body,
"timeout": self.timeout_ms / 1000,
"headers": {
**self._headers,
Expand Down Expand Up @@ -1212,7 +1261,7 @@ def update_run(
"patch",
f"{self.api_url}/runs/{data['id']}",
request_kwargs={
"data": json.dumps(data, default=_serialize_json),
"data": _dumps_json(data),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
Expand Down Expand Up @@ -1659,7 +1708,7 @@ def create_project(
response = self.session.post(
endpoint,
headers={**self._headers, "Content-Type": "application/json"},
data=json.dumps(body, default=_serialize_json),
data=_dumps_json(body),
)
ls_utils.raise_for_status_with_text(response)
return ls_schemas.TracerSession(**response.json(), _host_url=self._host_url)
Expand Down Expand Up @@ -1708,7 +1757,7 @@ def update_project(
response = self.session.patch(
endpoint,
headers={**self._headers, "Content-Type": "application/json"},
data=json.dumps(body, default=_serialize_json),
data=_dumps_json(body),
)
ls_utils.raise_for_status_with_text(response)
return ls_schemas.TracerSession(**response.json(), _host_url=self._host_url)
Expand Down Expand Up @@ -2394,7 +2443,7 @@ def create_examples(
response = self.session.post(
f"{self.api_url}/examples/bulk",
headers={**self._headers, "Content-Type": "application/json"},
data=json.dumps(examples, default=_serialize_json),
data=_dumps_json(examples),
)
ls_utils.raise_for_status_with_text(response)

Expand Down Expand Up @@ -2914,7 +2963,7 @@ def update_feedback(
response = self.session.patch(
self.api_url + f"/feedback/{_as_uuid(feedback_id, 'feedback_id')}",
headers={**self._headers, "Content-Type": "application/json"},
data=json.dumps(feedback_update, default=_serialize_json),
data=_dumps_json(feedback_update),
)
ls_utils.raise_for_status_with_text(response)

Expand Down Expand Up @@ -3481,6 +3530,7 @@ def _ensure_ingest_config(
info: Optional[ls_schemas.LangSmithInfo],
) -> ls_schemas.BatchIngestConfig:
default_config = ls_schemas.BatchIngestConfig(
size_limit_bytes=None, # Note this field is not used here
size_limit=100,
scale_up_nthreads_limit=_AUTO_SCALE_UP_NTHREADS_LIMIT,
scale_up_qsize_trigger=_AUTO_SCALE_UP_QSIZE_TRIGGER,
Expand Down
1 change: 1 addition & 0 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ class BatchIngestConfig(TypedDict, total=False):
scale_up_nthreads_limit: int
scale_down_nempty_trigger: int
size_limit: int
size_limit_bytes: Optional[int]


class LangSmithInfo(BaseModel):
Expand Down
61 changes: 60 additions & 1 deletion python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ langsmith = "langsmith.cli.main:main"
python = ">=3.8.1,<4.0"
pydantic = ">=1,<3"
requests = "^2"
orjson = "^3.9.14"


[tool.poetry.group.dev.dependencies]
Expand Down
65 changes: 32 additions & 33 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,40 +219,39 @@ def test_create_dataset(

@freeze_time("2023-01-01")
def test_list_datasets(langchain_client: Client) -> None:
if langchain_client.has_dataset(dataset_name="___TEST dataset1"):
langchain_client.delete_dataset(dataset_name="___TEST dataset1")
if langchain_client.has_dataset(dataset_name="___TEST dataset2"):
langchain_client.delete_dataset(dataset_name="___TEST dataset2")
dataset1 = langchain_client.create_dataset(
"___TEST dataset1", data_type=DataType.llm
)
dataset2 = langchain_client.create_dataset(
"___TEST dataset2", data_type=DataType.kv
)
assert dataset1.url is not None
assert dataset2.url is not None
datasets = list(
langchain_client.list_datasets(dataset_ids=[dataset1.id, dataset2.id])
)
assert len(datasets) == 2
assert dataset1.id in [dataset.id for dataset in datasets]
assert dataset2.id in [dataset.id for dataset in datasets]
assert dataset1.data_type == DataType.llm
assert dataset2.data_type == DataType.kv
# Sub-filter on data type
datasets = list(langchain_client.list_datasets(data_type=DataType.llm.value))
assert len(datasets) > 0
assert dataset1.id in {dataset.id for dataset in datasets}
# Sub-filter on name
datasets = list(
langchain_client.list_datasets(
dataset_ids=[dataset1.id, dataset2.id], dataset_name="___TEST dataset1"
ds1n = "__test_list_datasets1" + uuid4().hex[:4]
ds2n = "__test_list_datasets2" + uuid4().hex[:4]
try:
dataset1 = langchain_client.create_dataset(ds1n, data_type=DataType.llm)
dataset2 = langchain_client.create_dataset(ds2n, data_type=DataType.kv)
assert dataset1.url is not None
assert dataset2.url is not None
datasets = list(
langchain_client.list_datasets(dataset_ids=[dataset1.id, dataset2.id])
)
)
assert len(datasets) == 1
# Delete datasets
langchain_client.delete_dataset(dataset_id=dataset1.id)
langchain_client.delete_dataset(dataset_id=dataset2.id)
assert len(datasets) == 2
assert dataset1.id in [dataset.id for dataset in datasets]
assert dataset2.id in [dataset.id for dataset in datasets]
assert dataset1.data_type == DataType.llm
assert dataset2.data_type == DataType.kv
# Sub-filter on data type
datasets = list(langchain_client.list_datasets(data_type=DataType.llm.value))
assert len(datasets) > 0
assert dataset1.id in {dataset.id for dataset in datasets}
# Sub-filter on name
datasets = list(
langchain_client.list_datasets(
dataset_ids=[dataset1.id, dataset2.id], dataset_name=ds1n
)
)
assert len(datasets) == 1
finally:
# Delete datasets
for name in [ds1n, ds2n]:
try:
langchain_client.delete_dataset(dataset_name=name)
except LangSmithError:
pass


@pytest.mark.skip(reason="This test is flaky")
Expand Down
Loading
Loading