Skip to content

Commit

Permalink
Add more subscription options (#3686)
Browse files Browse the repository at this point in the history
* Added "includeTransactions" option to eth_subscribe.

* Fixed test.

* Changed test name.
  • Loading branch information
kjazgar authored Dec 14, 2021
1 parent 0ed12d2 commit b4d20c1
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public void Setup()
_blockTree.FindHeader(Arg.Any<BlockParameter>(), true).Returns(toBlock);
}

private JsonRpcResult GetBlockAddedToMainResult(BlockReplacementEventArgs blockReplacementEventArgs, out string subscriptionId)
private JsonRpcResult GetBlockAddedToMainResult(BlockReplacementEventArgs blockReplacementEventArgs, out string subscriptionId, Filter filter = null)
{
NewHeadSubscription newHeadSubscription = new(_jsonRpcDuplexClient, _blockTree, _logManager);
NewHeadSubscription newHeadSubscription = new(_jsonRpcDuplexClient, _blockTree, _logManager, filter);

JsonRpcResult jsonRpcResult = new();

Expand Down Expand Up @@ -130,9 +130,9 @@ private List<JsonRpcResult> GetLogsSubscriptionResult(Filter filter, BlockEventA
return jsonRpcResults;
}

private JsonRpcResult GetNewPendingTransactionsResult(TxEventArgs txEventArgs, out string subscriptionId)
private JsonRpcResult GetNewPendingTransactionsResult(TxEventArgs txEventArgs, out string subscriptionId, Filter filter = null)
{
NewPendingTransactionsSubscription newPendingTransactionsSubscription = new(_jsonRpcDuplexClient, _txPool, _logManager);
NewPendingTransactionsSubscription newPendingTransactionsSubscription = new(_jsonRpcDuplexClient, _txPool, _logManager, filter);
JsonRpcResult jsonRpcResult = new();

ManualResetEvent manualResetEvent = new(false);
Expand Down Expand Up @@ -226,6 +226,32 @@ public void NewHeadSubscription_creating_result()
expectedResult.Should().Be(serialized);
}

[Test]
public void NewHeadSubscription_with_includeTransactions_arg()
{
string serialized = RpcTest.TestSerializedRequest(_subscribeRpcModule, "eth_subscribe", "newHeads","{\"includeTransactions\":true}");
var expectedResult = string.Concat("{\"jsonrpc\":\"2.0\",\"result\":\"", serialized.Substring(serialized.Length - 44,34), "\",\"id\":67}");
expectedResult.Should().Be(serialized);
}

[Test]
public void NewHeadSubscription_on_BlockAddedToMain_event2()
{
Block block = Build.A.Block.WithDifficulty(1991).WithExtraData(new byte[] {3, 5, 8}).TestObject;
BlockReplacementEventArgs blockReplacementEventArgs = new(block);
Filter filter = new()
{
IncludeTransactions = true
};

JsonRpcResult jsonRpcResult = GetBlockAddedToMainResult(blockReplacementEventArgs, out var subscriptionId, filter);

jsonRpcResult.Response.Should().NotBeNull();
string serialized = _jsonSerializer.Serialize(jsonRpcResult.Response);
var expectedResult = string.Concat("{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\",\"params\":{\"subscription\":\"", subscriptionId, "\",\"result\":{\"author\":\"0x0000000000000000000000000000000000000000\",\"difficulty\":\"0x7c7\",\"extraData\":\"0x030508\",\"gasLimit\":\"0x3d0900\",\"gasUsed\":\"0x0\",\"hash\":\"0x2e3c1c2a507dc3071a16300858d4e75390e5f43561515481719a1e0dadf22585\",\"logsBloom\":\"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\",\"miner\":\"0x0000000000000000000000000000000000000000\",\"mixHash\":\"0x2ba5557a4c62a513c7e56d1bf13373e0da6bec016755483e91589fe1c6d212e2\",\"nonce\":\"0x00000000000003e8\",\"number\":\"0x0\",\"parentHash\":\"0xff483e972a04a9a62bb4b7d04ae403c615604e4090521ecc5bb7af67f71be09c\",\"receiptsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"sha3Uncles\":\"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\",\"size\":\"0x200\",\"stateRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"totalDifficulty\":\"0x0\",\"timestamp\":\"0xf4240\",\"transactions\":[],\"transactionsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"uncles\":[]}}}");
expectedResult.Should().Be(serialized);
}

[Test]
public void NewHeadSubscription_on_BlockAddedToMain_event()
{
Expand Down Expand Up @@ -737,6 +763,26 @@ public void NewPendingTransactionsSubscription_on_NewPending_event_with_null_tra
expectedResult.Should().Be(serialized);
}

[Test]
public void NewPendingTransactionsSubscription_on_NewPending_with_includeTransactions_param()
{
Transaction transaction = Build.A.Transaction.TestObject;
transaction.Hash = null;
TxEventArgs txEventArgs = new(transaction);

Filter filter = new()
{
IncludeTransactions = true
};

JsonRpcResult jsonRpcResult = GetNewPendingTransactionsResult(txEventArgs, out var subscriptionId, filter);

jsonRpcResult.Response.Should().NotBeNull();
string serialized = _jsonSerializer.Serialize(jsonRpcResult.Response);
var expectedResult = string.Concat("{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\",\"params\":{\"subscription\":\"",subscriptionId,"\",\"result\":{\"nonce\":\"0x0\",\"blockHash\":null,\"blockNumber\":null,\"transactionIndex\":null,\"to\":\"0x0000000000000000000000000000000000000000\",\"value\":\"0x1\",\"gasPrice\":\"0x1\",\"gas\":\"0x5208\",\"data\":\"0x\",\"input\":\"0x\",\"type\":\"0x0\"}}}");
expectedResult.Should().Be(serialized);
}

[Test]
public void DroppedPendingTransactionsSubscription_creating_result()
{
Expand Down
13 changes: 13 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc/Modules/Eth/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class Filter : IJsonRpcParam
public BlockParameter ToBlock { get; set; }
public object? Address { get; set; }
public IEnumerable<object?> Topics { get; set; }
public bool IncludeTransactions { get; set; }

private readonly IJsonSerializer _jsonSerializer = new EthereumJsonSerializer();

Expand All @@ -39,6 +40,7 @@ public void FromJson(string jsonValue)
ToBlock = BlockParameterConverter.GetBlockParameter(filter["toBlock"]?.ToObject<string>());
Address = GetAddress(filter["address"]);
Topics = GetTopics(filter["topics"] as JArray);
IncludeTransactions = GetIncludeTransactions(filter["includeTransactions"]);
}

private static object? GetAddress(JToken? token) => GetSingleOrMany(token);
Expand Down Expand Up @@ -68,5 +70,16 @@ public void FromJson(string jsonValue)
return token.ToObject<string>();
}
}

private static bool GetIncludeTransactions(JToken? token)
{
switch (token)
{
case null:
return false;
default:
return token.ToObject<bool>();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ namespace Nethermind.JsonRpc.Modules.Subscribe
public class NewHeadSubscription : Subscription
{
private readonly IBlockTree _blockTree;
private readonly bool _includeTransactions;

public NewHeadSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, IBlockTree? blockTree, ILogManager? logManager)

public NewHeadSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, IBlockTree? blockTree, ILogManager? logManager, Filter? filter = null)
: base(jsonRpcDuplexClient)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));

_includeTransactions = filter?.IncludeTransactions ?? false;

_blockTree.BlockAddedToMain += OnBlockAddedToMain;
if(_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} will track BlockAddedToMain");
}
Expand All @@ -42,7 +45,7 @@ private void OnBlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
ScheduleAction(() =>
{
JsonRpcResult result = CreateSubscriptionMessage(new BlockForRpc(e.Block, false));
JsonRpcResult result = CreateSubscriptionMessage(new BlockForRpc(e.Block, _includeTransactions));
JsonRpcDuplexClient.SendJsonRpcResult(result);
if(_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} printed new block");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

using System;
using System.Threading.Tasks;
using Nethermind.JsonRpc.Data;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.Logging;
using Nethermind.TxPool;

Expand All @@ -25,12 +27,14 @@ namespace Nethermind.JsonRpc.Modules.Subscribe
public class NewPendingTransactionsSubscription : Subscription
{
private readonly ITxPool _txPool;
private readonly bool _includeTransactions;

public NewPendingTransactionsSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, ITxPool? txPool, ILogManager? logManager)
public NewPendingTransactionsSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, ITxPool? txPool, ILogManager? logManager, Filter? filter = null)
: base(jsonRpcDuplexClient)
{
_txPool = txPool ?? throw new ArgumentNullException(nameof(txPool));
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_includeTransactions = filter?.IncludeTransactions ?? false;

_txPool.NewPending += OnNewPending;
if(_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} will track NewPendingTransactions");
Expand All @@ -40,7 +44,7 @@ private void OnNewPending(object? sender, TxEventArgs e)
{
ScheduleAction(() =>
{
JsonRpcResult result = CreateSubscriptionMessage(e.Transaction.Hash);
JsonRpcResult result = CreateSubscriptionMessage(_includeTransactions ? new TransactionForRpc(e.Transaction) : e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
if(_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} printed hash of NewPendingTransaction.");
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public Subscription CreateSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient,
switch (subscriptionType)
{
case SubscriptionType.NewHeads:
return new NewHeadSubscription(jsonRpcDuplexClient, _blockTree, _logManager);
return new NewHeadSubscription(jsonRpcDuplexClient, _blockTree, _logManager, filter);
case SubscriptionType.Logs:
return new LogsSubscription(jsonRpcDuplexClient, _receiptStorage, _filterStore, _blockTree, _logManager, filter);
case SubscriptionType.NewPendingTransactions:
return new NewPendingTransactionsSubscription(jsonRpcDuplexClient, _txPool, _logManager);
return new NewPendingTransactionsSubscription(jsonRpcDuplexClient, _txPool, _logManager, filter);
case SubscriptionType.DroppedPendingTransactions:
return new DroppedPendingTransactionsSubscription(jsonRpcDuplexClient, _txPool, _logManager);
case SubscriptionType.Syncing:
Expand Down

0 comments on commit b4d20c1

Please sign in to comment.