Skip to content

Commit

Permalink
filter plugin: include all transaction in group (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiqizng authored Mar 10, 2023
1 parent 4179564 commit 9c7f525
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 61 deletions.
2 changes: 2 additions & 0 deletions conduit/plugins/processors/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type SubConfig struct {
type Config struct {
// <code>search-inner</code> configures the filter processor to recursively search inner transactions for expressions.
SearchInner bool `yaml:"search-inner"`
// <code>omit-group-transactions</code> configures the filter processor to return the matched transaction without its grouped transactions.
OmitGroupTransactions bool `yaml:"omit-group-transactions"`
/* <code>filters</code> are a list of SubConfig objects with an operation acting as the string key in the map
filters:
Expand Down
22 changes: 19 additions & 3 deletions conduit/plugins/processors/filterprocessor/fields/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func ValidFieldOperation(input string) bool {
type Filter struct {
Op Operation
Searchers []*Searcher
OmitGroup bool
}

func (f Filter) matches(txn *sdk.SignedTxnWithAD) (bool, error) {
Expand Down Expand Up @@ -55,13 +56,28 @@ func (f Filter) matches(txn *sdk.SignedTxnWithAD) (bool, error) {
// SearchAndFilter searches through the block data and applies the operation to the results
func (f Filter) SearchAndFilter(payset []sdk.SignedTxnInBlock) ([]sdk.SignedTxnInBlock, error) {
var result []sdk.SignedTxnInBlock
for _, txn := range payset {
match, err := f.matches(&txn.SignedTxnWithAD)
firstGroupIdx := 0
for i := 0; i < len(payset); i++ {
if payset[firstGroupIdx].Txn.Group != payset[i].Txn.Group {
firstGroupIdx = i
}
match, err := f.matches(&payset[i].SignedTxnWithAD)
if err != nil {
return nil, err
}
if match {
result = append(result, txn)
// if txn.Group is set and omit group is false
if payset[i].Txn.Group != (sdk.Digest{}) && !f.OmitGroup {
j := firstGroupIdx
// append all txns with same group ID
for ; j < len(payset) && payset[j].Txn.Group == payset[firstGroupIdx].Txn.Group; j++ {
result = append(result, payset[j])
}
// skip txns that are already added, set i to the index of the last added txn
i = j - 1
} else {
result = append(result, payset[i])
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu
ff := fields.Filter{
Op: fields.Operation(key),
Searchers: searcherList,
OmitGroup: a.cfg.OmitGroupTransactions,
}

a.FieldFilters = append(a.FieldFilters, ff)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ func blockData(addr sdk.Address, numInner int) (block data.BlockData, searchTag
SignedTxnWithAD: sdk.SignedTxnWithAD{
SignedTxn: sdk.SignedTxn{
AuthAddr: addr,
Txn: sdk.Transaction{
Header: sdk.Header{
Group: sdk.Digest{1},
},
},
},
},
},
{
SignedTxnWithAD: sdk.SignedTxnWithAD{
SignedTxn: sdk.SignedTxn{
Txn: sdk.Transaction{
Header: sdk.Header{
Group: sdk.Digest{1},
},
},
},
},
},
Expand Down Expand Up @@ -52,21 +68,27 @@ func BenchmarkProcess(b *testing.B) {
addr[0] = 0x01

var table = []struct {
input int
input int
outputlen int
omitGroupTxns bool
}{
{input: 0},
{input: 10},
{input: 100},
{input: 0, outputlen: 1, omitGroupTxns: true},
{input: 10, outputlen: 1, omitGroupTxns: true},
{input: 100, outputlen: 1, omitGroupTxns: true},
{input: 0, outputlen: 2, omitGroupTxns: false},
{input: 10, outputlen: 2, omitGroupTxns: false},
{input: 100, outputlen: 2, omitGroupTxns: false},
}
for _, v := range table {
b.Run(fmt.Sprintf("inner_txn_count_%d", v.input), func(b *testing.B) {
b.Run(fmt.Sprintf("inner_txn_count_%d_omitGrouptxns_%t", v.input, v.omitGroupTxns), func(b *testing.B) {
bd, tag := blockData(addr, v.input)
cfgStr := fmt.Sprintf(`filters:
cfgStr := fmt.Sprintf(`search-inner: true
omit-group-transactions: %t
filters:
- all:
- tag: %s
search-inner: true
expression-type: equal
expression: "%s"`, tag, addr.String())
expression: "%s"`, v.omitGroupTxns, tag, addr.String())

fp := &FilterProcessor{}
err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfgStr), logrus.New())
Expand All @@ -76,7 +98,7 @@ func BenchmarkProcess(b *testing.B) {
{
out, err := fp.Process(bd)
require.NoError(b, err)
require.Len(b, out.Payset, 1)
require.Len(b, out.Payset, v.outputlen)
}

// Ignore the setup cost above.
Expand Down
Loading

0 comments on commit 9c7f525

Please sign in to comment.