Skip to content

Commit

Permalink
Make FielterStore and id's thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaszRozmej committed Dec 7, 2021
1 parent 5bfaae6 commit 088d858
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using Nethermind.Blockchain.Filters;
using Nethermind.Blockchain.Filters.Topics;
Expand Down Expand Up @@ -98,8 +99,8 @@ public void Can_get_filters_by_type()
LogFilter filter2 = store.CreateLogFilter(new BlockParameter(1), new BlockParameter(2));
store.SaveFilter(filter2);

LogFilter[] logFilters = store.GetFilters<LogFilter>();
BlockFilter[] blockFilters = store.GetFilters<BlockFilter>();
LogFilter[] logFilters = store.GetFilters<LogFilter>().ToArray();
BlockFilter[] blockFilters = store.GetFilters<BlockFilter>().ToArray();

Assert.AreEqual(1, logFilters.Length, "log filters length");
Assert.AreEqual(1, logFilters[0].Id, "log filters ids");
Expand Down
72 changes: 23 additions & 49 deletions src/Nethermind/Nethermind.Blockchain/Filters/FilterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,28 @@ private void OnTransactionProcessed(object sender, TxProcessedEventArgs e)

private void OnNewPendingTransaction(object sender, TxPool.TxEventArgs e)
{
var filters = _filterStore.GetFilters<PendingTransactionFilter>();
if (filters == null || filters.Length == 0)
IEnumerable<PendingTransactionFilter> filters = _filterStore.GetFilters<PendingTransactionFilter>();
foreach (PendingTransactionFilter filter in filters)
{
return;
}

for (var i = 0; i < filters.Length; i++)
{
var filterId = filters[i].Id;
var transactions = _pendingTransactions.GetOrAdd(filterId, _ => new List<Keccak>());
int filterId = filter.Id;
List<Keccak> transactions = _pendingTransactions.GetOrAdd(filterId, _ => new List<Keccak>());
transactions.Add(e.Transaction.Hash);
if (_logger.IsDebug) _logger.Debug($"Filter with id: '{filterId}' contains {transactions.Count} transactions.");

}
}

private void OnRemovedPendingTransaction(object sender, TxPool.TxEventArgs e)
{
var filters = _filterStore.GetFilters<PendingTransactionFilter>();
if (filters == null || filters.Length == 0)
{
return;
}
IEnumerable<PendingTransactionFilter> filters = _filterStore.GetFilters<PendingTransactionFilter>();

for (var i = 0; i < filters.Length; i++)
foreach (PendingTransactionFilter filter in filters)
{
var filterId = filters[i].Id;
var transactions = _pendingTransactions.GetOrAdd(filterId, _ => new List<Keccak>());
int filterId = filter.Id;
List<Keccak> transactions = _pendingTransactions.GetOrAdd(filterId, _ => new List<Keccak>());
transactions.Remove(e.Transaction.Hash);
if (_logger.IsDebug) _logger.Debug($"Filter with id: '{filterId}' contains {transactions.Count} transactions.");

}
}

Expand Down Expand Up @@ -187,15 +180,13 @@ private void AddReceipts(params TxReceipt[] txReceipts)
return;
}

var filters = _filterStore.GetFilters<LogFilter>();
if (filters == null || filters.Length == 0)
IEnumerable<LogFilter> filters = _filterStore.GetFilters<LogFilter>();
foreach (LogFilter filter in filters)
{
return;
}

for (var i = 0; i < txReceipts.Length; i++)
{
StoreLogs(filters, txReceipts[i], ref _logIndex);
for (int i = 0; i < txReceipts.Length; i++)
{
StoreLogs(filter, txReceipts[i], ref _logIndex);
}
}
}

Expand All @@ -206,20 +197,11 @@ private void AddBlock(Block block)
throw new ArgumentNullException(nameof(block));
}

var filters = _filterStore.GetFilters<BlockFilter>();
if (filters == null || filters.Length == 0)
IEnumerable<BlockFilter> filters = _filterStore.GetFilters<BlockFilter>();

foreach (BlockFilter filter in filters)
{
return;
}

StoreBlock(filters, block);
}

private void StoreBlock(BlockFilter[] filters, Block block)
{
for (var i = 0; i < filters.Length; i++)
{
StoreBlock(filters[i], block);
StoreBlock(filter, block);
}
}

Expand All @@ -230,28 +212,20 @@ private void StoreBlock(BlockFilter filter, Block block)
throw new InvalidOperationException("Cannot filter on blocks without calculated hashes");
}

var blocks = _blockHashes.GetOrAdd(filter.Id, i => new List<Keccak>());
List<Keccak> blocks = _blockHashes.GetOrAdd(filter.Id, i => new List<Keccak>());
blocks.Add(block.Hash);
if (_logger.IsDebug) _logger.Debug($"Filter with id: '{filter.Id}' contains {blocks.Count} blocks.");
}

private void StoreLogs(LogFilter[] filters, TxReceipt txReceipt, ref long logIndex)
{
for (var i = 0; i < filters.Length; i++)
{
StoreLogs(filters[i], txReceipt, ref logIndex);
}
}

private void StoreLogs(LogFilter filter, TxReceipt txReceipt, ref long logIndex)
{
if (txReceipt.Logs == null || txReceipt.Logs.Length == 0)
{
return;
}

var logs = _logs.GetOrAdd(filter.Id, i => new List<FilterLog>());
for (var i = 0; i < txReceipt.Logs.Length; i++)
List<FilterLog> logs = _logs.GetOrAdd(filter.Id, i => new List<FilterLog>());
for (int i = 0; i < txReceipt.Logs.Length; i++)
{
var logEntry = txReceipt.Logs[i];
var filterLog = CreateLog(filter, txReceipt, logEntry, logIndex++, i);
Expand Down
59 changes: 32 additions & 27 deletions src/Nethermind/Nethermind.Blockchain/Filters/FilterStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Nethermind.Blockchain.Filters.Topics;
using Nethermind.Blockchain.Find;
using Nethermind.Core;
Expand All @@ -28,7 +29,8 @@ namespace Nethermind.Blockchain.Filters
{
public class FilterStore : IFilterStore
{
private int _nextFilterId;
private int _currentFilterId = -1;
private object _locker = new object();

private readonly ConcurrentDictionary<int, FilterBase> _filters = new();

Expand All @@ -51,34 +53,22 @@ public FilterType GetFilterType(int filterId)
}
}

public T[] GetFilters<T>() where T : FilterBase
{
return _filters.Select(f => f.Value).OfType<T>().ToArray();
}
public IEnumerable<T> GetFilters<T>() where T : FilterBase =>
_filters.Select(f => f.Value).OfType<T>();

public BlockFilter CreateBlockFilter(long startBlockNumber, bool setId = true)
{
var filterId = setId ? GetFilterId() : 0;
var blockFilter = new BlockFilter(filterId, startBlockNumber);
return blockFilter;
}
public BlockFilter CreateBlockFilter(long startBlockNumber, bool setId = true) =>
new(GetFilterId(setId), startBlockNumber);

public PendingTransactionFilter CreatePendingTransactionFilter(bool setId = true)
{
var filterId = setId ? GetFilterId() : 0;
var pendingTransactionFilter = new PendingTransactionFilter(filterId);
return pendingTransactionFilter;
}
public PendingTransactionFilter CreatePendingTransactionFilter(bool setId = true) =>
new(GetFilterId(setId));

public LogFilter CreateLogFilter(BlockParameter fromBlock, BlockParameter toBlock,
object? address = null, IEnumerable<object?>? topics = null, bool setId = true)
{
var filterId = setId ? GetFilterId() : 0;
var filter = new LogFilter(filterId, fromBlock, toBlock,
GetAddress(address), GetTopicsFilter(topics));

return filter;
}
object? address = null, IEnumerable<object?>? topics = null, bool setId = true) =>
new(GetFilterId(setId),
fromBlock,
toBlock,
GetAddress(address),
GetTopicsFilter(topics));

public void RemoveFilter(int filterId)
{
Expand All @@ -95,11 +85,26 @@ public void SaveFilter(FilterBase filter)
throw new InvalidOperationException($"Filter with ID {filter.Id} already exists");
}

_nextFilterId = Math.Max(filter.Id + 1, _nextFilterId);
lock (_locker)
{
_currentFilterId = Math.Max(filter.Id, _currentFilterId);
}

_filters[filter.Id] = filter;
}

private int GetFilterId() => _nextFilterId++;
private int GetFilterId(bool generateId)
{
if (generateId)
{
lock (_locker)
{
return ++_currentFilterId;
}
}

return 0;
}

private TopicsFilter GetTopicsFilter(IEnumerable<object?>? topics = null)
{
Expand Down
10 changes: 7 additions & 3 deletions src/Nethermind/Nethermind.Blockchain/Filters/IFilterStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ namespace Nethermind.Blockchain.Filters
public interface IFilterStore
{
bool FilterExists(int filterId);
T[] GetFilters<T>() where T : FilterBase;
IEnumerable<T> GetFilters<T>() where T : FilterBase;
BlockFilter CreateBlockFilter(long startBlockNumber, bool setId = true);
PendingTransactionFilter CreatePendingTransactionFilter(bool setId = true);

LogFilter CreateLogFilter(BlockParameter fromBlock, BlockParameter toBlock, object address = null,
IEnumerable<object> topics = null, bool setId = true);
LogFilter CreateLogFilter(
BlockParameter fromBlock,
BlockParameter toBlock,
object? address = null,
IEnumerable<object>? topics = null,
bool setId = true);

void SaveFilter(FilterBase filter);
void RemoveFilter(int filterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public bool FilterExists(int filterId)
return false;
}

public T[] GetFilters<T>() where T : FilterBase
public IEnumerable<T> GetFilters<T>() where T : FilterBase
{
return Array.Empty<T>();
}
Expand All @@ -48,7 +48,7 @@ public PendingTransactionFilter CreatePendingTransactionFilter(bool setId = true
throw new InvalidOperationException($"{nameof(NullFilterStore)} does not support filter creation");
}

public LogFilter CreateLogFilter(BlockParameter fromBlock, BlockParameter toBlock, object address = null, IEnumerable<object> topics = null, bool setId = true)
public LogFilter CreateLogFilter(BlockParameter fromBlock, BlockParameter toBlock, object? address = null, IEnumerable<object>? topics = null, bool setId = true)
{
throw new InvalidOperationException($"{nameof(NullFilterStore)} does not support filter creation");
}
Expand Down

0 comments on commit 088d858

Please sign in to comment.