Skip to content

Commit 403c0b3

Browse files
committed
Merge pull request #127 from fishcakez/simple_notify
Use simple query to listen/unlisten
2 parents 1ecade6 + de0e5e9 commit 403c0b3

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

lib/postgrex/notifications.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ defmodule Postgrex.Notifications do
9090
end
9191

9292
def connect(_, opts) do
93-
case Protocol.connect(opts) do
93+
case Protocol.connect([types: false] ++ opts) do
9494
{:ok, protocol, parameters, _} ->
9595
{:ok, %__MODULE__{protocol: protocol, parameters: parameters}}
9696
{:error, reason} ->
@@ -156,7 +156,7 @@ defmodule Postgrex.Notifications do
156156
defp listener_query(statement, result, from, buffer, s) do
157157
%{protocol: protocol, parameters: parameters} = s
158158

159-
case Protocol.query(protocol, statement, [], buffer) do
159+
case Protocol.simple_query(protocol, statement, buffer) do
160160
{:ok, %Postgrex.Result{}, new_parameters, notifications, buffer} ->
161161
if from, do: Connection.reply(from, result)
162162
notify_listeners(notifications, s)

lib/postgrex/protocol.ex

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ defmodule Postgrex.Protocol do
2525
custom = opts[:extensions] || []
2626
extensions = custom ++ @default_extensions
2727
ssl? = opts[:ssl] || false
28+
types? = opts[:types] || true
2829

2930
s = %{sock: nil, backend_key: nil, types: nil, timeout: timeout}
3031

31-
types_key = {host, port, Keyword.fetch!(opts, :database), custom}
32+
types_key = if types?, do: {host, port, Keyword.fetch!(opts, :database), custom}
3233
status = %{opts: opts, parameters: %{}, notifications: [],
3334
types_key: types_key, types_ref: nil, extensions: extensions,
3435
extension_info: nil}
@@ -54,6 +55,15 @@ defmodule Postgrex.Protocol do
5455
activate(sock, buffer)
5556
end
5657

58+
@spec simple_query(state, String.t, binary | :active_once) ::
59+
{:ok, Postgrex.Result.t | Postgrex.Error.t, parameters, notifications,
60+
binary} |
61+
{:error, Postgrex.Error.t}
62+
def simple_query(s, statement, buffer) do
63+
status = %{parameters: %{}, notifications: [], ok: :result, sync: :sync}
64+
simple_send(s, status, statement, buffer)
65+
end
66+
5767
@spec query(state, String.t, [any], binary | :active_once) ::
5868
{:ok, Postgrex.Result.t | Postgrex.Error.t |
5969
{:error | :throw | :exit, any, list}, parameters, notifications, binary} |
@@ -243,6 +253,9 @@ defmodule Postgrex.Protocol do
243253

244254
## bootstrap
245255

256+
defp bootstrap(s, %{types_key: nil} = status, buffer) do
257+
bootstrap_ready(s, status, buffer)
258+
end
246259
defp bootstrap(s, %{types_key: types_key} = status, buffer) do
247260
case Postgrex.TypeServer.fetch(types_key) do
248261
{:ok, table} ->
@@ -331,6 +344,30 @@ defmodule Postgrex.Protocol do
331344
end
332345
end
333346

347+
## simple
348+
349+
defp simple_send(%{sock: sock} = s, status, statement, buffer) do
350+
msg = msg_query(statement: statement)
351+
case msg_send(msg, sock) do
352+
:ok -> simple_recv(s, status, buffer)
353+
{:error, _} = err -> err
354+
end
355+
end
356+
357+
defp simple_recv(s, status, buffer) do
358+
%{sock: sock, timeout: timeout} = s
359+
case msg_recv(sock, buffer, timeout) do
360+
{:ok, msg_command_complete(tag: tag), buffer} ->
361+
complete(s, status, %Query{}, [], tag, buffer)
362+
{:ok, msg_error(fields: fields), buffer} ->
363+
sync_recv(s, status, Postgrex.Error.exception(postgres: fields), buffer)
364+
{:ok, msg, buffer} ->
365+
simple_recv(s, handle_msg(status, msg), buffer)
366+
{:error, _} = err ->
367+
err
368+
end
369+
end
370+
334371
## query
335372

336373
defp query_encode(s, status, query, params, buffer) do

0 commit comments

Comments
 (0)