Skip to content

Commit

Permalink
feat: use sqlx row stream to load collab policy (AppFlowy-IO#217)
Browse files Browse the repository at this point in the history
* feat: use sqlx row stream to load collab policy

* fix: access control object type
  • Loading branch information
speed2exe authored Dec 17, 2023
1 parent 1eed034 commit 24ab19f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 99 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions libs/database-entity/src/pg_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ pub struct AFWorkspaceMemberRow {
pub role: AFRole,
}

#[derive(FromRow)]
pub struct AFCollabMemerAccessLevelRow {
pub uid: i64,
pub oid: String,
pub access_level: AFAccessLevel,
}

#[derive(FromRow, Clone, Debug, Serialize, Deserialize)]
pub struct AFCollabMemberRow {
pub uid: i64,
Expand Down
37 changes: 18 additions & 19 deletions libs/database/src/collab/collab_db_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use database_entity::dto::{
BatchQueryCollab, InsertCollabParams, QueryCollabResult, RawData,
};

use database_entity::pg_row::AFCollabMemerAccessLevelRow;

use app_error::AppError;
use futures_util::stream::BoxStream;
use sqlx::postgres::PgRow;
use sqlx::{Error, Executor, PgPool, Postgres, Row, Transaction};
use std::collections::HashMap;
Expand Down Expand Up @@ -101,7 +104,7 @@ pub async fn insert_into_af_collab(
// Get the permission_id of the Owner
let permission_id: i32 = sqlx::query_scalar!(
r#"
SELECT rp.permission_id
SELECT rp.permission_id
FROM af_role_permissions rp
JOIN af_roles ON rp.role_id = af_roles.id
WHERE af_roles.name = 'Owner';
Expand Down Expand Up @@ -340,7 +343,7 @@ pub async fn upsert_collab_member_with_txn<T: AsRef<str> + Debug>(
.context("Get permission id from access level fail")?;

sqlx::query!(
r#"
r#"
INSERT INTO af_collab_member (uid, oid, permission_id)
VALUES ($1, $2, $3)
ON CONFLICT (uid, oid)
Expand Down Expand Up @@ -382,24 +385,20 @@ pub async fn delete_collab_member(uid: i64, oid: &str, pg_pool: &PgPool) -> Resu
Ok(())
}

#[instrument(level = "info", skip_all, err)]
pub async fn select_all_collab_members(
pub fn select_collab_member_access_level(
pg_pool: &PgPool,
) -> Result<Vec<(String, Vec<AFCollabMember>)>, AppError> {
let collabs: Vec<_> = sqlx::query!("SELECT DISTINCT af_collab_member.oid FROM af_collab_member")
.fetch_all(pg_pool)
.await?
.into_iter()
.map(|r| r.oid)
.collect();

let mut collab_members = Vec::with_capacity(collabs.len());
for oid in collabs {
let members = select_collab_members(&oid, pg_pool).await?;
collab_members.push((oid, members));
}

Ok(collab_members)
) -> BoxStream<'_, sqlx::Result<AFCollabMemerAccessLevelRow>> {
sqlx::query_as!(
AFCollabMemerAccessLevelRow,
r#"
SELECT
uid, oid, access_level
FROM af_collab_member
INNER JOIN af_permissions
ON af_collab_member.permission_id = af_permissions.id
"#
)
.fetch(pg_pool)
}

#[inline]
Expand Down
7 changes: 3 additions & 4 deletions libs/database/src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,12 @@ pub async fn delete_workspace_members(

pub fn select_workspace_member_perm_stream(
pg_pool: &PgPool,
) -> Result<BoxStream<'_, sqlx::Result<AFWorkspaceMemberPermRow>>, AppError> {
let stream = sqlx::query_as!(
) -> BoxStream<'_, sqlx::Result<AFWorkspaceMemberPermRow>> {
sqlx::query_as!(
AFWorkspaceMemberPermRow,
"SELECT uid, role_id as role, workspace_id FROM af_workspace_member"
)
.fetch(pg_pool);
Ok(stream)
.fetch(pg_pool)
}

/// returns a list of workspace members, sorted by their creation time.
Expand Down
39 changes: 19 additions & 20 deletions src/biz/casbin/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use casbin::Adapter;
use casbin::Filter;
use casbin::Model;
use casbin::Result;
use database::collab::select_all_collab_members;
use database::collab::select_collab_member_access_level;
use database::workspace::select_workspace_member_perm_stream;
use database_entity::dto::AFAccessLevel;
use database_entity::dto::AFCollabMember;
use database_entity::pg_row::AFCollabMemerAccessLevelRow;
use database_entity::pg_row::AFWorkspaceMemberPermRow;
use futures_util::stream::BoxStream;
use sqlx::PgPool;
Expand All @@ -26,21 +26,23 @@ impl PgAdapter {
}
}

fn create_collab_policies(collab_members: Vec<(String, Vec<AFCollabMember>)>) -> Vec<Vec<String>> {
async fn create_collab_policies(
mut stream: BoxStream<'_, sqlx::Result<AFCollabMemerAccessLevelRow>>,
) -> Result<Vec<Vec<String>>> {
let mut policies: Vec<Vec<String>> = Vec::new();
for (oid, members) in collab_members {
for m in members {
let p = [
m.uid.to_string(),
ObjectType::Collab(&oid).to_string(),
i32::from(m.permission.access_level).to_string(),
]
.to_vec();
policies.push(p);
}

while let Some(result) = stream.next().await {
let member_access_lv = result.map_err(|err| AdapterError(Box::new(err)))?;
let policy = [
member_access_lv.uid.to_string(),
ObjectType::Collab(&member_access_lv.oid).to_string(),
i32::from(member_access_lv.access_level).to_string(),
]
.to_vec();
policies.push(policy);
}

policies
Ok(policies)
}

async fn create_workspace_policies(
Expand All @@ -65,18 +67,15 @@ async fn create_workspace_policies(
#[async_trait]
impl Adapter for PgAdapter {
async fn load_policy(&mut self, model: &mut dyn Model) -> Result<()> {
let workspace_member_perm_stream = select_workspace_member_perm_stream(&self.pg_pool)
.map_err(|err| AdapterError(Box::new(err)))?;
let workspace_member_perm_stream = select_workspace_member_perm_stream(&self.pg_pool);
let workspace_policies = create_workspace_policies(workspace_member_perm_stream).await?;

// Policy definition `p` of type `p`. See `model.conf`
model.add_policies("p", "p", workspace_policies);

let collab_members = select_all_collab_members(&self.pg_pool)
.await
.map_err(|err| AdapterError(Box::new(err)))?;
let collab_member_access_lv_stream = select_collab_member_access_level(&self.pg_pool);
let collab_policies = create_collab_policies(collab_member_access_lv_stream).await?;

let collab_policies = create_collab_policies(collab_members);
// Policy definition `p` of type `p`. See `model.conf`
model.add_policies("p", "p", collab_policies);

Expand Down

0 comments on commit 24ab19f

Please sign in to comment.