-
-
Notifications
You must be signed in to change notification settings - Fork 296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Duplicate published doc #700
Conversation
libs/database/src/publish.rs
Outdated
|
||
if res.rows_affected() != item_count as u64 { | ||
tracing::warn!( | ||
"Failed to insert or replace publish collab meta batch, workspace_id: {}, publisher_uuid: {}, rows_affected: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a partial failure - some rows might have been updated, some might not:
- If you want to be strongly consistent (either all collabs were duplicated or none of them are), you should rollback transaction at this point.
- If you want to be elastic, we should probably return row_affected count and inform user how many collabs have we duplicated.
Also log both {rows_affected}
and {item_count}
so that we have a clue how many were missed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Horusiath iirc {rows_affected}
account for both modified and inserted. my intent for every collab, it is either inserted or modified. I shall modify the code to log both the {rows_affected} and
{item_count}`, if they differ.
} | ||
|
||
#[inline] | ||
pub async fn insert_or_replace_publish_collabs<'a, E: Executor<'a, Database = Postgres>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking this might be a very heavy query. We know that individual collabs can have MBs in size. If there's no upper bound on their size/count, we might have as well lock the table for undefined time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is an upper bound 4M for metadata, 128MB for blob. that was check before the request came in post_publish_collabs_handler
. do you think these upper bounds are reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tbh. I think that 128MB could be used to DDoS us. But we can adjust it accordingly in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also a max limit of 256MB for a single request in our load balancer on this particular endpoint. in additional, this also consumes workspace storage quota, so it would be a matter of tries before the user's limit is reached and will be blocked.
|
||
if let Some(group) = self.group_manager.get_group(&object_id).await { | ||
let (collab_message_sender, _collab_message_receiver) = futures::channel::mpsc::channel(1); | ||
let (mut message_by_oid_sender, message_by_oid_receiver) = futures::channel::mpsc::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use tokio::sync::mpsc::channel
: tokio is often more mature than futures. If generic constraints allow that you could also use tokio::sync::oneshot
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have tried. but based on the subscribe
function signature, it's not possible to use the tokio channel
pub async fn subscribe<Sink, Stream>(
&self,
user: &RealtimeUser,
subscriber_origin: CollabOrigin,
sink: Sink,
stream: Stream,
) where
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
Stream: StreamExt<Item = MessageByObjectId> + Send + Sync + Unpin + 'static,
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
use shared_entity::dto::workspace_dto::FolderView; | ||
use uuid::Uuid; | ||
|
||
pub fn collab_folder_to_folder_view(folder: &Folder, depth: u32) -> FolderView { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like method that could be implemented directly on FolderView
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apparently FolderView
is in shared-entity
crate to share structure between server and client. putting the implementation as where you mentioned will mean that the shared-entity
crate will have to depend on the collab-folder
for Folder
, which we want to avoid having.
for publish_item in &publish_items { | ||
check_collab_publish_name(publish_item.meta.publish_name.as_str())?; | ||
} | ||
insert_or_replace_publish_collabs(pg_pool, workspace_id, publisher_uuid, publish_items).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's no explicit Postgres transaction intiialization I can only assume that every sqlx query executed here will be wrapped and committed independently.
Another issue is that publish_items has no upper bound check. We basically copy unlimited number of collabs of unlimited size. With no timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's only 1 sqlx operation for this api call, which is the insert_or_replace_publish_collabs
. so there's no need for transaction. as mentioned above, upper bound is checked when api is call before it reaches the this part
src/biz/workspace/publish_dup.rs
Outdated
// new view after deep copy | ||
// this is the root of the document/database duplicated | ||
let mut root_view = match self | ||
.deep_copy(&mut txn, uuid::Uuid::new_v4().to_string(), publish_view_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to say how CPU intensive the deep copy is. Ideally we don't want to hold transactions for too long (as we have upper limit of concurrent connections in PgPool).
- If possible we could prepare blobs first, then begin transaction and insert them.
- Alternatively if pt.1 uses too much memory, insert collabs one-by-one each one in separate transaction in their view parent order.
Maybe it would be a good idea to also add metrics to measure time spent on duplicating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed to accumulating all the collabs to insert without holding on to a postgres txn as you mentioned. We already have the metrics to measure the frequency and latency of each http endpoint (including this one), that would suffice for now. when we need more thorough analysis I'll add them to be included to the /metrics
.await?; | ||
|
||
collab_storage | ||
.insert_new_collab_with_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using insert_new_collab
? it will save the collab in memory and the write to disk in the background.
No description provided.