Skip to content

Commit

Permalink
Azure blob storage support (#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jun 11, 2024
1 parent fcfd567 commit 828939e
Show file tree
Hide file tree
Showing 39 changed files with 1,731 additions and 659 deletions.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ exclude = ^(examples/|scripts/)
mypy_path = $MYPY_CONFIG_FILE_DIR/mypy_stubs,
$MYPY_CONFIG_FILE_DIR/nucliadb_sdk/src

[mypy-azure.*]
ignore_missing_imports = True

[mypy-opentelemetry.*]
ignore_missing_imports = True

Expand Down
7 changes: 3 additions & 4 deletions nucliadb/src/nucliadb/writer/api/v1/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from nucliadb_models.resource import NucliaDBRoles
from nucliadb_models.utils import FieldIdString
from nucliadb_models.writer import CreateResourcePayload, ResourceFileUploaded
from nucliadb_protos.resources_pb2 import FieldFile, Metadata
from nucliadb_protos.resources_pb2 import CloudFile, FieldFile, Metadata
from nucliadb_protos.writer_pb2 import BrokerMessage
from nucliadb_utils.authentication import requires_one
from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError
Expand Down Expand Up @@ -458,7 +458,7 @@ async def _tus_patch(
field: Optional[str] = None,
) -> Response:
"""
Upload all bytes in the requests and append them in the specifyied offset
Upload all bytes in the requests and append them in the specified offset
"""
if rid is not None:
await validate_rid_exists_or_raise_error(kbid, rid)
Expand Down Expand Up @@ -511,7 +511,6 @@ async def _tus_patch(
}

upload_finished = dm.get("size") is not None and dm.offset >= dm.get("size")

if upload_finished:
rid = dm.get("rid", rid)
if rid is None:
Expand Down Expand Up @@ -802,7 +801,7 @@ async def store_file_on_nuclia_db(
path: str,
request: Request,
bucket: str,
source: Source,
source: CloudFile.Source.ValueType,
rid: str,
field: str,
content_type: str = "application/octet-stream",
Expand Down
11 changes: 11 additions & 0 deletions nucliadb/src/nucliadb/writer/tus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Optional

from nucliadb.writer.settings import settings as writer_settings
from nucliadb.writer.tus.azure import AzureBlobStore, AzureFileStorageManager
from nucliadb.writer.tus.dm import FileDataManager, RedisFileDataManagerFactory
from nucliadb.writer.tus.exceptions import ManagerNotAvailable
from nucliadb.writer.tus.gcs import GCloudBlobStore, GCloudFileStorageManager
Expand Down Expand Up @@ -90,6 +91,16 @@ async def initialize():

DRIVER = TusStorageDriver(backend=storage_backend, manager=storage_manager)

elif storage_settings.file_backend == FileBackendConfig.AZURE:
if storage_settings.azure_connection_string is None:
raise ConfigurationError("AZURE_CONNECTION_STRING env variable not configured")

storage_backend = AzureBlobStore()
await storage_backend.initialize(storage_settings.azure_connection_string)
storage_manager = AzureFileStorageManager(storage_backend)

DRIVER = TusStorageDriver(backend=storage_backend, manager=storage_manager)

elif storage_settings.file_backend == FileBackendConfig.NOT_SET:
raise ConfigurationError("FILE_BACKEND env variable not configured")

Expand Down
109 changes: 109 additions & 0 deletions nucliadb/src/nucliadb/writer/tus/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at info@nuclia.com.
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations

from nucliadb.writer import logger
from nucliadb.writer.tus.dm import FileDataManager
from nucliadb.writer.tus.storage import BlobStore, FileStorageManager
from nucliadb_protos.resources_pb2 import CloudFile
from nucliadb_utils.storages import CHUNK_SIZE
from nucliadb_utils.storages.azure import AzureObjectStore
from nucliadb_utils.storages.exceptions import ObjectNotFoundError
from nucliadb_utils.storages.utils import ObjectMetadata


class AzureBlobStore(BlobStore):
async def finalize(self):
if self._object_store is None:
return
try:
await self._object_store.close()
except Exception:
logger.exception("Error closing AzureBlobStore")
self._object_store = None

async def initialize(self, connection_string: str):
self.bucket = "nucliadb-{kbid}"
self.source = CloudFile.Source.AZURE
self._object_store = AzureObjectStore(connection_string)
await self._object_store.initialize()

@property
def object_store(self) -> AzureObjectStore:
assert self._object_store is not None
return self._object_store

async def check_exists(self, bucket_name: str) -> bool:
return await self.object_store.bucket_exists(bucket_name)

async def create_bucket(self, bucket_name: str) -> bool:
created = await self.object_store.bucket_create(bucket_name)
return not created


class AzureFileStorageManager(FileStorageManager):
storage: AzureBlobStore
chunk_size = CHUNK_SIZE
min_upload_size = None

@property
def object_store(self) -> AzureObjectStore:
return self.storage.object_store

async def start(self, dm: FileDataManager, path: str, kbid: str):
bucket = self.storage.get_bucket_name(kbid)
if dm.filename == 0:
filename = "file"
else:
filename = dm.filename
metadata = ObjectMetadata(
filename=filename,
content_type=dm.content_type,
size=dm.size,
)
await self.object_store.upload_multipart_start(bucket, path, metadata)
await dm.update(path=path, bucket=bucket)

async def delete_upload(self, uri: str, kbid: str) -> None:
bucket = self.storage.get_bucket_name(kbid)
try:
await self.object_store.delete(bucket, uri)
except ObjectNotFoundError:
logger.warning(
"Attempt to delete an upload but not found",
extra={"uri": uri, "kbid": kbid, "bucket": bucket},
)

async def append(self, dm: FileDataManager, iterable, offset: int) -> int:
bucket = dm.get("bucket")
assert bucket is not None
path = dm.get("path")
assert path is not None
uploaded_bytes = await self.object_store.upload_multipart_append(bucket, path, iterable)
await dm.update(offset=offset)
return uploaded_bytes

async def finish(self, dm: FileDataManager):
path = dm.get("path")
await dm.finish()
return path

def validate_intermediate_chunk(self, uploaded_bytes: int):
pass
4 changes: 3 additions & 1 deletion nucliadb/src/nucliadb/writer/tus/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ async def start(self, dm: FileDataManager, path: str, kbid: str):
if self.storage.session is None:
raise AttributeError()

upload_file_id = dm.get("upload_file_id", str(uuid.uuid4()))
upload_file_id = dm.get("upload_file_id")
if upload_file_id is not None:
await self.delete_upload(upload_file_id, kbid)
else:
upload_file_id = str(uuid.uuid4())

bucket = self.storage.get_bucket_name(kbid)
init_url = "{}&name={}".format(
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/writer/tus/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class FileStorageManager:
chunk_size: int
min_upload_size: Optional[int] = None

def __init__(self, storage):
def __init__(self, storage: BlobStore):
self.storage = storage

def iter_data(
Expand Down
10 changes: 10 additions & 0 deletions nucliadb/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from nucliadb_utils.settings import FileBackendConfig
from nucliadb_utils.storages.settings import settings as storage_settings
from nucliadb_utils.tests import free_port
from nucliadb_utils.tests.azure import AzuriteFixture
from nucliadb_utils.utilities import (
Utility,
clean_utility,
Expand Down Expand Up @@ -175,6 +176,7 @@ async def nucliadb(
)
)
server = await run_async_nucliadb(settings)
assert server.started, "Nucliadb server did not start correctly"

yield settings

Expand Down Expand Up @@ -669,6 +671,14 @@ def local_storage_settings(tmpdir):
}


@pytest.fixture(scope="function")
def azure_storage_settings(azurite: AzuriteFixture):
return {
"file_backend": FileBackendConfig.AZURE,
"azure_connection_string": azurite.connection_string,
}


@pytest.fixture(scope="function")
def blobstorage_settings(local_storage_settings):
"""
Expand Down
1 change: 1 addition & 0 deletions nucliadb/tests/ingest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
"nucliadb_telemetry.tests.telemetry",
]
1 change: 1 addition & 0 deletions nucliadb/tests/nucliadb/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
"tests.ingest.fixtures",
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.azure",
]
1 change: 1 addition & 0 deletions nucliadb/tests/reader/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
"nucliadb_utils.tests.asyncbenchmark",
]
1 change: 1 addition & 0 deletions nucliadb/tests/search/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
"nucliadb_utils.tests.nats",
"nucliadb_utils.tests.asyncbenchmark",
"nucliadb_utils.tests.indexing",
Expand Down
1 change: 1 addition & 0 deletions nucliadb/tests/standalone/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.nats",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
"nucliadb_utils.tests.indexing",
]
3 changes: 3 additions & 0 deletions nucliadb/tests/standalone/integration/test_upload_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def header_encode(some_string):
lazy_fixture.lf("gcs_storage_settings"),
lazy_fixture.lf("s3_storage_settings"),
lazy_fixture.lf("local_storage_settings"),
lazy_fixture.lf("azure_storage_settings"),
],
)
def blobstorage_settings(request):
Expand Down Expand Up @@ -145,6 +146,8 @@ async def test_file_tus_upload_and_download(

# Make sure the upload is finished on the server side
assert resp.headers["Tus-Upload-Finished"] == "1"
assert "NDB-Resource" in resp.headers
assert "NDB-Field" in resp.headers

# Now download the file
download_url = f"{kb_path}/{RESOURCE_PREFIX}/{resource}/file/field1/download/field"
Expand Down
1 change: 1 addition & 0 deletions nucliadb/tests/train/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
]
1 change: 1 addition & 0 deletions nucliadb/tests/writer/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
"nucliadb_utils.tests.fixtures",
"nucliadb_utils.tests.gcs",
"nucliadb_utils.tests.s3",
"nucliadb_utils.tests.azure",
]
2 changes: 1 addition & 1 deletion nucliadb_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ tracing-opentelemetry = "0.23"
reqwest = { version = "0.11", features = ["json", "rustls-tls", "blocking"] }
num_cpus = "1.16.0"
crossbeam-utils = "0.8.16"
object_store = { version = "^0.10.1", features = ["gcp", "aws"]}
object_store = { version = "^0.10.1", features = ["gcp", "aws", "azure"]}
base64 = "^0.21"


Expand Down
14 changes: 14 additions & 0 deletions nucliadb_node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use nucliadb_core::NodeResult;
use object_store::aws::AmazonS3Builder;
use object_store::azure::MicrosoftAzureBuilder;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::memory::InMemory;
use serde::{Deserialize, Deserializer};
Expand Down Expand Up @@ -132,6 +133,11 @@ pub fn build_object_store_driver(settings: &EnvSettings) -> Arc<dyn ObjectStore>
}
Arc::new(builder.build().unwrap())
}
ObjectStoreType::AZURE => {
let builder =
MicrosoftAzureBuilder::new().with_allow_http(true).with_url(settings.azure_url.clone().unwrap());
Arc::new(builder.build().unwrap())
}
// Any other type is not supported for now
_ => Arc::new(InMemory::new()),
}
Expand All @@ -151,6 +157,7 @@ pub enum ObjectStoreType {
NOTSET,
GCS,
S3,
AZURE,
}

impl<'de> Deserialize<'de> for ObjectStoreType {
Expand All @@ -162,6 +169,7 @@ impl<'de> Deserialize<'de> for ObjectStoreType {
match s.as_str() {
"gcs" => Ok(ObjectStoreType::GCS),
"s3" => Ok(ObjectStoreType::S3),
"azure" => Ok(ObjectStoreType::AZURE),
_ => {
warn!("Invalid object store type: {}. Using default one", s);
Ok(ObjectStoreType::NOTSET)
Expand Down Expand Up @@ -242,6 +250,7 @@ pub struct EnvSettings {
pub s3_region_name: String,
pub s3_indexing_bucket: String,
pub s3_endpoint: Option<String>,
pub azure_url: Option<String>,
}

impl EnvSettings {
Expand Down Expand Up @@ -322,6 +331,7 @@ impl Default for EnvSettings {
s3_region_name: Default::default(),
s3_indexing_bucket: Default::default(),
s3_endpoint: None,
azure_url: Default::default(),
}
}
}
Expand Down Expand Up @@ -356,6 +366,10 @@ mod tests {
let settings = from_pairs(&[("FILE_BACKEND", "s3")]).unwrap();
assert_eq!(settings.file_backend, super::ObjectStoreType::S3);

let azure_url = "https://myaccount.blob.core.windows.net/mycontainer/myblob";
let settings = from_pairs(&[("FILE_BACKEND", "azure"), ("azure_url", azure_url)]).unwrap();
assert_eq!(settings.file_backend, super::ObjectStoreType::AZURE);

let settings = from_pairs(&[("FILE_BACKEND", "unknown")]).unwrap();
assert_eq!(settings.file_backend, super::ObjectStoreType::NOTSET);
}
Expand Down
390 changes: 195 additions & 195 deletions nucliadb_protos/python/src/nucliadb_protos/resources_pb2.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions nucliadb_protos/python/src/nucliadb_protos/resources_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class CloudFile(google.protobuf.message.Message):
EMPTY: CloudFile._Source.ValueType # 5
EXPORT: CloudFile._Source.ValueType # 6
POSTGRES: CloudFile._Source.ValueType # 7
AZURE: CloudFile._Source.ValueType # 8

class Source(_Source, metaclass=_SourceEnumTypeWrapper): ...
FLAPS: CloudFile.Source.ValueType # 0
Expand All @@ -99,6 +100,7 @@ class CloudFile(google.protobuf.message.Message):
EMPTY: CloudFile.Source.ValueType # 5
EXPORT: CloudFile.Source.ValueType # 6
POSTGRES: CloudFile.Source.ValueType # 7
AZURE: CloudFile.Source.ValueType # 8

URI_FIELD_NUMBER: builtins.int
SIZE_FIELD_NUMBER: builtins.int
Expand Down
1 change: 1 addition & 0 deletions nucliadb_protos/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ message CloudFile {
EMPTY = 5;
EXPORT = 6;
POSTGRES = 7;
AZURE = 8;
}
Source source = 5;
string filename = 6;
Expand Down
Loading

3 comments on commit 828939e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 828939e Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3038.933642005945 iter/sec (stddev: 0.000006822984662528208) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.93

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 828939e Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3143.318018075541 iter/sec (stddev: 0.000004774135904057321) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.90

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 828939e Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3002.311769984327 iter/sec (stddev: 0.000006179196220037763) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.95

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.