Использование клиента Cowboy Websocket для тестирования с помощью Elixir

Во-первых, существует полное отсутствие документации для Cowboy в целом и Websockets в частности, но в целом после расшифровки ее можно использовать отлично. Затем передача этой информации из Erlang в Elixir — это еще один шаг. Благодаря этот пост от 7stud я смог получить функционирующий веб-сокет для целей тестирования, но я не могу заставить его прослушивать и, возможно, отправлять сообщения одновременно. Я думаю, это связано с тем, что получение блокирует поток, который необходим для отправки, и это неразрывно связано с подключением к веб-сокету, поэтому он не может отправлять, пока ожидает получения. Возможно, это понимание ошибочно. Я бы хотел, чтобы меня поправили. Я пытался создать нерест, но безрезультатно, поэтому я думаю, что получение блокирует поток веб-сокета.

def ws do
    localhost = 'localhost'
    path = '/ws/app/1'
    port = 5000

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
    IO.inspect(conn_pid, label: "conn_pid")
    {:ok, protocol} = :gun.await_up(conn_pid)
    IO.inspect(protocol, label: "protocol")
    # Set custom header with cookie for device id
    stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
    IO.inspect(stream_ref, label: "stream_ref")
    receive do
      {:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
              upgrade_success(conn_pid, headers, stream_ref)
      {:gun_response, ^conn_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _conn_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    :ok
  end

  def upgrade_success(conn_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")

    IO.inspect(self(), label: "upgrade self")
    # This one runs and message is received
    run_test(conn_pid)
    # This should spawn and therefore not block
    listen(conn_pid, stream_ref)
    # This never runs
    run_test(conn_pid)
  end

  def listen(conn_pid, stream_ref) do
    spawn receive_messages(conn_pid, stream_ref)
  end
  def receive_messages(conn_pid, stream_ref) do
    IO.inspect conn_pid, label: "conn_pid!"
    IO.inspect stream_ref, label: "stream_ref!"
    IO.inspect(self(), label: "self pid")
    receive do
      {:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
          IO.inspect(msg, label: "Message from websocket server:")
      other_messages ->
        IO.inspect(other_messages, label: "Other messages")
    after 5000 ->
      IO.puts "Receive timed out"
    end
    receive_messages(conn_pid, stream_ref)
  end

  def send_message(message, conn_pid) do
    :gun.ws_send(conn_pid, {:text, message})
  end

  def run_test(conn_pid) do
    IO.puts "Running test"
    message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
    send_message(message, conn_pid)
  end

  def stop(conn_pid) do
    :gun.shutdown(conn_pid)
  end
Я думаю, это связано с тем, что получение блокирует поток, который необходим для отправки, и это неразрывно связано с подключением к веб-сокету, поэтому он не может отправлять, пока ожидает получения. Возможно, это понимание ошибочно. -- Да, я думаю, что это должно быть ошибочно, из-за того, что здесь указано: ninenines.eu/docs/en/cowboy/2.1/guide/ws_protocol. С помощью Websocket клиент и сервер могут отправлять кадры в любое время без каких-либо ограничений.
7stud 21.06.2019 19:20
Мне удалось запустить работающий веб-сокет для целей тестирования, но я не могу заставить его слушать и, возможно, отправлять сообщения одновременно. -- Что означает it в этом предложении? Сервер или клиент?
7stud 21.06.2019 23:21

@7stud У меня уже есть сервер. Сейчас пытаюсь получить рабочий клиент. Под «сервером» я подразумеваю программу, которая ожидает получения соединений через веб-сокеты от различных клиентов.

GenericJam 24.06.2019 09:37

Для всех, кто заинтересован, я закончил это, и вы можете взглянуть на это репо, если вы ищете более полный пример. github.com/GamgeeNL/вебсокет-клиент

GenericJam 14.11.2019 20:25
Альтернативные WebSockets для netty/java: удвоение пропускной способности небольших сообщений
Альтернативные WebSockets для netty/java: удвоение пропускной способности небольших сообщений
Этот пост - краткая презентация netty-websocket-http1 - альтернативной netty/java реализации RFC6455 - протокола WebSocket.
Очень простая установка Websocket с помощью Deno без каких-либо пакетов
Очень простая установка Websocket с помощью Deno без каких-либо пакетов
Здесь мы рассмотрим, как можно использовать Websocket в Deno и развернуть его в Deno deploy. Мы будем слушать Websocket, а также отправлять сообщения.
0
4
1 443
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Из документы на оружие:

Receiving data

Gun sends an Erlang message to the owner process for every Websocket message it receives.

а также:

Connection

...

Gun connections

...

A Gun connection is an Erlang process that manages a socket to a remote endpoint. This Gun connection is owned by a user process that is called the owner of the connection, and is managed by the supervision tree of the gun application.

The owner process communicates with the Gun connection by calling functions from the module gun. All functions perform their respective operations asynchronously. The Gun connection will send Erlang messages to the owner process whenever needed.

Хотя это специально не упоминается в документах, я почти уверен, что процесс владельца — это процесс, который вызывает gun:open(). Мои попытки также показывают, что процесс-владелец должен вызывать gun:ws_send(). Другими словами, процесс-владелец должен как отправлять сообщения на сервер, так и получать сообщения от сервера.

Следующий код управляет пистолетом с gen_server таким образом, что gen_server отправляет сообщения на сервер и получает сообщения с сервера.

Когда пистолет получает сообщение от http-сервера cowboy, он отправляет сообщение, то есть Pid ! Msg, процессу-владельцу. В следующем коде gen_server создает соединение в обратном вызове init/1, а это означает, что пистолет будет стучать (!) сообщениями, которые он получает от ковбоя в gen_server. gen_server обрабатывает сообщения, отправленные непосредственно в его почтовый ящик, с помощью handle_info().

В handle_cast()gen_server использует пистолет, чтобы отправлять запросы ковбою. Поскольку handle_cast() является асинхронным, это означает, что вы можете отправлять ковбою асинхронные сообщения. И когда пушка получает сообщение от ковбоя, пушка отправляет(!) сообщение на gen_server, а функция gen_server handle_info() обрабатывает сообщение. Внутри handle_info() вызывается gen_server:reply/2 для передачи сообщения клиенту gen_server. В результате клиент gen_server может перейти к предложению получения всякий раз, когда он хочет проверить сообщения сервера, отправленные из пистолета.

-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]).  %%% client functions
-export([sender/1]).

%%% client functions
%%%

start_server() ->
    gen_server:start({local, ?MODULE}, ?MODULE, [], []).

send_sync(Requ) ->
    gen_server:call(?MODULE, Requ).

send_async(Requ) -> 
    gen_server:cast(?MODULE, {websocket_request, Requ}).

get_message(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
            io:format("Client received gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end.

receive_loop(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Client received Gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end,
    receive_loop(WebSocketPid, ClientRef).

go() ->
    {ok, GenServerPid} = start_server(),
    io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),

    [{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
    io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),

    ok = send_async("ABCD"),
    get_message(ConnPid, ClientRef),

    spawn(?MODULE, sender, [1]),

    ok = send_async("XYZ"),
    get_message(ConnPid, ClientRef),

    receive_loop(ConnPid, ClientRef).

sender(Count) -> %Send messages to handle_info() every 3 secs
    send_async(lists:concat(["Hello", Count])),
    timer:sleep(3000),
    sender(Count+1).

%%%%%% gen_server callbacks
%%%

init(_Arg) ->
    {ok, {no_client, ws()}}.

handle_call(get_conn_pid, From = {_ClientPid, ClientRef}, _State = {_Client, WebSocketPid}) ->
    io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
    {reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState = {From, WebSocketPid} };
handle_call(stop, _From, State) ->
    {stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
    {ok, State}.

handle_cast({websocket_request, Msg}, State = {_From, WebSocketPid}) ->
    gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
    {noreply, State}.

handle_info(Msg, State = {From, _WebSocketPid}) ->
    io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
    gen_server:reply(From, Msg),
    {noreply, State}.

terminate(_Reason, _State = {_From, WebSocketPid}) -> 
    gun:shutdown(WebSocketPid).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%%% private functions
%%%

ws() ->
    {ok, _} = application:ensure_all_started(gun),
    {ok, ConnPid} = gun:open("localhost", 8080),
    {ok, _Protocol} = gun:await_up(ConnPid),

    gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),

    receive
        {gun_ws_upgrade, ConnPid, ok, Headers} ->
            io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n", 
                      [ConnPid]),
            upgrade_success_handler(ConnPid, Headers);
        {gun_response, ConnPid, _, _, Status, Headers} ->
            exit({ws_upgrade_failed, Status, Headers});
        {gun_error, _ConnPid, _StreamRef, Reason} ->
            exit({ws_upgrade_failed, Reason})
    after 1000 ->
        exit(timeout)
    end.


upgrade_success_handler(ConnPid, _Headers) ->
    io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),  
    ConnPid.

=======

Упс, ответ ниже показывает, как заставить сервер передавать данные клиенту.

Хорошо, я понял - в эрланге. Этот пример немного замучен. Вам нужно сделать пару вещей:

1) Вам нужно получить pid процесса, выполняющего функции websocket_*, который не совпадает с pid запроса:

Post-upgrade initialization

Cowboy has separate processes for handling the connection and requests. Because Websocket takes over the connection, the Websocket protocol handling occurs in a different process than the request handling.

This is reflected in the different callbacks Websocket handlers have. The init/2 callback is called from the temporary request process and the websocket_ callbacks from the connection process.

This means that some initialization cannot be done from init/2. Anything that would require the current pid, or be tied to the current pid, will not work as intended. The optional websocket_init/1 can be used [to get the pid of the process running the websocket_ callbacks]:

https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/

Вот код, который я использовал:

init(Req, State) ->
    {cowboy_websocket, Req, State}.  %Perform websocket setup

websocket_init(State) ->
    io:format("[ME]: Inside websocket_init"),
    spawn(?MODULE, push, [self(), "Hi, there"]),
    {ok, State}.

push(WebSocketHandleProcess, Greeting) ->
    timer:sleep(4000),
    WebSocketHandleProcess ! {text, Greeting}.

websocket_handle({text, Msg}, State) ->
    timer:sleep(10000), %Don't respond to client request just yet.
    {
     reply, 
     {text, io_lib:format("Server received: ~s", [Msg]) },
     State
    };
websocket_handle(_Other, State) ->  %Ignore
    {ok, State}.

Это отправит сообщение клиенту, пока клиент ожидает ответа на запрос, который клиент ранее отправил на сервер.

2) Если вы отправляете сообщение процессу, выполняющему функции websocket_*:

Pid ! {text, Msg}

тогда это сообщение будет обработано функцией websocket_info(), а не функцией websocket_handle():

websocket_info({text, Text}, State) ->
    {reply, {text, Text}, State};
websocket_info(_Other, State) ->
    {ok, State}.

Возвращаемое значение функции websocket_info() работает так же, как и возвращаемое значение функции websocket_handle().

Поскольку ваш клиент оружия теперь получает несколько сообщений, клиент оружия должен получать в цикле:

upgrade_success_handler(ConnPid, Headers) ->
    io:format("Upgraded ~w. Success!~nHeaders:~n~p~n", 
              [ConnPid, Headers]),

    gun:ws_send(ConnPid, {text, "It's raining!"}),

    get_messages(ConnPid).  %Move the receive clause into a recursive function

get_messages(ConnPid) ->
    receive
        {gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
            io:format("~s~n", [Greeting]),
            get_messages(ConnPid);

        {gun_ws, ConnPid, {text, Msg} } ->
            io:format("~s~n", [Msg]),
            get_messages(ConnPid)
    end.

Спасибо! Правки отражены в моем ответе.

GenericJam 26.06.2019 11:35

@GenericJam, хороший перевод! Если вы не возражаете, отредактируйте свой вопрос и добавьте теги erlang и gun — это облегчит поиск информации для эрлангистов.

7stud 26.06.2019 20:01

Я добавил erlang и erlang-gun, так как gun относится к базе данных оружия. Я удалил cowboy, так как это не строго ковбойский стиль, и мне пришлось оставить его до 5 тегов.

GenericJam 27.06.2019 11:34
Ответ принят как подходящий

Спасибо 7stud за пример кода и правки, которые отражены ниже:

Вот моя интерпретация Эликсира, чтобы дать базовый клиент WebSocket для оружия:

defmodule WebsocketTester.Application do

  use Application

  def start(_type, _args) do

    path = '/ws/app/1'

    port = 5000

    host = 'localhost'

    args = %{path: path, port: port, host: host}

    children = [
      { WebSocket.Client, args }
    ]
    Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
  end
end

defmodule WebSocket.Client do

  use GenServer

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  # GenServer callbacks

  def init(args) do
    # Set up the websocket connection
    # get > upgrade
    # Initial state with gun_pid and stream_ref
    # %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
    {:ok, init_ws(args)}
  end

  # Give back gun_pid from state
  def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
    IO.inspect(gun_pid, label: "handle call gun pid")
    {:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
  end
  # Everything else
  def handle_call(other, from, state) do
    IO.inspect(other, label: "other call")
    IO.inspect(from, label: "from")
    {:ok, state}
  end
  # Client sends message to server.
  def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
    IO.puts message
    IO.inspect(gun_pid, label: "gun_pid")
    :gun.ws_send(gun_pid, {:text, message})
    {:noreply, state}
  end

  def handle_info(message, %{from: from} = state) do
    IO.inspect(message, label: "Inside handle_info(): ")
    GenServer.reply(from, message)
    {:noreply, state}
  end

  def terminate(reason, _state) do
    IO.puts "Terminated due to #{reason}."
    :ok
  end


  def code_change(_old_version, state, _extra) do
    {:ok, state}
  end

  ## Client functions
  # Used for getting gun_pid from state
  def send_sync(request) do
    GenServer.call(__MODULE__, request)
  end

  # Send a message async
  def send_async(request) do
    GenServer.cast(__MODULE__, {:websocket_request, request})
  end

  # Receive a single message
  def get_message(stream_ref, gun_pid) do
      receive do
          {^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
              IO.puts("Client received gun message: #{message}")
          other ->
            IO.inspect(other, label: "Client received other message")
      end
  end

  # Receive all messages recursively
  def receive_loop(stream_ref, gun_pid) do
    IO.puts "Listening"
      get_message(stream_ref, gun_pid)
      receive_loop(stream_ref, gun_pid)
  end

  def go() do
    # Get the gun_pid from state
    %{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
    IO.inspect(gun_pid, label: "Inside go(): gun_pid = ")
    # Send messages manually
    :ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
    # Or to send just text
    # :ok = send_async("yo")

    # Receive messages manually
    get_message(stream_ref, gun_pid)

    # Start sending loop
    spawn sender 1

    # Start listening
    receive_loop(stream_ref, gun_pid)
  end

  # Send messages to handle_info() every 3 secs
  def sender(count) do
      send_async("count is #{count}")
      :timer.sleep(3000)
      sender(count+1)
  end

  ## End of client functions

  # Initialize the websocket connection
  def init_ws(args) do

    %{ path: path, port: port, host: host} = args

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, gun_pid} = :gun.open(host, port, connect_opts)
    {:ok, _protocol} = :gun.await_up(gun_pid)
    # Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
    stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
    receive do
      {:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
            upgrade_success(gun_pid, headers, stream_ref)
      {:gun_response, ^gun_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _gun_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    # stop(gun_pid)
  end


  def set_headers(cookie_value) do
    [{"cookie", "my_cookie=#{cookie_value}"}]
  end

  # This just returns the gun_pid for further reference which gets stored in the GenServer state.
  def upgrade_success(gun_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
    %{stream_ref: stream_ref, gun_pid: gun_pid}
  end

  # To stop gun
  def stop(gun_pid) do
    :gun.shutdown(gun_pid)
  end

end

Чтобы использовать это:

iex -S mix
iex> WebSocket.Client.go

Извините, мне пришлось внести некоторые изменения. gen_server:call() создает уникальный реф для идентификации клиента, который gen_server:reply/2 возвращает со своим сообщением. Я игнорировал Ref в предложении получения клиента, но лучше сопоставить этот Ref с образцом, чтобы клиент знал, что сообщение исходит от gen_server, а не от какого-либо другого процесса. Теперь handle_call() возвращает и ConnPid, и ClientRef, что означает, что клиент может использовать ClientRef при получении, чтобы убедиться, что сообщения исходят от gen_server.

7stud 25.06.2019 20:24

Другие вопросы по теме