Skip to content

Commit

Permalink
[pub sub] - Event subscription using event filter (MystenLabs#2589)
Browse files Browse the repository at this point in the history
* filter impl and improvement for pub sub

* * Add sender and digest to delete and publish event
* some refactoring

* * Add sender, digest, module, package, function and recipient to new_object event

* remove tx digest from Event

* added unit test for filter and event_handler

* pubsub docs

* Make direct edits to new pubsub page

* address PR comment
Fix test

* rename instigator -> sender

* remove destination_addr

* address PR comment
fix test

* remove function from Event and event filters
address PR comments

* return EventEnvelope in subscription

* fixup after rebase

* populate transaction digest in event envelope

* fix test

* Make second round of edits to new pubsub doc

* address PR comment

Co-authored-by: Clay-Mysten <100217682+Clay-Mysten@users.noreply.github.com>
  • Loading branch information
patrickkuo and Clay-Mysten authored Jun 27, 2022
1 parent b112a06 commit 0a30a71
Show file tree
Hide file tree
Showing 25 changed files with 2,064 additions and 880 deletions.
2 changes: 2 additions & 0 deletions crates/generate-json-rpc-spec/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use sui_json_rpc::SuiRpcModule;
use sui_json_rpc_api::rpc_types::{
GetObjectDataResponse, SuiObjectInfo, TransactionEffectsResponse, TransactionResponse,
};
use sui_json_rpc_api::EventApiOpenRpc;
use sui_json_rpc_api::QuorumDriverApiClient;
use sui_json_rpc_api::RpcReadApiClient;
use sui_json_rpc_api::RpcTransactionBuilderClient;
Expand Down Expand Up @@ -82,6 +83,7 @@ async fn main() {
open_rpc.add_module(ReadApi::rpc_doc_module());
open_rpc.add_module(FullNodeApi::rpc_doc_module());
open_rpc.add_module(BcsApiImpl::rpc_doc_module());
open_rpc.add_module(EventApiOpenRpc::module_doc());

match options.action {
Action::Print => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ created: object(108)
written: object(107)

task 6 'run'. lines 18-18:
events: MoveEvent(MoveObject { type_: StructTag { address: sui, module: Identifier("object_basics"), name: Identifier("NewValueEvent"), type_params: [] }, contents: [20, 0, 0, 0, 0, 0, 0, 0] })
events: MoveEvent { package_id: sui, transaction_module: Identifier("object_basics"), sender: B, type_: StructTag { address: sui, module: Identifier("object_basics"), name: Identifier("NewValueEvent"), type_params: [] }, contents: [20, 0, 0, 0, 0, 0, 0, 0] }
written: object(105), object(108), object(109)

task 7 'run'. lines 20-20:
Expand Down
101 changes: 66 additions & 35 deletions crates/sui-adapter/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use std::{
borrow::Borrow,
collections::{BTreeMap, BTreeSet, HashSet},
convert::TryFrom,
fmt::Debug,
};

use crate::bytecode_rewriter::ModuleHandleRewriter;
use anyhow::Result;
use move_binary_format::{
access::ModuleAccess,
binary_views::BinaryIndexedView,
errors::PartialVMResult,
file_format::{CompiledModule, LocalIndex, SignatureToken, StructHandleIndex},
};
use move_core_types::{
account_address::AccountAddress,
identifier::Identifier,
language_storage::{ModuleId, StructTag, TypeTag},
resolver::{ModuleResolver, ResourceResolver},
};
pub use move_vm_runtime::move_vm::MoveVM;
use move_vm_runtime::{native_functions::NativeFunctionTable, session::SerializedReturnValues};

use sui_framework::EventType;
use sui_types::{
base_types::*,
Expand All @@ -27,22 +41,8 @@ use sui_verifier::{
verifier,
};

use move_core_types::{
account_address::AccountAddress,
identifier::Identifier,
language_storage::{ModuleId, StructTag, TypeTag},
resolver::{ModuleResolver, ResourceResolver},
};
use move_vm_runtime::{native_functions::NativeFunctionTable, session::SerializedReturnValues};
use std::{
borrow::Borrow,
collections::{BTreeMap, BTreeSet, HashSet},
convert::TryFrom,
fmt::Debug,
};

use crate::bytecode_rewriter::ModuleHandleRewriter;
use crate::object_root_ancestor_map::ObjectRootAncestorMap;
pub use move_vm_runtime::move_vm::MoveVM;

pub fn new_move_vm(natives: NativeFunctionTable) -> Result<MoveVM, SuiError> {
MoveVM::new(natives).map_err(|_| SuiError::ExecutionInvariantViolation)
Expand Down Expand Up @@ -197,6 +197,7 @@ fn execute_internal<
.collect();
process_successful_execution(
state_view,
module_id,
by_value_object_map,
mutable_refs,
events,
Expand Down Expand Up @@ -227,7 +228,10 @@ pub fn publish<E: Debug, S: ResourceResolver<Error = E> + ModuleResolver<Error =

let package_id = generate_package_id(&mut modules, ctx)?;
let vm = verify_and_link(state_view, &modules, package_id, natives, gas_status)?;
state_view.log_event(Event::Publish { package_id });
state_view.log_event(Event::Publish {
sender: ctx.sender(),
package_id,
});
store_package_and_init_modules(state_view, &vm, modules, ctx, gas_status)
}

Expand Down Expand Up @@ -394,6 +398,7 @@ fn process_successful_execution<
S: ResourceResolver<Error = E> + ModuleResolver<Error = E> + Storage,
>(
state_view: &mut S,
module_id: &ModuleId,
mut by_value_objects: BTreeMap<ObjectID, (object::Owner, SequenceNumber)>,
mutable_refs: Vec<(ObjectID, Vec<u8>)>,
events: Vec<MoveEvent>,
Expand Down Expand Up @@ -438,12 +443,14 @@ fn process_successful_execution<
_ => unreachable!(),
};
handle_transfer(
ctx.sender(),
new_owner,
type_,
event_bytes,
tx_digest,
&mut by_value_objects,
state_view,
module_id,
&mut object_owner_map,
&newly_generated_ids,
)
Expand All @@ -467,7 +474,12 @@ fn process_successful_execution<
return Err(ExecutionErrorKind::DeleteObjectOwnedObject.into());
}
Some(_) => {
state_view.log_event(Event::DeleteObject(*obj_id));
state_view.log_event(Event::delete_object(
module_id.address(),
module_id.name(),
ctx.sender(),
*obj_id,
));
state_view.delete_object(obj_id, id.version(), DeleteKind::Normal)
}
None => {
Expand All @@ -478,7 +490,12 @@ fn process_successful_execution<
// it will also have version `v+1`, leading to a violation of the invariant that any
// object_id and version pair must be unique. Hence for any object that's just unwrapped,
// we force incrementing its version number again to make it `v+2` before writing to the store.
state_view.log_event(Event::DeleteObject(*obj_id));
state_view.log_event(Event::delete_object(
module_id.address(),
module_id.name(),
ctx.sender(),
*obj_id,
));
state_view.delete_object(
obj_id,
id.version().increment(),
Expand All @@ -500,7 +517,13 @@ fn process_successful_execution<
}
EventType::User => {
match type_ {
TypeTag::Struct(s) => state_view.log_event(Event::move_event(s, event_bytes)),
TypeTag::Struct(s) => state_view.log_event(Event::move_event(
module_id.address(),
module_id.name(),
ctx.sender(),
s,
event_bytes,
)),
_ => unreachable!(
"Native function emit_event<T> ensures that T is always bound to structs"
),
Expand All @@ -525,12 +548,14 @@ fn handle_transfer<
E: Debug,
S: ResourceResolver<Error = E> + ModuleResolver<Error = E> + Storage,
>(
sender: SuiAddress,
recipient: Owner,
type_: TypeTag,
contents: Vec<u8>,
tx_digest: TransactionDigest,
by_value_objects: &mut BTreeMap<ObjectID, (object::Owner, SequenceNumber)>,
state_view: &mut S,
module_id: &ModuleId,
object_owner_map: &mut BTreeMap<SuiAddress, SuiAddress>,
newly_generated_ids: &HashSet<ObjectID>,
) -> Result<(), ExecutionError> {
Expand All @@ -557,7 +582,13 @@ fn handle_transfer<
if old_object.is_none() {
// Newly created object
if newly_generated_ids.contains(&obj_id) {
state_view.log_event(Event::NewObject(obj_id));
state_view.log_event(Event::new_object(
module_id.address(),
module_id.name(),
sender,
recipient,
obj_id,
));
} else {
// When an object was wrapped at version `v`, we added an record into `parent_sync`
// with version `v+1` along with OBJECT_DIGEST_WRAPPED. Now when the object is unwrapped,
Expand All @@ -569,21 +600,21 @@ fn handle_transfer<
} else if let Some((_, old_obj_ver)) = old_object {
// Some kind of transfer since there's an old object
// Add an event for the transfer

match recipient {
Owner::AddressOwner(addr) => state_view.log_event(Event::TransferObject {
object_id: obj_id,
version: old_obj_ver,
destination_addr: addr,
type_: TransferType::ToAddress,
}),
Owner::ObjectOwner(new_owner) => state_view.log_event(Event::TransferObject {
let transfer_type = match recipient {
Owner::AddressOwner(_) => Some(TransferType::ToAddress),
Owner::ObjectOwner(_) => Some(TransferType::ToObject),
_ => None,
};
if let Some(type_) = transfer_type {
state_view.log_event(Event::TransferObject {
package_id: ObjectID::from(*module_id.address()),
transaction_module: Identifier::from(module_id.name()),
sender,
recipient,
object_id: obj_id,
version: old_obj_ver,
destination_addr: new_owner,
type_: TransferType::ToObject,
}),
_ => {}
type_,
})
}
}
let obj = Object::new_move(move_obj, recipient, tx_digest);
Expand Down
87 changes: 0 additions & 87 deletions crates/sui-core/src/event_filter.rs

This file was deleted.

Loading

0 comments on commit 0a30a71

Please sign in to comment.