Skip to content

Commit ed896b2

Browse files
committed
refactor connection to use MessagePack instead of JSON
1 parent 3068f7c commit ed896b2

File tree

6 files changed

+197
-53
lines changed

6 files changed

+197
-53
lines changed

bake/async/container/supervisor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def memory_sample(duration: 10, connection_id:)
4444
operation = {do: :memory_sample, duration: duration}
4545

4646
# Use the forward operation to proxy the request to a worker:
47-
return connection.call(do: :forward, operation: operation, connection_id: connection_id)
47+
connection.call(do: :forward, operation: operation, connection_id: connection_id)
4848
end
4949
end
5050

examples/simple/simple.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def setup(container)
3333
Console.info(self, "Exiting...")
3434
end
3535
end
36-
end
36+
end
3737
end
3838

3939
service "sleep" do

lib/async/container/supervisor/connection.rb

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "json"
7+
require_relative "message_wrapper"
78

89
module Async
910
module Container
@@ -12,6 +13,8 @@ module Supervisor
1213
#
1314
# Handles message passing, call/response patterns, and connection lifecycle.
1415
class Connection
16+
MAX_MESSAGE_SIZE = 2 ** 32 - 1
17+
1518
# Represents a remote procedure call over a connection.
1619
#
1720
# Manages the call lifecycle, response queueing, and completion signaling.
@@ -218,7 +221,7 @@ def initialize(stream, id = 0, **state)
218221
@stream = stream
219222
@id = id
220223
@state = state
221-
224+
@message_wrapper = MessageWrapper.new
222225
@reader = nil
223226
@calls = {}
224227
end
@@ -238,9 +241,15 @@ def next_id
238241

239242
# Write a message to the connection stream.
240243
#
244+
# Uses a length-prefixed protocol: 2-byte length header (big-endian) followed by data.
245+
# This allows MessagePack data to contain newlines without breaking message boundaries.
246+
#
241247
# @parameter message [Hash] The message to write.
242248
def write(**message)
243-
@stream.write(JSON.dump(message) << "\n")
249+
data = @message_wrapper.pack(message)
250+
# Write 4-byte length prefix
251+
data_length = [data.bytesize].pack("N")
252+
@stream.write(data_length + data)
244253
@stream.flush
245254
end
246255

@@ -264,9 +273,26 @@ def call(timeout: nil, **message)
264273
#
265274
# @returns [Hash, nil] The parsed message or nil if stream is closed.
266275
def read
267-
if line = @stream&.gets
268-
JSON.parse(line, symbolize_names: true)
276+
length_data = @stream&.read(4)
277+
return nil unless length_data && length_data.bytesize == 4
278+
279+
# Unpack 32-bit integer
280+
length = length_data.unpack1("N")
281+
282+
# Validate message size
283+
if length > MAX_MESSAGE_SIZE
284+
Console.error(self, "Message too large: #{length} bytes (max: #{MAX_MESSAGE_SIZE})")
285+
return nil
269286
end
287+
288+
# Read the exact amount of data specified
289+
data = @stream.read(length)
290+
291+
unless data && data.bytesize == length
292+
raise EOFError, "Failed to read complete message"
293+
end
294+
295+
@message_wrapper.unpack(data)
270296
end
271297

272298
# Iterate over all messages from the connection.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "msgpack"
7+
8+
module Async
9+
module Container
10+
module Supervisor
11+
class MessageWrapper
12+
def initialize
13+
@factory = MessagePack::Factory.new
14+
15+
register_types
16+
17+
@packer = @factory.packer
18+
end
19+
20+
def pack(message)
21+
@packer.clear
22+
normalized_message = normalize(message)
23+
@packer.pack(normalized_message)
24+
@packer.full_pack
25+
end
26+
27+
def unpack(data)
28+
@factory.unpack(data)
29+
end
30+
31+
private
32+
33+
def normalize(obj)
34+
case obj
35+
when Hash
36+
obj.transform_values{|v| normalize(v)}
37+
when Array
38+
obj.map{|v| normalize(v)}
39+
else
40+
if obj.respond_to?(:as_json)
41+
normalize(obj.as_json)
42+
else
43+
obj
44+
end
45+
end
46+
end
47+
48+
def register_types
49+
@factory.register_type(0x00, Symbol)
50+
51+
@factory.register_type(
52+
0x01,
53+
Exception,
54+
packer: self.method(:pack_exception),
55+
unpacker: self.method(:unpack_exception),
56+
recursive: true,
57+
)
58+
59+
@factory.register_type(
60+
0x02,
61+
Class,
62+
packer: ->(klass) {klass.name},
63+
unpacker: ->(name) {name},
64+
)
65+
66+
@factory.register_type(
67+
MessagePack::Timestamp::TYPE,
68+
Time,
69+
packer: MessagePack::Time::Packer,
70+
unpacker: MessagePack::Time::Unpacker
71+
)
72+
end
73+
74+
def pack_exception(exception)
75+
[exception.class.name, exception.message, exception.backtrace].pack("A*")
76+
end
77+
78+
def unpack_exception(data)
79+
klass, message, backtrace = data.unpack("A*A*A*")
80+
klass = Object.const_get(klass)
81+
82+
exception = klass.new(message)
83+
exception.set_backtrace(backtrace)
84+
85+
return exception
86+
end
87+
end
88+
end
89+
end
90+
end

test/async/container/connection.rb

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
require "async/container/supervisor/connection"
77
require "stringio"
8+
require "msgpack"
89

910
describe Async::Container::Supervisor::Connection do
1011
let(:stream) {StringIO.new}
1112
let(:connection) {Async::Container::Supervisor::Connection.new(stream)}
13+
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new}
1214

1315
with subject::Call do
1416
let(:test_call) {Async::Container::Supervisor::Connection::Call.new(connection, 1, {do: :test, data: "value"})}
@@ -18,16 +20,16 @@
1820
parsed = JSON.parse(json)
1921

2022
expect(parsed).to have_keys(
21-
"do" => be == "test",
22-
"data" => be == "value"
23-
)
23+
"do" => be == "test",
24+
"data" => be == "value"
25+
)
2426
end
2527

2628
it "can get call message via as_json" do
2729
expect(test_call.as_json).to have_keys(
28-
do: be == :test,
29-
data: be == "value"
30-
)
30+
do: be == :test,
31+
data: be == "value"
32+
)
3133
end
3234

3335
it "can iterate over call responses with each" do
@@ -51,11 +53,11 @@
5153

5254
response = test_call.pop
5355
expect(response).to have_keys(
54-
id: be == 1,
55-
finished: be == true,
56-
failed: be == true,
57-
error: be == "Something went wrong"
58-
)
56+
id: be == 1,
57+
finished: be == true,
58+
failed: be == true,
59+
error: be == "Something went wrong"
60+
)
5961

6062
expect(test_call.closed?).to be == true
6163
end
@@ -74,30 +76,43 @@
7476
end
7577
end
7678

77-
it "writes JSON with newline" do
79+
it "writes length-prefixed MessagePack data" do
7880
connection.write(id: 1, do: :test)
7981

8082
stream.rewind
81-
output = stream.read
8283

83-
# Check it's valid JSON with a newline
84-
expect(output[-1]).to be == "\n"
84+
# Read 2-byte length prefix
85+
length_data = stream.read(4)
86+
expect(length_data.bytesize).to be == 4
8587

86-
parsed = JSON.parse(output.chomp)
88+
length = length_data.unpack1("N")
89+
expect(length).to be > 0
90+
91+
# Read MessagePack data
92+
data = stream.read(length)
93+
expect(data.bytesize).to be == length
94+
95+
# Parse MessagePack
96+
parsed = message_wrapper.unpack(data)
8797
expect(parsed).to have_keys(
88-
"id" => be == 1,
89-
"do" => be == "test"
98+
id: be == 1,
99+
do: be == :test
90100
)
91101
end
92102

93-
it "parses JSON lines" do
94-
stream.string = JSON.dump({id: 1, do: "test"}) << "\n"
103+
it "reads length-prefixed MessagePack data" do
104+
# Create MessagePack data
105+
message = {id: 1, do: "test"}
106+
data = message_wrapper.pack(message)
107+
108+
# Write with length prefix
109+
stream.string = [data.bytesize].pack("N") + data
95110
stream.rewind
96111

97-
message = connection.read
112+
parsed = connection.read
98113

99-
# Connection.read uses symbolize_names: true (keys are symbols, values are as-is)
100-
expect(message).to have_keys(
114+
# Keys are symbols
115+
expect(parsed).to have_keys(
101116
id: be == 1,
102117
do: be == "test"
103118
)

0 commit comments

Comments
 (0)