Skip to content

Commit 49966d4

Browse files
committed
Rudimentary socket connections are working
Really basic, hacky, janky socket connections are working: I can run `ChorexExamples.start_server()` and then run `nc localhost 4242`, transmit 100 bytes, and get a response back! Amazing!
1 parent afe5b9f commit 49966d4

File tree

11 files changed

+97
-61
lines changed

11 files changed

+97
-61
lines changed

lib/chorex_examples.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@ defmodule ChorexExamples do
1515
def hello do
1616
:world
1717
end
18+
19+
def start_server() do
20+
Chorex.start(
21+
Tcp.ListenerChor.Chorex,
22+
%{Listener => Tcp.ListenerImpl, AccepterPool => Tcp.AccepterPoolImpl},
23+
[%{port: 4242, user_options: []}]
24+
)
25+
end
1826
end

lib/http/Client.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Http.Client do
44
def get_headers() do
55
receive do
66
# This has to come in from elsewhere
7-
{:socket, s} ->
7+
{:socket, s} ->
88
read_headers(s)
99
end
1010
end
@@ -13,14 +13,14 @@ defmodule Http.Client do
1313
end
1414

1515
def finish_request(nil) do
16-
# close request
16+
# close request
1717
end
1818

1919
#
2020
# Helper functions
2121
#
2222

23-
def read_headers(sock) do
23+
def read_headers(_sock) do
2424
# ...
2525
end
2626
end

lib/http/chor.ex

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@ defmodule Http.Chor do
22
import Chorex
33

44
defchor [Client, Server] do
5-
Client.get_headers() ~> Server.(headers)
6-
if Server.accept?(headers) do
7-
Server[L] ~> Client
8-
Server.accept_length(headers) ~> Client.(body_length)
9-
Client.get_body(body_length) ~> Server.(body)
10-
Server.handle(headers, body) ~> Client.(response)
11-
Client.finish_request(response)
12-
else
13-
Server[R] ~> Client
14-
Client.finish_request(nil)
5+
def run(_) do
6+
Client.get_headers() ~> Server.(headers)
7+
8+
if Server.accept?(headers) do
9+
Server[L] ~> Client
10+
Server.accept_length(headers) ~> Client.(body_length)
11+
Client.get_body(body_length) ~> Server.(body)
12+
Server.handle(headers, body) ~> Client.(response)
13+
Client.finish_request(response)
14+
else
15+
Server[R] ~> Client
16+
Client.finish_request(nil)
17+
end
1518
end
1619
end
1720
end

lib/http/server.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ defmodule Http.Server do
55
end
66

77
def accept?(_headers) do
8-
false
8+
false
99
end
1010

1111
def accept_length(_headers) do
12-
42
12+
42
1313
end
1414
end

lib/tcp/accepter_pool_impl.ex

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
defmodule Tcp.AccepterPoolImpl do
22
use Tcp.ListenerChor.Chorex, :accepterpool
33

4-
def spawn_handler(_socket) do
5-
# startup instance of the handler choreography
4+
def accept_and_handle_connection(listen_socket) do
5+
IO.inspect(listen_socket, label: "[accepter_pool] socket")
6+
7+
{:ok, socket} = :gen_tcp.accept(listen_socket)
8+
9+
# startup instance of the handler choreography
10+
Chorex.start(
11+
Tcp.HandlerChor.Chorex,
12+
%{Handler => Tcp.HandlerImpl, TcpClient => Tcp.ClientImpl},
13+
[socket]
14+
)
615
end
716
end

lib/tcp/client_impl.ex

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
defmodule Tcp.ClientImpl do
22
use Tcp.HandlerChor.Chorex, :tcpclient
33

4-
def send_over_socket(_sock, _msg) do
4+
def read(sock) do
5+
:gen_tcp.recv(sock, 100)
6+
end
7+
8+
def send_over_socket(sock, msg) do
9+
IO.inspect(msg, label: "[client] msg")
10+
:gen_tcp.send(sock, msg)
511
end
612
end

lib/tcp/handler_chor.ex

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,20 @@ defmodule Tcp.HandlerChor do
22
import Chorex
33

44
defchor [Handler, TcpClient] do
5-
def loop(TcpClient.(sock)) do
6-
with Handler.(resp) <- Handler.run() do
5+
def run(TcpClient.(sock)) do
6+
TcpClient.read(sock) ~> Handler.(msg)
7+
with Handler.(resp) <- Handler.run(msg) do
78
if Handler.continue?(resp) do
89
Handler[L] ~> TcpClient
910
Handler.fmt_reply(resp) ~> TcpClient.(resp)
1011
TcpClient.send_over_socket(sock, resp)
11-
loop(TcpClient.(sock))
12+
run(TcpClient.(sock))
1213
else
1314
Handler[R] ~> TcpClient
1415
Handler.fmt_reply(resp) ~> TcpClient.(resp)
1516
TcpClient.send_over_socket(sock, resp)
1617
end
1718
end
1819
end
19-
20-
def init(TcpClient.(sock)) do
21-
loop(TcpClient.(sock))
22-
end
2320
end
24-
25-
# quote do
26-
# defchor [Handler, TcpClient] do
27-
# def loop(TcpClient.(sock)) do
28-
# with Handler.(resp) <- Handler.run() do
29-
# if Handler.continue?(resp) do
30-
# Handler[L] ~> TcpClient
31-
# Handler.fmt_reply(resp) ~> TcpClient.(resp)
32-
# TcpClient.send_over_socket(sock, resp)
33-
# loop(TcpClient.(sock))
34-
# else
35-
# Handler[R] ~> TcpClient
36-
# Handler.fmt_reply(resp) ~> TcpClient.(resp)
37-
# TcpClient.send_over_socket(sock, resp)
38-
# end
39-
# end
40-
# end
41-
42-
# def init(TcpClient.(sock)) do
43-
# loop(TcpClient.(sock))
44-
# end
45-
# end
46-
# end
47-
# |> Macro.expand_once(__ENV__)
48-
# |> Macro.to_string()
49-
# |> IO.puts()
5021
end

lib/tcp/handler_impl.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
defmodule Tcp.HandlerImpl do
22
use Tcp.HandlerChor.Chorex, :handler
33

4-
def run() do
4+
def run(msg) do
5+
IO.inspect(msg, label: "[handler] msg")
6+
{:halt, "you did it!"}
57
end
68

79
def continue?({:continue, _resp}), do: true
810
def continue?({:halt, _resp}), do: false
911

1012
def fmt_reply({_status, resp}), do: resp
11-
1213
end

lib/tcp/listener_chor.ex

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,29 @@ defmodule Tcp.ListenerChor do
22
import Chorex
33

44
defchor [Listener, AccepterPool] do
5-
def loop(Listener.(_x)) do
6-
Listener.await_connection() ~> AccepterPool.(socket)
7-
AccepterPool.spawn_handler(socket)
8-
loop(Listener.(nil))
5+
def run(Listener.(config)) do
6+
Listener.get_listener_socket(config) ~> AccepterPool.({:ok, socket})
7+
loop(AccepterPool.(socket))
98
end
109

11-
loop(Listener.(nil))
10+
def loop(AccepterPool.(listen_socket)) do
11+
AccepterPool.accept_and_handle_connection(listen_socket)
12+
loop(AccepterPool.(listen_socket))
13+
end
1214
end
15+
16+
# # Tcp.ListenerChor.Chorex
17+
# defchor [Listener, AccepterPool] do
18+
# alias Tcp.HandlerChor
19+
20+
# def run(Listener.(config)) do
21+
# Listener.await_connection(config) ~> AccepterPool.({:ok, socket})
22+
23+
# sub_chor(HandlerChor.Chorex,
24+
# supervisor: AcceptorPool,
25+
# args: [AccepterPool.(socket)])
26+
27+
# run(Listener.(config))
28+
# end
29+
# end
1330
end

lib/tcp/listener_impl.ex

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,28 @@
11
defmodule Tcp.ListenerImpl do
22
use Tcp.ListenerChor.Chorex, :listener
33

4-
def await_connection() do
5-
42
4+
@hardcoded_options [mode: :binary, active: false]
5+
6+
def get_listener_socket(config) do
7+
default_options = [
8+
backlog: 1024,
9+
nodelay: true,
10+
send_timeout: 30_000,
11+
send_timeout_close: true,
12+
reuseaddr: true
13+
]
14+
15+
opts =
16+
Enum.uniq_by(
17+
@hardcoded_options ++ config[:user_options] ++ default_options,
18+
fn
19+
{key, _} when is_atom(key) -> key
20+
key when is_atom(key) -> key
21+
end
22+
)
23+
24+
# Hopefully returns {:ok, :inet.socket()}
25+
:gen_tcp.listen(config[:port], opts)
26+
|> IO.inspect(label: "listener socket")
627
end
728
end

0 commit comments

Comments
 (0)