Во-первых, существует полное отсутствие документации для Cowboy в целом и Websockets в частности, но в целом после расшифровки ее можно использовать отлично. Затем передача этой информации из Erlang в Elixir — это еще один шаг. Благодаря этот пост от 7stud я смог получить функционирующий веб-сокет для целей тестирования, но я не могу заставить его прослушивать и, возможно, отправлять сообщения одновременно. Я думаю, это связано с тем, что получение блокирует поток, который необходим для отправки, и это неразрывно связано с подключением к веб-сокету, поэтому он не может отправлять, пока ожидает получения. Возможно, это понимание ошибочно. Я бы хотел, чтобы меня поправили. Я пытался создать нерест, но безрезультатно, поэтому я думаю, что получение блокирует поток веб-сокета.
def ws do
localhost = 'localhost'
path = '/ws/app/1'
port = 5000
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
IO.inspect(conn_pid, label: "conn_pid")
{:ok, protocol} = :gun.await_up(conn_pid)
IO.inspect(protocol, label: "protocol")
# Set custom header with cookie for device id
stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
IO.inspect(stream_ref, label: "stream_ref")
receive do
{:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(conn_pid, headers, stream_ref)
{:gun_response, ^conn_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _conn_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
:ok
end
def upgrade_success(conn_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
IO.inspect(self(), label: "upgrade self")
# This one runs and message is received
run_test(conn_pid)
# This should spawn and therefore not block
listen(conn_pid, stream_ref)
# This never runs
run_test(conn_pid)
end
def listen(conn_pid, stream_ref) do
spawn receive_messages(conn_pid, stream_ref)
end
def receive_messages(conn_pid, stream_ref) do
IO.inspect conn_pid, label: "conn_pid!"
IO.inspect stream_ref, label: "stream_ref!"
IO.inspect(self(), label: "self pid")
receive do
{:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
IO.inspect(msg, label: "Message from websocket server:")
other_messages ->
IO.inspect(other_messages, label: "Other messages")
after 5000 ->
IO.puts "Receive timed out"
end
receive_messages(conn_pid, stream_ref)
end
def send_message(message, conn_pid) do
:gun.ws_send(conn_pid, {:text, message})
end
def run_test(conn_pid) do
IO.puts "Running test"
message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
send_message(message, conn_pid)
end
def stop(conn_pid) do
:gun.shutdown(conn_pid)
end
it в этом предложении? Сервер или клиент?
@7stud У меня уже есть сервер. Сейчас пытаюсь получить рабочий клиент. Под «сервером» я подразумеваю программу, которая ожидает получения соединений через веб-сокеты от различных клиентов.
Для всех, кто заинтересован, я закончил это, и вы можете взглянуть на это репо, если вы ищете более полный пример. github.com/GamgeeNL/вебсокет-клиент


Receiving data
Gun sends an Erlang message to the owner process for every Websocket message it receives.
Connection
...
Gun connections
...
A Gun connection is an Erlang process that manages a socket to a remote endpoint. This Gun connection is owned by a user process that is called the owner of the connection, and is managed by the supervision tree of the gun application.
The owner process communicates with the Gun connection by calling functions from the module gun. All functions perform their respective operations asynchronously. The Gun connection will send Erlang messages to the owner process whenever needed.
Хотя это специально не упоминается в документах, я почти уверен, что процесс владельца — это процесс, который вызывает gun:open(). Мои попытки также показывают, что процесс-владелец должен вызывать gun:ws_send(). Другими словами, процесс-владелец должен как отправлять сообщения на сервер, так и получать сообщения от сервера.
Следующий код управляет пистолетом с gen_server таким образом, что gen_server отправляет сообщения на сервер и получает сообщения с сервера.
Когда пистолет получает сообщение от http-сервера cowboy, он отправляет сообщение, то есть Pid ! Msg, процессу-владельцу. В следующем коде gen_server создает соединение в обратном вызове init/1, а это означает, что пистолет будет стучать (!) сообщениями, которые он получает от ковбоя в gen_server. gen_server обрабатывает сообщения, отправленные непосредственно в его почтовый ящик, с помощью handle_info().
В handle_cast()gen_server использует пистолет, чтобы отправлять запросы ковбою. Поскольку handle_cast() является асинхронным, это означает, что вы можете отправлять ковбою асинхронные сообщения. И когда пушка получает сообщение от ковбоя, пушка отправляет(!) сообщение на gen_server, а функция gen_server handle_info() обрабатывает сообщение. Внутри handle_info() вызывается gen_server:reply/2 для передачи сообщения клиенту gen_server. В результате клиент gen_server может перейти к предложению получения всякий раз, когда он хочет проверить сообщения сервера, отправленные из пистолета.
-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]). %%% client functions
-export([sender/1]).
%%% client functions
%%%
start_server() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
send_sync(Requ) ->
gen_server:call(?MODULE, Requ).
send_async(Requ) ->
gen_server:cast(?MODULE, {websocket_request, Requ}).
get_message(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
io:format("Client received gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end.
receive_loop(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Client received Gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end,
receive_loop(WebSocketPid, ClientRef).
go() ->
{ok, GenServerPid} = start_server(),
io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),
[{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),
ok = send_async("ABCD"),
get_message(ConnPid, ClientRef),
spawn(?MODULE, sender, [1]),
ok = send_async("XYZ"),
get_message(ConnPid, ClientRef),
receive_loop(ConnPid, ClientRef).
sender(Count) -> %Send messages to handle_info() every 3 secs
send_async(lists:concat(["Hello", Count])),
timer:sleep(3000),
sender(Count+1).
%%%%%% gen_server callbacks
%%%
init(_Arg) ->
{ok, {no_client, ws()}}.
handle_call(get_conn_pid, From = {_ClientPid, ClientRef}, _State = {_Client, WebSocketPid}) ->
io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
{reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState = {From, WebSocketPid} };
handle_call(stop, _From, State) ->
{stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
{ok, State}.
handle_cast({websocket_request, Msg}, State = {_From, WebSocketPid}) ->
gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
{noreply, State}.
handle_info(Msg, State = {From, _WebSocketPid}) ->
io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
gen_server:reply(From, Msg),
{noreply, State}.
terminate(_Reason, _State = {_From, WebSocketPid}) ->
gun:shutdown(WebSocketPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% private functions
%%%
ws() ->
{ok, _} = application:ensure_all_started(gun),
{ok, ConnPid} = gun:open("localhost", 8080),
{ok, _Protocol} = gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),
receive
{gun_ws_upgrade, ConnPid, ok, Headers} ->
io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n",
[ConnPid]),
upgrade_success_handler(ConnPid, Headers);
{gun_response, ConnPid, _, _, Status, Headers} ->
exit({ws_upgrade_failed, Status, Headers});
{gun_error, _ConnPid, _StreamRef, Reason} ->
exit({ws_upgrade_failed, Reason})
after 1000 ->
exit(timeout)
end.
upgrade_success_handler(ConnPid, _Headers) ->
io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),
ConnPid.
=======
Упс, ответ ниже показывает, как заставить сервер передавать данные клиенту.
Хорошо, я понял - в эрланге. Этот пример немного замучен. Вам нужно сделать пару вещей:
1) Вам нужно получить pid процесса, выполняющего функции websocket_*, который не совпадает с pid запроса:
Post-upgrade initialization
Cowboy has separate processes for handling the connection and requests. Because Websocket takes over the connection, the Websocket protocol handling occurs in a different process than the request handling.
This is reflected in the different callbacks Websocket handlers have. The init/2 callback is called from the temporary request process and the websocket_ callbacks from the connection process.
This means that some initialization cannot be done from init/2. Anything that would require the current pid, or be tied to the current pid, will not work as intended. The optional websocket_init/1 can be used [to get the pid of the process running the websocket_ callbacks]:
https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/
Вот код, который я использовал:
init(Req, State) ->
{cowboy_websocket, Req, State}. %Perform websocket setup
websocket_init(State) ->
io:format("[ME]: Inside websocket_init"),
spawn(?MODULE, push, [self(), "Hi, there"]),
{ok, State}.
push(WebSocketHandleProcess, Greeting) ->
timer:sleep(4000),
WebSocketHandleProcess ! {text, Greeting}.
websocket_handle({text, Msg}, State) ->
timer:sleep(10000), %Don't respond to client request just yet.
{
reply,
{text, io_lib:format("Server received: ~s", [Msg]) },
State
};
websocket_handle(_Other, State) -> %Ignore
{ok, State}.
Это отправит сообщение клиенту, пока клиент ожидает ответа на запрос, который клиент ранее отправил на сервер.
2) Если вы отправляете сообщение процессу, выполняющему функции websocket_*:
Pid ! {text, Msg}
тогда это сообщение будет обработано функцией websocket_info(), а не функцией websocket_handle():
websocket_info({text, Text}, State) ->
{reply, {text, Text}, State};
websocket_info(_Other, State) ->
{ok, State}.
Возвращаемое значение функции websocket_info() работает так же, как и возвращаемое значение функции websocket_handle().
Поскольку ваш клиент оружия теперь получает несколько сообщений, клиент оружия должен получать в цикле:
upgrade_success_handler(ConnPid, Headers) ->
io:format("Upgraded ~w. Success!~nHeaders:~n~p~n",
[ConnPid, Headers]),
gun:ws_send(ConnPid, {text, "It's raining!"}),
get_messages(ConnPid). %Move the receive clause into a recursive function
get_messages(ConnPid) ->
receive
{gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
io:format("~s~n", [Greeting]),
get_messages(ConnPid);
{gun_ws, ConnPid, {text, Msg} } ->
io:format("~s~n", [Msg]),
get_messages(ConnPid)
end.
Спасибо! Правки отражены в моем ответе.
@GenericJam, хороший перевод! Если вы не возражаете, отредактируйте свой вопрос и добавьте теги erlang и gun — это облегчит поиск информации для эрлангистов.
Я добавил erlang и erlang-gun, так как gun относится к базе данных оружия. Я удалил cowboy, так как это не строго ковбойский стиль, и мне пришлось оставить его до 5 тегов.
Спасибо 7stud за пример кода и правки, которые отражены ниже:
Вот моя интерпретация Эликсира, чтобы дать базовый клиент WebSocket для оружия:
defmodule WebsocketTester.Application do
use Application
def start(_type, _args) do
path = '/ws/app/1'
port = 5000
host = 'localhost'
args = %{path: path, port: port, host: host}
children = [
{ WebSocket.Client, args }
]
Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
end
end
defmodule WebSocket.Client do
use GenServer
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
# GenServer callbacks
def init(args) do
# Set up the websocket connection
# get > upgrade
# Initial state with gun_pid and stream_ref
# %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
{:ok, init_ws(args)}
end
# Give back gun_pid from state
def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
IO.inspect(gun_pid, label: "handle call gun pid")
{:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
end
# Everything else
def handle_call(other, from, state) do
IO.inspect(other, label: "other call")
IO.inspect(from, label: "from")
{:ok, state}
end
# Client sends message to server.
def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
IO.puts message
IO.inspect(gun_pid, label: "gun_pid")
:gun.ws_send(gun_pid, {:text, message})
{:noreply, state}
end
def handle_info(message, %{from: from} = state) do
IO.inspect(message, label: "Inside handle_info(): ")
GenServer.reply(from, message)
{:noreply, state}
end
def terminate(reason, _state) do
IO.puts "Terminated due to #{reason}."
:ok
end
def code_change(_old_version, state, _extra) do
{:ok, state}
end
## Client functions
# Used for getting gun_pid from state
def send_sync(request) do
GenServer.call(__MODULE__, request)
end
# Send a message async
def send_async(request) do
GenServer.cast(__MODULE__, {:websocket_request, request})
end
# Receive a single message
def get_message(stream_ref, gun_pid) do
receive do
{^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
IO.puts("Client received gun message: #{message}")
other ->
IO.inspect(other, label: "Client received other message")
end
end
# Receive all messages recursively
def receive_loop(stream_ref, gun_pid) do
IO.puts "Listening"
get_message(stream_ref, gun_pid)
receive_loop(stream_ref, gun_pid)
end
def go() do
# Get the gun_pid from state
%{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
IO.inspect(gun_pid, label: "Inside go(): gun_pid = ")
# Send messages manually
:ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
# Or to send just text
# :ok = send_async("yo")
# Receive messages manually
get_message(stream_ref, gun_pid)
# Start sending loop
spawn sender 1
# Start listening
receive_loop(stream_ref, gun_pid)
end
# Send messages to handle_info() every 3 secs
def sender(count) do
send_async("count is #{count}")
:timer.sleep(3000)
sender(count+1)
end
## End of client functions
# Initialize the websocket connection
def init_ws(args) do
%{ path: path, port: port, host: host} = args
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, gun_pid} = :gun.open(host, port, connect_opts)
{:ok, _protocol} = :gun.await_up(gun_pid)
# Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
receive do
{:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(gun_pid, headers, stream_ref)
{:gun_response, ^gun_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _gun_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
# stop(gun_pid)
end
def set_headers(cookie_value) do
[{"cookie", "my_cookie=#{cookie_value}"}]
end
# This just returns the gun_pid for further reference which gets stored in the GenServer state.
def upgrade_success(gun_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
%{stream_ref: stream_ref, gun_pid: gun_pid}
end
# To stop gun
def stop(gun_pid) do
:gun.shutdown(gun_pid)
end
end
Чтобы использовать это:
iex -S mix
iex> WebSocket.Client.go
Извините, мне пришлось внести некоторые изменения. gen_server:call() создает уникальный реф для идентификации клиента, который gen_server:reply/2 возвращает со своим сообщением. Я игнорировал Ref в предложении получения клиента, но лучше сопоставить этот Ref с образцом, чтобы клиент знал, что сообщение исходит от gen_server, а не от какого-либо другого процесса. Теперь handle_call() возвращает и ConnPid, и ClientRef, что означает, что клиент может использовать ClientRef при получении, чтобы убедиться, что сообщения исходят от gen_server.