Skip to content
This repository has been archived by the owner on Nov 17, 2022. It is now read-only.

Commit

Permalink
update & fix DevP2P::Peer sub protocols handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed May 10, 2018
1 parent 01d4fbc commit 1ceecac
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 15 deletions.
18 changes: 11 additions & 7 deletions lib/ethruby/devp2p/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ def queue
end
end

class Error < StandardError
end

# stop actor
class StopError < StandardError
class StopError < Error
end

class StateError < StandardError
class StateError < Error
end

attr_accessor :executor
Expand Down Expand Up @@ -110,6 +113,8 @@ def call(method, *args)

# start actor
def start
raise Error.new("must set executor before start") unless executor

@running = true
executor.post do
start_loop
Expand All @@ -124,7 +129,7 @@ def start
# actor.wait
#
def send_stop
self << :raise_stop_error
self << [:raise_error, StopError.new]
end

# wait until an error occurs
Expand All @@ -137,7 +142,7 @@ def wait
def start_loop
loop_callback do
# check inbox
next if @inbox.empty?
next Thread.pass if @inbox.empty?
msg = @inbox.pop

# extract sync or async call
Expand Down Expand Up @@ -183,9 +188,8 @@ def loop_callback
yield
end

private
def raise_stop_error
raise StopError.new
def raise_error(e)
raise e
end
end

Expand Down
22 changes: 16 additions & 6 deletions lib/ethruby/devp2p/peer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module DevP2P
# represent a connected remote node
class Peer

class DiscoverError < StandardError
end
class UnknownMessageCodeError < StandardError
end

Expand Down Expand Up @@ -54,17 +56,23 @@ def read_loop
msg.received_at = Time.now
handle(msg)
end
rescue StandardError => e
self << [:raise_error, e]
end

def start_protocols
@protocols.each do |protocol|
protocol.executor ||= executor
protocol.start(self, @protocol_io_hash[protocol.name])
end
end

def handle(msg)
if msg.code == RLPX::MESSAGES[:ping]
#TODO send pong
elsif msg.code == RLPX::MESSAGES[:discover]
reason = RLP.decode(msg.payload).ord
raise DiscoverError.new("receive error discovery message, reason: #{reason}")
else
# send msg to sub protocol
if (protocol = find_protocol_by_msg_code(msg.code)).nil?
Expand All @@ -76,9 +84,11 @@ def handle(msg)

private
def find_protocol_by_msg_code(code)
@protocols.find do |protocol|
code >= protocol.offset && code <= protocol.offset + protocol.length
end
@protocol_io_hash.values.find do |protocol_io|
offset = protocol_io.offset
protocol = protocol_io.protocol
code >= offset && code < offset + protocol.length
end.protocol
end

# return protocol_io_hash
Expand All @@ -88,7 +98,7 @@ def make_protocol_io_hash(protocols, caps, io)
offset = RLPX::BASE_PROTOCOL_LENGTH
result = {}
# [name, version] as key
protocols_hash = protocols.map {|protocol| [protocol.name, protocol.version]}.to_h
protocols_hash = protocols.map {|protocol| [[protocol.name, protocol.version], protocol]}.to_h
sorted_caps = caps.sort_by {|c| [c.name, c.version]}

sorted_caps.each do |cap|
Expand All @@ -97,7 +107,7 @@ def make_protocol_io_hash(protocols, caps, io)
# ignore same name old protocols
if (old = result[cap.name])
result.delete(cap.name)
offset -= old.length
offset -= old.protocol.length
end
result[cap.name] = ProtocolIO.new(protocol, offset, io)
# move offset, to support next protocol
Expand All @@ -108,4 +118,4 @@ def make_protocol_io_hash(protocols, caps, io)
end

end
end
end
12 changes: 10 additions & 2 deletions lib/ethruby/devp2p/protocol.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
# frozen_string_literal: true

require_relative 'actor'

module ETH
module DevP2P

# protocol represent DevP2P sub protocols
class Protocol
attr_accessor :name, :version, :length, :node_info, :peer_info

include Actor

attr_reader :name, :version, :length
attr_accessor :node_info, :peer_info

def initialize(name:, version:, length:)
@name = name
@version = version
@length = length
super()
end

# start protocol handling
def start
def start(peer, io)
super()
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/ethruby/crypto_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'spec_helper'
require 'ethruby/crypto'
require 'ethruby/key'

Expand Down
1 change: 1 addition & 0 deletions spec/ethruby/devp2p/actor_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'spec_helper'
require 'concurrent'
require 'ethruby/devp2p/actor'

Expand Down
102 changes: 102 additions & 0 deletions spec/ethruby/devp2p/peer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# frozen_string_literal: true

require 'spec_helper'
require 'ethruby/devp2p/peer'
require 'ethruby/devp2p/protocol'
require 'ethruby/devp2p/rlpx/protocol_handshake'
require 'concurrent'

RSpec.describe ETH::DevP2P::Peer do
let(:executor) {Concurrent::CachedThreadPool.new}
let(:actor) {my_actor.new(executor: executor)}

after {executor.kill}

# mock connection
let(:connection) do
Class.new do
attr_reader :queue

def initialize
@queue = []
end

def read_msg
raise StandardError if queue.empty?
queue.shift
end
end.new
end

let(:mock_protocol) do
Class.new(ETH::DevP2P::Protocol) do
attr_reader :histories, :peer, :protocol_io

def initialize(*args)
super(*args)
@histories = []
end

def start(peer, io)
@peer = peer
@protocol_io = io
super
end

def handle_msg(msg)
histories << msg
end
end
end

it 'handle msg by code' do
protocol_1 = mock_protocol.new(name: 'eth', version: 63, length: 17)
protocol_2 = mock_protocol.new(name: 'eth', version: 62, length: 8)
protocol_3 = mock_protocol.new(name: 'hello', version: 1, length: 16)

caps = [
ETH::DevP2P::RLPX::Cap.new(name: 'eth', version: 63),
ETH::DevP2P::RLPX::Cap.new(name: 'eth', version: 62),
ETH::DevP2P::RLPX::Cap.new(name: 'hello', version: 1),
]
handshake = ETH::DevP2P::RLPX::ProtocolHandshake.new(version: 4, name: 'test', caps: caps, id: 0)


msg_1 = ETH::DevP2P::RLPX::Message.new(code: 16, payload: "test_1".b, size: 6)
msg_2 = ETH::DevP2P::RLPX::Message.new(code: 32, payload: "test_2".b, size: 6)
msg_3 = ETH::DevP2P::RLPX::Message.new(code: 33, payload: "test_hello".b, size: 10)

# send messages to connection
connection.queue << msg_1
connection.queue << msg_2
connection.queue << msg_3

peer = ETH::DevP2P::Peer.new(connection, handshake, [protocol_1, protocol_2, protocol_3])
peer.executor = executor
peer.start

# peer read all messages
expect {peer.wait}.to raise_error(StandardError)

# 'eth' protocol
protocol_1.send_stop
protocol_1.wait
expect(protocol_1.peer).to be peer
expect(protocol_1.protocol_io.io).to be connection
expect(protocol_1.histories).to eq [msg_1, msg_2]

# old 'eth' protocol
protocol_2.send_stop
protocol_2.wait
expect(protocol_2.peer).to be peer
expect(protocol_2.protocol_io.io).to be connection
expect(protocol_2.histories).to eq []

# 'hello' protocol
protocol_3.send_stop
protocol_3.wait
expect(protocol_3.peer).to be peer
expect(protocol_3.protocol_io.io).to be connection
expect(protocol_3.histories).to eq [msg_3]
end
end
2 changes: 2 additions & 0 deletions spec/ethruby_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'spec_helper'

RSpec.describe ETH do
it "has a version number" do
expect(ETH::VERSION).not_to be nil
Expand Down
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@
config.expect_with :rspec do |c|
c.syntax = :expect
end

# set concurrent ruby logger
require 'concurrent'
Concurrent.use_simple_logger(level = Logger::DEBUG)
end

0 comments on commit 1ceecac

Please sign in to comment.