Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/observer filters #5570

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 215 additions & 19 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#[cfg(test)]
use std::cell::RefCell;
rdeioris marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
Expand Down Expand Up @@ -320,7 +322,7 @@ impl RewardSetEventPayload {
}

#[cfg(test)]
static TEST_EVENT_OBSERVER_SKIP_RETRY: std::sync::Mutex<Option<bool>> = std::sync::Mutex::new(None);
thread_local! { static TEST_EVENT_OBSERVER_SKIP_RETRY: RefCell<bool> = RefCell::new(false); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use Cell here instead of RefCell?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can you add doc comment to this variable? Skip retry of what? I'm not familiar with this code so I don't know

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why does it have to be thread-local? If it doesn't have to be thread-local, can we just use the TestFlag struct?

Copy link
Contributor Author

@rdeioris rdeioris Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hstove tests are executed in parallel so there is a race condition between the set and clear of TEST_EVENT_OBSERVER_SKIP_RETRY (this caused the 3 related tests to often fail when executed together)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense, thanks!


impl EventObserver {
fn init_db(db_path: &str) -> Result<Connection, db_error> {
Expand Down Expand Up @@ -430,11 +432,7 @@ impl EventObserver {
Self::send_payload_directly(&payload, &url, timeout);

#[cfg(test)]
if TEST_EVENT_OBSERVER_SKIP_RETRY
.lock()
.unwrap()
.unwrap_or(false)
{
if TEST_EVENT_OBSERVER_SKIP_RETRY.with(|v| *v.borrow()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pervasively, instead of testing a global variable container for a global that should only be compiled for test code, can you create a helper function with an informative name, and a test and non-test body? Like this:

use std::sync::LazyLock;
use crate::run_loop::neon::TestFlag;

const TEST_EVENT_OBSERVER_SKIP_RETRY : LazyLock<TestFlag> = LazyLock::new(Testflag::default);

/// Add a helpful docstring here
#[cfg(any(test, feature = "testing"))]
fn fault_injection_skip_observer_entry() -> bool {
   TEST_EVENT_OBSERVER_SKIP_RETRY.get()
}

#[cfg(not(any(test, feature = "testing")))]
fn fault_injection_skip_observer_entry() -> bool {
   false
}

warn!("Fault injection: delete_payload");
return;
}
Expand Down Expand Up @@ -499,11 +497,7 @@ impl EventObserver {
}

#[cfg(test)]
if TEST_EVENT_OBSERVER_SKIP_RETRY
.lock()
.unwrap()
.unwrap_or(false)
{
if TEST_EVENT_OBSERVER_SKIP_RETRY.with(|v| *v.borrow()) {
warn!("Fault injection: skipping retry of payload");
return;
}
Expand Down Expand Up @@ -1272,6 +1266,12 @@ impl EventDispatcher {
block_timestamp: Option<u64>,
coinbase_height: u64,
) {
let interested_observers =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this get passed into create_dispatch_matrix_and_event_vector? and used there? Otherwise, I think this is still sending to observers that should have been filtered out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this new code can just be removed.

self.filter_observers(&self.block_proposal_observers_lookup, true);
if interested_observers.is_empty() {
return;
}
Comment on lines +1269 to +1273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong filter here - you're filtering for only the block_proposal listeners, but this is for /new_block. Also, there isn't a filter for /new_block - it's sent by default (and needs to stay that way to be backwards compatible).


let all_receipts = receipts.to_owned();
let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector(&all_receipts);

Expand Down Expand Up @@ -1592,7 +1592,8 @@ impl EventDispatcher {
}

pub fn process_new_attachments(&self, attachments: &[(AttachmentInstance, Attachment)]) {
let interested_observers: Vec<_> = self.registered_observers.iter().enumerate().collect();
let interested_observers = self.filter_observers(&self.mempool_observers_lookup, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the correct hash set to use for attachment events. None of the others look like a better option either. Maybe we should use any_event_observers_lookup, or maybe we should add a new one. I'm not entirely sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific issue (from the sbtc team that is actively using the event system) is about this /attachments/new event. Can we have a new event id dedicated to it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be another copy/paste bug. There doesn't actually appear to be an attachments event key. I think it's best to probably filter for only the * event key here (self.any_event_observers_lookup), but I'm actually not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jinx


if interested_observers.is_empty() {
return;
}
Expand All @@ -1603,7 +1604,7 @@ impl EventDispatcher {
serialized_attachments.push(payload);
}

for (_, observer) in interested_observers.iter() {
for observer in interested_observers.iter() {
observer.send_new_attachments(&json!(serialized_attachments));
}
}
Expand Down Expand Up @@ -1702,15 +1703,20 @@ mod test {
use std::time::Instant;

use clarity::vm::costs::ExecutionCost;
use clarity::vm::ClarityVersion;
use stacks::burnchains::{PoxConstants, Txid};
use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader};
use stacks::chainstate::stacks::db::{StacksBlockHeaderTypes, StacksHeaderInfo};
use stacks::chainstate::stacks::events::StacksBlockEventData;
use stacks::chainstate::stacks::StacksBlock;
use stacks::chainstate::stacks::{
SinglesigHashMode, SinglesigSpendingCondition, StacksBlock, TransactionAuth,
TransactionPublicKeyEncoding, TransactionSpendingCondition, TransactionVersion,
};
use stacks::types::chainstate::BlockHeaderHash;
use stacks::util::secp256k1::MessageSignature;
use stacks_common::bitvec::BitVec;
use stacks_common::types::chainstate::{BurnchainHeaderHash, StacksBlockId};
use stacks_common::util::hash::Hash160;
use tempfile::tempdir;
use tiny_http::{Method, Response, Server, StatusCode};

Expand Down Expand Up @@ -2338,18 +2344,15 @@ mod test {

// Disable retrying so that it sends the payload only once
// and that payload will be ignored by the test server.
TEST_EVENT_OBSERVER_SKIP_RETRY.lock().unwrap().replace(true);
TEST_EVENT_OBSERVER_SKIP_RETRY.with(|v| *v.borrow_mut() = true);

info!("Sending payload 1");

// Send the payload
observer.send_payload(&payload, "/test");

// Re-enable retrying
TEST_EVENT_OBSERVER_SKIP_RETRY
.lock()
.unwrap()
.replace(false);
TEST_EVENT_OBSERVER_SKIP_RETRY.with(|v| *v.borrow_mut() = false);

info!("Sending payload 2");

Expand All @@ -2360,4 +2363,197 @@ mod test {
rx.recv_timeout(Duration::from_secs(5))
.expect("Server did not receive request in time");
}

#[test]
fn test_event_dispatcher_tx_filtering() {
let dir = tempdir().unwrap();
let working_dir = dir.path().to_path_buf();

// Create a mock server
let mut server = mockito::Server::new();
let _m = server
.mock("POST", "/new_mempool_tx")
.with_status(200)
.expect_at_most(1)
.create();

let mut event_dispatcher = EventDispatcher::new();
let config = EventObserverConfig {
endpoint: server.url().strip_prefix("http://").unwrap().to_string(),
events_keys: vec![EventKeyType::MemPoolTransactions],
timeout_ms: 1000,
};
event_dispatcher.register_observer(&config, working_dir);

let sig = SinglesigSpendingCondition {
hash_mode: SinglesigHashMode::P2WPKH,
signer: Hash160([0u8; 20]),
nonce: 0,
tx_fee: 0,
key_encoding: TransactionPublicKeyEncoding::Compressed,
signature: MessageSignature([0u8; 65]),
};
let tx = StacksTransaction::new(
TransactionVersion::Testnet,
TransactionAuth::Standard(TransactionSpendingCondition::Singlesig(sig)),
TransactionPayload::new_smart_contract("test", "test", Some(ClarityVersion::Clarity3))
.unwrap(),
);
event_dispatcher.process_new_mempool_txs(vec![tx]);

_m.assert();
}

#[test]
fn test_event_dispatcher_tx_filtering_empty() {
let dir = tempdir().unwrap();
let working_dir = dir.path().to_path_buf();

// Create a mock server
let mut server = mockito::Server::new();
let _m = server
.mock("POST", "/new_mempool_tx")
.with_status(200)
.expect_at_most(0)
.create();

let mut event_dispatcher = EventDispatcher::new();
let config = EventObserverConfig {
endpoint: server.url().strip_prefix("http://").unwrap().to_string(),
events_keys: vec![],
timeout_ms: 1000,
};
event_dispatcher.register_observer(&config, working_dir);

let sig = SinglesigSpendingCondition {
hash_mode: SinglesigHashMode::P2WPKH,
signer: Hash160([0u8; 20]),
nonce: 0,
tx_fee: 0,
key_encoding: TransactionPublicKeyEncoding::Compressed,
signature: MessageSignature([0u8; 65]),
};
let tx = StacksTransaction::new(
TransactionVersion::Testnet,
TransactionAuth::Standard(TransactionSpendingCondition::Singlesig(sig)),
TransactionPayload::new_smart_contract("test", "test", Some(ClarityVersion::Clarity3))
.unwrap(),
);
event_dispatcher.process_new_mempool_txs(vec![tx]);

_m.assert();
}

#[test]
fn test_event_dispatcher_tx_filtering_wrong() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add short comments above these tests? It takes me a bit to register exactly what's being tested in each of these. The code is good, it's just helpful to have a short explainer.

let dir = tempdir().unwrap();
let working_dir = dir.path().to_path_buf();

// Create a mock server
let mut server = mockito::Server::new();
let _m = server
.mock("POST", "/new_mempool_tx")
.with_status(200)
.expect_at_most(0)
.create();

let mut event_dispatcher = EventDispatcher::new();
let config = EventObserverConfig {
endpoint: server.url().strip_prefix("http://").unwrap().to_string(),
events_keys: vec![EventKeyType::BlockProposal],
timeout_ms: 1000,
};
event_dispatcher.register_observer(&config, working_dir);

let sig = SinglesigSpendingCondition {
hash_mode: SinglesigHashMode::P2WPKH,
signer: Hash160([0u8; 20]),
nonce: 0,
tx_fee: 0,
key_encoding: TransactionPublicKeyEncoding::Compressed,
signature: MessageSignature([0u8; 65]),
};

let tx = StacksTransaction::new(
TransactionVersion::Testnet,
TransactionAuth::Standard(TransactionSpendingCondition::Singlesig(sig)),
TransactionPayload::new_smart_contract("test", "test", Some(ClarityVersion::Clarity3))
.unwrap(),
);
event_dispatcher.process_new_mempool_txs(vec![tx]);

_m.assert();
}

#[test]
fn test_event_dispatcher_nakamoto_block_filtering() {
let dir = tempdir().unwrap();
let working_dir = dir.path().to_path_buf();

// Create a mock server
let mut server = mockito::Server::new();
let _m = server
.mock("POST", "/new_block")
.with_status(200)
.expect_at_most(1)
.create();

let mut event_dispatcher = EventDispatcher::new();
let config = EventObserverConfig {
endpoint: server.url().strip_prefix("http://").unwrap().to_string(),
events_keys: vec![EventKeyType::BlockProposal],
timeout_ms: 1000,
};
event_dispatcher.register_observer(&config, working_dir);

let nakamoto_block = NakamotoBlock {
header: NakamotoBlockHeader::empty(),
txs: vec![],
};
event_dispatcher.process_mined_nakamoto_block_event(
0,
&nakamoto_block,
0,
&ExecutionCost::max_value(),
vec![],
);

_m.assert();
}

#[test]
fn test_event_dispatcher_nakamoto_block_filtering_empty() {
let dir = tempdir().unwrap();
let working_dir = dir.path().to_path_buf();

// Create a mock server
let mut server = mockito::Server::new();
let _m = server
.mock("POST", "/new_mempool_tx")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above - I think we need to always send /new_block proposals, so this test might be moot. Regardless, the path here should be /new_block.

Assuming we do make it so that /new_block is always sent, it would be good to modify this test such that we ensure /new_block is called event with an empty event_keys.

.with_status(200)
.expect_at_most(0)
.create();

let mut event_dispatcher = EventDispatcher::new();
let config = EventObserverConfig {
endpoint: server.url().strip_prefix("http://").unwrap().to_string(),
events_keys: vec![],
timeout_ms: 1000,
};
event_dispatcher.register_observer(&config, working_dir);

let nakamoto_block = NakamotoBlock {
header: NakamotoBlockHeader::empty(),
txs: vec![],
};
event_dispatcher.process_mined_nakamoto_block_event(
0,
&nakamoto_block,
0,
&ExecutionCost::max_value(),
vec![],
);

_m.assert();
}
}
Loading