Skip to content

Commit

Permalink
Cloud import support in backend (#29996)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 7282c13d1c61758256de1b7d8a4341d74f40c46a
  • Loading branch information
nipunn1313 authored and Convex, Inc. committed Sep 20, 2024
1 parent 57abce4 commit 24bc769
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
9 changes: 5 additions & 4 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use common::{
CursorMs,
EnvVarName,
EnvVarValue,
FullyQualifiedObjectKey,
FunctionCaller,
IndexId,
IndexName,
Expand Down Expand Up @@ -259,7 +260,6 @@ use storage::{
ClientDrivenUploadPartToken,
ClientDrivenUploadToken,
Storage,
StorageCacheKey,
StorageExt,
StorageGetStream,
Upload,
Expand Down Expand Up @@ -461,7 +461,8 @@ pub struct Application<RT: Runtime> {
files_storage: Arc<dyn Storage>,
modules_storage: Arc<dyn Storage>,
search_storage: Arc<dyn Storage>,
exports_storage: Arc<dyn Storage>,
// TODO not pub
pub exports_storage: Arc<dyn Storage>,
snapshot_imports_storage: Arc<dyn Storage>,
usage_tracking: UsageCounter,
key_broker: KeyBroker,
Expand Down Expand Up @@ -1360,8 +1361,8 @@ impl<RT: Runtime> Application<RT> {
}

/// Returns the cloud export key - fully qualified to the instance.
pub async fn cloud_export_key(&self, zip_export_key: ObjectKey) -> StorageCacheKey {
self.exports_storage.cache_key(&zip_export_key)
pub async fn cloud_export_key(&self, zip_export_key: ObjectKey) -> FullyQualifiedObjectKey {
self.exports_storage.fully_qualified_key(&zip_export_key)
}

pub async fn update_environment_variables(
Expand Down
22 changes: 22 additions & 0 deletions crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use common::{
schemas::DatabaseSchema,
types::{
FieldName,
FullyQualifiedObjectKey,
MemberId,
ObjectKey,
StorageUuid,
Expand Down Expand Up @@ -1316,6 +1317,27 @@ pub async fn upload_import_file<RT: Runtime>(
.await
}

pub async fn start_cloud_import<RT: Runtime>(
application: &Application<RT>,
identity: Identity,
source_object_key: FullyQualifiedObjectKey,
) -> anyhow::Result<()> {
let object_key: ObjectKey = application
.exports_storage
.copy_object(source_object_key)
.await?;
store_uploaded_import(
application,
identity,
ImportFormat::Zip,
ImportMode::Replace,
ComponentPath::root(),
object_key,
)
.await?;
Ok(())
}

pub async fn store_uploaded_import<RT: Runtime>(
application: &Application<RT>,
identity: Identity,
Expand Down
5 changes: 4 additions & 1 deletion crates/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ pub use index::{
INDEX_BY_ID_DESCRIPTOR,
};
pub use maybe_value::MaybeValue;
pub use object_key::ObjectKey;
pub use object_key::{
FullyQualifiedObjectKey,
ObjectKey,
};
pub use table::TableStats;
#[cfg(any(test, feature = "testing"))]
pub use timestamp::unchecked_repeatable_ts;
Expand Down
5 changes: 5 additions & 0 deletions crates/common/src/types/object_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub struct ObjectKey(
String,
);

/// Fully qualified object key. For s3, in the format
/// {bucket}/{prefix}-{object_key}
#[derive(Debug, Clone, derive_more::From, derive_more::Into)]
pub struct FullyQualifiedObjectKey(String);

impl TryFrom<ObjectKey> for ConvexString {
type Error = anyhow::Error;

Expand Down
31 changes: 29 additions & 2 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use std::{
Write,
},
mem,
path::PathBuf,
path::{
Path,
PathBuf,
},
pin::Pin,
sync::Arc,
task::{
Expand All @@ -37,7 +40,10 @@ use bytes::Bytes;
use common::{
errors::report_error,
runtime::Runtime,
types::ObjectKey,
types::{
FullyQualifiedObjectKey,
ObjectKey,
},
};
use futures::{
channel::{
Expand Down Expand Up @@ -165,10 +171,16 @@ pub trait Storage: Send + Sync + Debug {
key: &ObjectKey,
bytes_range: std::ops::Range<u64>,
) -> BoxFuture<'static, anyhow::Result<StorageGetStream>>;
/// Copy from source storage (potentially different bucket) into current
/// bucket
async fn copy_object(&self, source: FullyQualifiedObjectKey) -> anyhow::Result<ObjectKey>;
fn storage_type_proto(&self) -> pb::searchlight::StorageType;
/// Return a cache key suitable for the given ObjectKey, even in
/// a multi-tenant cache.
fn cache_key(&self, key: &ObjectKey) -> StorageCacheKey;
/// Return a fully qualified key, including info on bucket name
/// and suitable for access in multi-tenant scenario
fn fully_qualified_key(&self, key: &ObjectKey) -> FullyQualifiedObjectKey;
}

pub struct ObjectAttributes {
Expand Down Expand Up @@ -998,6 +1010,12 @@ impl<RT: Runtime> Storage for LocalDirStorage<RT> {
StorageCacheKey(path.to_string_lossy().to_string())
}

fn fully_qualified_key(&self, key: &ObjectKey) -> FullyQualifiedObjectKey {
let key = self.path_for_key(key.clone());
let path = self.dir.join(key);
path.to_string_lossy().to_string().into()
}

fn get_small_range(
&self,
key: &ObjectKey,
Expand Down Expand Up @@ -1039,6 +1057,15 @@ impl<RT: Runtime> Storage for LocalDirStorage<RT> {
}))
}

async fn copy_object(&self, source: FullyQualifiedObjectKey) -> anyhow::Result<ObjectKey> {
let source: String = source.into();
let source_path = Path::new(&source);
let key: ObjectKey = self.rt.new_uuid_v4().to_string().try_into()?;
let dest_path = self.dir.join(self.path_for_key(key.clone()));
fs::copy(source_path, dest_path)?;
Ok(key)
}

fn storage_type_proto(&self) -> pb::searchlight::StorageType {
pb::searchlight::StorageType {
storage_type: Some(pb::searchlight::storage_type::StorageType::Local(
Expand Down

0 comments on commit 24bc769

Please sign in to comment.