-
Notifications
You must be signed in to change notification settings - Fork 36
Introduce file-based database export and import #758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -186,6 +194,20 @@ error_messages! { ConceptError | |||
2: "Cannot get concept from a concept row by index '{index}'.", | |||
} | |||
|
|||
error_messages! { MigrationError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood that it's probably more correct to have a separate error. I may convert it to DatabaseError
or smth. It's certainly not a connection error.
Self { export_transmitter } | ||
} | ||
|
||
pub(crate) fn listen<'a>(&'a mut self) -> impl Promise<'a, Result<DatabaseExportAnswer>> + 'a { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Listen to exported items, similarly to how we listen for transaction responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listen
normally refers to a loop. This is more like accept
or even next
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next
is good, thansk
Self { import_transmitter } | ||
} | ||
|
||
pub(crate) fn items(&mut self, items: Vec<migration::Item>) -> Result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Send a bucket of migration items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_items
then. Or even just send
, the items are the argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
is also "sent", so I don't like the send
variant. Let it be send_items
} | ||
} | ||
|
||
data_writer.flush()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if I need to flush the exported file from time to time hmmm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a benefit? If the driver crashes you'd have to restart the export process anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Yeah, I think it knows how to optimally write itself. I was more concerned about buffers for a moment.
let item = item?; | ||
item_buffer.push(item); | ||
if item_buffer.len() >= ITEM_BATCH_SIZE { | ||
import_stream.items(item_buffer.split_off(0))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split_off
preserves the capacity of the buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is mem::replace(&mut item_buffer, Vec::with_capacity(ITEM_BATCH_SIZE))
.
Neither is particularly readable, but split_off
is at least shorter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like split_off
more, but I think it's a question of what you are used to. I used split_off
in both the driver and the server, so I may get us more used to it =)
_phantom_data: PhantomData<M>, | ||
} | ||
|
||
impl<M: Message + Default, R: BufRead> ProtoMessageIterator<M, R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Manual import data files' decoding. All the prost
's helpers expect an impl of bytes::Buf
, which requires an in-memory buffer and cannot be used on a file reader (e.g., writing a where I consume the whole file and then process it is 20-30 times smaller in code volume). At the same time, it's unclear how many bytes to read from the file without more manual operations.
The idea is the following: as the data is delimited by length, we need to read the length first, then read exactly length
bytes to decode the item. Then, all the other bytes should be used for the next item, so they should be preserved in the buffer.
let schema_ref: &str = schema.as_ref(); | ||
let data_file_path = data_file_path.as_ref(); | ||
self.run_failsafe(name, |server_connection, name| async move { | ||
let mut import_stream = server_connection.import_database(name, schema_ref.to_string()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't wait for a server response here and proceed to read the file and send its items. However, if an error is returned, we will stop on one of the .items(...)
calls.
rust/src/database/migration.rs
Outdated
} | ||
} | ||
|
||
pub(crate) fn try_creating_export_file(path: impl AsRef<Path>) -> Result<File> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No forced filename format.
CI is red because there is no server artifact yet. |
|
||
// Makes sure that the shutdown signal is sent when it's dropped. Can try sending redundant shutdown | ||
// signals after another shutdown source: the channel is expected to be closed (no-op). | ||
pub(crate) struct ShutdownGuard<T: Default> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we always counted on the server to drop the stream. However, for import, we are the owners of the stream, and we should make sure to terminate it for both
- GRPC connection dropping because of the server or the network
- Our error leading to resource deallocation <- this struct helps specifically with this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming and style comments aside, looks good.
rust/tests/behaviour/steps/lib.rs
Outdated
use crate::util::{create_temp_dir, TempDir}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confused auto-import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks T_T
c/src/database_manager.rs
Outdated
use crate::{error::try_release_arc, iterator::iterator_arc_next}; | ||
use crate::{error::try_release_arc, iterator::iterator_arc_next, memory::borrow}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused?
Self { export_transmitter } | ||
} | ||
|
||
pub(crate) fn listen<'a>(&'a mut self) -> impl Promise<'a, Result<DatabaseExportAnswer>> + 'a { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listen
normally refers to a loop. This is more like accept
or even next
.
Self { import_transmitter } | ||
} | ||
|
||
pub(crate) fn items(&mut self, items: Vec<migration::Item>) -> Result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_items
then. Or even just send
, the items are the argument.
rust/src/connection/message.rs
Outdated
DatabaseExportStream { | ||
response_source: Streaming<database::export::Server>, | ||
}, | ||
DatabaseExportAnswer { | ||
response_source: Streaming<database::export::Server>, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Response::DatabaseExportAnswer
used? It seems to be a doublet of DatabaseExportStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it got redundant after a couple of protocol refactoring iterations. Good catch
} | ||
} | ||
|
||
async fn listen( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as listen
above, this should be renamed
} | ||
} | ||
|
||
data_writer.flush()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a benefit? If the driver crashes you'd have to restart the export process anyway.
#[cfg_attr(not(feature = "sync"), doc = "driver.databases().import_file(name, schema, data_path).await;")] | ||
/// ``` | ||
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)] | ||
pub async fn import_file( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import_database_from_file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_database_ is clearly redundant, we don't call
create
create_databaseinstead.
from` is fine, I will use it, alright. Although it feels redundant.
let item = item?; | ||
item_buffer.push(item); | ||
if item_buffer.len() >= ITEM_BATCH_SIZE { | ||
import_stream.items(item_buffer.split_off(0))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is mem::replace(&mut item_buffer, Vec::with_capacity(ITEM_BATCH_SIZE))
.
Neither is particularly readable, but split_off
is at least shorter.
rust/src/database/migration.rs
Outdated
} | ||
} | ||
|
||
pub(crate) fn try_creating_export_file(path: impl AsRef<Path>) -> Result<File> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_create_...
and so on, ing
is against convention.
3fe2be9
to
b536c8b
Compare
## Usage and product changes Add database export and database import operations. Database export saves the database information (its schema and data) on the client machine as two files at provided locations: ``` # database export <name> <schema file location> <data file location> database export my-database export/my-database.typeql export/my-database.typedb ``` Database import uses the exported files to create a new database with equivalent schema and data: ``` # database export <name> <schema file location> <data file location> database import their-database export/my-database.typeql export/my-database.typedb ``` ## Implementation Add new commands similar to other `database` interfaces. Prepare data to call the respective [typedb-driver APIs](typedb/typedb-driver#758).
Usage and product changes
Introduce interfaces to export databases into schema definition and data files and to import databases using these files. Database import supports files exported from both TypeDB 2.x and TypeDB 3.x.
Both operations are blocking and may take a significant amount of time to execute for large databases. Use parallel connections to continue operating with the server and its other databases.
Usage examples in Rust:
Usage examples in Python:
Usage examples in Java:
Implementation
Implemented the updated protocol.
As both operations work with streaming, the implementation is similar to transactions. The behavior is split into the file processing logic and networking (specialized for sync and async modes). The exposed interfaces present only the file-based versions, but additional interfaces for direct work with streams can be presented in future updates.
In Rust, paths are accepted as Rust
Path
s. In other languages working through C interfaces, it's pure strings for C layer transmission.Database export
Implemented through the
database
interface and accepts two target files for export. Does not require a specific format of naming (so it's necessarily.typeql
or.typedb
. If any of the target files already exist, an error is returned.The export operation consists of these steps:
listen
promise presented by the network layer)The network layer is basically just a task listening for the GRPC stream and transmitting the converted messages to the processing loop.
Database import
Implemented through the
database_manager
interface and accepts a database name, a schema definition query string (can be read from the exported file), and an exported data file. No naming requirements as well.The import operation consists of these steps:
The network layer consists of a blocking task for client-side requests and a listening task waiting for a one-shot signal from the server (either an error or a "done" message). Errors can be received at any time of processing, while "done" is expected only after a client-side "done" request. When a response is received, either an async or a sync sink receives this message, which should be checked before any client-side network operations to ensure proper interruption.