Skip to content

Commit

Permalink
feat: implement push/pull interface from JAC, file and s3 (docarray#1182
Browse files Browse the repository at this point in the history
)

* refactor: move streaming serialization into separate method

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: add binary io like protocol definition

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: ported push pull to JAC

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: protocol is not in 3.7 typing

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: make mypy happy

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: patch missing waterfall

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: jit import backends

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: implement cache in jinaai pull

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add hubble dependency to jina group

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: better division of concerns

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: add concept of namespace

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: ignore missing hubble stubs

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: streaming protocol stubs

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: make more general buffered caching reader

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: add tests for hubble pushpull

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: add tests for file backend

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: remove hubble dependency from jina group

This reverts commit b304421.

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: implement push pull for local filesystem

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: test concurrent pushes and pulls in file protocol

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: resolve concurrent pushes and pulls correctly

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: rename text to textdoc

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: added some logging

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: s3 tests

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: s3 pushpull

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add smart open dependency

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add smart opens silly python bound

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: update hubble tests (failing)

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: fix delete return in hubble pushpull

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* Revert "fix: add smart open dependency"

This reverts commit cf78c6c.

This reverts commit eb0e52b.

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add hubble and smart open dependencies

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: mypy fixes

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* ci: allow tests to see jina auth token

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: add progress bars for streaming

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* style: blacken

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: buffer writes to s3

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: mypy no like sequence

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: make progress bar quieter when disabled

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: skip failing tests

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: add tables when listing

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: add jina auth token to uncaped test

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: mock s3 tests with minio container

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: silly error that cost me 2 hours of life

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: use tolerance ratio in file tests

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: add caching to s3 pull

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: add log messages for unused parameters

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: take out unneeded buffering

smart open already buffers

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: pick fastest protocol compression configuration for s3

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: bump tolerance ratio for s3 test

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: reduce code duplication

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: put reader chunk size constant at top of file

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: reduce reader chunk size for memory tests

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: multipart uploads get stuck frequently

lets just do big uploads for now...

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* docs: add docstrings to mixin and file backend

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* docs: add docstring for s3 and hubble backends

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* test: remove unused test

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: use literal in protocol

Co-authored-by: samsja <55492238+samsja@users.noreply.github.com>
Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: protocols dont need to be inherited

Co-authored-by: samsja <55492238+samsja@users.noreply.github.com>
Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add make mypy happy with the literals

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: literals not in 3.7

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: move mixin out of init file

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: move cache path resolution to utils

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* feat: cache path is only evaluated once

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: loading backends makes more sense as debug log

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* tests: add slow and internet marks

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: pin image tag

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: use abc instead of protocol for typing backends

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: revert - add hubble and smart open dependencies

This reverts commit 1d1d2ee.

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: add hubble and aws dependencies

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: change all push pull mixin methods to class methods

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: misstyped class method self reference

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: rename pushpull to docstore and use more classmethods

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: separate remote backend implementations from mixin

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* fix: missed import refactor

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: change submodule name to store

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: remove list and delete from mixin

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* tests: clear all the garbage in ci account

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* tests: skip test that is broken on ci

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

* refactor: standardize naming to jac

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>

---------

Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>
Co-authored-by: samsja <55492238+samsja@users.noreply.github.com>
  • Loading branch information
Jackmin801 and samsja authored Mar 27, 2023
1 parent 081a03f commit 6cd05f8
Show file tree
Hide file tree
Showing 20 changed files with 2,897 additions and 76 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ jobs:
run: |
poetry run pytest -m "not (tensorflow or benchmark or index)" ${{ matrix.test-path }}
timeout-minutes: 30
# env:
# JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"
env:
JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"
# - name: Check codecov file
# id: check_files
# uses: andstor/file-existence-action@v1
Expand Down Expand Up @@ -164,6 +164,8 @@ jobs:
run: |
poetry run pytest -m "not (tensorflow or benchmark)" ${{ matrix.test-path }}
timeout-minutes: 30
env:
JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"


docarray-test-proto3:
Expand Down
3 changes: 2 additions & 1 deletion docarray/array/array/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from docarray.array.abstract_array import AnyDocumentArray
from docarray.array.array.io import IOMixinArray
from docarray.array.array.pushpull import PushPullMixin
from docarray.array.array.sequence_indexing_mixin import (
IndexingSequenceMixin,
IndexIterType,
Expand Down Expand Up @@ -57,7 +58,7 @@ def _delegate_meth(self, *args, **kwargs):


class DocumentArray(
IndexingSequenceMixin[T_doc], IOMixinArray, AnyDocumentArray[T_doc]
IndexingSequenceMixin[T_doc], PushPullMixin, IOMixinArray, AnyDocumentArray[T_doc]
):
"""
DocumentArray is a container of Documents.
Expand Down
128 changes: 82 additions & 46 deletions docarray/array/array/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Dict,
Generator,
Iterable,
Iterator,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -177,37 +178,60 @@ def _write_bytes(
elif protocol == 'pickle-array':
f.write(pickle.dumps(self))
elif protocol in SINGLE_PROTOCOLS:
from rich import filesize

from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Serializing', disable=not show_progress, total=len(self)
)

f.write(self._stream_header)

with pbar:
_total_size = 0
pbar.start_task(t)
for doc in self:
doc_bytes = doc.to_bytes(protocol=protocol, compress=compress)
len_doc_as_bytes = len(doc_bytes).to_bytes(
4, 'big', signed=False
)
all_bytes = len_doc_as_bytes + doc_bytes
f.write(all_bytes)
_total_size += len(all_bytes)
pbar.update(
t,
advance=1,
total_size=str(filesize.decimal(_total_size)),
f.write(
b''.join(
self.to_binary_stream(
protocol=protocol,
compress=compress,
show_progress=show_progress,
)
)
)
else:
raise ValueError(
f'protocol={protocol} is not supported. Can be only {ALLOWED_PROTOCOLS}.'
)

def to_binary_stream(
self,
protocol: str = 'protobuf',
compress: Optional[str] = None,
show_progress: bool = False,
) -> Iterator[bytes]:
from rich import filesize

if show_progress:
from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Serializing', disable=not show_progress, total=len(self)
)
else:
from contextlib import nullcontext

pbar = nullcontext()

yield self._stream_header

with pbar:
if show_progress:
_total_size = 0
pbar.start_task(t)
for doc in self:
doc_bytes = doc.to_bytes(protocol=protocol, compress=compress)
len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False)
all_bytes = len_doc_as_bytes + doc_bytes

yield all_bytes

if show_progress:
_total_size += len(all_bytes)
pbar.update(
t,
advance=1,
total_size=str(filesize.decimal(_total_size)),
)

def to_bytes(
self,
protocol: str = 'protobuf-array',
Expand Down Expand Up @@ -584,7 +608,7 @@ def _load_binary_all(
def _load_binary_stream(
cls: Type[T],
file_ctx: ContextManager[io.BufferedReader],
protocol: Optional[str] = None,
protocol: str = 'protobuf',
compress: Optional[str] = None,
show_progress: bool = False,
) -> Generator['BaseDocument', None, None]:
Expand All @@ -598,37 +622,43 @@ def _load_binary_stream(

from rich import filesize

from docarray import BaseDocument
from docarray.utils.progress_bar import _get_progressbar

with file_ctx as f:
version_numdocs_lendoc0 = f.read(9)
# 1 byte (uint8)
# 8 bytes (uint64)
num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False)

pbar, t = _get_progressbar(
'Deserializing', disable=not show_progress, total=num_docs
)
if show_progress:
from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Deserializing', disable=not show_progress, total=num_docs
)
else:
from contextlib import nullcontext

pbar = nullcontext()

with pbar:
_total_size = 0
pbar.start_task(t)
if show_progress:
_total_size = 0
pbar.start_task(t)
for _ in range(num_docs):
# 4 bytes (uint32)
len_current_doc_in_bytes = int.from_bytes(
f.read(4), 'big', signed=False
)
_total_size += len_current_doc_in_bytes
load_protocol: str = protocol or 'protobuf'
yield BaseDocument.from_bytes(
load_protocol: str = protocol
yield cls.document_type.from_bytes(
f.read(len_current_doc_in_bytes),
protocol=load_protocol,
compress=compress,
)
pbar.update(
t, advance=1, total_size=str(filesize.decimal(_total_size))
)
if show_progress:
_total_size += len_current_doc_in_bytes
pbar.update(
t, advance=1, total_size=str(filesize.decimal(_total_size))
)

@classmethod
def load_binary(
Expand Down Expand Up @@ -670,12 +700,18 @@ def load_binary(
else:
raise FileNotFoundError(f'cannot find file {file}')
if streaming:
return cls._load_binary_stream(
file_ctx,
protocol=load_protocol,
compress=load_compress,
show_progress=show_progress,
)
if load_protocol not in SINGLE_PROTOCOLS:
raise ValueError(
f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, '
f'got {load_protocol}'
)
else:
return cls._load_binary_stream(
file_ctx,
protocol=load_protocol,
compress=load_compress,
show_progress=show_progress,
)
else:
return cls._load_binary_all(
file_ctx, load_protocol, load_compress, show_progress
Expand Down
Loading

0 comments on commit 6cd05f8

Please sign in to comment.