|
2 | 2 | :is IO.Engine(IO.Action)
|
3 | 3 | :var io IO.CoreEngine
|
4 | 4 | :var _listener (IO.Actor(IO.Action) | None): None
|
5 |
| - :var connect_error OSError: OSError.None |
| 5 | + :var _pending_connect_count I32: 0 |
6 | 6 | :let read_stream: ByteStream.Reader.new
|
7 | 7 | :let write_stream ByteStream.Writer
|
8 | 8 |
|
9 |
| - :new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket) |
10 |
| - @io = try ( |
11 |
| - // TODO: The IO package shouldn't expose this unsafe interface that |
12 |
| - // could be used to circumvent the capability security of the TCP package. |
13 |
| - // Instead, the relevant code should be carefully moved to this package. |
14 |
| - IO.CoreEngine.new_tcp_connect!( |
15 |
| - actor |
16 |
| - ticket.host |
17 |
| - ticket.port |
18 |
| - ticket.from_port |
19 |
| - ) |
| 9 | + :fun non _asio_flags |
| 10 | + if Platform.is_windows ( |
| 11 | + AsioEvent.Flags.read_write |
20 | 12 | |
|
21 |
| - @connect_error = OSError.EINVAL |
22 |
| - IO.CoreEngine.new(AsioEvent.ID.null) // an invalid one |
| 13 | + AsioEvent.Flags.read_write_oneshot |
23 | 14 | )
|
| 15 | + |
| 16 | + :: Create a new TCP engine based on an outbound connection. |
| 17 | + :: |
| 18 | + :: The given `ticket` specifies the connection details, and also proves |
| 19 | + :: (via capability security) that the caller has authority to connect. |
| 20 | + :new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket) |
| 21 | + // Begin with an "empty" IO core engine - we'll fill it later |
| 22 | + // after one of the attempted TCP connections succeeds. |
| 23 | + @io = IO.CoreEngine.new(AsioEvent.ID.null) |
24 | 24 | @write_stream = ByteStream.Writer.new(@io)
|
25 | 25 |
|
26 |
| - :new accept( |
27 |
| - actor IO.Actor(IO.Action) |
28 |
| - ticket TCP.Accept.Ticket |
29 |
| - ) |
| 26 | + // If IPv4 and IPv6 resolutions are both possible, the runtime will try to |
| 27 | + // connect with both parallel; we'll later adopt whichever succeeds first. |
| 28 | + @_pending_connect_count = _FFI.pony_os_connect_tcp( |
| 29 | + actor |
| 30 | + ticket.host.cstring, ticket.port.cstring, ticket.from_port.cstring |
| 31 | + @_asio_flags |
| 32 | + ) |
| 33 | + |
| 34 | + // If we failed to resolve any valid connection attempts, send the actor |
| 35 | + // a later IO action that will let it know that connection has failed. |
| 36 | + if (@_pending_connect_count == 0) ( |
| 37 | + actor.io_deferred_action(IO.Action.OpenFailed) |
| 38 | + ) |
| 39 | + |
| 40 | + :: Create a new TCP engine based on an accepting an inbound connection. |
| 41 | + :: |
| 42 | + :: The given `ticket` is a single-use capability that originated in a |
| 43 | + :: `TCP.Listen.Engine` that had an incoming connection available to accept. |
| 44 | + :new accept(actor IO.Actor(IO.Action), ticket TCP.Accept.Ticket) |
30 | 45 | actor.io_deferred_action(IO.Action.Opened)
|
31 | 46 | @io = IO.CoreEngine.new(
|
32 | 47 | _FFI.pony_asio_event_create(actor, ticket._fd, @_asio_flags, 0, True)
|
33 | 48 | )
|
34 | 49 | @write_stream = ByteStream.Writer.new(@io)
|
35 | 50 | @_listener = ticket._listener
|
36 | 51 |
|
37 |
| - :fun non _asio_flags |
38 |
| - if Platform.is_windows ( |
39 |
| - AsioEvent.Flags.read_write |
40 |
| - | |
41 |
| - AsioEvent.Flags.read_write_oneshot |
42 |
| - ) |
43 |
| - |
44 | 52 | :fun ref react(event AsioEvent) @
|
45 | 53 | :yields IO.Action
|
| 54 | + // If we haven't adopted an event yet, and this one is ready to be adopted, |
| 55 | + // try to adopt it now, as we expect it is one of our pending connections. |
| 56 | + if (@io.is_waiting_to_open && event.is_writable) ( |
| 57 | + try ( |
| 58 | + @_pending_connect_count -= 1 |
| 59 | + @io.adopt_event!(event) |
| 60 | + yield IO.Action.Opened |
| 61 | + | |
| 62 | + // We failed to adopt it because it was a failed connection attempt. |
| 63 | + // If there are no more pending connection attempts, our last one has |
| 64 | + // failed and we have no choice but to admit final failure. |
| 65 | + if (@_pending_connect_count == 0) ( |
| 66 | + yield IO.Action.OpenFailed |
| 67 | + ) |
| 68 | + |
| 69 | + // Return early because we don't want to do anything with this event |
| 70 | + // after having failed to adopt it already. |
| 71 | + return @ |
| 72 | + ) |
| 73 | + ) |
| 74 | + |
| 75 | + // Now, pass the event to the inner engine and react to its yielded actions. |
46 | 76 | @io.react(event) -> (action |
|
47 | 77 | case action == (
|
48 | 78 | | IO.Action.Closed |
|
|
67 | 97 | yield action
|
68 | 98 | )
|
69 | 99 | )
|
| 100 | + |
70 | 101 | @
|
71 | 102 |
|
72 | 103 | :fun ref close
|
|
0 commit comments