Skip to content
This repository was archived by the owner on Aug 29, 2024. It is now read-only.

Commit 255fafb

Browse files
committed
Use native I/O when the scheduler is supported.
1 parent e1b0b6c commit 255fafb

File tree

7 files changed

+119
-92
lines changed

7 files changed

+119
-92
lines changed

.github/workflows/async-v1.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- ubuntu
1313

1414
ruby:
15-
- 2.7
15+
- "2.7"
1616

1717
env:
1818
BUNDLE_GEMFILE: gems/async-v1.rb

.github/workflows/async-v2.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: Async v2
2+
3+
on: [push, pull_request]
4+
5+
jobs:
6+
test:
7+
runs-on: ${{matrix.os}}-latest
8+
9+
strategy:
10+
matrix:
11+
os:
12+
- ubuntu
13+
14+
ruby:
15+
- "3.1.1"
16+
17+
env:
18+
BUNDLE_GEMFILE: gems/async-v2.rb
19+
20+
steps:
21+
- uses: actions/checkout@v2
22+
- uses: ruby/setup-ruby@v1
23+
with:
24+
ruby-version: ${{matrix.ruby}}
25+
bundler-cache: true
26+
27+
- name: Run tests
28+
timeout-minutes: 5
29+
run: bundle exec rspec

gems/async-v2.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2023, by Samuel Williams.
5+
6+
source 'https://rubygems.org'
7+
8+
gemspec path: "../"
9+
10+
gem 'async', '~> 2.0'

lib/async/io/generic.rb

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -84,71 +84,75 @@ def wrap(*args)
8484

8585
wraps ::IO, :external_encoding, :internal_encoding, :autoclose?, :autoclose=, :pid, :stat, :binmode, :flush, :set_encoding, :set_encoding_by_bom, :to_path, :to_io, :to_i, :reopen, :fileno, :fsync, :fdatasync, :sync, :sync=, :tell, :seek, :rewind, :path, :pos, :pos=, :eof, :eof?, :close_on_exec?, :close_on_exec=, :closed?, :close_read, :close_write, :isatty, :tty?, :binmode?, :sysseek, :advise, :ioctl, :fcntl, :nread, :ready?, :pread, :pwrite, :pathconf
8686

87-
# Read the specified number of bytes from the input stream. This is fast path.
88-
# @example
89-
# data = io.sysread(512)
90-
wrap_blocking_method :sysread, :read_nonblock
91-
92-
alias readpartial read_nonblock
93-
94-
# Read `length` bytes of data from the underlying I/O. If length is unspecified, read everything.
95-
def read(length = nil, buffer = nil)
96-
if buffer
97-
buffer.clear
98-
else
99-
buffer = String.new
100-
end
87+
if Async::Scheduler.supported?
88+
def_delegators :@io, :read, :write, :sysread, :syswrite, :read_nonblock, :write_nonblock, :readpartial
89+
else
90+
# Read the specified number of bytes from the input stream. This is fast path.
91+
# @example
92+
# data = io.sysread(512)
93+
wrap_blocking_method :sysread, :read_nonblock
94+
95+
alias readpartial read_nonblock
10196

102-
if length
103-
return String.new(encoding: Encoding::BINARY) if length <= 0
97+
# Read `length` bytes of data from the underlying I/O. If length is unspecified, read everything.
98+
def read(length = nil, buffer = nil)
99+
if buffer
100+
buffer.clear
101+
else
102+
buffer = String.new
103+
end
104104

105-
# Fast path:
106-
if buffer = self.sysread(length, buffer)
105+
if length
106+
return "" if length <= 0
107107

108-
# Slow path:
109-
while buffer.bytesize < length
108+
# Fast path:
109+
if buffer = self.sysread(length, buffer)
110+
110111
# Slow path:
111-
if chunk = self.sysread(length - buffer.bytesize)
112-
buffer << chunk
113-
else
114-
break
112+
while buffer.bytesize < length
113+
# Slow path:
114+
if chunk = self.sysread(length - buffer.bytesize)
115+
buffer << chunk
116+
else
117+
break
118+
end
115119
end
120+
121+
return buffer
122+
else
123+
return nil
124+
end
125+
else
126+
buffer = self.sysread(BLOCK_SIZE, buffer)
127+
128+
while chunk = self.sysread(BLOCK_SIZE)
129+
buffer << chunk
116130
end
117131

118132
return buffer
119-
else
120-
return nil
121133
end
122-
else
123-
buffer = self.sysread(BLOCK_SIZE, buffer)
124-
125-
while chunk = self.sysread(BLOCK_SIZE)
126-
buffer << chunk
127-
end
128-
129-
return buffer
130134
end
131-
end
132-
133-
# Write entire buffer to output stream. This is fast path.
134-
# @example
135-
# io.syswrite("Hello World")
136-
wrap_blocking_method :syswrite, :write_nonblock
137-
138-
def write(buffer)
139-
# Fast path:
140-
written = self.syswrite(buffer)
141-
remaining = buffer.bytesize - written
142135

143-
while remaining > 0
144-
# Slow path:
145-
length = self.syswrite(buffer.byteslice(written, remaining))
136+
# Write entire buffer to output stream. This is fast path.
137+
# @example
138+
# io.syswrite("Hello World")
139+
wrap_blocking_method :syswrite, :write_nonblock
140+
141+
def write(buffer)
142+
# Fast path:
143+
written = self.syswrite(buffer)
144+
remaining = buffer.bytesize - written
146145

147-
remaining -= length
148-
written += length
146+
while remaining > 0
147+
# Slow path:
148+
length = self.syswrite(buffer.byteslice(written, remaining))
149+
150+
remaining -= length
151+
written += length
152+
end
153+
154+
return written
149155
end
150-
151-
return written
152156
end
153157

154158
def << buffer

lib/async/io/stream.rb

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,20 @@ def eof!
234234

235235
private
236236

237+
def sysread(size, buffer)
238+
while true
239+
result = @io.read_nonblock(size, buffer, exception: false)
240+
241+
if result == :wait_readable
242+
@io.wait_readable
243+
elsif result == :wait_writable
244+
@io.wait_writable
245+
else
246+
return result
247+
end
248+
end
249+
end
250+
237251
# Fills the buffer from the underlying stream.
238252
def fill_read_buffer(size = @block_size)
239253
# We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
@@ -245,19 +259,23 @@ def fill_read_buffer(size = @block_size)
245259
flush
246260

247261
if @read_buffer.empty?
248-
if @io.read_nonblock(size, @read_buffer, exception: false)
262+
if sysread(size, @read_buffer)
249263
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
250264
return true
251265
end
252266
else
253-
if chunk = @io.read_nonblock(size, @input_buffer, exception: false)
267+
if chunk = sysread(size, @input_buffer)
254268
@read_buffer << chunk
255269
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
256270

257271
return true
258272
end
259273
end
260274

275+
@eof = true
276+
return false
277+
278+
rescue EOFError
261279
# else for both cases above:
262280
@eof = true
263281
return false

lib/async/io/tcp_socket.rb

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,55 +12,21 @@ module Async
1212
module IO
1313
# Asynchronous TCP socket wrapper.
1414
class TCPSocket < IPSocket
15-
wraps ::TCPSocket
15+
wraps ::TCPSocket, :gets, :puts
1616

1717
def initialize(remote_host, remote_port = nil, local_host = nil, local_port = nil)
1818
if remote_host.is_a? ::TCPSocket
1919
super(remote_host)
2020
else
21-
remote_address = Addrinfo.tcp(remote_host, remote_port)
22-
local_address = Addrinfo.tcp(local_host, local_port) if local_host
23-
24-
# We do this unusual dance to avoid leaking an "open" socket instance.
25-
socket = Socket.connect(remote_address, local_address: local_address)
26-
fd = socket.fcntl(Fcntl::F_DUPFD)
27-
Console.logger.debug(self) {"Connected to #{remote_address.inspect}: #{fd}"}
28-
socket.close
29-
30-
super(::TCPSocket.for_fd(fd))
31-
32-
# The equivalent blocking operation. Unfortunately there is no trivial way to make this non-blocking.
33-
# super(::TCPSocket.new(remote_host, remote_port, local_host, local_port))
21+
super(::TCPSocket.new(remote_host, remote_port, local_host, local_port))
3422
end
35-
36-
@stream = Stream.new(self)
3723
end
3824

3925
class << self
4026
alias open new
4127
end
4228

43-
def close
44-
@stream.flush
45-
super
46-
end
47-
4829
include Peer
49-
50-
attr :stream
51-
52-
# The way this buffering works is pretty atrocious.
53-
def_delegators :@stream, :gets, :puts
54-
55-
def sysread(size, buffer = nil)
56-
data = @stream.read_partial(size)
57-
58-
if buffer
59-
buffer.replace(data)
60-
end
61-
62-
return data
63-
end
6430
end
6531

6632
# Asynchronous TCP server wrappper.

spec/async/io/tcp_socket_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# Shared port for localhost network tests.
1616
let(:server_address) {Async::IO::Address.tcp("localhost", 6788)}
17-
let(:data) {"The quick brown fox jumped over the lazy dog."}
17+
let(:data) {"The quick brown fox jumped over the lazy dog.\n"}
1818

1919
describe Async::IO::TCPServer do
2020
it_should_behave_like Async::IO::Generic

0 commit comments

Comments
 (0)