Skip to content

Introduce file-based database export and import #758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jun 10, 2025

Conversation

farost
Copy link
Member

@farost farost commented May 28, 2025

Usage and product changes

Introduce interfaces to export databases into schema definition and data files and to import databases using these files. Database import supports files exported from both TypeDB 2.x and TypeDB 3.x.

Both operations are blocking and may take a significant amount of time to execute for large databases. Use parallel connections to continue operating with the server and its other databases.

Usage examples in Rust:

// export
let db = driver.databases().get(db_name).await.unwrap();
db.export_to_file(schema_file_path, data_file_path).await.unwrap();

// import
let schema = read_to_string(schema_file_path).unwrap();
driver.databases().import_from_file(db_name2, schema, data_file_path).await.unwrap();

Usage examples in Python:

# export
database = driver.databases.get(db_name)
database.export_to_file(schema_file_path, data_file_path)

# import
with open(schema_file_path, 'r', encoding='utf-8') as f:
    schema = f.read()
driver.databases.import_from_file(db_name2, schema, data_file_path)

Usage examples in Java:

// export
Database database = driver.databases().get(dbName);
database.exportToFile(schemaFilePath, dataFilePath);

// import
String schema = Files.readString(Path.of(schemaFilePath));
driver.databases().importFromFile(dbName2, schema, dataFilePath);

Implementation

Implemented the updated protocol.

As both operations work with streaming, the implementation is similar to transactions. The behavior is split into the file processing logic and networking (specialized for sync and async modes). The exposed interfaces present only the file-based versions, but additional interfaces for direct work with streams can be presented in future updates.

In Rust, paths are accepted as Rust Paths. In other languages working through C interfaces, it's pure strings for C layer transmission.

Database export

Implemented through the database interface and accepts two target files for export. Does not require a specific format of naming (so it's necessarily .typeql or .typedb. If any of the target files already exist, an error is returned.

The export operation consists of these steps:

  • prepare the output files
  • open a unidirectional GRPC stream from the server to the client
  • "block" on server response listening until an error or a "done" is received (blocking is implemented through a loop which resolves a listen promise presented by the network layer)
  • if there is a schema message, write it to the schema file and flush it right away
  • if there is a data items message, encode the items and write them to the data file
  • in case of an error, the output files are deleted (we own them as we create them at the beginning)

The network layer is basically just a task listening for the GRPC stream and transmitting the converted messages to the processing loop.

Database import

Implemented through the database_manager interface and accepts a database name, a schema definition query string (can be read from the exported file), and an exported data file. No naming requirements as well.

The import operation consists of these steps:

  • open the input file
  • open a bidirectional GRPC stream between the server and the client, send the initial request with the database's name and schema
  • eagerly start reading and decoding data items from the input file one by one, storing up to 250 items in the buffer (this number can be easily changed)
  • once the buffer is full, attempt items sending operation: this operation will check a potential early error signal from the server and then send the batch, returning to processing the rest of the file
  • once the file is read, send a "done" message and block until the server responds with either an error or its "done" message

The network layer consists of a blocking task for client-side requests and a listening task waiting for a one-shot signal from the server (either an error or a "done" message). Errors can be received at any time of processing, while "done" is expected only after a client-side "done" request. When a response is received, either an async or a sync sink receives this message, which should be checked before any client-side network operations to ensure proper interruption.

@@ -186,6 +194,20 @@ error_messages! { ConceptError
2: "Cannot get concept from a concept row by index '{index}'.",
}

error_messages! { MigrationError
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood that it's probably more correct to have a separate error. I may convert it to DatabaseError or smth. It's certainly not a connection error.

Self { export_transmitter }
}

pub(crate) fn listen<'a>(&'a mut self) -> impl Promise<'a, Result<DatabaseExportAnswer>> + 'a {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listen to exported items, similarly to how we listen for transaction responses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listen normally refers to a loop. This is more like accept or even next.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next is good, thansk

Self { import_transmitter }
}

pub(crate) fn items(&mut self, items: Vec<migration::Item>) -> Result {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send a bucket of migration items.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_items then. Or even just send, the items are the argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done is also "sent", so I don't like the send variant. Let it be send_items

}
}

data_writer.flush()?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if I need to flush the exported file from time to time hmmm

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit? If the driver crashes you'd have to restart the export process anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Yeah, I think it knows how to optimally write itself. I was more concerned about buffers for a moment.

let item = item?;
item_buffer.push(item);
if item_buffer.len() >= ITEM_BATCH_SIZE {
import_stream.items(item_buffer.split_off(0))?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_off preserves the capacity of the buffer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is mem::replace(&mut item_buffer, Vec::with_capacity(ITEM_BATCH_SIZE)).
Neither is particularly readable, but split_off is at least shorter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like split_off more, but I think it's a question of what you are used to. I used split_off in both the driver and the server, so I may get us more used to it =)

_phantom_data: PhantomData<M>,
}

impl<M: Message + Default, R: BufRead> ProtoMessageIterator<M, R> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manual import data files' decoding. All the prost's helpers expect an impl of bytes::Buf, which requires an in-memory buffer and cannot be used on a file reader (e.g., writing a where I consume the whole file and then process it is 20-30 times smaller in code volume). At the same time, it's unclear how many bytes to read from the file without more manual operations.

The idea is the following: as the data is delimited by length, we need to read the length first, then read exactly length bytes to decode the item. Then, all the other bytes should be used for the next item, so they should be preserved in the buffer.

@farost farost changed the title Introduce file-based export and import Introduce file-based database export and import May 29, 2025
let schema_ref: &str = schema.as_ref();
let data_file_path = data_file_path.as_ref();
self.run_failsafe(name, |server_connection, name| async move {
let mut import_stream = server_connection.import_database(name, schema_ref.to_string()).await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't wait for a server response here and proceed to read the file and send its items. However, if an error is returned, we will stop on one of the .items(...) calls.

}
}

pub(crate) fn try_creating_export_file(path: impl AsRef<Path>) -> Result<File> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No forced filename format.

@farost farost marked this pull request as ready for review May 29, 2025 10:28
@farost
Copy link
Member Author

farost commented May 29, 2025

CI is red because there is no server artifact yet.


// Makes sure that the shutdown signal is sent when it's dropped. Can try sending redundant shutdown
// signals after another shutdown source: the channel is expected to be closed (no-op).
pub(crate) struct ShutdownGuard<T: Default> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we always counted on the server to drop the stream. However, for import, we are the owners of the stream, and we should make sure to terminate it for both

  1. GRPC connection dropping because of the server or the network
  2. Our error leading to resource deallocation <- this struct helps specifically with this case

Copy link
Member

@dmitrii-ubskii dmitrii-ubskii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming and style comments aside, looks good.

Comment on lines 520 to 521
use crate::util::{create_temp_dir, TempDir};

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused auto-import

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks T_T

use crate::{error::try_release_arc, iterator::iterator_arc_next};
use crate::{error::try_release_arc, iterator::iterator_arc_next, memory::borrow};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

Self { export_transmitter }
}

pub(crate) fn listen<'a>(&'a mut self) -> impl Promise<'a, Result<DatabaseExportAnswer>> + 'a {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listen normally refers to a loop. This is more like accept or even next.

Self { import_transmitter }
}

pub(crate) fn items(&mut self, items: Vec<migration::Item>) -> Result {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_items then. Or even just send, the items are the argument.

Comment on lines 103 to 108
DatabaseExportStream {
response_source: Streaming<database::export::Server>,
},
DatabaseExportAnswer {
response_source: Streaming<database::export::Server>,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Response::DatabaseExportAnswer used? It seems to be a doublet of DatabaseExportStream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it got redundant after a couple of protocol refactoring iterations. Good catch

}
}

async fn listen(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as listen above, this should be renamed

}
}

data_writer.flush()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit? If the driver crashes you'd have to restart the export process anyway.

#[cfg_attr(not(feature = "sync"), doc = "driver.databases().import_file(name, schema, data_path).await;")]
/// ```
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
pub async fn import_file(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import_database_from_file

Copy link
Member Author

@farost farost Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_database_ is clearly redundant, we don't call create create_databaseinstead.from` is fine, I will use it, alright. Although it feels redundant.

let item = item?;
item_buffer.push(item);
if item_buffer.len() >= ITEM_BATCH_SIZE {
import_stream.items(item_buffer.split_off(0))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is mem::replace(&mut item_buffer, Vec::with_capacity(ITEM_BATCH_SIZE)).
Neither is particularly readable, but split_off is at least shorter.

}
}

pub(crate) fn try_creating_export_file(path: impl AsRef<Path>) -> Result<File> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_create_... and so on, ing is against convention.

@farost farost force-pushed the 3.x-export-import branch from 3fe2be9 to b536c8b Compare June 10, 2025 14:43
@farost farost merged commit 6db6f94 into typedb:master Jun 10, 2025
8 checks passed
@farost farost deleted the 3.x-export-import branch June 10, 2025 15:07
farost added a commit to typedb/typedb-console that referenced this pull request Jun 10, 2025
## Usage and product changes
Add database export and database import operations. 

Database export saves the database information (its schema and data) on
the client machine as two files at provided locations:
```
# database export <name> <schema file location> <data file location>
database export my-database export/my-database.typeql export/my-database.typedb
```

Database import uses the exported files to create a new database with
equivalent schema and data:
```
# database export <name> <schema file location> <data file location>
database import their-database export/my-database.typeql export/my-database.typedb
```

## Implementation
Add new commands similar to other `database` interfaces. Prepare data to
call the respective [typedb-driver
APIs](typedb/typedb-driver#758).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants