diff --git a/.github/workflows/auto-tests.yml b/.github/workflows/auto-tests.yml index e8fdeed..a0133ef 100644 --- a/.github/workflows/auto-tests.yml +++ b/.github/workflows/auto-tests.yml @@ -2,7 +2,7 @@ name: 🔁 Pytest ⏳x60 on: schedule: - - cron: "0 * * * *" + - cron: "*/60 * * * *" jobs: test: @@ -14,9 +14,7 @@ jobs: with: enable_cache: true cache_prefix: "venv-codeboxapi" - - name: Sync rye - run: rye sync - - name: Run Pytest + - run: rye sync + - run: rye run pytest env: CODEBOX_API_KEY: ${{ secrets.CODEBOX_API_KEY }} - run: rye run pytest diff --git a/.github/workflows/code-check.yml b/.github/workflows/code-check.yml index 7954a61..78152b5 100644 --- a/.github/workflows/code-check.yml +++ b/.github/workflows/code-check.yml @@ -3,23 +3,23 @@ name: ☑️ CodeCheck on: [push] jobs: - pre-commit: - strategy: - matrix: - python-version: ['3.9', '3.10', '3.11'] - runs-on: ubuntu-latest - steps: + pre-commit: + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11"] + runs-on: ubuntu-latest + steps: - uses: actions/checkout@v2 - uses: eifinger/setup-rye@v1 with: enable-cache: true - cache-prefix: 'venv-codeboxapi' + cache-prefix: "venv-codeboxapi" - name: pin version run: rye pin ${{ matrix.python-version }} - name: Sync rye run: rye sync - - name: Run pre-commit - run: rye run pre-commit run --all-files + - name: Run ruff + run: rye run ruff check - name: Run tests env: CODEBOX_API_KEY: ${{ secrets.CODEBOX_API_KEY }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index 99aa3ad..0000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,23 +0,0 @@ -repos: - -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 - hooks: - - id: check-yaml - - id: end-of-file-fixer - - id: trailing-whitespace - -- repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 - hooks: - - id: mypy - args: [--ignore-missing-imports, --follow-imports=skip] - additional_dependencies: [types-requests] - -- repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.8 - hooks: - - id: ruff - args: [ --fix ] - - id: ruff-format - types_or: [ python, pyi, jupyter ] diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 12901f9..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "python.analysis.typeCheckingMode": "basic" -} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..29fe4d4 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,14 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "type": "shell", + "command": "bash ./scripts/dev-setup.sh", + "group": "build", + "label": "dev-setup", + "runOptions": { + "runOn": "default" + } + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..34394e9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM ghcr.io/astral-sh/uv as uv + +FROM --platform=amd64 python:3.11-slim as build + +ENV VIRTUAL_ENV=/.venv PATH="/.venv/bin:$PATH" + +COPY --from=uv /uv /uv +COPY README.md pyproject.toml src / + +RUN --mount=type=cache,target=/root/.cache/uv \ + --mount=from=uv,source=/uv,target=/uv \ + /uv venv /.venv && /uv pip install -e .[all] \ + && rm -rf README.md pyproject.toml src + +# FROM --platform=amd64 python:3.11-slim as runtime + +ENV PORT=8069 +EXPOSE $PORT + +CMD ["/.venv/bin/python", "-m", "codeboxapi.api"] diff --git a/docs/settings.md b/docs/settings.md index e5da440..4ee927a 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -5,6 +5,7 @@ The configuration settings are encapsulated within the `CodeBoxSettings` class, which inherits from Pydantic's `BaseSettings` class. `codeboxapi/config.py` + ```python class CodeBoxSettings(BaseSettings): ... @@ -22,7 +23,7 @@ class CodeBoxSettings(BaseSettings): ### CodeBox API Settings -- `CODEBOX_API_KEY: Optional[str] = None` +- `CODEBOX_API_KEY: str | None = None` The API key for CodeBox. - `CODEBOX_BASE_URL: str = "https://codeboxapi.com/api/v1"` diff --git a/examples/async_example.py b/examples/async_example.py new file mode 100644 index 0000000..e9fcdd4 --- /dev/null +++ b/examples/async_example.py @@ -0,0 +1,38 @@ +from codeboxapi import CodeBox + +codebox = CodeBox() + + +async def async_examples(): + # 1. Async Code Execution + result = await codebox.aexec("print('Async Hello!')") + print(result.text) + + # 2. Async File Operations + await codebox.aupload("async_file.txt", b"Async content") + + downloaded = await codebox.adownload("async_file.txt") + print("File content:", downloaded.get_content()) + + # 3. All Sync Methods are also available Async + await codebox.ainstall("requests") + + # 4. Async Streaming + async for chunk in codebox.astream_exec(""" + for i in range(3): + print(f"Async chunk {i}") + import time + time.sleep(1) + """): + print(chunk.content, end="") + + # 5. Async Streaming Download + async for chunk in codebox.astream_download("async_file.txt"): + assert isinstance(chunk, bytes) + print(chunk.decode()) + + +if __name__ == "__main__": + import asyncio + + asyncio.run(async_examples()) diff --git a/examples/async_file_io.py b/examples/async_file_io.py deleted file mode 100644 index a4a0043..0000000 --- a/examples/async_file_io.py +++ /dev/null @@ -1,44 +0,0 @@ -import requests # type: ignore -from codeboxapi import CodeBox - - -async def main(): - async with CodeBox() as codebox: - # upload dataset csv - csv_bytes = requests.get( - "https://archive.ics.uci.edu/" - "ml/machine-learning-databases/iris/iris.data" - ).content - await codebox.aupload("iris.csv", csv_bytes) - - # install openpyxl for excel conversion - await codebox.ainstall("pandas") - await codebox.ainstall("openpyxl") - - # convert dataset csv to excel - output = await codebox.arun( - "import pandas as pd\n\n" - "df = pd.read_csv('iris.csv', header=None)\n\n" - "df.to_excel('iris.xlsx', index=False)\n" - "'iris.xlsx'" - ) - - if output.type == "image/png": - print("This should not happen") - - elif output.type == "error": - print("Error: ", output.content) - - else: - files = await codebox.alist_files() - print("Available files: ", files) - - file = files[0].name - content = await codebox.adownload(file) - print("Content: ", content) - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) diff --git a/examples/async_plot_dataset.py b/examples/async_plot_dataset.py deleted file mode 100755 index c556489..0000000 --- a/examples/async_plot_dataset.py +++ /dev/null @@ -1,72 +0,0 @@ -import os - -import requests -from codeboxapi import CodeBox - - -async def main(): - async with CodeBox() as codebox: - # download the iris dataset - csv_bytes = requests.get( - "https://archive.ics.uci.edu/" - "ml/machine-learning-databases/iris/iris.data" - ).content - - # upload the dataset to the codebox - await codebox.aupload("iris.csv", csv_bytes) - - # install the required packages - await codebox.ainstall("matplotlib") - await codebox.ainstall("pandas") - - # dataset analysis code - code = ( - "import pandas as pd\n" - "import matplotlib.pyplot as plt\n\n" - "df = pd.read_csv('iris.csv', header=None)\n" - "df.columns = ['sepal_length', 'sepal_width'," - "'petal_length', 'petal_width', 'class']\n\n" - "color_dict = {'Iris-setosa': 0, 'Iris-versicolor': 1, " - "'Iris-virginica': 2}\n\n" - "df['color'] = df['class'].map(color_dict)\n\n" - "df.plot.scatter(x='sepal_length', y='sepal_width', " - "c='color', colormap='viridis')\n" - "plt.show()" - ) - - # run the code - output = await codebox.arun(code) - print(output.type) - - if output.type == "image/png" and os.environ.get("CODEBOX_TEST") == "False": - try: - from PIL import Image # type: ignore - except ImportError: - print( - "Please install it with " - '`pip install "codeboxapi[image_support]"`' - " to display images." - ) - exit(1) - - # Convert the image content ( bytes) into an image - import base64 - from io import BytesIO - - img_bytes = base64.b64decode(output.content) - img_buffer = BytesIO(img_bytes) - - # Display the image - img = Image.open(img_buffer) - img.show() - - elif output.type == "error": - # error output - print("Error:") - print(output.content) - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) diff --git a/examples/big_upload.py b/examples/big_upload.py deleted file mode 100644 index bcccebe..0000000 --- a/examples/big_upload.py +++ /dev/null @@ -1,37 +0,0 @@ -from codeboxapi import CodeBox - - -def url_upload(codebox, url: str) -> None: - codebox.run( - """ -import requests - -def download_file_from_url(url: str) -> None: - response = requests.get(url, stream=True) - response.raise_for_status() - file_name = url.split('/')[-1] - with open('./' + file_name, 'wb') as file: - for chunk in response.iter_content(chunk_size=8192): - if chunk: - file.write(chunk) - """ - ) - print(codebox.run(f"download_file_from_url('{url}')")) - - -with CodeBox() as codebox: - url_upload( - codebox, - "https://codeboxapistorage.blob.core.windows.net/bucket/data-test.arrow", - ) - print(codebox.list_files()) - - url_upload( - codebox, - "https://codeboxapistorage.blob.core.windows.net/bucket/data-train.arrow", - ) - print(codebox.list_files()) - - codebox.run("import os") - print(codebox.run("print(os.listdir())")) - print(codebox.run("print([(f, os.path.getsize(f)) for f in os.listdir('.')])")) diff --git a/examples/big_upload_from_url.py b/examples/big_upload_from_url.py new file mode 100644 index 0000000..750a1eb --- /dev/null +++ b/examples/big_upload_from_url.py @@ -0,0 +1,38 @@ +from codeboxapi import CodeBox + + +def url_upload(codebox: CodeBox, url: str) -> None: + codebox.exec( + """ +import requests + +def download_file_from_url(url: str) -> None: + response = requests.get(url, stream=True) + response.raise_for_status() + file_name = url.split('/')[-1] + with open('./' + file_name, 'wb') as file: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + file.write(chunk) + """ + ) + print(codebox.exec(f"download_file_from_url('{url}')")) + + +codebox = CodeBox() + +url_upload( + codebox, + "https://codeboxapistorage.blob.core.windows.net/bucket/data-test.arrow", +) +print(codebox.list_files()) + +url_upload( + codebox, + "https://codeboxapistorage.blob.core.windows.net/bucket/data-train.arrow", +) +print(codebox.list_files()) + +codebox.exec("import os") +print(codebox.exec("print(os.listdir())")) +print(codebox.exec("print([(f, os.path.getsize(f)) for f in os.listdir('.')])")) diff --git a/examples/custom_factory.py.todo b/examples/custom_factory.py.todo new file mode 100644 index 0000000..e69de29 diff --git a/examples/docker_parallel_execution.py b/examples/docker_parallel_execution.py new file mode 100644 index 0000000..9ed911f --- /dev/null +++ b/examples/docker_parallel_execution.py @@ -0,0 +1,84 @@ +import asyncio +import time +from pathlib import Path + +from codeboxapi import CodeBox + + +async def train_model(codebox: CodeBox, data_split: int) -> dict: + """Train a model on a subset of data.""" + + file = Path("examples/assets/advertising.csv") + assert file.exists(), "Dataset file does not exist" + + # Upload dataset + await codebox.aupload(file.name, file.read_bytes()) + + # Install required packages + await codebox.ainstall("pandas") + await codebox.ainstall("scikit-learn") + + # Training code with different data splits + code = f""" +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error, r2_score + +# Load and prepare data +data = pd.read_csv('advertising.csv') +X = data[['TV', 'Radio', 'Newspaper']] +y = data['Sales'] + +# Split with different random states for different data subsets +X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.3, random_state={data_split} +) + +# Train model +model = LinearRegression() +model.fit(X_train, y_train) + +# Evaluate +y_pred = model.predict(X_test) +mse = mean_squared_error(y_test, y_pred) +r2 = r2_score(y_test, y_pred) + +print(f"Split {data_split}:") +print(f"MSE: {{mse:.4f}}") +print(f"R2: {{r2:.4f}}") +print(f"Coefficients: {{model.coef_.tolist()}}") +""" + result = await codebox.aexec(code) + return {"split": data_split, "output": result.text, "errors": result.errors} + + +async def main(): + # Create multiple Docker instances + num_parallel = 4 + codeboxes = [CodeBox(api_key="docker") for _ in range(num_parallel)] + + # Create tasks for different data splits + tasks = [] + for i, codebox in enumerate(codeboxes): + task = asyncio.create_task(train_model(codebox, i)) + tasks.append(task) + + # Execute and time the parallel processing + start_time = time.perf_counter() + results = await asyncio.gather(*tasks) + end_time = time.perf_counter() + + # Print results + print(f"\nParallel execution completed in {end_time - start_time:.2f} seconds\n") + for result in results: + if not result["errors"]: + print(f"Results for {result['split']}:") + print(result["output"]) + print("-" * 50) + else: + print(f"Error in split {result['split']}:", result["errors"]) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/file_conversion.py b/examples/file_conversion.py new file mode 100644 index 0000000..10a75c0 --- /dev/null +++ b/examples/file_conversion.py @@ -0,0 +1,33 @@ +import httpx +from codeboxapi import CodeBox + +codebox = CodeBox() + +# upload dataset csv +csv_bytes = httpx.get( + "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" +).content +codebox.upload("iris.csv", csv_bytes) + +# install openpyxl for excel conversion +codebox.install("pandas") +codebox.install("openpyxl") + +# convert dataset csv to excel +output = codebox.exec( + "import pandas as pd\n\n" + "df = pd.read_csv('iris.csv', header=None)\n\n" + "df.to_excel('iris.xlsx', index=False)\n" + "'iris.xlsx'" +) + +# check output type +if output.images: + print("This should not happen") +elif output.errors: + print("Error: ", output.errors) +else: + # all files inside the codebox + for file in codebox.list_files(): + print("File: ", file.path) + print("File Size: ", file.get_size()) diff --git a/examples/file_io.py b/examples/file_io.py deleted file mode 100644 index e9c3061..0000000 --- a/examples/file_io.py +++ /dev/null @@ -1,30 +0,0 @@ -import requests -from codeboxapi import CodeBox - -with CodeBox() as codebox: - # upload dataset csv - csv_bytes = requests.get( - "https://archive.ics.uci.edu/" "ml/machine-learning-databases/iris/iris.data" - ).content - codebox.upload("iris.csv", csv_bytes) - - # convert dataset csv to excel - output = codebox.run( - "import pandas as pd\n\n" - "df = pd.read_csv('iris.csv', header=None)\n\n" - "df.to_excel('iris.xlsx', index=False)\n" - "'iris.xlsx'" - ) - - # check output type - if output.type == "image/png": - print("This should not happen") - elif output.type == "error": - print("Error: ", output.content) - else: - # all files inside the codebox - for file in codebox.list_files(): - print("File: ", file.name) - print("Content is None: ", file.content is None) - content = codebox.download(file.name) - print("Content: ", content) diff --git a/examples/getting_started.py b/examples/getting_started.py new file mode 100644 index 0000000..419719d --- /dev/null +++ b/examples/getting_started.py @@ -0,0 +1,81 @@ +from codeboxapi import CodeBox + +# Initialize CodeBox +codebox = CodeBox(api_key="local") # or get your API key at https://codeboxapi.com + +# Basic Examples +# ------------- + +# 1. Simple Code Execution +result = codebox.exec("print('Hello World!')") +print(result.text) # Output: Hello World! + +# 2. File Operations +# Upload a file +codebox.upload("example.txt", b"Hello from CodeBox!") + +# Download a file +downloaded = codebox.download("example.txt") +content = downloaded.get_content() # Returns b"Hello from CodeBox!" +print("Content:\n", content, sep="") + +# List files +files = codebox.list_files() +print("\nFiles:\n", "\n".join(f.__repr__() for f in files), sep="") + +# 3. Package Management +# Install packages +codebox.install("pandas") + +# List installed packages +packages = codebox.list_packages() +print("\nFirst 10 packages:\n", "\n".join(packages[:10]), sep="") + +# 4. Variable Management +# Execute code that creates variables +codebox.exec(""" +x = 42 +data = [1, 2, 3] +name = "Alice" +""") + +# Show all variables +variables = codebox.show_variables() +print("\nVariables:\n", "\n".join(f"{k}={v}" for k, v in variables.items()), sep="") + +# 5. Plotting with Matplotlib +plot_code = """ +import matplotlib.pyplot as plt +plt.figure(figsize=(10, 5)) +plt.plot([1, 2, 3, 4], [1, 4, 2, 3]) +plt.title('My Plot') +plt.show() +""" +result = codebox.exec(plot_code) +# result.images will contain the plot as bytes + +# 6. Streaming Output +# Useful for long-running operations +for chunk in codebox.stream_exec(""" +for i in range(5): + print(f"Processing item {i}") + import time + time.sleep(1) +"""): + # will not print when using "local" as api_key + # due to stdout being captured in the background + print(chunk.content, end="") + +# 7. Bash Commands +# Execute bash commands +codebox.exec("ls -la", kernel="bash") +codebox.exec("pwd", kernel="bash") + +# Create and run Python scripts via bash +codebox.exec("echo \"print('Running from file')\" > script.py", kernel="bash") +codebox.exec("python script.py", kernel="bash") + +# 8. Error Handling +result = codebox.exec("1/0") +if result.errors: + print("\nError occurred:", result.errors[0]) diff --git a/examples/langchain_agent.py.todo b/examples/langchain_agent.py.todo new file mode 100644 index 0000000..e69de29 diff --git a/examples/local_docker.py.todo b/examples/local_docker.py.todo new file mode 100644 index 0000000..e69de29 diff --git a/examples/parallel.py b/examples/parallel.py deleted file mode 100644 index 3853fcb..0000000 --- a/examples/parallel.py +++ /dev/null @@ -1,19 +0,0 @@ -import asyncio - -from codeboxapi import CodeBox - - -async def main(): - await asyncio.gather(*(spawn_codebox() for _ in range(10))) - - -async def spawn_codebox(): - async with CodeBox() as codebox: - await codebox.arun("a = 'Hello World!'") - a = await codebox.arun("a") - assert a == "Hello World!" - print("Success!") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/plot_dataset.py b/examples/plot_dataset.py index 51d9da0..a293cf8 100644 --- a/examples/plot_dataset.py +++ b/examples/plot_dataset.py @@ -1,48 +1,34 @@ -import os +import base64 +from io import BytesIO from pathlib import Path -import requests +import httpx from codeboxapi import CodeBox +from PIL import Image -with CodeBox() as codebox: - # download the iris dataset - csv_bytes = requests.get( - "https://archive.ics.uci.edu/" "ml/machine-learning-databases/iris/iris.data" - ).content - - # upload the dataset to the codebox - o = codebox.upload("iris.csv", csv_bytes) - - # dataset analysis code - file_path = Path("examples/assets/dataset_code.txt") - - # run the code - output = codebox.run(file_path=file_path) - print(output.type) - - if output.type == "image/png" and os.environ.get("CODEBOX_TEST") == "False": - try: - from PIL import Image # type: ignore - except ImportError: - print( - "Please install it with " - '`pip install "codeboxapi[image_support]"`' - " to display images." - ) - exit(1) - - # Convert the image content ( bytes) into an image - import base64 - from io import BytesIO - - img_bytes = base64.b64decode(output.content) - img_buffer = BytesIO(img_bytes) - - # Display the image - img = Image.open(img_buffer) - img.show() - - elif output.type == "error": - # error output - print("Error:") - print(output.content) +codebox = CodeBox(api_key="local") + +# download the iris dataset +iris_csv_bytes = httpx.get( + "https://archive.ics.uci.edu/" "ml/machine-learning-databases/iris/iris.data" +).content + +# upload the dataset to the codebox +codebox.upload("iris.csv", iris_csv_bytes) + +# dataset analysis code +file_path = Path("examples/assets/dataset_code.txt") + +# run the code +output = codebox.exec(file_path) + +if output.images: + img_bytes = base64.b64decode(output.images[0]) + img_buffer = BytesIO(img_bytes) + + # Display the image + img = Image.open(img_buffer) + img.show() + +elif output.errors: + print("Error:", output.errors) diff --git a/examples/session_restoring.py b/examples/session_restoring.py deleted file mode 100644 index 5788026..0000000 --- a/examples/session_restoring.py +++ /dev/null @@ -1,22 +0,0 @@ -from codeboxapi import CodeBox - - -def session_restoring(): - session = CodeBox() - session.start() - - session_id = session.session_id - print(session_id) - assert session_id is not None - - session.run('hello = "Hello World!"') - - del session - - print(CodeBox.from_id(session_id=session_id).run("print(hello)")) - - CodeBox.from_id(session_id=session_id).stop() - - -if __name__ == "__main__": - session_restoring() diff --git a/examples/simple_codeinterpreter.py.todo b/examples/simple_codeinterpreter.py.todo new file mode 100644 index 0000000..e69de29 diff --git a/examples/stream_chunk_timing.py b/examples/stream_chunk_timing.py new file mode 100644 index 0000000..7a08422 --- /dev/null +++ b/examples/stream_chunk_timing.py @@ -0,0 +1,44 @@ +import asyncio +import time + +from codeboxapi import CodeBox, ExecChunk + + +def sync_stream_exec(cb: CodeBox) -> None: + chunks: list[tuple[ExecChunk, float]] = [] + t0 = time.perf_counter() + for chunk in cb.stream_exec( + "import time;\nfor i in range(3): time.sleep(1); print(i)" + ): + chunks.append((chunk, time.perf_counter() - t0)) + + for chunk, t in chunks: + print(f"{t:.5f}: {chunk}") + + +async def async_stream_exec(cb: CodeBox) -> None: + chunks: list[tuple[ExecChunk, float]] = [] + t0 = time.perf_counter() + async for chunk in cb.astream_exec( + "import time;\nfor i in range(3): time.sleep(1); print(i)" + ): + chunks.append((chunk, time.perf_counter() - t0)) + + for chunk, t in chunks: + print(f"{t:.5f}: {chunk}") + + +print("remote") +cb = CodeBox() +sync_stream_exec(cb) +asyncio.run(async_stream_exec(cb)) + +print("local") +local = CodeBox(api_key="local") +sync_stream_exec(local) +asyncio.run(async_stream_exec(local)) + +print("docker") +docker = CodeBox(api_key="docker") +sync_stream_exec(docker) +asyncio.run(async_stream_exec(docker)) diff --git a/pyproject.toml b/pyproject.toml index 33e41a4..1850e26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "codeboxapi" -version = "0.1.21" +version = "0.2.0" description = "CodeBox gives you an easy scalable and isolated python interpreter for your LLM Agents." keywords = [ "codeboxapi", @@ -8,15 +8,14 @@ keywords = [ "codebox-api", "language model", "codeinterpreterapi", + "code sandbox", + "codebox", + "agent infrastructure", + "agents", + "agent sandbox", ] authors = [{ name = "Shroominic", email = "contact@shroominic.com" }] -dependencies = [ - "pydantic>=2", - "pydantic-settings>=2", - "requests>=2", - "aiohttp>=3.9", - "websockets>=12", -] +dependencies = ["httpx>=0.27"] readme = "README.md" requires-python = ">= 3.9" license = { text = "MIT" } @@ -34,22 +33,50 @@ build-backend = "hatchling.build" managed = true dev-dependencies = [ "codeboxapi[all]", - "mypy>=1.8", - "ruff>=0.1", - "pytest>=7.4", - "pre-commit>=3.5", - "neoteroi-mkdocs>=1", - "mkdocs-material>=9", - "types-requests>=2.31", - "matplotlib>=3.8.2", - "pip", - "pytest-asyncio>=0.23.7", + "codeboxapi[docs]", + "codeboxapi[pytest]", + "codeboxapi[serve]", + "ruff", + "mypy", + "types-aiofiles>=24", ] [project.optional-dependencies] -all = ["jupyter-kernel-gateway>=2.5, <3", "Pillow>=10"] -local_support = ["jupyter-kernel-gateway>=2.5, <3"] -image_support = ["Pillow>=10"] +docs = ["neoteroi-mkdocs", "mkdocs-material"] +pytest = ["pytest-asyncio"] +local = ["jupyter-client", "ipykernel", "uv", "aiofiles"] +vision = ["Pillow"] +serve = ["fastapi[standard]"] +data-science = [ + "codeboxapi[local]", + "codeboxapi[vision]", + "pandas", + "numpy", + "matplotlib", + "seaborn", + "scikit-learn", + "uv", + "bokeh", + "dash", + "matplotlib", + "networkx", + "numpy", + "openpyxl", + "pandas", + "pillow", + "plotly", + "python-docx", + "scikit-learn", + "scipy", + "seaborn", + "statsmodels", + "sympy", + "yfinance", +] +all = ["codeboxapi[data-science]", "codeboxapi[serve]"] + +[project.scripts] +codeboxapi-serve = "codeboxapi.api:serve" [tool.hatch.metadata] allow-direct-references = true @@ -60,5 +87,9 @@ packages = ["src/codeboxapi"] [tool.pytest.ini_options] filterwarnings = "ignore::DeprecationWarning" -[tool.ruff] +[tool.ruff.lint] select = ["E", "F", "I"] +ignore = ["E701"] + +[tool.ruff.format] +preview = true diff --git a/requirements.lock b/requirements.lock index cb83081..7a47e04 100644 --- a/requirements.lock +++ b/requirements.lock @@ -6,46 +6,23 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:. -aiohttp==3.9.5 +anyio==4.4.0 + # via httpx +certifi==2024.7.4 + # via httpcore + # via httpx +h11==0.14.0 + # via httpcore +httpcore==1.0.5 + # via httpx +httpx==0.27.0 # via codeboxapi -aiosignal==1.3.1 - # via aiohttp -annotated-types==0.6.0 - # via pydantic -attrs==23.2.0 - # via aiohttp -certifi==2024.2.2 - # via requests -charset-normalizer==3.3.2 - # via requests -frozenlist==1.4.1 - # via aiohttp - # via aiosignal idna==3.7 - # via requests - # via yarl -multidict==6.0.5 - # via aiohttp - # via yarl -pydantic==2.7.1 - # via codeboxapi - # via pydantic-settings -pydantic-core==2.18.2 - # via pydantic -pydantic-settings==2.2.1 - # via codeboxapi -python-dotenv==1.0.1 - # via pydantic-settings -requests==2.31.0 - # via codeboxapi -typing-extensions==4.11.0 - # via pydantic - # via pydantic-core -urllib3==2.2.1 - # via requests -websockets==12.0 - # via codeboxapi -yarl==1.9.4 - # via aiohttp + # via anyio + # via httpx +sniffio==1.3.1 + # via anyio + # via httpx diff --git a/roadmap.todo b/roadmap.todo index d5ed364..9baea17 100644 --- a/roadmap.todo +++ b/roadmap.todo @@ -1,21 +1,15 @@ -[ ] - gpu support - [ ] - different python versions -[ ] - node/bun engine - -[ ] - metadata kw storage - -[ ] - different container sizes (cpu/ram) +[ ] - gpu support -[ ] - variable timeout +[ ] - js kernel -[ ] - request only mode with dynamic filesystem +[ ] - metadata for kw storage -[ ] - general dynamic filesystem +[ ] - request only mode -[ ] - requirements on startup +[ ] - s3 filesystems -[ ] - chromiumbox + vectorbox +[ ] - add pypi requirements to factory -[ ] - gitbox +[ ] - visual-chromium-box diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..5cd1503 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,16 @@ +if [ -z "$1" ]; then + echo "Error: No version supplied" + echo "Usage: $0 " + exit 1 +fi + +VERSION=$1 + +docker build -t codebox . + +# todo move container to seperate codeboxapi account +docker tag codebox shroominic/codebox:$VERSION +docker tag codebox shroominic/codebox:latest + +docker push shroominic/codebox:$VERSION +docker push shroominic/codebox:latest diff --git a/scripts/dev-setup.sh b/scripts/dev-setup.sh new file mode 100644 index 0000000..3af3f57 --- /dev/null +++ b/scripts/dev-setup.sh @@ -0,0 +1,23 @@ +ruff lint . --fix + +# check if uv is installed or install it +if ! command -v uv &> /dev/null +then + echo "uv not found, installing..." + pip install uv +else + echo "uv is already installed" +fi + +# check if venv exists or create it +if [ ! -d ".venv" ]; then + echo "Creating virtual environment..." + uv venv +else + echo "Virtual environment already exists" +fi + +# Install dependencies +echo "Installing dependencies..." +uv pip install -r pyproject.toml + diff --git a/src/codeboxapi/__init__.py b/src/codeboxapi/__init__.py index 7b85236..12ade69 100644 --- a/src/codeboxapi/__init__.py +++ b/src/codeboxapi/__init__.py @@ -5,12 +5,8 @@ The package includes modules for configuring the client, setting the API key, and interacting with Codebox instances. """ -from codeboxapi.box.codebox import CodeBox -from codeboxapi.config import settings -from codeboxapi.utils import set_api_key -__all__ = [ - "CodeBox", - "set_api_key", - "settings", -] +from .codebox import CodeBox +from .types import ExecChunk, ExecResult, RemoteFile + +__all__ = ["CodeBox", "ExecChunk", "ExecResult", "RemoteFile"] diff --git a/src/codeboxapi/api.py b/src/codeboxapi/api.py new file mode 100644 index 0000000..6c4c559 --- /dev/null +++ b/src/codeboxapi/api.py @@ -0,0 +1,102 @@ +import asyncio +from contextlib import asynccontextmanager +from datetime import UTC, datetime, timedelta +from os import getenv, path +from typing import AsyncGenerator, Literal + +from fastapi import Body, Depends, FastAPI, HTTPException, UploadFile +from fastapi.responses import FileResponse, StreamingResponse +from pydantic import BaseModel + +from .local import LocalBox + +codebox = LocalBox() +last_interaction = datetime.now(UTC) + + +@asynccontextmanager +async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: + async def timeout(): + if (_timeout := getenv("CODEBOX_TIMEOUT", "15")).lower() == "none": + return + while last_interaction + timedelta(minutes=float(_timeout)) > datetime.now(UTC): + await asyncio.sleep(1) + exit(0) + + t = asyncio.create_task(timeout()) + yield + t.cancel() + + +async def get_codebox() -> AsyncGenerator[LocalBox, None]: + global codebox, last_interaction + last_interaction = datetime.now(UTC) + yield codebox + + +app = FastAPI(title="Codebox API", lifespan=lifespan) +app.get("/")(lambda: {"status": "ok"}) + + +class ExecBody(BaseModel): + code: str + kernel: Literal["ipython", "bash"] = "ipython" + timeout: int | None = None + cwd: str | None = None + + +@app.post("/exec") +async def exec( + exec: ExecBody, codebox: LocalBox = Depends(get_codebox) +) -> StreamingResponse: + async def event_stream() -> AsyncGenerator[str, None]: + async for chunk in codebox.astream_exec( + exec.code, exec.kernel, exec.timeout, exec.cwd + ): # protocol is content + yield f"<{chunk.type}>{chunk.content}" + + return StreamingResponse(event_stream()) + + +@app.get("/files/download/{file_name}") +async def download( + file_name: str, + timeout: int | None = None, + codebox: LocalBox = Depends(get_codebox), +) -> FileResponse: + async with asyncio.timeout(timeout): + file_path = path.join(codebox.cwd, file_name) + return FileResponse( + path=file_path, media_type="application/octet-stream", filename=file_name + ) + + +@app.post("/files/upload") +async def upload( + file: UploadFile, + timeout: int | None = None, + codebox: LocalBox = Depends(get_codebox), +) -> None: + if not file.filename: + raise HTTPException(status_code=400, detail="A file name is required") + async with asyncio.timeout(timeout): + await codebox.aupload(file.filename, file.file) + + +@app.post("/code/execute") +async def deprecated_exec( + body: dict = Body(), codebox: LocalBox = Depends(get_codebox) +) -> dict: + """deprecated: use /exec instead""" + ex = await codebox.aexec(body["properties"]["code"]) + return {"properties": {"stdout": ex.text, "stderr": ex.errors, "result": ex.text}} + + +def serve(): + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=int(getenv("PORT", 8069))) + + +if __name__ == "__main__": + serve() diff --git a/src/codeboxapi/box/__init__.py b/src/codeboxapi/box/__init__.py deleted file mode 100644 index b5a258d..0000000 --- a/src/codeboxapi/box/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -This module contains the box classes that are used to run code in a sandboxed -environment. The `BaseBox` class is the base class for all box classes. -The `LocalBox` class is used to run code in a local testing environment. The -`CodeBox` class is used to run code in a remote sandboxed environment. -""" - -from .basebox import BaseBox -from .codebox import CodeBox -from .localbox import LocalBox - -__all__ = [ - "BaseBox", - "CodeBox", - "LocalBox", -] diff --git a/src/codeboxapi/box/basebox.py b/src/codeboxapi/box/basebox.py deleted file mode 100644 index d57fee3..0000000 --- a/src/codeboxapi/box/basebox.py +++ /dev/null @@ -1,118 +0,0 @@ -"""Abstract Base Class for Isolated Execution Environments (CodeBox's)""" - -from abc import ABC, abstractmethod -from datetime import datetime -from os import PathLike -from typing import List, Optional -from uuid import UUID - -from codeboxapi.schema import CodeBoxFile, CodeBoxOutput, CodeBoxStatus - - -class BaseBox(ABC): - """CodeBox Abstract Base Class""" - - def __init__(self, session_id: Optional[UUID] = None) -> None: - """Initialize the CodeBox instance""" - self.session_id = session_id - self.last_interaction = datetime.now() - - @abstractmethod - def start(self) -> CodeBoxStatus: - """Startup the CodeBox instance""" - - @abstractmethod - async def astart(self) -> CodeBoxStatus: - """Async Startup the CodeBox instance""" - - @abstractmethod - def status(self) -> CodeBoxStatus: - """Get the current status of the CodeBox instance""" - - @abstractmethod - async def astatus(self) -> CodeBoxStatus: - """Async Get the current status of the CodeBox instance""" - - @abstractmethod - def run( - self, code: Optional[str] = None, file_path: Optional[PathLike] = None - ) -> CodeBoxOutput: - """Execute python code inside the CodeBox instance""" - - @abstractmethod - async def arun( - self, code: str, file_path: Optional[PathLike] = None - ) -> CodeBoxOutput: - """Async Execute python code inside the CodeBox instance""" - - @abstractmethod - def upload( - self, file_name: str, content: bytes, timeout: int = 600 - ) -> CodeBoxStatus: - """Upload a file as bytes to the CodeBox instance""" - - @abstractmethod - async def aupload( - self, file_name: str, content: bytes, timeout: int = 600 - ) -> CodeBoxStatus: - """Async Upload a file as bytes to the CodeBox instance""" - - @abstractmethod - def download(self, file_name: str) -> CodeBoxFile: - """Download a file as CodeBoxFile schema""" - - @abstractmethod - async def adownload(self, file_name: str) -> CodeBoxFile: - """Async Download a file as CodeBoxFile schema""" - - @abstractmethod - def install(self, package_name: str) -> CodeBoxStatus: - """Install a python package to the venv""" - - @abstractmethod - async def ainstall(self, package_name: str) -> CodeBoxStatus: - """Async Install a python package to the venv""" - - @abstractmethod - def list_files(self) -> List[CodeBoxFile]: - """List all available files inside the CodeBox instance""" - - @abstractmethod - async def alist_files(self) -> List[CodeBoxFile]: - """Async List all available files inside the CodeBox instance""" - - @abstractmethod - def restart(self) -> CodeBoxStatus: - """Restart the jupyter kernel inside the CodeBox instance""" - - @abstractmethod - async def arestart(self) -> CodeBoxStatus: - """Async Restart the jupyter kernel inside the CodeBox instance""" - - @abstractmethod - def stop(self) -> CodeBoxStatus: - """Terminate the CodeBox instance""" - - @abstractmethod - async def astop(self) -> CodeBoxStatus: - """Async Terminate the CodeBox instance""" - - def __enter__(self) -> "BaseBox": - self.start() - return self - - async def __aenter__(self) -> "BaseBox": - await self.astart() - return self - - def __exit__(self, exc_type, exc_value, traceback) -> None: - self.stop() - - async def __aexit__(self, exc_type, exc_value, traceback) -> None: - await self.astop() - - def __repr__(self) -> str: - return f"<{self.__class__.__name__} id={self.session_id}>" - - def __str__(self) -> str: - return self.__repr__() diff --git a/src/codeboxapi/box/codebox.py b/src/codeboxapi/box/codebox.py deleted file mode 100644 index 3f69444..0000000 --- a/src/codeboxapi/box/codebox.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -CodeBox API Wrapper -~~~~~~~~~~~~~~~~~~~ - -A basic wrapper for the CodeBox API. - -Usage ------ - -.. code-block:: python - - from codeboxapi import CodeBox - - with CodeBox() as codebox: - codebox.status() - codebox.run(code="print('Hello World!')") - codebox.install("python-package") - codebox.upload("test.txt", b"Hello World!") - codebox.list_files() - codebox.download("test.txt") - -.. code-block:: python - - from codeboxapi import CodeBox - - async with CodeBox() as codebox: - await codebox.astatus() - await codebox.arun(code="print('Hello World!')") - await codebox.ainstall("python-package") - await codebox.aupload("test.txt", b"Hello World!") - await codebox.alist_files() - await codebox.adownload("test.txt") - -""" - -from datetime import datetime -from os import PathLike -from typing import Any, Dict, List, Optional, Union -from uuid import UUID - -from aiohttp import ClientSession - -from codeboxapi.box.basebox import BaseBox -from codeboxapi.config import settings -from codeboxapi.schema import CodeBoxFile, CodeBoxOutput, CodeBoxStatus -from codeboxapi.utils import abase_request, base_request - - -class CodeBox(BaseBox): - """ - Sandboxed Python Interpreter - """ - - def __new__(cls, *args, **kwargs): - if ( - kwargs.pop("local", False) - or settings.CODEBOX_API_KEY is None - or settings.CODEBOX_API_KEY == "local" - ): - from .localbox import LocalBox - - return LocalBox(*args, **kwargs) - - return super().__new__(cls) - - def __init__(self, /, **kwargs) -> None: - super().__init__() - self.session_id: Optional[UUID] = kwargs.pop("session_id", None) - self.aiohttp_session: Optional[ClientSession] = None - - @classmethod - def from_id(cls, session_id: Union[int, UUID], **kwargs) -> "CodeBox": - kwargs["session_id"] = ( - UUID(int=session_id) if isinstance(session_id, int) else session_id - ) - return cls(**kwargs) - - def _update(self) -> None: - """Update last interaction time""" - self.last_interaction = datetime.now() - - def codebox_request(self, method, endpoint, *args, **kwargs) -> Dict[str, Any]: - """Basic request to the CodeBox API""" - self._update() - if self.session_id is None: - raise RuntimeError("Make sure to start your CodeBox before using it.") - return base_request( - method, f"/codebox/{self.session_id.int}" + endpoint, *args, **kwargs - ) - - async def acodebox_request( - self, method, endpoint, *args, **kwargs - ) -> Dict[str, Any]: - """Basic async request to the CodeBox API""" - self._update() - if self.aiohttp_session is None: - self.aiohttp_session = ClientSession() - if self.session_id is None: - raise RuntimeError("Make sure to start your CodeBox before using it.") - return await abase_request( - self.aiohttp_session, - method, - f"/codebox/{self.session_id.int}" + endpoint, - *args, - **kwargs, - ) - - def start(self) -> CodeBoxStatus: - if self.session_id is not None: - if settings.VERBOSE: - print(f"{self} is already started!") - return CodeBoxStatus(status="started") - self.session_id = UUID( - int=base_request( - method="GET", - endpoint="/codebox/start", - )["id"] - ) - if settings.VERBOSE: - print(f"{self} started!") - return CodeBoxStatus(status="started") - - async def astart(self) -> CodeBoxStatus: - self.aiohttp_session = ClientSession() - if self.session_id is not None: - if settings.VERBOSE: - print(f"{self} is already started!") - return CodeBoxStatus(status="started") - self.session_id = UUID( - int=( - await abase_request( - self.aiohttp_session, method="GET", endpoint="/codebox/start" - ) - )["id"] - ) - if settings.VERBOSE: - print(f"{self} started!") - return CodeBoxStatus(status="started") - - def status(self): - return CodeBoxStatus( - **self.codebox_request( - method="GET", - endpoint="/", - ) - ) - - async def astatus(self): - return CodeBoxStatus( - **await self.acodebox_request( - method="GET", - endpoint="/", - ) - ) - - def run( - self, code: Optional[str] = None, file_path: Optional[PathLike] = None - ) -> CodeBoxOutput: - if not code and not file_path: # R0801 - raise ValueError("Code or file_path must be specified!") - - if code and file_path: - raise ValueError("Can only specify code or the file to read_from!") - - if file_path: - with open(file_path, "r", encoding="utf-8") as f: - code = f.read() - - return CodeBoxOutput( - **self.codebox_request( - method="POST", - endpoint="/run", - body={"code": code}, - ) - ) - - async def arun( - self, code: str, file_path: Optional[PathLike] = None - ) -> CodeBoxOutput: - if file_path: # TODO: Implement this - raise NotImplementedError( - "Reading from FilePath is not supported in async mode yet!" - ) - - return CodeBoxOutput( - **await self.acodebox_request( - method="POST", - endpoint="/run", - body={"code": code}, - ) - ) - - def upload( - self, file_name: str, content: bytes, timeout: int = 600 - ) -> CodeBoxStatus: - return CodeBoxStatus( - **self.codebox_request( - method="POST", - endpoint="/upload", - files={"file": (file_name, content)}, - timeout=timeout, - ) - ) - - async def aupload( - self, file_name: str, content: bytes, timeout: int = 600 - ) -> CodeBoxStatus: - return CodeBoxStatus( - **await self.acodebox_request( - method="POST", - endpoint="/upload", - files={"file": (file_name, content)}, - timeout=timeout, - ) - ) - - def download(self, file_name: str) -> CodeBoxFile: - return CodeBoxFile( - **self.codebox_request( - method="GET", - endpoint="/download", - body={"file_name": file_name}, - ) - ) - - async def adownload(self, file_name: str) -> CodeBoxFile: - return CodeBoxFile( - **await self.acodebox_request( - method="GET", - endpoint="/download", - body={"file_name": file_name}, - ) - ) - - def install(self, package_name: str) -> CodeBoxStatus: - return CodeBoxStatus( - **self.codebox_request( - method="POST", - endpoint="/install", - body={ - "package_name": package_name, - }, - ) - ) - - async def ainstall(self, package_name: str) -> CodeBoxStatus: - return CodeBoxStatus( - **await self.acodebox_request( - method="POST", - endpoint="/install", - body={ - "package_name": package_name, - }, - ) - ) - - def list_files(self) -> List[CodeBoxFile]: - return [ - CodeBoxFile(name=file_name, content=None) - for file_name in ( - self.codebox_request( - method="GET", - endpoint="/files", - ) - )["files"] - ] - - async def alist_files(self) -> List[CodeBoxFile]: - return [ - CodeBoxFile(name=file_name, content=None) - for file_name in ( - await self.acodebox_request( - method="GET", - endpoint="/files", - ) - )["files"] - ] - - def restart(self) -> CodeBoxStatus: - return CodeBoxStatus( - **self.codebox_request( - method="POST", - endpoint="/restart", - ) - ) - - async def arestart(self) -> CodeBoxStatus: - return CodeBoxStatus( - **await self.acodebox_request( - method="POST", - endpoint="/restart", - ) - ) - - def stop(self) -> CodeBoxStatus: - status = CodeBoxStatus( - **self.codebox_request( - method="POST", - endpoint="/stop", - ) - ) - self.session_id = None - return status - - async def astop(self) -> CodeBoxStatus: - status = CodeBoxStatus( - **await self.acodebox_request( - method="POST", - endpoint="/stop", - ) - ) - self.session_id = None - if self.aiohttp_session: - await self.aiohttp_session.close() - self.aiohttp_session = None - return status - - def __del__(self): - if self.aiohttp_session: - import asyncio - - loop = asyncio.new_event_loop() - loop.run_until_complete(self.aiohttp_session.close()) - self.aiohttp_session = None diff --git a/src/codeboxapi/box/localbox.py b/src/codeboxapi/box/localbox.py deleted file mode 100644 index 2eb427d..0000000 --- a/src/codeboxapi/box/localbox.py +++ /dev/null @@ -1,645 +0,0 @@ -""" -Local implementation of CodeBox. -This is useful for testing and development.c -In case you don't put an api_key, -this is the default CodeBox. -""" - -import asyncio -import json -import os -import signal -import subprocess -import sys -import time -from asyncio.subprocess import Process -from importlib.metadata import PackageNotFoundError, distribution -from pathlib import Path -from typing import List, Optional, Union -from uuid import uuid4 - -import aiohttp -import requests -from websockets.client import WebSocketClientProtocol -from websockets.client import connect as ws_connect -from websockets.exceptions import ConnectionClosedError -from websockets.sync.client import ClientConnection -from websockets.sync.client import connect as ws_connect_sync - -from codeboxapi.box import BaseBox -from codeboxapi.schema import CodeBoxFile, CodeBoxOutput, CodeBoxStatus - -from ..config import settings - - -class LocalBox(BaseBox): - """ - LocalBox is a CodeBox implementation that runs code locally. - This is useful for testing and development. - """ - - _instance: Optional["LocalBox"] = None - _jupyter_pids: List[int] = [] - - def __new__(cls, *args, **kwargs): - if not cls._instance: - cls._instance = super().__new__(cls) - else: - if settings.SHOW_INFO: - print( - "INFO: Using a LocalBox which is not fully isolated\n" - " and not scalable across multiple users.\n" - " Make sure to use a CODEBOX_API_KEY in production.\n" - " Set envar SHOW_INFO=False to not see this again.\n" - ) - return cls._instance - - def __init__(self, /, **kwargs) -> None: - super().__init__(session_id=kwargs.pop("session_id", None)) - self.port: int = 8888 - self.kernel_id: Optional[dict] = None - self.ws: Union[WebSocketClientProtocol, ClientConnection, None] = None - self.jupyter: Union[Process, subprocess.Popen, None] = None - self.aiohttp_session: Optional[aiohttp.ClientSession] = None - - def start(self) -> CodeBoxStatus: - self.session_id = uuid4() - os.makedirs(".codebox", exist_ok=True) - self._check_port() - if settings.VERBOSE: - print("Starting kernel...") - out = None - else: - out = subprocess.PIPE - self._check_installed() - try: - python = Path(sys.executable).absolute() - self.jupyter = subprocess.Popen( - [ - python, - "-m", - "jupyter", - "kernelgateway", - "--KernelGatewayApp.ip='0.0.0.0'", - f"--KernelGatewayApp.port={self.port}", - ], - stdout=out, - stderr=out, - cwd=".codebox", - ) - self._jupyter_pids.append(self.jupyter.pid) - except FileNotFoundError: - raise ModuleNotFoundError( - "Jupyter Kernel Gateway not found, please install it with:\n" - "`pip install jupyter_kernel_gateway`\n" - "to use the LocalBox." - ) - while True: - try: - response = requests.get(self.kernel_url, timeout=270) - if response.status_code == 200: - break - except requests.exceptions.ConnectionError: - pass - if settings.VERBOSE: - print("Waiting for kernel to start...") - time.sleep(1) - self._connect() - return CodeBoxStatus(status="started") - - def _connect(self) -> None: - # Implement retry logic for kernel connection - for attempt in range(5): - try: - response = requests.post( - f"{self.kernel_url}/kernels", - headers={"Content-Type": "application/json"}, - timeout=60, - ) - if response.status_code == 201: - self.kernel_id = response.json().get("id", None) - if self.kernel_id: - break - except requests.RequestException as e: - print(f"Could not connect to kernel. {e}") - time.sleep(5) # Wait for 5 seconds before retrying - - if self.kernel_id is None: - raise Exception("Could not start kernel after multiple attempts") - - # Connect to WebSocket with retry logic - for attempt in range(5): - try: - self.ws = ws_connect_sync( - f"{self.ws_url}/kernels/{self.kernel_id}/channels", - open_timeout=60, - close_timeout=60, - ) - break # Break the loop if connection is successful - except (ConnectionClosedError, TimeoutError) as e: - print(f"Attempt {attempt + 1}: WebSocket connection failed. Error: {e}") - time.sleep(5) # Wait for 5 seconds before retrying - - if not self.ws: - raise Exception("Could not connect to WebSocket after multiple attempts") - - def _check_port(self) -> None: - try: - response = requests.get(f"http://localhost:{self.port}", timeout=270) - except requests.exceptions.ConnectionError: - pass - else: - if response.status_code == 200: - self.port += 1 - self._check_port() - - def _check_installed(self) -> None: - try: - distribution("jupyter-kernel-gateway") - except PackageNotFoundError: - print( - "Make sure 'jupyter-kernel-gateway' is installed " - "when using without a CODEBOX_API_KEY.\n" - "You can install it with 'pip install jupyter-kernel-gateway'." - ) - raise - - async def astart(self) -> CodeBoxStatus: - self.session_id = uuid4() - os.makedirs(".codebox", exist_ok=True) - self.aiohttp_session = aiohttp.ClientSession() - await self._acheck_port() - if settings.VERBOSE: - print("Starting kernel...") - out = None - else: - out = asyncio.subprocess.PIPE - self._check_installed() - python = Path(sys.executable).absolute() - try: - self.jupyter = await asyncio.create_subprocess_exec( - python, - "-m", - "jupyter", - "kernelgateway", - "--KernelGatewayApp.ip='0.0.0.0'", - f"--KernelGatewayApp.port={self.port}", - stdout=out, - stderr=out, - cwd=".codebox", - ) - self._jupyter_pids.append(self.jupyter.pid) - except Exception as e: - print(e) - raise ModuleNotFoundError( - "Jupyter Kernel Gateway not found, please install it with:\n" - "`pip install jupyter_kernel_gateway`\n" - "to use the LocalBox." - ) - while True: - try: - response = await self.aiohttp_session.get(self.kernel_url) - if response.status == 200: - break - except aiohttp.ClientConnectorError: - pass - except aiohttp.ServerDisconnectedError: - pass - if settings.VERBOSE: - print("Waiting for kernel to start...") - await asyncio.sleep(1) - await self._aconnect() - return CodeBoxStatus(status="started") - - async def _aconnect(self) -> None: - if self.aiohttp_session is None: - timeout = aiohttp.ClientTimeout(total=270) - self.aiohttp_session = aiohttp.ClientSession(timeout=timeout) - - # Implement retry logic for kernel connection - for attempt in range(5): - try: - response = await self.aiohttp_session.post( - f"{self.kernel_url}/kernels", - headers={"Content-Type": "application/json"}, - ) - if response.status == 201: - self.kernel_id = (await response.json()).get("id", None) - if self.kernel_id: - break - except aiohttp.ClientError as e: - print(f"Attempt {attempt + 1}: Could not connect to kernel. Error: {e}") - await asyncio.sleep(5) # Wait for 5 seconds before retrying - - if self.kernel_id is None: - raise Exception("Could not start kernel after multiple attempts") - - # Connect to WebSocket with increased timeout and retry logic - for attempt in range(5): - try: - self.ws = await ws_connect( - f"{self.ws_url}/kernels/{self.kernel_id}/channels", - timeout=60, - open_timeout=60, - close_timeout=60, - ) - break # Break the loop if connection is successful - except asyncio.TimeoutError as e: - print( - f"Attempt {attempt + 1}: WebSocket connection timeout. Error: {e}" - ) - await asyncio.sleep(5) # Wait for 5 seconds before retrying - - if not self.ws: - raise Exception("Could not connect to WebSocket after multiple attempts") - - async def _acheck_port(self) -> None: - try: - if self.aiohttp_session is None: - self.aiohttp_session = aiohttp.ClientSession() - response = await self.aiohttp_session.get(f"http://localhost:{self.port}") - except aiohttp.ClientConnectorError: - pass - except aiohttp.ServerDisconnectedError: - pass - else: - if response.status == 200: - self.port += 1 - await self._acheck_port() - - def status(self) -> CodeBoxStatus: - if not self.kernel_id: - self._connect() - - return CodeBoxStatus( - status="running" - if self.kernel_id - and requests.get(self.kernel_url, timeout=270).status_code == 200 - else "stopped" - ) - - async def astatus(self) -> CodeBoxStatus: - if not self.kernel_id: - await self._aconnect() - return CodeBoxStatus( - status="running" - if self.kernel_id - and self.aiohttp_session - and (await self.aiohttp_session.get(self.kernel_url)).status == 200 - else "stopped" - ) - - def run( - self, - code: Optional[str] = None, - file_path: Optional[os.PathLike] = None, - retry=3, - ) -> CodeBoxOutput: - if not code and not file_path: - raise ValueError("Code or file_path must be specified!") - - if code and file_path: - raise ValueError("Can only specify code or the file to read_from!") - - if file_path: - with open(file_path, "r", encoding="utf-8") as f: - code = f.read() - - # run code in jupyter kernel - if retry <= 0: - raise RuntimeError("Could not connect to kernel") - if not self.ws: - self._connect() - if not self.ws: - raise RuntimeError("Jupyter not running. Make sure to start it first.") - - if settings.VERBOSE: - print("Running code:\n", code) - - # send code to kernel - self.ws.send( - json.dumps( - { - "header": { - "msg_id": (msg_id := uuid4().hex), - "msg_type": "execute_request", - }, - "parent_header": {}, - "metadata": {}, - "content": { - "code": code, - "silent": False, - "store_history": True, - "user_expressions": {}, - "allow_stdin": False, - "stop_on_error": True, - }, - "channel": "shell", - "buffers": [], - } - ) - ) - result = "" - while True: - try: - if isinstance(self.ws, WebSocketClientProtocol): - raise RuntimeError("Mixing asyncio and sync code is not supported") - received_msg = json.loads(self.ws.recv()) - except ConnectionClosedError: - self.start() - return self.run(code, file_path, retry - 1) - - msg_header = received_msg.get("header", {}) - msg_parent_header = received_msg.get("parent_header", {}) - msg_content = received_msg.get("content", {}) - msg_data = msg_content.get("data", {}) - - if ( - msg_header["msg_type"] == "stream" - and msg_parent_header["msg_id"] == msg_id - ): - msg = msg_content["text"].strip() - if "Requirement already satisfied:" in msg: - continue - result += msg + "\n" - if settings.VERBOSE: - print("Output:\n", result) - - elif ( - msg_header["msg_type"] == "execute_result" - and msg_parent_header["msg_id"] == msg_id - ): - result += msg_data["text/plain"].strip() + "\n" - if settings.VERBOSE: - print("Output:\n", result) - - elif msg_header["msg_type"] == "display_data": - if "image/png" in msg_data: - return CodeBoxOutput( - type="image/png", - content=msg_data["image/png"], - ) - if "text/plain" in msg_data: - return CodeBoxOutput( - type="text", - content=msg_data["text/plain"], - ) - return CodeBoxOutput( - type="error", - content="Could not parse output", - ) - elif ( - msg_header["msg_type"] == "status" - and msg_parent_header["msg_id"] == msg_id - and msg_content["execution_state"] == "idle" - ): - if len(result) > 500: - result = "[...]\n" + result[-500:] - return CodeBoxOutput( - type="text", content=result or "code run successfully (no output)" - ) - - elif ( - msg_header["msg_type"] == "error" - and msg_parent_header["msg_id"] == msg_id - ): - error = f"{msg_content['ename']}: " f"{msg_content['evalue']}" - if settings.VERBOSE: - print("Error:\n", error) - return CodeBoxOutput(type="error", content=error) - - async def arun( - self, - code: str, - file_path: Optional[os.PathLike] = None, - retry=3, - ) -> CodeBoxOutput: - if file_path: - raise NotImplementedError( - "Reading from file is not supported in async mode" - ) - - # run code in jupyter kernel - if retry <= 0: - raise RuntimeError("Could not connect to kernel") - if not self.ws: - await self._aconnect() - - if settings.VERBOSE: - print("Running code:\n", code) - - if not isinstance(self.ws, WebSocketClientProtocol): - raise RuntimeError("Mixing asyncio and sync code is not supported") - - await self.ws.send( - json.dumps( - { - "header": { - "msg_id": (msg_id := uuid4().hex), - "msg_type": "execute_request", - }, - "parent_header": {}, - "metadata": {}, - "content": { - "code": code, - "silent": False, - "store_history": True, - "user_expressions": {}, - "allow_stdin": False, - "stop_on_error": True, - }, - "channel": "shell", - "buffers": [], - } - ) - ) - result = "" - while True: - try: - received_msg = json.loads(await self.ws.recv()) - except ConnectionClosedError: - await self.astart() - return await self.arun(code, file_path, retry - 1) - - msg_header = received_msg.get("header", {}) - msg_parent_header = received_msg.get("parent_header", {}) - msg_content = received_msg.get("content", {}) - msg_data = msg_content.get("data", {}) - - if ( - msg_header["msg_type"] == "stream" - and msg_parent_header["msg_id"] == msg_id - ): - msg = msg_content["text"].strip() - if "Requirement already satisfied:" in msg: - continue - result += msg + "\n" - if settings.VERBOSE: - print("Output:\n", result) - - elif ( - msg_header["msg_type"] == "execute_result" - and msg_parent_header["msg_id"] == msg_id - ): - result += msg_data["text/plain"].strip() + "\n" - if settings.VERBOSE: - print("Output:\n", result) - - elif msg_header["msg_type"] == "display_data": - if "image/png" in msg_data: - return CodeBoxOutput( - type="image/png", - content=msg_data["image/png"], - ) - if "text/plain" in msg_data: - return CodeBoxOutput( - type="text", - content=msg_data["text/plain"], - ) - elif ( - msg_header["msg_type"] == "status" - and msg_parent_header["msg_id"] == msg_id - and msg_content["execution_state"] == "idle" - ): - if len(result) > 500: - result = "[...]\n" + result[-500:] - return CodeBoxOutput( - type="text", content=result or "code run successfully (no output)" - ) - - elif ( - msg_header["msg_type"] == "error" - and msg_parent_header["msg_id"] == msg_id - ): - error = f"{msg_content['ename']}: " f"{msg_content['evalue']}" - if settings.VERBOSE: - print("Error:\n", error) - return CodeBoxOutput(type="error", content=error) - - def upload( - self, file_name: str, content: bytes, timeout: int = 900 - ) -> CodeBoxStatus: - os.makedirs(".codebox", exist_ok=True) - with open(os.path.join(".codebox", file_name), "wb") as f: - f.write(content) - - return CodeBoxStatus(status=f"{file_name} uploaded successfully") - - async def aupload( - self, file_name: str, content: bytes, timeout: int = 900 - ) -> CodeBoxStatus: - return await asyncio.to_thread(self.upload, file_name, content, timeout) - - def download(self, file_name: str) -> CodeBoxFile: - with open(os.path.join(".codebox", file_name), "rb") as f: - content = f.read() - - return CodeBoxFile(name=file_name, content=content) - - async def adownload(self, file_name: str) -> CodeBoxFile: - return await asyncio.to_thread(self.download, file_name) - - def install(self, package_name: str) -> CodeBoxStatus: - result = self.run(f"!pip install -q {package_name}") - if result.type == "error": - raise RuntimeError(f"Failed to install {package_name}") - - return CodeBoxStatus(status=f"{package_name} installed successfully") - - async def ainstall(self, package_name: str) -> CodeBoxStatus: - result = await self.arun(f"!pip install -q {package_name}") - if result.type == "error": - raise RuntimeError(f"Failed to install {package_name}") - - return CodeBoxStatus(status=f"{package_name} installed successfully") - - def list_files(self) -> List[CodeBoxFile]: - return [ - CodeBoxFile(name=file_name, content=None) - for file_name in os.listdir(".codebox") - ] - - async def alist_files(self) -> List[CodeBoxFile]: - return await asyncio.to_thread(self.list_files) - - def restart(self) -> CodeBoxStatus: - # self.stop() - # self.start() - return CodeBoxStatus(status="restarted") - - async def arestart(self) -> CodeBoxStatus: - # await self.astop() - # await self.astart() - return CodeBoxStatus(status="restarted") - - def stop(self) -> CodeBoxStatus: - try: - if self.jupyter is not None: - if isinstance(self.jupyter, subprocess.Popen): - self.jupyter.terminate() - self.jupyter.wait() - self.jupyter = None - time.sleep(2) - elif isinstance(self.jupyter, Process): - self.jupyter.terminate() - self.jupyter = None - time.sleep(5) - else: - for pid in self._jupyter_pids: - os.kill(pid, signal.SIGTERM) - except ProcessLookupError: - pass - - if self.ws is not None: - try: - if isinstance(self.ws, ClientConnection): - self.ws.close() - else: - loop = asyncio.new_event_loop() - loop.run_until_complete(self.ws.close()) - except ConnectionClosedError: - pass - self.ws = None - - return CodeBoxStatus(status="stopped") - - async def astop(self) -> CodeBoxStatus: - if self.jupyter is not None: - self.jupyter.terminate() - await asyncio.create_subprocess_exec("kill", "-9", str(self.jupyter.pid)) - await asyncio.sleep(5) - self.jupyter = None - - if self.ws is not None: - try: - if isinstance(self.ws, WebSocketClientProtocol): - await self.ws.close() - else: - self.ws.close() - except ConnectionClosedError: - pass - self.ws = None - - if self.aiohttp_session is not None: - await self.aiohttp_session.close() - self.aiohttp_session = None - - return CodeBoxStatus(status="stopped") - - @property - def kernel_url(self) -> str: - """Return the url of the kernel.""" - return f"http://localhost:{self.port}/api" - - @property - def ws_url(self) -> str: - """Return the url of the websocket.""" - return f"ws://localhost:{self.port}/api" - - def __del__(self): - self.stop() - - if self.aiohttp_session is not None: - loop = asyncio.new_event_loop() - loop.run_until_complete(self.aiohttp_session.close()) - self.aiohttp_session = None diff --git a/src/codeboxapi/codebox.py b/src/codeboxapi/codebox.py new file mode 100644 index 0000000..32d36ed --- /dev/null +++ b/src/codeboxapi/codebox.py @@ -0,0 +1,331 @@ +""" +CodeBox API +~~~~~~~~~~~ + +The main class for the CodeBox API. + +Usage +----- + +.. code-block:: python + + from codeboxapi import CodeBox + + codebox = CodeBox.create(api_key="local") + + codebox.healthcheck() + codebox.exec("print('Hello World!')") + codebox.upload("test.txt", "This is test file content!") + codebox.exec("!pip install matplotlib", kernel="bash") + codebox.list_files() + codebox.download("test.txt") + +.. code-block:: python + + from codeboxapi import CodeBox + + codebox = CodeBox.create(api_key="local") + + await codebox.ahealthcheck() + await codebox.aexec("print('Hello World!')") + await codebox.ainstall("matplotlib") + await codebox.aupload("test.txt", "This is test file content!") + await codebox.alist_files() + await codebox.adownload("test.txt") + +""" + +import os +import typing as t +from importlib import import_module + +import anyio + +from .utils import async_flatten_exec_result, deprecated, flatten_exec_result, syncify + +if t.TYPE_CHECKING: + from .types import CodeBoxOutput, ExecChunk, ExecResult, RemoteFile + + +class CodeBox: + def __new__( + cls, + session_id: str | None = None, + api_key: str | t.Literal["local", "docker"] | None = None, + factory_id: str | t.Literal["default"] | None = None, + ) -> "CodeBox": + """ + Creates a CodeBox session + """ + api_key = api_key or os.getenv("CODEBOX_API_KEY", "local") + factory_id = factory_id or os.getenv("CODEBOX_FACTORY_ID", "default") + if api_key == "local": + return import_module("codeboxapi.local").LocalBox() + + if api_key == "docker": + return import_module("codeboxapi.docker").DockerBox() + return import_module("codeboxapi.remote").RemoteBox() + + def __init__( + self, + session_id: str | None = None, + api_key: str | t.Literal["local", "docker"] | None = None, + factory_id: str | t.Literal["default"] | None = None, + **_: bool, + ) -> None: + self.session_id = session_id or "local" + self.api_key = api_key or os.getenv("CODEBOX_API_KEY", "local") + self.factory_id = factory_id or os.getenv("CODEBOX_FACTORY_ID", "default") + + # SYNC + + def exec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> "ExecResult": + """Execute code inside the CodeBox instance""" + return flatten_exec_result(self.stream_exec(code, kernel, timeout, cwd)) + + def stream_exec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.Generator["ExecChunk", None, None]: + """Executes the code and streams the result.""" + raise NotImplementedError("Abstract method, please use a subclass.") + + def upload( + self, + remote_file_path: str, + content: t.BinaryIO | bytes | str, + timeout: float | None = None, + ) -> "RemoteFile": + """Upload a file to the CodeBox instance""" + raise NotImplementedError("Abstract method, please use a subclass.") + + def stream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.Generator[bytes, None, None]: + """Download a file as open BinaryIO. Make sure to close the file after use.""" + raise NotImplementedError("Abstract method, please use a subclass.") + + # ASYNC + + async def aexec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> "ExecResult": + """Async Execute python code inside the CodeBox instance""" + return await async_flatten_exec_result( + self.astream_exec(code, kernel, timeout, cwd) + ) + + def astream_exec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.AsyncGenerator["ExecChunk", None]: + """Async Stream Chunks of Execute python code inside the CodeBox instance""" + raise NotImplementedError("Abstract method, please use a subclass.") + + async def aupload( + self, + remote_file_path: str, + content: t.BinaryIO | bytes | str, + timeout: float | None = None, + ) -> "RemoteFile": + """Async Upload a file to the CodeBox instance""" + raise NotImplementedError("Abstract method, please use a subclass.") + + async def adownload( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> "RemoteFile": + return [f for f in (await self.alist_files()) if f.path in remote_file_path][0] + + def astream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.AsyncGenerator[bytes, None]: + """Async Download a file as BinaryIO. Make sure to close the file after use.""" + raise NotImplementedError("Abstract method, please use a subclass.") + + # HELPER METHODS + + async def ahealthcheck(self) -> t.Literal["healthy", "error"]: + return ( + "healthy" + if "ok" in (await self.aexec("echo ok", kernel="bash")).text + else "error" + ) + + async def ainstall(self, *packages: str) -> str: + # todo make sure it always uses the correct python venv + await self.aexec( + "uv pip install " + " ".join(packages), + kernel="bash", + ) + return " ".join(packages) + " installed successfully" + + async def alist_files(self) -> list["RemoteFile"]: + from .types import RemoteFile + + files = ( + await self.aexec( + "find . -type f -exec du -h {} + | awk '{print $2, $1}' | sort", + kernel="bash", + ) + ).text.splitlines() + return [ + RemoteFile( + path=parts[0].removeprefix("./"), + remote=self, + _size=self._parse_size(parts[1]), + ) + for file in files + if (parts := file.split(" ")) and len(parts) == 2 + ] + + def _parse_size(self, size_str: str) -> int: + """Convert human-readable size to bytes.""" + units = {"K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} + try: + number = float(size_str[:-1]) + unit = size_str[-1].upper() + return int(number * units.get(unit, 1)) + except ValueError: + return -1 + + async def alist_packages(self) -> list[str]: + return ( + await self.aexec( + "uv pip list | tail -n +3 | cut -d ' ' -f 1", + kernel="bash", + ) + ).text.splitlines() + + async def ashow_variables(self) -> dict[str, str]: + vars = [ + line.strip() for line in (await self.aexec("%who")).text.strip().split() + ] + return {v: (await self.aexec(f"print({v}, end='')")).text for v in vars} + + async def arestart(self) -> None: + """Restart the Jupyter kernel""" + await self.aexec(r"%restart") + + async def akeep_alive(self, minutes: int = 15) -> None: + """Keep the CodeBox instance alive for a certain amount of minutes""" + + async def ping(cb: CodeBox, d: int) -> None: + for _ in range(d): + await cb.ahealthcheck() + await anyio.sleep(60) + + async with anyio.create_task_group() as tg: + tg.start_soon(ping, self, minutes) + + # SYNCIFY + + def download( + self, remote_file_path: str, timeout: float | None = None + ) -> "RemoteFile": + return syncify(self.adownload)(remote_file_path, timeout) + + def healthcheck(self) -> str: + return syncify(self.ahealthcheck)() + + def install(self, *packages: str) -> str: + return syncify(self.ainstall)(*packages) + + def list_files(self) -> list["RemoteFile"]: + return syncify(self.alist_files)() + + def list_packages(self) -> list[str]: + return syncify(self.alist_packages)() + + def show_variables(self) -> dict[str, str]: + return syncify(self.ashow_variables)() + + def restart(self) -> None: + return syncify(self.arestart)() + + def keep_alive(self, minutes: int = 15) -> None: + return syncify(self.akeep_alive)(minutes) + + # DEPRECATED + + @deprecated( + "There is no need anymore to explicitly start a CodeBox instance.\n" + "When calling any method you will get assigned a new session.\n" + "The `.start` method is deprecated. Use `.healthcheck` instead." + ) + async def astart(self) -> t.Literal["started", "error"]: + return "started" if (await self.ahealthcheck()) == "healthy" else "error" + + @deprecated( + "The `.stop` method is deprecated. " + "The session will be closed automatically after the last interaction.\n" + "(default timeout: 15 minutes)" + ) + async def astop(self) -> t.Literal["stopped"]: + return "stopped" + + @deprecated( + "The `.run` method is deprecated. Use `.exec` instead.", + ) + async def arun(self, code: str | os.PathLike) -> "CodeBoxOutput": + from .types import CodeBoxOutput + + exec_result = await self.aexec(code, kernel="ipython") + if exec_result.images: + return CodeBoxOutput(type="image/png", content=exec_result.images[0]) + if exec_result.errors: + return CodeBoxOutput(type="stderr", content=exec_result.errors[0]) + return CodeBoxOutput(type="stdout", content=exec_result.text) + + @deprecated( + "The `.status` method is deprecated. Use `.healthcheck` instead.", + ) + async def astatus(self) -> t.Literal["started", "running", "stopped"]: + return "running" if await self.ahealthcheck() == "healthy" else "stopped" + + @deprecated( + "The `.start` method is deprecated. Use `.healthcheck` instead.", + ) + def start(self) -> t.Literal["started", "error"]: + return syncify(self.astart)() + + @deprecated( + "The `.stop` method is deprecated. " + "The session will be closed automatically after the last interaction.\n" + "(default timeout: 15 minutes)" + ) + def stop(self) -> t.Literal["stopped"]: + return syncify(self.astop)() + + @deprecated( + "The `.run` method is deprecated. Use `.exec` instead.", + ) + def run(self, code: str | os.PathLike) -> "CodeBoxOutput": + return syncify(self.arun)(code) + + @deprecated( + "The `.status` method is deprecated. Use `.healthcheck` instead.", + ) + def status(self) -> t.Literal["started", "running", "stopped"]: + return syncify(self.astatus)() diff --git a/src/codeboxapi/config.py b/src/codeboxapi/config.py deleted file mode 100644 index 02ebb2e..0000000 --- a/src/codeboxapi/config.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -CodeBox API Config: -Automatically loads environment variables from .env file -""" - -from typing import Optional - -from dotenv import load_dotenv -from pydantic_settings import BaseSettings - -# .env file -load_dotenv("./.env") - - -class CodeBoxSettings(BaseSettings): - """ - CodeBox API Config - """ - - VERBOSE: bool = False - SHOW_INFO: bool = True - - CODEBOX_API_KEY: Optional[str] = None - CODEBOX_BASE_URL: str = "https://codeboxapi.com/api/v1" - CODEBOX_TIMEOUT: int = 20 - - -settings = CodeBoxSettings() diff --git a/src/codeboxapi/docker.py b/src/codeboxapi/docker.py new file mode 100644 index 0000000..da69af7 --- /dev/null +++ b/src/codeboxapi/docker.py @@ -0,0 +1,74 @@ +import socket +import subprocess +import time + +import httpx + +from .remote import RemoteBox + + +def get_free_port(port_or_range: int | tuple[int, int]) -> int: + if isinstance(port_or_range, int): + port = port_or_range + else: + start, end = port_or_range + port = start + + while port <= (end if isinstance(port_or_range, tuple) else port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("localhost", port)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return port + except OSError: + port += 1 + + raise OSError("No free ports available on the specified port or range.") + + +class DockerBox(RemoteBox): + def __init__( + self, + port_or_range: int | tuple[int, int] = 8069, + image: str = "shroominic/codebox:latest", + timeout: float = 3, # minutes + start_container: bool = True, + **_, + ) -> None: + if start_container: + self.port = get_free_port(port_or_range) + subprocess.run( + [ + "docker", + "run", + "-d", + "--rm", + "-e", + f"CODEBOX_TIMEOUT={timeout}", + "-p", + f"{self.port}:8069", + image, + ], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + else: + assert isinstance(port_or_range, int) + self.port = port_or_range + self.session_id = str(self.port) + self.base_url = f"http://localhost:{self.port}" + self.client = httpx.Client(base_url=self.base_url) + self.aclient = httpx.AsyncClient(base_url=self.base_url) + self.api_key = "docker" + self.factory_id = image + self.session_id = str(self.port) + self._wait_for_startup() + + def _wait_for_startup(self) -> None: + while True: + try: + self.client.get("/") + break + except httpx.HTTPError: + time.sleep(1) diff --git a/src/codeboxapi/errors.py b/src/codeboxapi/errors.py deleted file mode 100644 index d6d1232..0000000 --- a/src/codeboxapi/errors.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -CodeBox API -Error Classes -""" - - -class CodeBoxError(Exception): - """ - Represents an api error returned from the CodeBox API. - """ - - def __init__( - self, - http_status: int = 0, - json_body: dict = {}, - headers: dict = {}, - **kwargs, - ): - super().__init__(**kwargs) - - self.http_status = http_status - self.json_body = json_body - self.headers = headers - - def __str__(self): - return f"{self.http_status}: {self.json_body}" - - def __repr__(self): - return f"" diff --git a/src/codeboxapi/local.py b/src/codeboxapi/local.py new file mode 100644 index 0000000..515a84c --- /dev/null +++ b/src/codeboxapi/local.py @@ -0,0 +1,368 @@ +""" +Local implementation of CodeBox. +This is useful for testing and development. +In case you don't put an api_key, +this is the default CodeBox. +""" + +import asyncio +import base64 +import io +import os +import re +import subprocess +import sys +import tempfile as tmpf +import threading +import time +import typing as t +from builtins import print as important_print +from queue import Queue + +from IPython.core.interactiveshell import ExecutionResult, InteractiveShell + +from .codebox import CodeBox +from .types import ExecChunk, RemoteFile +from .utils import check_installed, raise_timeout, resolve_pathlike, run_inside + +IMAGE_PATTERN = r"(.*?)" + + +class LocalBox(CodeBox): + """ + LocalBox is a CodeBox implementation that runs code locally using IPython. + This is useful for testing and development. + + Only one instance can exist at a time. For parallel execution, use: + - DockerBox for local parallel execution + - Get an API key at codeboxapi.com for hosted parallel execution + """ + + _instance: t.Optional["LocalBox"] = None + + def __new__(cls, *args, **kwargs) -> "LocalBox": + if cls._instance: + raise RuntimeError( + "Only one LocalBox instance can exist at a time.\n" + "For parallel execution use:\n" + "- Use DockerBox for local parallel execution\n" + "- Get an API key at https://codeboxapi.com for secure remote execution" + ) + + # This is a hack to ignore the CodeBox.__new__ factory method + instance = object.__new__(cls) + cls._instance = instance + return instance + + def __init__( + self, + session_id: str | None = None, + codebox_cwd: str = ".codebox", + **kwargs, + ) -> None: + self.api_key = "local" + self.factory_id = "local" + self.session_id = session_id or "" + os.makedirs(codebox_cwd, exist_ok=True) + self.cwd = os.path.abspath(codebox_cwd) + check_installed("ipython") + self.shell = InteractiveShell.instance() + self.shell.enable_gui = lambda x: None # type: ignore + self._patch_matplotlib_show() + + def _patch_matplotlib_show(self) -> None: + import matplotlib + + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + def custom_show(close=True): + fig = plt.gcf() + buf = io.BytesIO() + fig.savefig(buf, format="png") + buf.seek(0) + img_str = base64.b64encode(buf.getvalue()).decode("utf-8") + important_print(IMAGE_PATTERN.replace("(.*?)", img_str)) + if close: + plt.close(fig) + + plt.show = custom_show + + def stream_exec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.Generator[ExecChunk, None, None]: + with raise_timeout(timeout): + code = resolve_pathlike(code) + + if kernel == "ipython": + with run_inside(cwd or self.cwd): + old_stdout, old_stderr = sys.stdout, sys.stderr + temp_output, temp_error = sys.stdout, sys.stderr = ( + io.StringIO(), + io.StringIO(), + ) + queue = Queue[ExecChunk | None]() + _result: list[ExecutionResult] = [] + + def _run_cell(c: str, result: list[ExecutionResult]) -> None: + time.sleep(0.001) + result.append(self.shell.run_cell(c)) + + run_cell = threading.Thread(target=_run_cell, args=(code, _result)) + try: + + def stream_chunks(_out: io.StringIO, _err: io.StringIO) -> None: + while run_cell.is_alive(): + time.sleep(0.001) + if output := _out.getvalue(): + # todo make this more efficient? + sys.stdout = _out = io.StringIO() + + if "" in output: + image_matches = re.findall( + IMAGE_PATTERN, output + ) + for img_str in image_matches: + queue.put( + ExecChunk(type="img", content=img_str) + ) + output = re.sub(IMAGE_PATTERN, "", output) + + if output: + if output.startswith("Out["): + # todo better disable logging somehow + output = re.sub( + r"Out\[(.*?)\]: ", "", output.strip() + ) + queue.put(ExecChunk(type="txt", content=output)) + + if error := _err.getvalue(): + # todo make this more efficient? + sys.stderr = _err = io.StringIO() + queue.put(ExecChunk(type="err", content=error)) + + queue.put(None) + + stream = threading.Thread( + target=stream_chunks, args=(temp_output, temp_error) + ) + + run_cell.start() + stream.start() + + while True: + time.sleep(0.001) + if queue.qsize() > 0: + if chunk := queue.get(): + yield chunk + else: + break + + result = _result[0] + if result.error_before_exec: + yield ExecChunk( + type="err", + content=str(result.error_before_exec).replace( + "\\n", "\n" + ), + ) + elif result.error_in_exec: + yield ExecChunk( + type="err", + content=str(result.error_in_exec).replace("\\n", "\n"), + ) + elif result.result is not None: + yield ExecChunk(type="txt", content=str(result.result)) + + finally: + sys.stdout = old_stdout + sys.stderr = old_stderr + run_cell._stop() # type: ignore + + elif kernel == "bash": + # todo maybe implement using queue + process = subprocess.Popen( + code, + cwd=cwd or self.cwd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if process.stdout: + for c in process.stdout: + yield ExecChunk(content=c.decode(), type="txt") + if process.stderr: + for c in process.stderr: + yield ExecChunk(content=c.decode(), type="err") + + else: + raise ValueError(f"Unsupported kernel: {kernel}") + + async def astream_exec( + self, + code: str | os.PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.AsyncGenerator[ExecChunk, None]: + async with asyncio.timeout(timeout): + code = resolve_pathlike(code) + + if kernel == "ipython": + with run_inside(cwd or self.cwd): + old_stdout, old_stderr = sys.stdout, sys.stderr + temp_output, temp_error = sys.stdout, sys.stderr = ( + io.StringIO(), + io.StringIO(), + ) + + run_cell = asyncio.create_task( + asyncio.to_thread(self.shell.run_cell, code) + ) + try: + while not run_cell.done(): + await asyncio.sleep(0.001) + if output := temp_output.getvalue(): + # todo make this more efficient? + sys.stdout = temp_output = io.StringIO() + + if "" in output: + image_matches = re.findall(IMAGE_PATTERN, output) + for img_str in image_matches: + yield ExecChunk(type="img", content=img_str) + output = re.sub(IMAGE_PATTERN, "", output) + + if output: + if output.startswith("Out["): + # todo better disable logging somehow + output = re.sub( + r"Out\[(.*?)\]: ", "", output.strip() + ) + yield ExecChunk(type="txt", content=output) + + if error := temp_error.getvalue(): + sys.stderr = temp_error = io.StringIO() + yield ExecChunk(type="err", content=error) + + result = await run_cell + if result.error_before_exec: + yield ExecChunk( + type="err", content=str(result.error_before_exec) + ) + elif result.error_in_exec: + yield ExecChunk( + type="err", content=str(result.error_in_exec) + ) + elif result.result is not None: + yield ExecChunk(type="txt", content=str(result.result)) + finally: + sys.stdout = old_stdout + sys.stderr = old_stderr + run_cell.cancel() + + elif kernel == "bash": + process = await asyncio.create_subprocess_shell( + code, + cwd=cwd or self.cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # todo yield at the same time and not after each other + if process.stdout: + async for chunk in process.stdout: + yield ExecChunk(content=chunk.decode(), type="txt") + + if process.stderr: + async for err in process.stderr: + yield ExecChunk(content=err.decode(), type="err") + else: + raise ValueError(f"Unsupported kernel: {kernel}") + + def upload( + self, + remote_file_path: str, + content: t.BinaryIO | bytes | str, + timeout: float | None = None, + ) -> "RemoteFile": + from .types import RemoteFile + + with raise_timeout(timeout): + file_path = os.path.join(self.cwd, remote_file_path) + with open(file_path, "wb") as file: + if isinstance(content, str): + file.write(content.encode()) + elif isinstance(content, bytes): + file.write(content) + elif isinstance(content, tmpf.SpooledTemporaryFile): + file.write(content.read()) + elif isinstance(content, (t.BinaryIO, io.BytesIO)): + file.write(content.read()) + else: + raise TypeError("Unsupported content type") + return RemoteFile(path=remote_file_path, remote=self) + + async def aupload( + self, + file_name: str, + content: t.BinaryIO | bytes | str | tmpf.SpooledTemporaryFile, + timeout: float | None = None, + ) -> RemoteFile: + import aiofiles.os + + from .types import RemoteFile + + async with asyncio.timeout(timeout): + file_path = os.path.join(self.cwd, file_name) + async with aiofiles.open(file_path, "wb") as file: + if isinstance(content, str): + await file.write(content.encode()) + elif isinstance(content, tmpf.SpooledTemporaryFile): + await file.write(content.read()) + elif isinstance(content, (t.BinaryIO, io.BytesIO)): + try: + while chunk := content.read(8192): + await file.write(chunk) + except ValueError as e: + if "I/O operation on closed file" in str(e): + # If the file is closed, we can't reopen it + # Instead, we'll raise a more informative error + raise ValueError( + "The provided file object is closed and cannot be read" + ) from e + else: + raise + elif isinstance(content, bytes): + await file.write(content) + else: + print(type(content), content.__dict__) + raise TypeError("Unsupported content type") + return RemoteFile(path=file_path, remote=self) + + def stream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.Generator[bytes, None, None]: + with raise_timeout(timeout): + with open(os.path.join(self.cwd, remote_file_path), "rb") as f: + while chunk := f.read(8192): + yield chunk + + async def astream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.AsyncGenerator[bytes, None]: + import aiofiles + + async with asyncio.timeout(timeout): + async with aiofiles.open( + os.path.join(self.cwd, remote_file_path), "rb" + ) as f: + while chunk := await f.read(8192): + yield chunk diff --git a/src/codeboxapi/py.typed b/src/codeboxapi/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/codeboxapi/remote.py b/src/codeboxapi/remote.py new file mode 100644 index 0000000..bec7462 --- /dev/null +++ b/src/codeboxapi/remote.py @@ -0,0 +1,167 @@ +import re +import typing as t +from os import PathLike, getenv +from uuid import uuid4 + +import anyio +import httpx + +from .codebox import CodeBox +from .types import ExecChunk, RemoteFile +from .utils import raise_error, resolve_pathlike + + +class RemoteBox(CodeBox): + """ + Sandboxed Python Interpreter + """ + + def __new__(cls, *args, **kwargs) -> "RemoteBox": + # This is a hack to ignore the CodeBox.__new__ factory method. + return object.__new__(cls) + + def __init__( + self, + session_id: str | None = None, + api_key: str | t.Literal["local", "docker"] | None = None, + factory_id: str | t.Literal["default"] | None = None, + base_url: str | None = None, + ) -> None: + self.session_id = session_id or uuid4().hex + self.factory_id = factory_id or getenv("CODEBOX_FACTORY_ID", "default") + assert self.factory_id is not None + self.api_key = ( + api_key + or getenv("CODEBOX_API_KEY") + or raise_error("CODEBOX_API_KEY is required") + ) + self.base_url = base_url or getenv( + "CODEBOX_BASE_URL", "https://codeboxapi.com/api/v2" + ) + self.url = f"{self.base_url}/codebox/{self.session_id}" + self.headers = { + "Factory-Id": self.factory_id, + "Authorization": f"Bearer {self.api_key}", + } + self.client = httpx.Client(base_url=self.url, headers=self.headers) + self.aclient = httpx.AsyncClient(base_url=self.url, headers=self.headers) + + def stream_exec( + self, + code: str | PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.Generator[ExecChunk, None, None]: + code = resolve_pathlike(code) + with self.client.stream( + method="POST", + url="/exec", + timeout=timeout, + json={"code": code, "kernel": kernel, "cwd": cwd}, + ) as response: + response.raise_for_status() + buffer = "" + for chunk in response.iter_text(): + buffer += chunk + while match := re.match( + r"<(txt|img|err)>(.*?)", buffer, re.DOTALL + ): + _, end = match.span() + t, c = match.groups() + yield ExecChunk(type=t, content=c) # type: ignore[arg-type] + buffer = buffer[end:] + + async def astream_exec( + self, + code: str | PathLike, + kernel: t.Literal["ipython", "bash"] = "ipython", + timeout: float | None = None, + cwd: str | None = None, + ) -> t.AsyncGenerator[ExecChunk, None]: + code = resolve_pathlike(code) + try: + async with self.aclient.stream( + method="POST", + url="/exec", + timeout=timeout, + json={"code": code, "kernel": kernel, "cwd": cwd}, + ) as response: + response.raise_for_status() + buffer = "" + async for chunk in response.aiter_text(): + buffer += chunk + while match := re.match( + r"<(txt|img|err)>(.*?)", buffer, re.DOTALL + ): + _, end = match.span() + t, c = match.groups() + yield ExecChunk(type=t, content=c) # type: ignore[arg-type] + buffer = buffer[end:] + except RuntimeError as e: + if "loop is closed" not in str(e): + raise e + await anyio.sleep(0.1) + async for c in self.astream_exec(code, kernel, timeout, cwd): + yield c + + def upload( + self, + file_name: str, + content: t.BinaryIO | bytes | str, + timeout: float | None = None, + ) -> "RemoteFile": + from .types import RemoteFile + + if isinstance(content, str): + content = content.encode("utf-8") + self.client.post( + url="/files/upload", + files={"file": (file_name, content)}, + timeout=timeout, + ).raise_for_status() + return RemoteFile(path=file_name, remote=self) + + async def aupload( + self, + remote_file_path: str, + content: t.BinaryIO | bytes | str, + timeout: float | None = None, + ) -> "RemoteFile": + from .types import RemoteFile + + if isinstance(content, str): + content = content.encode("utf-8") + response = await self.aclient.post( + url="/files/upload", + files={"file": (remote_file_path, content)}, + timeout=timeout, + ) + response.raise_for_status() + return RemoteFile(path=remote_file_path, remote=self) + + def stream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.Generator[bytes, None, None]: + with self.client.stream( + method="GET", + url=f"/files/download/{remote_file_path}", + timeout=timeout, + ) as response: + for chunk in response.iter_bytes(): + yield chunk + + async def astream_download( + self, + remote_file_path: str, + timeout: float | None = None, + ) -> t.AsyncGenerator[bytes, None]: + async with self.aclient.stream( + method="GET", + url=f"/files/download/{remote_file_path}", + timeout=timeout, + ) as response: + async for chunk in response.aiter_bytes(): + yield chunk diff --git a/src/codeboxapi/schema.py b/src/codeboxapi/schema.py deleted file mode 100644 index 7ea45da..0000000 --- a/src/codeboxapi/schema.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -This file contains the schema for the CodeBox API. -It is used to validate the data returned from the API. -It also helps with type hinting and provides a nice -interface for interacting with the API. -""" - -from typing import Optional - -from pydantic import BaseModel - - -class CodeBoxStatus(BaseModel): - """ - Represents the status of a CodeBox instance. - """ - - status: str - - def __str__(self): - return self.status - - def __repr__(self): - return f"Status({self.status})" - - def __eq__(self, other): - return self.__str__() == other.__str__() - - -class CodeBoxOutput(BaseModel): - """ - Represents the code execution output of a CodeBox instance. - """ - - type: str - content: str - - def __str__(self): - return self.content - - def __repr__(self): - return f"{self.type}({self.content})" - - def __eq__(self, other): - return self.__str__() == other.__str__() - - -class CodeBoxFile(BaseModel): - """ - Represents a file returned from a CodeBox instance. - """ - - name: str - content: Optional[bytes] = None - - @classmethod - def from_path(cls, path: str) -> "CodeBoxFile": - if not path.startswith("/"): - path = f"./{path}" - with open(path, "rb") as f: - path = path.split("/")[-1] - return cls(name=path, content=f.read()) - - def __str__(self): - return self.name - - def __repr__(self): - return f"File({self.name})" diff --git a/src/codeboxapi/types.py b/src/codeboxapi/types.py new file mode 100644 index 0000000..0350564 --- /dev/null +++ b/src/codeboxapi/types.py @@ -0,0 +1,128 @@ +import typing as t +from dataclasses import dataclass + +import aiofiles + +from .codebox import CodeBox + + +@dataclass +class RemoteFile: + path: str + remote: CodeBox + _size: int | None = None + _content: bytes | None = None + + @property + def name(self) -> str: + return self.path.split("/")[-1] + + def get_content(self) -> bytes: + if self._content is None: + self._content = b"".join(self.remote.stream_download(self.path)) + return self._content + + async def aget_content(self) -> bytes: + if self._content is None: + self._content = b"" + async for chunk in self.remote.astream_download(self.path): + self._content += chunk + return self._content + + def get_size(self) -> int: + if self._size is None: + self._size = len(self.get_content()) + return self._size + + async def aget_size(self) -> int: + if self._size is None: + self._size = len(await self.aget_content()) + return self._size + + def save(self, local_path: str) -> None: + with open(local_path, "wb") as f: + for chunk in self.remote.stream_download(self.path): + f.write(chunk) + + async def asave(self, local_path: str) -> None: + async with aiofiles.open(local_path, "wb") as f: + async for chunk in self.remote.astream_download(self.path): + await f.write(chunk) + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + if self._size is None: + return f"RemoteFile({self.path})" + return f"RemoteFile({self.path}, {self._size} bytes)" + + +@dataclass +class ExecChunk: + """ + A chunk of output from an execution. + The type is one of: + - txt: text output + - img: image output + - err: error output + """ + + type: t.Literal["txt", "img", "err"] + content: str + + +@dataclass +class ExecResult: + chunks: list[ExecChunk] + + @property + def text(self) -> str: + return "".join(chunk.content for chunk in self.chunks if chunk.type == "txt") + + @property + def images(self) -> list[str]: + return [chunk.content for chunk in self.chunks if chunk.type == "img"] + + @property + def errors(self) -> list[str]: + return [chunk.content for chunk in self.chunks if chunk.type == "err"] + + +@dataclass +class CodeBoxOutput: + """Deprecated CodeBoxOutput class""" + + content: str + type: t.Literal["stdout", "stderr", "image/png", "error"] + + def __str__(self) -> str: + return self.content + + def __eq__(self, other: object) -> bool: + if isinstance(other, str): + return self.content == other + if isinstance(other, CodeBoxOutput): + return self.content == other.content and self.type == other.type + return False + + +class CodeBoxFile: + """Deprecated CodeBoxFile class""" + + def __init__(self, name: str, content: bytes | None = None) -> None: + from .utils import deprecated + + deprecated( + "The CodeBoxFile class is deprecated. Use RemoteFile for file handling " + "or plain bytes for content instead." + )(lambda: None)() + self.name = name + self.content = content + + @classmethod + def from_path(cls, path: str) -> "CodeBoxFile": + import os + + with open(path, "rb") as f: + return cls(name=os.path.basename(path), content=f.read()) diff --git a/src/codeboxapi/utils.py b/src/codeboxapi/utils.py index e3c0b7c..620eeb7 100644 --- a/src/codeboxapi/utils.py +++ b/src/codeboxapi/utils.py @@ -1,202 +1,145 @@ -"""Utility functions for API requests""" +import os +import signal +import typing as t +from contextlib import contextmanager +from functools import partial, reduce, wraps +from importlib.metadata import PackageNotFoundError, distribution +from warnings import warn + +import anyio +from anyio._core._eventloop import threadlocals + +if t.TYPE_CHECKING: + from .types import ExecChunk, ExecResult + +T = t.TypeVar("T") +P = t.ParamSpec("P") + + +def deprecated(message: str) -> t.Callable[[t.Callable[P, T]], t.Callable[P, T]]: + def decorator(func: t.Callable[P, T]) -> t.Callable[P, T]: + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + if os.getenv("IGNORE_DEPRECATION_WARNINGS", "false").lower() == "true": + return func(*args, **kwargs) + warn( + f"{func.__name__} is deprecated. {message}", + DeprecationWarning, + stacklevel=2, + ) + return func(*args, **kwargs) -import json -from asyncio import sleep as asleep -from io import BytesIO -from time import sleep -from typing import Optional + return wrapper -import requests -from aiohttp import ClientError, ClientResponse, ClientSession, ClientTimeout, FormData -from aiohttp.payload import BytesIOPayload + return decorator -from codeboxapi.config import settings -from codeboxapi.errors import CodeBoxError +def resolve_pathlike(file: str | os.PathLike) -> str: + if isinstance(file, os.PathLike): + with open(file, "r") as f: + return f.read() + return file -def build_request_data( - method: str, - endpoint: str, - body: Optional[dict] = None, - files: Optional[dict] = None, -) -> dict: - """ - Builds a request data dictionary for the requests library. - """ - return { - "method": method, - "url": settings.CODEBOX_BASE_URL + endpoint, - "headers": { - "Authorization": f"Bearer {settings.CODEBOX_API_KEY}", - }, - "json": body, - "files": files, - } - - -def handle_response(response: requests.Response): - """ - Handles a response from the requests library. - """ - handlers = { - "application/json": lambda r: json.loads(r.content.decode()), - "application/octet-stream": lambda r: { - "content": BytesIO(r.content).read(), - "name": r.headers["Content-Disposition"].split("=")[1], - }, - # Add other content type handlers here - } - handler = handlers.get( - response.headers["Content-Type"].split(";")[0], lambda r: r.content.decode() - ) - if response.status_code != 200: - try: - json_body = response.json() - except Exception: - json_body = {"": response.text} - raise CodeBoxError( - http_status=response.status_code, - json_body=json_body, - headers=dict(response.headers.items()), - ) - return handler(response) - - -async def handle_response_async(response: ClientResponse) -> dict: - """ - Handles a response from the aiohttp library. - """ - async def json_handler(r: ClientResponse) -> dict: - return json.loads(await r.text()) - - async def file_handler(r: ClientResponse) -> dict: - return { - "content": await r.read(), - "name": r.headers["Content-Disposition"].split("=")[1], - } - - async def default_handler(r: ClientResponse) -> dict: - return {"content": await r.text()} - - handlers = { - "application/json": json_handler, - "application/octet-stream": file_handler, - # Add other content type handlers here - } - if response.status != 200: - try: - json_body = await response.json() - except Exception: - json_body = {"": await response.text()} - - raise CodeBoxError( - http_status=response.status, - json_body=json_body, - headers=dict(response.headers.items()), - ) - handler = handlers.get( - response.headers["Content-Type"].split(";")[0], default_handler - ) - return await handler(response) - - -def base_request( - method: str, - endpoint: str, - body: Optional[dict] = None, - files: Optional[dict] = None, - timeout: int = 420, - retries: int = 3, - backoff_factor: float = 0.3, -) -> dict: +def reduce_bytes(async_gen: t.Iterator[bytes]) -> bytes: + return reduce(lambda x, y: x + y, async_gen) + + +def flatten_exec_result( + result: "ExecResult" | t.Iterator["ExecChunk"], +) -> "ExecResult": + from .types import ExecResult + + if not isinstance(result, ExecResult): + result = ExecResult(chunks=[c for c in result]) + # todo todo todo todo todo todo + # remove empty text chunks + # merge text chunks + # remove empty stream chunks + # merge stream chunks + # remove empty error chunks + # merge error chunks + # ... + return result + + +async def async_flatten_exec_result( + async_gen: t.AsyncGenerator["ExecChunk", None], +) -> "ExecResult": + # todo todo todo todo todo todo + # remove empty text chunks + # merge text chunks + # remove empty stream chunks + # merge stream chunks + # remove empty error chunks + # merge error chunks + # ... + from .types import ExecResult + + return ExecResult(chunks=[c async for c in async_gen]) + + +def syncify( + async_function: t.Callable[P, t.Coroutine[t.Any, t.Any, T]], +) -> t.Callable[P, T]: """ - Makes a request to the CodeBox API with retry logic. - - Args: - - method: HTTP method as a string. - - endpoint: API endpoint as a string. - - body: Optional dictionary containing the JSON body. - - files: Optional dictionary containing file data. - - retries: Maximum number of retries on failure. - - backoff_factor: Multiplier for delay between retries (exponential backoff). - - Returns: - - A dictionary response from the API. + Take an async function and create a regular one that receives the same keyword and + positional arguments, and that when called, calls the original async function in + the main async loop from the worker thread using `anyio.to_thread.run()`. """ - request_data = build_request_data(method, endpoint, body, files) - for attempt in range(retries): - try: - response = requests.request(**request_data, timeout=timeout) - return handle_response(response) - except requests.RequestException as e: - if attempt < retries - 1: - sleep_time = backoff_factor * (2**attempt) - sleep(sleep_time) - else: - raise e - raise CodeBoxError(http_status=500, json_body={"error": "Max retries exceeded"}) - - -async def abase_request( - session: ClientSession, - method: str, - endpoint: str, - body: Optional[dict] = None, - files: Optional[dict] = None, - timeout: int = 420, - retries: int = 3, - backoff_factor: float = 0.3, -) -> dict: + + @wraps(async_function) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + partial_f = partial(async_function, *args, **kwargs) + + if not getattr(threadlocals, "current_async_backend", None): + return anyio.run(partial_f) + return anyio.from_thread.run(partial_f) + + return wrapper + + +def check_installed(package: str) -> None: """ - Makes an asynchronous request to the CodeBox API with retry functionality. - - Args: - - session: The aiohttp ClientSession. - - method: HTTP method as a string. - - endpoint: API endpoint as a string. - - body: Optional dictionary containing the JSON body. - - files: Optional dictionary containing file data. - - retries: Maximum number of retries on failure. - - backoff_factor: Multiplier for delay between retries (exponential backoff). - - Returns: - - A dictionary response from the API. + Check if the given package is installed. """ - request_data = build_request_data(method, endpoint, body, files) - if files is not None: - data = FormData() - for key, file_tuple in files.items(): - filename, fileobject = file_tuple[ - :2 - ] # Get the filename and fileobject from the tuple - payload = BytesIOPayload(BytesIO(fileobject)) - data.add_field( - key, payload, filename=filename - ) # Use the filename from the tuple - request_data.pop("files") - request_data.pop("json") - request_data["data"] = data - else: - request_data.pop("files") - - for attempt in range(retries): - try: - response = await session.request( - **request_data, timeout=ClientTimeout(total=timeout) + try: + distribution(package) + except PackageNotFoundError: + if os.getenv("DEBUG", "false").lower() == "true": + print( + f"\nMake sure '{package}' is installed " + "when using without a CODEBOX_API_KEY.\n" + f"You can install it with 'pip install {package}'.\n" ) - return await handle_response_async(response) - except ClientError as e: - if attempt < retries - 1: - sleep_time = backoff_factor * (2**attempt) - await asleep(sleep_time) - else: - raise e - raise CodeBoxError(http_status=500, json_body={"error": "Max retries exceeded"}) + raise -def set_api_key(api_key: str) -> None: - """ - Manually set the CODEBOX_API_KEY. - """ - settings.CODEBOX_API_KEY = api_key +@contextmanager +def raise_timeout(timeout: float | None = None): + def timeout_handler(signum, frame): + raise TimeoutError("Execution timed out") + + if timeout is not None: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(timeout)) + + try: + yield + finally: + if timeout is not None: + signal.alarm(0) + + +@contextmanager +def run_inside(directory: str): + old_cwd = os.getcwd() + os.chdir(directory) + try: + yield + finally: + os.chdir(old_cwd) + + +def raise_error(message: str) -> t.NoReturn: + raise Exception(message) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b773253 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,20 @@ +import os + +import pytest +from codeboxapi import CodeBox + +LOCALBOX = CodeBox(api_key="local") + + +@pytest.fixture( + scope="session", + params=["local", "docker", os.getenv("CODEBOX_API_KEY")], +) +def codebox(request: pytest.FixtureRequest) -> CodeBox: + if request.param == "local": + return LOCALBOX + + if request.param == "docker" and os.system("docker ps > /dev/null 2>&1") != 0: + pytest.skip("Docker is not running") + + return CodeBox(api_key=request.param) diff --git a/tests/parametric_test.py b/tests/parametric_test.py deleted file mode 100644 index 872532b..0000000 --- a/tests/parametric_test.py +++ /dev/null @@ -1,175 +0,0 @@ -import asyncio -import time -from typing import Callable - -import pytest -from codeboxapi import CodeBox -from codeboxapi.schema import CodeBoxFile, CodeBoxOutput - -AssertFunctionType = Callable[[CodeBoxOutput, list[CodeBoxFile]], bool] - -code_1 = """ -import pandas as pd -# Read the CSV file -df = pd.read_csv('iris.csv') - -# Save the DataFrame to an Excel file -df.to_excel('iris.xlsx', index=False) -""" - - -def assert_function_1(_, files): - return any(".xlsx" in file.name for file in files) - - -code_2 = """ -import pandas as pd -from sklearn.model_selection import train_test_split -from sklearn.linear_model import LinearRegression -from sklearn.metrics import mean_squared_error - -# Load the dataset -data = pd.read_csv('advertising.csv') - -# Split the data into features (X) and target (y) -X = data[['TV']] -y = data['Sales'] - -# Split the data into train and test sets -X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=0.3, random_state=42 -) - -# Train the model -model = LinearRegression() -model.fit(X_train, y_train) - -# Make predictions on the test set -y_pred = model.predict(X_test) - -# Calculate Mean Squared Error -mse = mean_squared_error(y_test, y_pred) - -mse -""" - - -def assert_function_2(output, _): - # np.float64(5.179525402166653)\n - if "np.float64" in output.content: - return 4.0 <= float(output.content.split("(")[1].split(")")[0]) <= 7.0 - return 4.0 <= float(output.content) <= 7.0 - - -# Helper function to build parameters with defaults -def param(code, assert_function, files=[], num_samples=2, local=False, packages=[]): - return ( - code, - assert_function, - files, - num_samples, - local, - packages, - ) - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "code, assert_function, files, num_samples, local, packages", - [ - param( - code_1, - assert_function_1, - files=[CodeBoxFile.from_path("examples/assets/iris.csv")], - ), - param( - code_1, - assert_function_1, - files=[CodeBoxFile.from_path("examples/assets/iris.csv")], - num_samples=1, - local=True, - packages=["pandas", "openpyxl"], - ), - param( - code_2, - assert_function_2, - files=[CodeBoxFile.from_path("examples/assets/advertising.csv")], - ), - param( - code_2, - assert_function_2, - files=[CodeBoxFile.from_path("examples/assets/advertising.csv")], - num_samples=10, - ), # For remote CodeBox, the time taken to run 10 samples - # should be around the same as 2 samples (the above case). - param( - code_2, - assert_function_2, - files=[CodeBoxFile.from_path("examples/assets/advertising.csv")], - num_samples=1, - local=True, - packages=["pandas", "scikit-learn"], - ), - ], -) -async def test_boxes_async( - code: str, - assert_function: AssertFunctionType, - files: list[CodeBoxFile], - num_samples: int, - local: bool, - packages: list[str], - capsys: pytest.CaptureFixture, -) -> None: - codeboxes = [CodeBox(local=local) for _ in range(num_samples)] - - start_time = time.perf_counter() - tasks = [ - run_async(codebox, code, assert_function, files, packages) - for codebox in codeboxes - ] - results = await asyncio.gather(*tasks) - end_time = time.perf_counter() - with capsys.disabled(): - print(f"Time taken: {end_time - start_time:.2f} seconds") - - assert all(results), "Failed to run codeboxes" - - -async def run_async( - codebox: CodeBox, - code: str, - assert_function: AssertFunctionType, - files: list[CodeBoxFile], - packages: list[str], -) -> bool: - try: - assert await codebox.astart() == "started" - - assert await codebox.astatus() == "running" - - orginal_files = await codebox.alist_files() - for file in files: - assert file.content is not None - await codebox.aupload(file.name, file.content) - - codebox_files = await codebox.alist_files() - assert set( - [file.name for file in files] + [file.name for file in orginal_files] - ) == set([file.name for file in codebox_files]) - - assert all( - [ - package_name in str(await codebox.ainstall(package_name)) - for package_name in packages - ] - ) - - output: CodeBoxOutput = await codebox.arun(code) - codebox_files_output = await codebox.alist_files() - assert assert_function(output, codebox_files_output) - - finally: - assert await codebox.astop() == "stopped" - - return True diff --git a/tests/run_all_examples.py b/tests/run_all_examples.py deleted file mode 100644 index 0d4257c..0000000 --- a/tests/run_all_examples.py +++ /dev/null @@ -1,49 +0,0 @@ -import asyncio -import os -import sys -from pathlib import Path - -from dotenv import load_dotenv - - -async def run_example(file: Path, local: bool = False): - process = await asyncio.create_subprocess_exec( - Path(sys.executable).absolute(), - file.absolute(), - env={"CODEBOX_API_KEY": "local" if local else os.environ["CODEBOX_API_KEY"]}, - ) - await process.wait() - - if process.returncode != 0: - raise Exception(f"Example {file} failed with return code {process.returncode}") - - -async def run_examples(): - if os.environ.get("CODEBOX_API_KEY") is None: - return print("Skipping remote examples because CODEBOX_API_KEY is not set") - - await asyncio.gather( - *[ - asyncio.create_task(run_example(file)) - for file in list(Path("examples").glob("**/*.py")) - ] - ) - - -async def run_examples_local(): - for file in list(Path("examples").glob("**/*.py")): - await run_example(file, local=True) - - -# TODO: fix using pytest -def run_all_examples(): - """Integration test for running the examples.""" - load_dotenv() - os.environ["CODEBOX_TEST"] = "True" - # TODO: Use ENV variable to reuse the same remote codebox - asyncio.run(run_examples()) - asyncio.run(run_examples_local()) - - -if __name__ == "__main__": - run_all_examples() diff --git a/tests/general_test.py b/tests/test_v01.py similarity index 57% rename from tests/general_test.py rename to tests/test_v01.py index b6d58b6..dfe960f 100644 --- a/tests/general_test.py +++ b/tests/test_v01.py @@ -1,96 +1,94 @@ -import asyncio - +import pytest from codeboxapi import CodeBox -def test_codebox(): - codebox = CodeBox() - assert run_sync(codebox), "Failed to run sync codebox remotely" - assert asyncio.run(run_async(codebox)), "Failed to run async codebox remotely" - - -def test_localbox(): - codebox = CodeBox(local=True) - assert run_sync(codebox), "Failed to run sync codebox locally" - assert asyncio.run(run_async(codebox)), "Failed to run async codebox locally" - - -def run_sync(codebox: CodeBox) -> bool: +def test_sync(codebox: CodeBox) -> None: try: assert codebox.start() == "started" + print("Started") assert codebox.status() == "running" + print("Running") - assert codebox.run("print('Hello World!')") == "Hello World!\n" + codebox.run("x = 'Hello World!'") + assert codebox.run("print(x)") == "Hello World!\n" + print("Printed") file_name = "test_file.txt" assert file_name in str(codebox.upload(file_name, b"Hello World!")) + print("Uploaded") assert file_name in str( codebox.run("import os;\nprint(os.listdir(os.getcwd())); ") ) assert file_name in str(codebox.list_files()) - assert codebox.download(file_name).content == b"Hello World!" + assert codebox.download(file_name).get_content() == b"Hello World!" + print("Downloaded") + + assert "matplotlib" in str(codebox.install("matplotlib")) - package_name = "matplotlib" - assert package_name in str(codebox.install(package_name)) assert ( "error" != codebox.run("import matplotlib; print(matplotlib.__version__)").type ) + print("Installed") o = codebox.run( "import matplotlib.pyplot as plt;" "plt.plot([1, 2, 3, 4], [1, 4, 2, 3]); plt.show()" ) assert o.type == "image/png" + print("Plotted") finally: assert codebox.stop() == "stopped" - - return True + print("Stopped") -async def run_async(codebox: CodeBox) -> bool: +@pytest.mark.asyncio +async def test_async(codebox: CodeBox) -> None: try: assert await codebox.astart() == "started" + print("Started") assert await codebox.astatus() == "running" + print("Running") - assert await codebox.arun("print('Hello World!')") == "Hello World!\n" + await codebox.arun("x = 'Hello World!'") + assert (await codebox.arun("print(x)")) == "Hello World!\n" + print("Printed") file_name = "test_file.txt" assert file_name in str(await codebox.aupload(file_name, b"Hello World!")) + print("Uploaded") assert file_name in str( await codebox.arun("import os;\nprint(os.listdir(os.getcwd())); ") ) - assert file_name in str(codebox.list_files()) + assert file_name in str(await codebox.alist_files()) + + assert (await codebox.adownload(file_name)).get_content() == b"Hello World!" + print("Downloaded") - assert (await codebox.adownload(file_name)).content == b"Hello World!" + assert "matplotlib" in str(await codebox.ainstall("matplotlib")) - package_name = "matplotlib" - assert package_name in str(await codebox.ainstall(package_name)) assert ( "error" != ( await codebox.arun("import matplotlib; print(matplotlib.__version__)") ).type ) + print("Installed") o = await codebox.arun( "import matplotlib.pyplot as plt;" "plt.plot([1, 2, 3, 4], [1, 4, 2, 3]); plt.show()" ) assert o.type == "image/png" + print("Plotted") + finally: assert await codebox.astop() == "stopped" - - return True - - -if __name__ == "__main__": - test_codebox() - test_localbox() + print("Stopped") diff --git a/tests/test_v02.py b/tests/test_v02.py new file mode 100644 index 0000000..bc6a37a --- /dev/null +++ b/tests/test_v02.py @@ -0,0 +1,329 @@ +import time + +import pytest +from codeboxapi import CodeBox, ExecChunk, ExecResult, RemoteFile + + +def test_sync_codebox_lifecycle(codebox: CodeBox): + assert codebox.healthcheck() == "healthy", "CodeBox should be healthy" + + result = codebox.exec("print('Hello World!')") + assert isinstance(result, ExecResult), "Exec should return an ExecResult" + assert result.text.strip() == "Hello World!", "Exec should print 'Hello World!'" + assert not result.errors, "Exec should not produce errors" + + file_name = "test_file.txt" + file_content = b"Hello World!" + codebox.upload(file_name, file_content) + + downloaded_file = codebox.download(file_name) + assert isinstance( + downloaded_file, RemoteFile + ), "Download should return a RemoteFile" + assert ( + downloaded_file.get_content() == file_content + ), "Downloaded content should match uploaded content" + + install_result = codebox.install("matplotlib") + assert "matplotlib" in install_result, "Matplotlib should be installed successfully" + + exec_result = codebox.exec("import matplotlib; print(matplotlib.__version__)") + assert exec_result.errors == [], "Importing matplotlib should not produce errors" + assert exec_result.text.strip() != "", "Matplotlib version should be printed" + + plot_result = codebox.exec( + "import matplotlib.pyplot as plt; " + "plt.figure(figsize=(10, 5)); " + "plt.plot([1, 2, 3, 4], [1, 4, 2, 3]); " + "plt.title('Test Plot'); " + "plt.xlabel('X-axis'); " + "plt.ylabel('Y-axis'); " + "plt.show()" + ) + assert plot_result.images, "Plot execution should produce an image" + assert ( + len(plot_result.images) == 1 + ), "Plot execution should produce exactly one image" + + +@pytest.mark.asyncio +async def test_async_codebox_lifecycle(codebox: CodeBox): + assert await codebox.ahealthcheck() == "healthy", "CodeBox should be healthy" + + result = await codebox.aexec("print('Hello World!')") + assert isinstance(result, ExecResult), "Exec should return an ExecResult" + assert result.text.strip() == "Hello World!", "Exec should print 'Hello World!'" + assert not result.errors, "Exec should not produce errors" + + file_name = "test_file.txt" + file_content = b"Hello World!" + await codebox.aupload(file_name, file_content) + + downloaded_file = await codebox.adownload(file_name) + assert isinstance( + downloaded_file, RemoteFile + ), "Download should return a RemoteFile" + assert ( + downloaded_file.get_content() == file_content + ), "Downloaded content should match uploaded content" + + install_result = await codebox.ainstall("matplotlib") + assert "matplotlib" in install_result, "Matplotlib should be installed successfully" + + exec_result = await codebox.aexec( + "import matplotlib; print(matplotlib.__version__)" + ) + assert exec_result.errors == [], "Importing matplotlib should not produce errors" + assert exec_result.text.strip() != "", "Matplotlib version should be printed" + + plot_result = await codebox.aexec( + "import matplotlib.pyplot as plt; " + "plt.figure(figsize=(10, 5)); " + "plt.plot([1, 2, 3, 4], [1, 4, 2, 3]); " + "plt.title('Test Plot'); " + "plt.xlabel('X-axis'); " + "plt.ylabel('Y-axis'); " + "plt.show()" + ) + assert plot_result.images, "Plot execution should produce an image" + assert ( + len(plot_result.images) == 1 + ), "Plot execution should produce exactly one image" + + +def test_sync_list_operations(codebox: CodeBox): + codebox.exec("x = 1; y = 'test'; z = [1, 2, 3]") + variables = codebox.show_variables() + assert "x" in variables.keys(), "Variable 'x' should be listed" + assert "1" in variables["x"], "Variable 'x' should contain value '1'" + assert "y" in variables.keys(), "Variable 'y' should be listed" + assert "test" in variables["y"], "Variable 'y' should contain value 'test'" + assert "z" in variables.keys(), "Variable 'z' should be listed" + assert "[1, 2, 3]" in variables["z"], "Variable 'z' should contain value '[1, 2, 3]" + + files = codebox.list_files() + assert isinstance(files, list), "list_files should return a list" + assert all( + isinstance(f, RemoteFile) for f in files + ), "All items in list_files should be RemoteFile instances" + + packages = codebox.list_packages() + assert isinstance(packages, list), "list_packages should return a list" + assert len(packages) > 0, "There should be at least one package installed" + assert any( + "matplotlib" in pkg for pkg in packages + ), "Matplotlib should be in the list of packages" + + +@pytest.mark.asyncio +async def test_async_list_operations(codebox: CodeBox): + await codebox.aexec("x = 1; y = 'test'; z = [1, 2, 3]") + variables = await codebox.ashow_variables() + assert "x" in variables.keys(), "Variable 'x' should be listed" + assert "1" in variables["x"], "Variable 'x' should contain value '1'" + assert "y" in variables.keys(), "Variable 'y' should be listed" + assert "test" in variables["y"], "Variable 'y' should contain value 'test'" + assert "z" in variables.keys(), "Variable 'z' should be listed" + assert ( + "[1, 2, 3]" in variables["z"] + ), "Variable 'z' should contain value '[1, 2, 3]'" + + files = await codebox.alist_files() + assert isinstance(files, list), "list_files should return a list" + assert all( + isinstance(f, RemoteFile) for f in files + ), "All items in list_files should be RemoteFile instances" + + packages = await codebox.alist_packages() + assert isinstance(packages, list), "list_packages should return a list" + assert len(packages) > 0, "There should be at least one package installed" + assert any( + "matplotlib" in pkg for pkg in packages + ), "Matplotlib should be in the list of packages" + + +def test_sync_stream_exec(codebox: CodeBox): + chunks: list[tuple[ExecChunk, float]] = [] + t0 = time.perf_counter() + sleep = 0.01 if codebox.api_key == "local" else 1 + for chunk in codebox.stream_exec( + f"import time;\nfor i in range(3): time.sleep({sleep}); print(i)" + ): + chunks.append((chunk, time.perf_counter() - t0)) + + assert len(chunks) == 3, "iterating over stream_exec should produce 3 chunks" + assert all( + isinstance(chunk[0], ExecChunk) for chunk in chunks + ), "All items should be ExecChunk instances" + assert all( + chunk[0].type == "txt" for chunk in chunks + ), "All chunks should be of type 'txt'" + assert [chunk[0].content.strip() for chunk in chunks] == [ + "0", + "1", + "2", + ], "Chunks should contain correct content" + # Verify chunks arrive with delay + assert all( + chunks[i][1] < chunks[i + 1][1] for i in range(len(chunks) - 1) + ), "Chunks should arrive with delay" + # Verify chunks don't arrive all at once + assert any( + chunks[i + 1][1] - chunks[i][1] > 0.005 for i in range(len(chunks) - 1) + ), "At least some chunks should have noticeable delay between them" + + +@pytest.mark.asyncio +async def test_sync_stream_exec_ipython(codebox: CodeBox): + chunks = [] + t0 = time.perf_counter() + sleep = 0.01 if codebox.api_key == "local" else 1 + for chunk in codebox.stream_exec( + f"python -u -c 'import time\nfor i in range(3): time.sleep({sleep}); print(i)'", + kernel="bash", + ): + chunks.append((chunk, time.perf_counter() - t0)) + + assert len(chunks) == 3, "iterating over stream_exec should produce 3 chunks" + assert all( + isinstance(chunk[0], ExecChunk) for chunk in chunks + ), "All items should be ExecChunk instances" + assert all( + chunk[0].type == "txt" for chunk in chunks + ), "All chunks should be of type 'txt'" + assert [chunk[0].content.strip() for chunk in chunks] == [ + "0", + "1", + "2", + ], "Chunks should contain correct content" + # Verify chunks arrive with delay + assert all( + chunks[i][1] < chunks[i + 1][1] for i in range(len(chunks) - 1) + ), "Chunks should arrive with delay" + # Verify chunks don't arrive all at once + assert any( + chunks[i + 1][1] - chunks[i][1] > 0.005 for i in range(len(chunks) - 1) + ), "At least some chunks should have noticeable delay between them" + + +@pytest.mark.asyncio +async def test_async_stream_exec_ipython(codebox: CodeBox): + chunks: list[tuple[ExecChunk, float]] = [] + t0 = time.perf_counter() + sleep = 0.01 if codebox.api_key == "local" else 1 + async for chunk in codebox.astream_exec( + f"import time;\nfor i in range(3): time.sleep({sleep}); print(i)", + ): + chunks.append((chunk, time.perf_counter() - t0)) + + assert len(chunks) == 3, "iterating over stream_exec should produce 3 chunks" + assert all( + isinstance(chunk[0], ExecChunk) for chunk in chunks + ), "All items should be ExecChunk instances" + assert all( + chunk[0].type == "txt" for chunk in chunks + ), "All chunks should be of type 'txt'" + assert [chunk[0].content.strip() for chunk in chunks] == [ + "0", + "1", + "2", + ], "Chunks should contain correct content" + # Verify chunks arrive with delay + assert all( + chunks[i][1] < chunks[i + 1][1] for i in range(len(chunks) - 1) + ), "Chunks should arrive with delay" + # Verify chunks don't arrive all at once + assert any( + chunks[i + 1][1] - chunks[i][1] > 0.005 for i in range(len(chunks) - 1) + ), "At least some chunks should have noticeable delay between them" + + +@pytest.mark.asyncio +async def test_async_stream_exec_bash(codebox: CodeBox): + chunks = [] + t0 = time.perf_counter() + sleep = 0.01 if codebox.api_key == "local" else 1 + async for chunk in codebox.astream_exec( + f"python -u -c 'import time\nfor i in range(3): time.sleep({sleep}); print(i)'", + kernel="bash", + ): + chunks.append((chunk, time.perf_counter() - t0)) + + assert len(chunks) == 3, "iterating over stream_exec should produce 3 chunks" + assert all( + isinstance(chunk[0], ExecChunk) for chunk in chunks + ), "All items should be ExecChunk instances" + assert all( + chunk[0].type == "txt" for chunk in chunks + ), "All chunks should be of type 'txt'" + assert [chunk[0].content.strip() for chunk in chunks] == [ + "0", + "1", + "2", + ], "Chunks should contain correct content" + # Verify chunks arrive with delay + assert all( + chunks[i][1] < chunks[i + 1][1] for i in range(len(chunks) - 1) + ), "Chunks should arrive with delay" + # Verify chunks don't arrive all at once + assert any( + chunks[i + 1][1] - chunks[i][1] > 0.005 for i in range(len(chunks) - 1) + ), "At least some chunks should have noticeable delay between them" + + +def test_sync_error_handling(codebox: CodeBox): + result = codebox.exec("1/0") + assert result.errors, "Execution should produce an error" + error = result.errors[0].lower() + assert ( + "division" in error and "zero" in error + ), "Error should be a ZeroDivisionError" + + +@pytest.mark.asyncio +async def test_async_error_handling(codebox: CodeBox): + result = await codebox.aexec("1/0") + assert result.errors, "Execution should produce an error" + error = result.errors[0].lower() + assert ( + "division" in error and "zero" in error + ), "Error should be a ZeroDivisionError" + + +def test_sync_bash_commands(codebox: CodeBox): + result = codebox.exec("echo ok", kernel="bash") + assert "ok" in result.text, "Execution should contain 'ok'" + result = codebox.exec("echo \"print('Hello!')\" > test.py", kernel="bash") + assert result.text.strip() == "", "Execution result should be empty" + assert "test.py" in [file.path for file in codebox.list_files()] + result = codebox.exec("python test.py", kernel="bash") + assert result.text.strip() == "Hello!", "Execution result should be 'Hello!'" + + +@pytest.mark.asyncio +async def test_async_bash_commands(codebox: CodeBox): + result = await codebox.aexec("echo ok", kernel="bash") + assert "ok" in result.text, "Execution should contain 'ok'" + result = await codebox.aexec("echo 'print(\"Hello!\")' > test.py", kernel="bash") + assert result.text.strip() == "", "Execution result should be empty" + assert "test.py" in [file.path for file in await codebox.alist_files()] + result = await codebox.aexec("python test.py", kernel="bash") + assert result.text.strip() == "Hello!", "Execution result should be 'Hello!'" + + +def test_local_box_singleton(): + from codeboxapi.local import LocalBox + + with pytest.raises(RuntimeError) as exc_info: + _ = LocalBox() + + assert "Only one LocalBox instance can exist at a time" in str(exc_info.value) + + with pytest.raises(RuntimeError) as exc_info: + _ = CodeBox(api_key="local") + + assert "codeboxapi.com" in str(exc_info.value) + + +if __name__ == "__main__": + pytest.main([__file__])