Skip to content

Commit

Permalink
Merge pull request #150 from clear-code/add-feature-to-take-over-anot…
Browse files Browse the repository at this point in the history
…her-server

socket_manager: add feature to share sockets with another server
  • Loading branch information
kenhys authored Oct 22, 2024
2 parents 5e9d11e + 9a42d6b commit d0a75c6
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 26 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)`
- It starts a new manager server that shares all UDP/TCP sockets with the existing manager.
- We can use this for live restart for network servers.
- The old process should stop without removing the file for the socket after the new process starts.
- Limitation: This feature would not work well if the process opens new TCP ports frequently.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
21 changes: 17 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,32 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.share_sockets_with_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.share_sockets_with_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
end

def start
start_server(path)
nil
end

def close
stop_server
nil
Expand Down Expand Up @@ -159,9 +172,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
91 changes: 70 additions & 21 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def share_sockets_with_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

SocketManager.send_peer(another_server, [Process.pid, :share_unix])
res = SocketManager.recv_peer(another_server)
raise res if res.is_a?(Exception)
@server = another_server.recv_io UNIXServer

start_server(@path)
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)
unless @server
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end
end

@thread = Thread.new do
Expand All @@ -111,19 +145,34 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :share_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :share_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :share_unix
SocketManager.send_peer(peer, nil)
peer.send_io @server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
168 changes: 168 additions & 0 deletions spec/socket_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
expect(server.path).to be_between(49152, 65535)
end
end

context 'Server.share_sockets_with_another_server' do
it 'not supported' do
server = SocketManager::Server.open(server_path)
expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError)
ensure
server.close
end
end
else
context 'Server.generate_path' do
it 'returns socket path under /tmp' do
Expand All @@ -76,6 +85,165 @@
expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
end
end

context 'Server.share_sockets_with_another_server' do
it 'shares listen sockets to another server' do
server = SocketManager::Server.open(server_path)

client = SocketManager::Client.new(server_path)
tcp1 = client.listen_tcp('127.0.0.1', 55551)
udp1 = client.listen_udp('127.0.0.1', 55561)
udp2 = client.listen_udp('127.0.0.1', 55562)

another_server = SocketManager::Server.share_sockets_with_another_server(server_path)

expect([
another_server.tcp_sockets.keys,
another_server.tcp_sockets.values.map(&:addr),
another_server.udp_sockets.keys,
another_server.udp_sockets.values.map(&:addr),
]).to eq([
server.tcp_sockets.keys,
server.tcp_sockets.values.map(&:addr),
server.udp_sockets.keys,
server.udp_sockets.values.map(&:addr),
])
ensure
tcp1&.close
udp1&.close
udp2&.close
server&.close
another_server&.close
end

it 'takes over TCP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_tcp('127.0.0.1', test_port)
has_server_started = true
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = TCPSocket.new('127.0.0.1', test_port)
begin
socket.write("Hello #{i}\n")
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_tcp('127.0.0.1', test_port)
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end

it 'takes over UDP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_udp('127.0.0.1', test_port)
has_server_started = true
while server.recv(10)
incr_test_state(:count)
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = UDPSocket.new
begin
socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port)
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_udp('127.0.0.1', test_port)
while server.recv(10)
incr_test_state(:count)
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end
end
end

context 'with thread' do
Expand Down

0 comments on commit d0a75c6

Please sign in to comment.