Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: download on ipfs client #36

Merged
merged 17 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 98 additions & 8 deletions src/aleph/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
TypeVar,
Union,
)
from io import BytesIO
from typing import BinaryIO

import aiohttp
from aleph_message.models import (
Expand All @@ -31,6 +33,7 @@
AlephMessage,
ForgetContent,
ForgetMessage,
ItemHash,
ItemType,
Message,
MessageType,
Expand All @@ -42,7 +45,7 @@
)
from aleph_message.models.program import Encoding, ProgramContent
from aleph_message.status import MessageStatus
from pydantic import ValidationError
from pydantic import ValidationError, BaseModel

from aleph.sdk.types import Account, GenericMessage, StorageEnum

Expand Down Expand Up @@ -225,8 +228,17 @@ def get_posts(
end_date=end_date,
)

def download_file(self, file_hash: str) -> bytes:
return self._wrap(self.async_session.download_file, file_hash=file_hash)
def download_file(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes:
odesenfans marked this conversation as resolved.
Show resolved Hide resolved
return self._wrap(
self.async_session.download_file, file_hash=file_hash, chunk_size=chunk_size
)

def download_file_ipfs(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes:
return self._wrap(
self.async_session.download_file_ipfs,
file_hash=file_hash,
chunk_size=chunk_size,
)

def watch_messages(
self,
Expand Down Expand Up @@ -608,22 +620,100 @@ async def get_posts(
resp.raise_for_status()
return await resp.json()

async def download_file_to_buffer(
self,
file_hash: str,
output_buffer: BinaryIO,
chunk_size: int,
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.
:param file_hash: The hash of the file to retrieve.
:param output_buffer: Writable binary buffer. The file will be written to this buffer.
:param chunk_size: Size of the chunk to download.
"""
IPFS_HASH = ItemHash(file_hash)
odesenfans marked this conversation as resolved.
Show resolved Hide resolved

async with aiohttp.ClientSession() as session:
odesenfans marked this conversation as resolved.
Show resolved Hide resolved
async with self.http_session.get(
f"/api/v0/storage/raw/{file_hash}"
) as response:
if response.status == 200:
response.raise_for_status()
odesenfans marked this conversation as resolved.
Show resolved Hide resolved
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
output_buffer.write(chunk)
odesenfans marked this conversation as resolved.
Show resolved Hide resolved
if response.status == 413:
if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs:
return await self.download_file_ipfs_to_buffer(
file_hash, output_buffer, chunk_size
)
else:
raise Exception("Unsupported file hash")
odesenfans marked this conversation as resolved.
Show resolved Hide resolved

async def download_file_ipfs_to_buffer(
self,
file_hash: str,
output_buffer: BinaryIO,
chunk_size: int,
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.

:param file_hash: The hash of the file to retrieve.
:param output_buffer: The binary output buffer to write the file data to.
:param chunk_size: Size of chunk we download.
"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://ipfs.aleph.im/ipfs/{file_hash}"
) as response:
response.raise_for_status()
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
output_buffer.write(chunk)

async def download_file(
self,
file_hash: str,
chunk_size: int = 16 * 1024,
) -> bytes:
"""
Get a file from the storage engine as raw bytes.

Warning: Downloading large files can be slow and memory intensive.

:param file_hash: The hash of the file to retrieve.
:param chunk_size: The size of each chunk to read from the response.
"""
async with self.http_session.get(
f"/api/v0/storage/raw/{file_hash}"
) as response:
response.raise_for_status()
return await response.read()
buffer = BytesIO()
hoh marked this conversation as resolved.
Show resolved Hide resolved
await self.download_file_to_buffer(
file_hash, output_buffer=buffer, chunk_size=chunk_size
)
return buffer.getvalue()

async def download_file_ipfs(
self,
file_hash: str,
chunk_size: int = 16 * 1024,
) -> bytes:
"""
Get a file from the ipfs storage engine as raw bytes.

Warning: Downloading large files can be slow.

:param file_hash: The hash of the file to retrieve.
:param chunk_size: The size of each chunk to read from the response.
"""
buffer = BytesIO()
await self.download_file_ipfs_to_buffer(
file_hash, output_buffer=buffer, chunk_size=chunk_size
)
return buffer.getvalue()

async def get_messages(
self,
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest
from aleph.sdk import AlephClient
from aleph.sdk.conf import settings as sdk_settings


@pytest.mark.parametrize(
"file_hash,expected_size",
[
("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5),
("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703),
],
)
@pytest.mark.asyncio
async def test_download(file_hash: str, expected_size: int):
async with AlephClient(api_server=sdk_settings.API_HOST) as client:
file_content = await client.download_file(file_hash) # File is 5B
file_size = len(file_content)
assert file_size == expected_size


@pytest.mark.parametrize(
"file_hash,expected_size",
[
("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5),
("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703),
],
)
@pytest.mark.asyncio
async def test_download_ipfs(file_hash: str, expected_size: int):
async with AlephClient(api_server=sdk_settings.API_HOST) as client:
file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE
file_size = len(file_content)
assert file_size == expected_size