Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/crawlee/storage_clients/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
KvsValueType = TypeVar('KvsValueType', default=Any)


class _StorageMetadata(BaseModel):
@docs_group('Data structures')
class StorageMetadata(BaseModel):
"""Represents the base model for storage metadata.

It contains common fields shared across all specific storage types.
Expand All @@ -32,7 +33,7 @@ class _StorageMetadata(BaseModel):


@docs_group('Data structures')
class DatasetMetadata(_StorageMetadata):
class DatasetMetadata(StorageMetadata):
"""Model for a dataset metadata."""

model_config = ConfigDict(populate_by_name=True)
Expand All @@ -41,7 +42,7 @@ class DatasetMetadata(_StorageMetadata):


@docs_group('Data structures')
class KeyValueStoreMetadata(_StorageMetadata):
class KeyValueStoreMetadata(StorageMetadata):
"""Model for a key-value store metadata."""

model_config = ConfigDict(populate_by_name=True)
Expand All @@ -50,7 +51,7 @@ class KeyValueStoreMetadata(_StorageMetadata):


@docs_group('Data structures')
class RequestQueueMetadata(_StorageMetadata):
class RequestQueueMetadata(StorageMetadata):
"""Model for a request queue metadata."""

model_config = ConfigDict(populate_by_name=True)
Expand Down
11 changes: 11 additions & 0 deletions src/crawlee/storages/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
if TYPE_CHECKING:
from crawlee.configuration import Configuration
from crawlee.storage_clients._base import StorageClient
from crawlee.storage_clients.models import StorageMetadata


class Storage(ABC):
Expand All @@ -21,6 +22,16 @@ def id(self) -> str:
def name(self) -> str | None:
"""Get the storage name."""

@property
@abstractmethod
def storage_object(self) -> StorageMetadata:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be more precise?

Suggested change
def storage_object(self) -> StorageMetadata:
def storage_object(self) -> DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata:

and _StorageMetadata could remain private.

I am also ok with having StorageMetadata as a type alias for the union mentioned above. And the _StorageMetadata could be renamed to _CommonStorageMetadata, _BaseStorageMetadata, ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I actually did in the beginning, but I was getting type error in _creation_managment.py

Screenshot 2025-03-03 at 14 52 34

"""Get the full storage object."""

@storage_object.setter
@abstractmethod
def storage_object(self, storage_object: StorageMetadata) -> None:
"""Set the full storage object."""

@classmethod
@abstractmethod
async def open(
Expand Down
10 changes: 5 additions & 5 deletions src/crawlee/storages/_creation_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,19 @@ async def open_storage(
async with _creation_lock:
if id and not is_default_on_memory:
resource_client = _get_resource_client(storage_class, storage_client, id)
storage_info = await resource_client.get()
if not storage_info:
storage_object = await resource_client.get()
if not storage_object:
raise RuntimeError(f'{storage_class.__name__} with id "{id}" does not exist!')

elif is_default_on_memory:
resource_collection_client = _get_resource_collection_client(storage_class, storage_client)
storage_info = await resource_collection_client.get_or_create(name=name, id=id)
storage_object = await resource_collection_client.get_or_create(name=name, id=id)

else:
resource_collection_client = _get_resource_collection_client(storage_class, storage_client)
storage_info = await resource_collection_client.get_or_create(name=name)
storage_object = await resource_collection_client.get_or_create(name=name)

storage = storage_class(id=storage_info.id, name=storage_info.name, storage_client=storage_client)
storage = storage_class.from_storage_object(storage_client=storage_client, storage_object=storage_object)

# Cache the storage by ID and name
_add_to_cache_by_id(storage.id, storage)
Expand Down
29 changes: 28 additions & 1 deletion src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Literal, TextIO, TypedDict, cast

from typing_extensions import NotRequired, Required, Unpack, override
Expand All @@ -12,7 +13,7 @@
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.docs import docs_group
from crawlee._utils.file import json_dumps
from crawlee.storage_clients.models import DatasetMetadata
from crawlee.storage_clients.models import DatasetMetadata, StorageMetadata

from ._base import Storage
from ._key_value_store import KeyValueStore
Expand Down Expand Up @@ -197,11 +198,27 @@ class Dataset(Storage):
def __init__(self, id: str, name: str | None, storage_client: StorageClient) -> None:
self._id = id
self._name = name
datetime_now = datetime.now(timezone.utc)
self._storage_object = StorageMetadata(
id=id, name=name, accessed_at=datetime_now, created_at=datetime_now, modified_at=datetime_now
)

# Get resource clients from the storage client.
self._resource_client = storage_client.dataset(self._id)
self._resource_collection_client = storage_client.datasets()

@classmethod
def from_storage_object(cls, storage_client: StorageClient, storage_object: StorageMetadata) -> Dataset:
"""Create a new instance of Dataset from a storage metadata object."""
dataset = Dataset(
id=storage_object.id,
name=storage_object.name,
storage_client=storage_client,
)

dataset.storage_object = storage_object
return dataset

@property
@override
def id(self) -> str:
Expand All @@ -212,6 +229,16 @@ def id(self) -> str:
def name(self) -> str | None:
return self._name

@property
@override
def storage_object(self) -> StorageMetadata:
return self._storage_object

@storage_object.setter
@override
def storage_object(self, storage_object: StorageMetadata) -> None:
self._storage_object = storage_object

@override
@classmethod
async def open(
Expand Down
29 changes: 28 additions & 1 deletion src/crawlee/storages/_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import asyncio
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, overload

from typing_extensions import override

from crawlee import service_locator
from crawlee._utils.docs import docs_group
from crawlee.events._types import Event, EventPersistStateData
from crawlee.storage_clients.models import KeyValueStoreKeyInfo, KeyValueStoreMetadata
from crawlee.storage_clients.models import KeyValueStoreKeyInfo, KeyValueStoreMetadata, StorageMetadata

from ._base import Storage

Expand Down Expand Up @@ -65,11 +66,27 @@ class KeyValueStore(Storage):
def __init__(self, id: str, name: str | None, storage_client: StorageClient) -> None:
self._id = id
self._name = name
datetime_now = datetime.now(timezone.utc)
self._storage_object = StorageMetadata(
id=id, name=name, accessed_at=datetime_now, created_at=datetime_now, modified_at=datetime_now
)

# Get resource clients from storage client
self._resource_client = storage_client.key_value_store(self._id)
self._autosave_lock = asyncio.Lock()

@classmethod
def from_storage_object(cls, storage_client: StorageClient, storage_object: StorageMetadata) -> KeyValueStore:
"""Create a new instance of KeyValueStore from a storage metadata object."""
key_value_store = KeyValueStore(
id=storage_object.id,
name=storage_object.name,
storage_client=storage_client,
)

key_value_store.storage_object = storage_object
return key_value_store

@property
@override
def id(self) -> str:
Expand All @@ -80,6 +97,16 @@ def id(self) -> str:
def name(self) -> str | None:
return self._name

@property
@override
def storage_object(self) -> StorageMetadata:
return self._storage_object

@storage_object.setter
@override
def storage_object(self, storage_object: StorageMetadata) -> None:
self._storage_object = storage_object

async def get_info(self) -> KeyValueStoreMetadata | None:
"""Get an object containing general information about the key value store."""
return await self._resource_client.get()
Expand Down
29 changes: 28 additions & 1 deletion src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from crawlee._utils.wait import wait_for_all_tasks_for_finish
from crawlee.events import Event
from crawlee.request_loaders import RequestManager
from crawlee.storage_clients.models import ProcessedRequest, RequestQueueMetadata
from crawlee.storage_clients.models import ProcessedRequest, RequestQueueMetadata, StorageMetadata

from ._base import Storage

Expand Down Expand Up @@ -113,6 +113,11 @@ def __init__(self, id: str, name: str | None, storage_client: StorageClient) ->
self._id = id
self._name = name

datetime_now = datetime.now(timezone.utc)
self._storage_object = StorageMetadata(
id=id, name=name, accessed_at=datetime_now, created_at=datetime_now, modified_at=datetime_now
)

# Get resource clients from storage client
self._resource_client = storage_client.request_queue(self._id)
self._resource_collection_client = storage_client.request_queues()
Expand All @@ -137,6 +142,18 @@ def __init__(self, id: str, name: str | None, storage_client: StorageClient) ->
self._recently_handled: BoundedSet[str] = BoundedSet(max_length=self._RECENTLY_HANDLED_CACHE_SIZE)
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=self._MAX_CACHED_REQUESTS)

@classmethod
def from_storage_object(cls, storage_client: StorageClient, storage_object: StorageMetadata) -> RequestQueue:
"""Create a new instance of RequestQueue from a storage metadata object."""
request_queue = RequestQueue(
id=storage_object.id,
name=storage_object.name,
storage_client=storage_client,
)

request_queue.storage_object = storage_object
return request_queue

@property
@override
def id(self) -> str:
Expand All @@ -147,6 +164,16 @@ def id(self) -> str:
def name(self) -> str | None:
return self._name

@property
@override
def storage_object(self) -> StorageMetadata:
return self._storage_object

@storage_object.setter
@override
def storage_object(self, storage_object: StorageMetadata) -> None:
self._storage_object = storage_object

@override
@classmethod
async def open(
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/storages/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING

import pytest

from crawlee import service_locator
from crawlee.storage_clients.models import StorageMetadata
from crawlee.storages import Dataset, KeyValueStore

if TYPE_CHECKING:
Expand Down Expand Up @@ -131,3 +134,21 @@ async def test_iterate_items(dataset: Dataset) -> None:
idx += 1

assert idx == desired_item_count


async def test_from_storage_object() -> None:
storage_client = service_locator.get_storage_client()

storage_object = StorageMetadata(
id='dummy-id',
name='dummy-name',
accessed_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
)

dataset = Dataset.from_storage_object(storage_client, storage_object)

assert dataset.id == storage_object.id
assert dataset.name == storage_object.name
assert dataset.storage_object == storage_object
27 changes: 27 additions & 0 deletions tests/unit/storages/test_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import pytest

from crawlee import service_locator
from crawlee.events import EventManager
from crawlee.storage_clients.models import StorageMetadata
from crawlee.storages import KeyValueStore

if TYPE_CHECKING:
Expand Down Expand Up @@ -51,6 +53,13 @@ async def test_open() -> None:
await KeyValueStore.open(id='dummy-name')


async def test_open_save_storage_object() -> None:
default_key_value_store = await KeyValueStore.open()

assert default_key_value_store.storage_object is not None
assert default_key_value_store.storage_object.id == default_key_value_store.id


async def test_consistency_accross_two_clients() -> None:
kvs = await KeyValueStore.open(name='my-kvs')
await kvs.set_value('key', 'value')
Expand Down Expand Up @@ -205,3 +214,21 @@ async def increment_counter() -> None:
await asyncio.gather(*tasks)

assert (await key_value_store.get_auto_saved_value('state'))['counter'] == 2


async def test_from_storage_object() -> None:
storage_client = service_locator.get_storage_client()

storage_object = StorageMetadata(
id='dummy-id',
name='dummy-name',
accessed_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
)

key_value_store = KeyValueStore.from_storage_object(storage_client, storage_object)

assert key_value_store.id == storage_object.id
assert key_value_store.name == storage_object.name
assert key_value_store.storage_object == storage_object
21 changes: 20 additions & 1 deletion tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import pytest
from pydantic import ValidationError

from crawlee import Request
from crawlee import Request, service_locator
from crawlee._request import RequestState
from crawlee.storage_clients.models import StorageMetadata
from crawlee.storages import RequestQueue

if TYPE_CHECKING:
Expand Down Expand Up @@ -265,3 +266,21 @@ async def test_cache_requests(request_queue: RequestQueue) -> None:
# After calling fetch_next_request request_1 moved to the end of the cache store.
cached_items = [request_queue._requests_cache.popitem()[0] for _ in range(2)]
assert cached_items == [request_2.id, request_1.id]


async def test_from_storage_object() -> None:
storage_client = service_locator.get_storage_client()

storage_object = StorageMetadata(
id='dummy-id',
name='dummy-name',
accessed_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
)

request_queue = RequestQueue.from_storage_object(storage_client, storage_object)

assert request_queue.id == storage_object.id
assert request_queue.name == storage_object.name
assert request_queue.storage_object == storage_object
Loading