Skip to content

Conversation

@matthewgapp
Copy link
Contributor

@matthewgapp matthewgapp commented Jan 4, 2024

Resolves #39

Decided to have a go at implementing the reducer trait so that custom server dead letters and such can be inserted based on mutation content and the user's specific business logic. Seems to be working fine in my server but haven't tested exhaustively.

Then users can use it like so

struct CustomReducer {
    wasm_reducer: sqlsync::reducer::WasmReducer,
}

impl sqlsync::reducer::Reducer for CustomReducer {
    fn apply(
        &mut self,
        tx: &mut sqlsync::sqlite::Transaction,
        mutation: &[u8],
    ) -> sqlsync::reducer::Result<()> {
        // custom logic goes here
        /*
        match self.customInner.validate(mutation) => { 
            Err(e) => { 
                tx.execute("INSERT INTO errors (error) VALUES (?)", params![e.to_string()])?;
            }
        }
         */

		// can use SQLSync WasmReducer or just use the tx handle yourself?
        self.wasm_reducer.apply(tx, mutation)
    }
}

Most of the lines are changed because of auto formatting.

@matthewgapp matthewgapp changed the title Introduce Reducer Triat Introduce Reducer Trait Jan 5, 2024
@matthewgapp matthewgapp marked this pull request as ready for review January 5, 2024 00:46
@carlsverre carlsverre self-requested a review January 5, 2024 23:08
Copy link
Contributor

@carlsverre carlsverre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks solid. In order to land this make sure that the following commands run with no error and no compiler warnings (like unused imports):

just build
just unit-test
just test-end-to-end-local-net
just test-end-to-end-local
just package-sqlsync-worker

I'll also figure out how to get your PR's running in github actions.

use crate::error::Result;
use crate::reducer::Reducer;
use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource};
use crate::reducer::{Reducer, WasmReducer};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import WasmReducer

storage: J,
timeline_factory: J::Factory,
reducer_wasm_bytes: &[u8],
reducer: R, // reducer_wasm_bytes: &[u8],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment


/// CoordinatorDocument knows how to replicate it's storage journal
impl<J: Journal + ReplicationSource> ReplicationSource for CoordinatorDocument<J> {
impl<J: Journal + ReplicationSource, R: Reducer> ReplicationSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type constraint not needed here


use serde::{Deserialize, Serialize};

/// Log Sequence Number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the only actual change in this file? Can you revert the other changes since I think they are just due to our rustfmt settings differing. I'll figure out how to enforce consistent rustfmt settings for the whole repo to prevent this in the future.

Comment on lines +40 to +48
pub trait Reducer {
fn apply(&mut self, tx: &mut Transaction, mutation: &[u8]) -> Result<()>;
}

impl Reducer for WasmReducer {
fn apply(&mut self, tx: &mut Transaction, mutation: &[u8]) -> Result<()> {
WasmReducer::apply(self, tx, mutation)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty stoked how clean this ended up being. If you have time, I would prefer to just move the WasmReducer apply impl into this impl, which probably means you'll need to make the reducer generic in local.rs (which looks like it's just a couple lines of changes a'la coordinator). No worries if you leave this, I can do it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing WasmReducer on line 116 I think.

error[E0107]: struct takes 2 generic arguments but 1 generic argument was supplied
   --> lib/sqlsync/examples/end-to-end-local-net.rs:116:20
    |
116 |     doc: Arc<Mutex<CoordinatorDocument<MemoryJournal>>>,
    |                    ^^^^^^^^^^^^^^^^^^^ ------------- supplied 1 generic argument
    |                    |
    |                    expected 2 generic arguments
    |
note: struct defined here, with 2 generic parameters: `J`, `R`
   --> /home/carl/dev/sqlsync/lib/sqlsync/src/coordinator.rs:25:12
    |
25  | pub struct CoordinatorDocument<J: Journal, R> {
    |            ^^^^^^^^^^^^^^^^^^^ -           -
help: add missing generic argument
    |
116 |     doc: Arc<Mutex<CoordinatorDocument<MemoryJournal, R>>>,
    |                                                     +++
    ```

@carlsverre
Copy link
Contributor

I think if you rebase this on main it should prompt me to run tests on your PRs.

@carlsverre
Copy link
Contributor

I'm going to clean this PR up and merge it now - just pinging incase you are also doing so.

@carlsverre carlsverre merged commit 7f1c10d into orbitinghail:main Jan 6, 2024
@matthewgapp
Copy link
Contributor Author

I'm going to clean this PR up and merge it now - just pinging incase you are also doing so.

Thanks! Sorry for the lint issues. My LSP was falling over yesterday.

Copy link

@Williamug Williamug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

pluggable reducer

3 participants