Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate published doc #700

Merged
merged 129 commits into from
Sep 3, 2024
Merged

Duplicate published doc #700

merged 129 commits into from
Sep 3, 2024

Conversation

speed2exe
Copy link
Contributor

No description provided.


if res.rows_affected() != item_count as u64 {
tracing::warn!(
"Failed to insert or replace publish collab meta batch, workspace_id: {}, publisher_uuid: {}, rows_affected: {}",
Copy link
Collaborator

@Horusiath Horusiath Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a partial failure - some rows might have been updated, some might not:

  1. If you want to be strongly consistent (either all collabs were duplicated or none of them are), you should rollback transaction at this point.
  2. If you want to be elastic, we should probably return row_affected count and inform user how many collabs have we duplicated.

Also log both {rows_affected} and {item_count} so that we have a clue how many were missed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Horusiath iirc {rows_affected} account for both modified and inserted. my intent for every collab, it is either inserted or modified. I shall modify the code to log both the {rows_affected} and {item_count}`, if they differ.

}

#[inline]
pub async fn insert_or_replace_publish_collabs<'a, E: Executor<'a, Database = Postgres>>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking this might be a very heavy query. We know that individual collabs can have MBs in size. If there's no upper bound on their size/count, we might have as well lock the table for undefined time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an upper bound 4M for metadata, 128MB for blob. that was check before the request came in post_publish_collabs_handler. do you think these upper bounds are reasonable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh. I think that 128MB could be used to DDoS us. But we can adjust it accordingly in the future.

Copy link
Contributor Author

@speed2exe speed2exe Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a max limit of 256MB for a single request in our load balancer on this particular endpoint. in additional, this also consumes workspace storage quota, so it would be a matter of tries before the user's limit is reached and will be blocked.

libs/database/src/publish.rs Outdated Show resolved Hide resolved

if let Some(group) = self.group_manager.get_group(&object_id).await {
let (collab_message_sender, _collab_message_receiver) = futures::channel::mpsc::channel(1);
let (mut message_by_oid_sender, message_by_oid_receiver) = futures::channel::mpsc::channel(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use tokio::sync::mpsc::channel: tokio is often more mature than futures. If generic constraints allow that you could also use tokio::sync::oneshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have tried. but based on the subscribe function signature, it's not possible to use the tokio channel

  pub async fn subscribe<Sink, Stream>(
    &self,
    user: &RealtimeUser,
    subscriber_origin: CollabOrigin,
    sink: Sink,
    stream: Stream,
  ) where
    Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
    Stream: StreamExt<Item = MessageByObjectId> + Send + Sync + Unpin + 'static,
    <Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,

use shared_entity::dto::workspace_dto::FolderView;
use uuid::Uuid;

pub fn collab_folder_to_folder_view(folder: &Folder, depth: u32) -> FolderView {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like method that could be implemented directly on FolderView.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apparently FolderView is in shared-entity crate to share structure between server and client. putting the implementation as where you mentioned will mean that the shared-entity crate will have to depend on the collab-folder for Folder, which we want to avoid having.

for publish_item in &publish_items {
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
}
insert_or_replace_publish_collabs(pg_pool, workspace_id, publisher_uuid, publish_items).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's no explicit Postgres transaction intiialization I can only assume that every sqlx query executed here will be wrapped and committed independently.

Another issue is that publish_items has no upper bound check. We basically copy unlimited number of collabs of unlimited size. With no timeouts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's only 1 sqlx operation for this api call, which is the insert_or_replace_publish_collabs. so there's no need for transaction. as mentioned above, upper bound is checked when api is call before it reaches the this part

// new view after deep copy
// this is the root of the document/database duplicated
let mut root_view = match self
.deep_copy(&mut txn, uuid::Uuid::new_v4().to_string(), publish_view_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to say how CPU intensive the deep copy is. Ideally we don't want to hold transactions for too long (as we have upper limit of concurrent connections in PgPool).

  1. If possible we could prepare blobs first, then begin transaction and insert them.
  2. Alternatively if pt.1 uses too much memory, insert collabs one-by-one each one in separate transaction in their view parent order.

Maybe it would be a good idea to also add metrics to measure time spent on duplicating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed to accumulating all the collabs to insert without holding on to a postgres txn as you mentioned. We already have the metrics to measure the frequency and latency of each http endpoint (including this one), that would suffice for now. when we need more thorough analysis I'll add them to be included to the /metrics

src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
services/appflowy-collaborate/src/collab/storage.rs Outdated Show resolved Hide resolved
services/appflowy-collaborate/src/command.rs Outdated Show resolved Hide resolved
services/appflowy-collaborate/src/group/cmd.rs Outdated Show resolved Hide resolved
services/appflowy-collaborate/src/group/cmd.rs Outdated Show resolved Hide resolved
src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
src/biz/workspace/publish_dup.rs Outdated Show resolved Hide resolved
.await?;

collab_storage
.insert_new_collab_with_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using insert_new_collab? it will save the collab in memory and the write to disk in the background.

@appflowy appflowy merged commit 826546c into main Sep 3, 2024
9 checks passed
@appflowy appflowy deleted the duplicate-published-doc branch September 3, 2024 01:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants