diff --git a/big_tests/tests/traffic_monitor_SUITE.erl b/big_tests/tests/traffic_monitor_SUITE.erl new file mode 100644 index 00000000000..5d17f284616 --- /dev/null +++ b/big_tests/tests/traffic_monitor_SUITE.erl @@ -0,0 +1,135 @@ +-module(traffic_monitor_SUITE). +-compile([export_all, nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("escalus/include/escalus_xmlns.hrl"). +-include_lib("exml/include/exml.hrl"). +-include_lib("jid/include/jid.hrl"). + +%%-------------------------------------------------------------------- +%% Suite configuration +%%-------------------------------------------------------------------- + + +all() -> + [ + {group, regular} + ]. + +groups() -> + [{regular, [], [get_page, tracing_from_websocket]}]. + +suite() -> + escalus:suite(). + +init_per_suite(Config) -> + mongoose_helper:inject_module(?MODULE), + escalus:init_per_suite(Config). + +end_per_suite(Config) -> + escalus:end_per_suite(Config). + +init_per_group(GroupName, Config) when GroupName =:= regular; GroupName =:= async_pools -> + HostType = domain_helper:host_type(), + SecHostType = domain_helper:secondary_host_type(), + Config1 = dynamic_modules:save_modules(HostType, Config), + Config2 = dynamic_modules:save_modules(SecHostType, Config1), + ok = dynamic_modules:ensure_modules(HostType, [{mongoose_traffic, #{}}]), + init_listeners(), + Config2. + +init_per_testcase(get_page, Config) -> + {ok, Client} = fusco:start("http://localhost:5111", []), + init_per_testcase(generic, [{http_client, Client} | Config]); +init_per_testcase(tracing_from_websocket, Config) -> + {ok, Wsc} = gun:open("localhost", 5111, #{transport => tcp, protocols => [http], retry => 1}), + StreamRef = gun:ws_upgrade(Wsc, "/ws-traffic", [], #{}), + receive + {gun_upgrade, Wsc, StreamRef, [<<"websocket">>], _} -> ok + after 1000 -> + ct:fail("gun did not fire") + end, + init_per_testcase(generic, [{ws_client, {Wsc, StreamRef}} | Config]); +init_per_testcase(CaseName, Config) -> + escalus:init_per_testcase(CaseName, Config). + +end_per_testcase(get_page, Config) -> + C = proplists:get_value(http_client, Config), + fusco:disconnect(C), + end_per_testcase(generic, Config); +end_per_testcase(tracing_from_websocket, Config) -> + {C, _} = proplists:get_value(ws_client, Config), + gun:close(C), + end_per_testcase(generic, Config); +end_per_testcase(CaseName, Config) -> + escalus:end_per_testcase(CaseName, Config). + +end_per_group(GroupName, Config) when GroupName =:= regular -> + escalus_fresh:clean(), + dynamic_modules:restore_modules(Config). + +get_page(Config) -> + % just to make sure listener works + C = proplists:get_value(http_client, Config), + {ok, Res} = fusco:request(C, <<"/traffic">>, <<"GET">>, [], [], 5000), + {{<<"200">>, <<"OK">>}, _, _, _, _} = Res, + ok. + +tracing_from_websocket(Config) -> + C = proplists:get_value(ws_client, Config), + send(C, <<"get_status">>, #{}), + receive_status(false), + send(C, <<"trace_flag">>, #{<<"value">> => true}), + receive_status(true), + send(C, <<"get_status">>, #{}), + receive_status(true), + escalus:fresh_story(Config, [{carol, 1}], fun(Alice) -> + {<<"new_trace">>, #{<<"bare_jid">> := <<>>}} = receive_msg(), + {<<"new_trace">>, #{<<"full_jid">> := <<>>, <<"pid">> := BPid}} = receive_msg(), + send(C, <<"get_trace">>, #{<<"pid">> => BPid}), + {<<"get_trace">>, Trace} = receive_msg(), + [_ | _] = maps:get(<<"trace">>, Trace), + Server = escalus_client:server(Alice), + escalus:send(Alice, escalus_stanza:to(escalus_stanza:iq_get(?NS_DISCO_INFO, []), Server)), + {<<"message">>, _} = receive_msg(), + ok + end), + ok. + +init_listeners() -> + Opts = #{connection_type => undefined, + handlers => + [#{host => '_',module => mongoose_traffic, path => "/traffic/[...]"}, + #{host => '_',module => mongoose_traffic_channel, path => "/ws-traffic"}], + ip_address => "0", + ip_tuple => {0,0,0,0}, + ip_version => 4, + module => ejabberd_cowboy, + port => 5111, + proto => tcp, + protocol => #{compress => false}, + transport => #{max_connections => 1024,num_acceptors => 2}}, + distributed_helper:rpc(distributed_helper:mim(), ejabberd_cowboy, + start_listener, [Opts]), + ok. + +receive_status(Expected) -> + {Evt, Data} = receive_msg(), + ?assertEqual(<<"status">>, Evt), + ?assertEqual(Expected, maps:get(<<"trace_flag">>, Data, missing)). + +receive_msg() -> + receive + {gun_ws, _, _, {text, Bin}} -> + {Data} = jiffy:decode(Bin), + Event = proplists:get_value(<<"event">>, Data), + {Payload} = proplists:get_value(<<"payload">>, Data), + {Event, maps:from_list(Payload)} + after 100 -> + ct:fail("message not received") + end. + +send({C, Stream}, Event, Payload) -> + Data = #{event => Event, payload => Payload}, + gun:ws_send(C, Stream, {text, jiffy:encode(Data)}). \ No newline at end of file diff --git a/doc/operation-and-maintenance/Logging-&-monitoring.md b/doc/operation-and-maintenance/Logging-&-monitoring.md index 2105d327200..a4022823ff4 100644 --- a/doc/operation-and-maintenance/Logging-&-monitoring.md +++ b/doc/operation-and-maintenance/Logging-&-monitoring.md @@ -146,3 +146,9 @@ Parts of the names are indexed from `0`. Time-based metrics in MongooseIM are given in **microseconds**, so to display human-readable values in graph's legend, the Y-axis unit has to be edited on the `Axes` tab. [Logger]: https://erlang.org/doc/apps/kernel/logger_chapter.html#handlers + +## Traffic monitor + +For debugging purposes, especially while developing MIM additional modules or client applications, +you may want to use browser-based traffic monitor. It is downloaded as a dependency in installed +in /web/traffic subdirectory. Consult README for details. \ No newline at end of file diff --git a/rebar.config b/rebar.config index bb806f3b31c..5000b0b9e14 100644 --- a/rebar.config +++ b/rebar.config @@ -118,7 +118,8 @@ {jwerl, "1.2.0"}, {cpool, "0.1.0"}, %% Do not upgrade cpool to version 0.1.1, it has bugs {nkpacket, {git, "https://github.com/esl/nkpacket.git", {branch, "mongooseim-ranch-compatibility"}}}, - {nksip, {git, "https://github.com/arcusfelis/nksip.git", {branch, "mu-fix-dialyzer"}}} + {nksip, {git, "https://github.com/arcusfelis/nksip.git", {branch, "mu-fix-dialyzer"}}}, + {traffic_monitor, {git, "https://github.com/bartekgorny/mongooseim_traffic_monitor.git", {branch, "master"}}} ]}. {relx, [{release, { mongooseim, {cmd, "cat VERSION | tr -d '\r\n'"} }, @@ -147,6 +148,8 @@ {copy, "rel/files/scripts", "./"}, {copy, "rel/files/templates", "./"}, {copy, "rel/files/templates.ini", "etc/templates.ini"}, + {mkdir, "web"}, + {copy, "_build/default/lib/traffic_monitor", "web/traffic"}, {template, "rel/files/nodetool", "erts-\{\{erts_vsn\}\}/bin/nodetool"}, diff --git a/rebar.lock b/rebar.lock index d71df625fb2..fdc74461e42 100644 --- a/rebar.lock +++ b/rebar.lock @@ -125,6 +125,10 @@ {<<"thoas">>,{pkg,<<"thoas">>,<<"1.0.0">>},2}, {<<"tirerl">>,{pkg,<<"tirerl">>,<<"1.2.0">>},0}, {<<"tomerl">>,{pkg,<<"tomerl">>,<<"0.5.0">>},0}, + {<<"traffic_monitor">>, + {git,"https://github.com/bartekgorny/mongooseim_traffic_monitor.git", + {ref,"f80a45dad104fd72eed7096c788962dab7139fea"}}, + 0}, {<<"trails">>,{pkg,<<"trails">>,<<"2.3.0">>},0}, {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.7.0">>},1}, {<<"uuid">>,{pkg,<<"uuid_erl">>,<<"2.0.5">>},0}, diff --git a/rel/fed1.vars-toml.config b/rel/fed1.vars-toml.config index de3a8cb9629..c8ae8671af5 100644 --- a/rel/fed1.vars-toml.config +++ b/rel/fed1.vars-toml.config @@ -12,6 +12,7 @@ {http_graphql_api_user_endpoint_port, 5566}. {http_api_endpoint_port, 5294}. {http_api_client_endpoint_port, 8095}. +{traffic_channel_port, 5115}. %% This node is for s2s testing. %% "localhost" host should NOT be defined. diff --git a/rel/files/mongooseim.toml b/rel/files/mongooseim.toml index 8e9a95c4cce..4e62db21684 100644 --- a/rel/files/mongooseim.toml +++ b/rel/files/mongooseim.toml @@ -38,6 +38,18 @@ host = "_" path = "/ws-xmpp" +# [[listen.http]] +# port = {{traffic_channel_port}} +# transport.num_acceptors = 2 +# +# [[listen.http.handlers.mongoose_traffic_channel]] +# host = "_" +# path = "/ws-traffic" +# +# [[listen.http.handlers.mongoose_traffic]] +# host = "_" +# path = "/traffic/[...]" + [[listen.http]] port = {{{https_port}}} transport.num_acceptors = 10 @@ -197,6 +209,8 @@ {{/service_domain_db}} [modules.mod_adhoc] +# [modules.mongoose_traffic] + {{#mod_amp}} [modules.mod_amp] {{{mod_amp}}} diff --git a/rel/mim1.vars-toml.config b/rel/mim1.vars-toml.config index ad4ca503646..97194b0ce69 100644 --- a/rel/mim1.vars-toml.config +++ b/rel/mim1.vars-toml.config @@ -16,6 +16,7 @@ {http_graphql_api_user_endpoint_port, 5561}. {http_api_endpoint_port, 8088}. {http_api_client_endpoint_port, 8089}. +{traffic_channel_port, 5111}. {hosts, "\"localhost\", \"anonymous.localhost\", \"localhost.bis\""}. {host_types, "\"test type\", \"dummy auth\", \"anonymous\""}. diff --git a/rel/mim2.vars-toml.config b/rel/mim2.vars-toml.config index 5dba2d4e757..95514bd00ea 100644 --- a/rel/mim2.vars-toml.config +++ b/rel/mim2.vars-toml.config @@ -14,6 +14,7 @@ {http_graphql_api_admin_endpoint_port, 5552}. {http_graphql_api_domain_admin_endpoint_port, 5542}. {http_graphql_api_user_endpoint_port, 5562}. +{traffic_channel_port, 5112}. {hosts, "\"localhost\", \"anonymous.localhost\", \"localhost.bis\""}. {host_types, "\"test type\", \"dummy auth\""}. diff --git a/rel/mim3.vars-toml.config b/rel/mim3.vars-toml.config index f69c5d4b33e..aeb75b44f84 100644 --- a/rel/mim3.vars-toml.config +++ b/rel/mim3.vars-toml.config @@ -14,6 +14,7 @@ {http_graphql_api_user_endpoint_port, 5563}. {http_api_endpoint_port, 8092}. {http_api_client_endpoint_port, 8193}. +{traffic_channel_port, 5113}. "./vars-toml.config". diff --git a/rel/reg1.vars-toml.config b/rel/reg1.vars-toml.config index 62dff69727a..27bd14bfe70 100644 --- a/rel/reg1.vars-toml.config +++ b/rel/reg1.vars-toml.config @@ -13,6 +13,7 @@ {http_graphql_api_user_endpoint_port, 5564}. {http_api_endpoint_port, 8074}. {http_api_client_endpoint_port, 8075}. +{traffic_channel_port, 5114}. %% This node is for global distribution testing. %% reg is short for region. diff --git a/src/c2s/mongoose_c2s.erl b/src/c2s/mongoose_c2s.erl index 90abd45dcfb..4b5c0ca981f 100644 --- a/src/c2s/mongoose_c2s.erl +++ b/src/c2s/mongoose_c2s.erl @@ -239,6 +239,10 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) -> NextEvent = {next_event, internal, #xmlstreamerror{name = iolist_to_binary(Reason)}}, {keep_state, StateData, NextEvent}; {ok, NewParser, XmlElements} -> + lists:foreach(fun(Element) -> + mongoose_hooks:c2s_debug(no_acc, + {client_to_server, StateData#c2s_data.jid, Element}) + end, XmlElements), Size = iolist_size(Packet), NewStateData = StateData#c2s_data{parser = NewParser}, handle_socket_elements(NewStateData, XmlElements, Size) @@ -1009,9 +1013,11 @@ maybe_send_xml(StateData, Acc, ToSend) -> -spec do_send_element(data(), mongoose_acc:t(), exml:element()) -> mongoose_acc:t(). do_send_element(StateData = #c2s_data{host_type = undefined}, Acc, El) -> + mongoose_hooks:c2s_debug(Acc, {server_to_client, StateData#c2s_data.jid, El}), send_xml(StateData, El), Acc; do_send_element(StateData = #c2s_data{host_type = HostType}, Acc, #xmlel{} = El) -> + mongoose_hooks:c2s_debug(Acc, {server_to_client, StateData#c2s_data.jid, El}), Res = send_xml(StateData, El), Acc1 = mongoose_acc:set(c2s, send_result, Res, Acc), mongoose_hooks:xmpp_send_element(HostType, Acc1, El). diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index 5776621a288..05ca8e1bb3a 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -47,6 +47,7 @@ -export([get_pep_recipients/2, filter_pep_recipient/3, c2s_stream_features/3, + c2s_debug/2, check_bl_c2s/1, forbidden_session_hook/3, session_opening_allowed_for_user/2]). @@ -535,6 +536,12 @@ sasl2_start(HostType, Acc, Element) -> sasl2_success(HostType, Acc, Params) -> run_hook_for_host_type(sasl2_success, HostType, Acc, Params). +-spec c2s_debug(Acc, Arg) -> mongoose_acc:t() when + Acc :: mongoose_acc:t() | no_acc, + Arg :: mongoose_debug:debug_entry(). +c2s_debug(Acc, Arg) -> + run_global_hook(c2s_debug, Acc, #{arg => Arg}). + -spec check_bl_c2s(IP) -> Result when IP :: inet:ip_address(), Result :: boolean(). diff --git a/src/mongoose_debug.erl b/src/mongoose_debug.erl new file mode 100644 index 00000000000..200bdece83f --- /dev/null +++ b/src/mongoose_debug.erl @@ -0,0 +1,67 @@ +-module(mongoose_debug). + +%% The most simple use case possible is: +%% - add [modules.mongoose_debug] to mongooseim.toml +%% - from erlang shell, run recon_trace:calls([{mongoose_debug, traffic, '_'}], 100, [{scope, local}]). +%% - watch all the traffic coming in and out + +-behaviour(gen_mod). + +-include("mongoose.hrl"). +-include("jlib.hrl"). +-include_lib("exml/include/exml_stream.hrl"). + +-type debug_entry() :: {client_to_server, jid:jid() | undefined, exml:element()}| {server_to_client, jid:jid(), exml:element()}. +-type direction() :: client_to_server | server_to_client. +-export_type([debug_entry/0, direction/0]). + +%% API +-export([start/2, stop/1]). +-export([trace_traffic/3]). +-export([supported_features/0]). + +-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. +start(Host, _Opts) -> + gen_hook:add_handlers(hooks(Host)), + ok. + +-spec stop(mongooseim:host_type()) -> ok. +stop(Host) -> + gen_hook:delete_handlers(hooks(Host)), + ok. + +hooks(_Host) -> + [{c2s_debug, global, fun ?MODULE:trace_traffic/3, #{}, 50}]. + + +-spec trace_traffic(mongoose_acc:t(), #{arg => debug_entry()}, term()) -> + {ok, mongoose_acc:t()}. +trace_traffic(Acc, #{arg := {client_to_server, From, El}}, _) -> + Sfrom = binary_to_list(maybe_jid_to_binary(From)), + Sto = binary_to_list(get_attr(El, <<"to">>)), + St = exml:to_binary(El), + Marker = " C >>>> MiM ", + traffic(Sfrom, Marker, Sto, St), + {ok, Acc}; +trace_traffic(Acc, #{arg := {server_to_client, To, El}}, _) -> + Sto = binary_to_list(maybe_jid_to_binary(To)), + Sfrom = binary_to_list(get_attr(El, <<"from">>)), + St = exml:to_binary(El), + Marker = " C <<<< MiM ", + traffic(Sfrom, Marker, Sto, St), + {ok, Acc}. + +traffic(_Sender, _Marker, _Recipient, _Stanza) -> ok. + +-spec supported_features() -> [atom()]. +supported_features() -> [dynamic_domains]. + +maybe_jid_to_binary(undefined) -> <<" ">>; +maybe_jid_to_binary(J) -> jid:to_binary(J). + +get_attr(#xmlstreamstart{attrs = AttrList}, AttrName) -> + proplists:get_value(AttrName, AttrList, <<" ">>); +get_attr(#xmlstreamend{}, _) -> + <<" ">>; +get_attr(El, AttrName) -> + exml_query:attr(El, AttrName, <<" ">>). diff --git a/src/mongoose_traffic.erl b/src/mongoose_traffic.erl new file mode 100644 index 00000000000..c601fb7e487 --- /dev/null +++ b/src/mongoose_traffic.erl @@ -0,0 +1,124 @@ +-module(mongoose_traffic). + +-behaviour(gen_mod). +-behaviour(gen_server). + +-include("mongoose.hrl"). +-include("jlib.hrl"). + +%% gen_mod API +-export([start/2, stop/1]). +-export([supported_features/0]). +%% hook handler +-export([trace_traffic/3]). +%% gen_server +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +%% cowboy handler for serving main page +-export([init/2]). +-ignore_xref([init/2]). + +-define(SERVER, ?MODULE). +-type state() :: [pid()]. + +-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. +start(HostType, Opts) -> + gen_hook:add_handlers(hooks(HostType)), + case maps:get(standalone, Opts, false) of + true -> + gen_server:start_link(?MODULE, [], []); + false -> + case whereis(?MODULE) of + undefined -> + Traffic = {mongoose_traffic, + {gen_server, start_link, [?MODULE, [], []]}, + permanent, 1000, supervisor, [?MODULE]}, + % there has to be another layer + % channel will set up its own traces, someone has to watch and distribute stanzas + ejabberd_sup:start_child(Traffic); + _ -> + ok + end + end. + +-spec stop(mongooseim:host_type()) -> ok. +stop(Host) -> + gen_hook:delete_handlers(hooks(Host)), + supervisor:terminate_child(ejabberd_sup, ?MODULE), + supervisor:delete_child(ejabberd_sup, ?MODULE), + ok. + +hooks(_HostType) -> + [{c2s_debug, global, fun ?MODULE:trace_traffic/3, #{}, 50}]. + +-spec supported_features() -> [atom()]. +supported_features() -> [dynamic_domains]. + +-spec trace_traffic(mongoose_acc:t(), #{arg => mongoose_debug:debug_entry()}, term()) -> + {ok, mongoose_acc:t()}. +trace_traffic(Acc, #{arg := {client_to_server, From, El}}, _) -> + traffic(client_to_server, From, El), + {ok, Acc}; +trace_traffic(Acc, #{arg := {server_to_client, To, El}}, _) -> + traffic(server_to_client, To, El), + {ok, Acc}. + +-spec traffic(mongoose_debug:direction(), jid:jid(), exml:element()) -> + ok. +traffic(Dir, Jid, El) -> + St = iolist_to_binary(fix_and_format(El)), + UserSessionPid = self(), + gen_server:cast(?MODULE, {message, Dir, UserSessionPid, Jid, St}), + ok. + +-spec init(term()) -> {ok, state()}. +init([]) -> + register(?MODULE, self()), + {ok, []}. + +handle_call({register, Pid}, _From, State) -> + monitor(process, Pid), + {reply, ok, [Pid | State]}; +handle_call({unregister, Pid}, _From, State) -> + {reply, ok, lists:delete(Pid, State)}; +handle_call(_, _, State) -> + {reply, ok, State}. + +handle_cast({message, _, _, _, _} = Msg, State) -> + lists:map(fun(Pid) -> Pid ! Msg end, State), + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, _}, State) -> + {noreply, lists:delete(Pid, State)}. + +-spec init(cowboy_req:req(), term()) -> + {ok, cowboy_req:req(), term()}. +init(Req, State) -> + {ok, Cwd} = file:get_cwd(), + Base = Cwd ++ "/web/traffic", + File = case cowboy_req:path_info(Req) of + [] -> "session.html"; + P -> filename:join(P) + end, + Path = filename:join(Base, File), + Size = filelib:file_size(Path), + Req1 = cowboy_req:reply(200, + #{}, + {sendfile, 0, Size, Path}, Req), + {ok, Req1, State}. + +fix_and_format(El) when is_binary(El) -> + El; +fix_and_format({xmlstreamend, _}) -> + <<"">>; +fix_and_format({Tag, Name}) -> + exml:to_pretty_iolist({Tag, Name, []}); +fix_and_format({Tag, Name, Attrs}) -> + exml:to_pretty_iolist({Tag, Name, fix_attrs(Attrs)}); +fix_and_format({Tag, Name, Attrs, Children}) -> + exml:to_pretty_iolist({Tag, Name, fix_attrs(Attrs), Children}). + +fix_attrs(Attrs) -> + lists:filter(fun is_defined/1, Attrs). + +is_defined({_, undefined}) -> false; +is_defined({_, _}) -> true. diff --git a/src/mongoose_traffic_channel.erl b/src/mongoose_traffic_channel.erl new file mode 100644 index 00000000000..1d6e00b9651 --- /dev/null +++ b/src/mongoose_traffic_channel.erl @@ -0,0 +1,268 @@ +%%%=================================================================== +%%% @copyright (C) 2016, Erlang Solutions Ltd. +%%% @doc Module providing support for websockets in MongooseIM +%%% @end +%%%=================================================================== +-module(mongoose_traffic_channel). + +-behaviour(cowboy_websocket). + +%% cowboy_http_websocket_handler callbacks +-export([init/2, + websocket_init/1, + websocket_handle/2, + websocket_info/2, + terminate/3]). + +-include("mongoose.hrl"). +-include("jlib.hrl"). + +-define(LISTENER, ?MODULE). +-define(MAX_ITEMS, 500). +-define(MAX_TRACED, 100). + +-type str_pid() :: binary(). % representation of traced user's session pid +-type str_stanza() :: binary(). + +-record(state, {traces = #{}, + tracing = false, + current = <<>>, + mappings = #{}, + start_times = #{}}). + +-type state() :: #state{traces :: #{str_pid() => queue:queue()}, + tracing :: boolean(), + current :: str_pid(), + mappings :: #{str_pid() => jid:jid()}, + start_times :: #{str_pid() => float()}}. + +%%-------------------------------------------------------------------- +%% Common callbacks for all cowboy behaviours +%%-------------------------------------------------------------------- + +-spec init(cowboy_req:req(), proplists:proplist()) -> + {cowboy_websocket, cowboy_req:req(), proplists:proplist(), map()}. +init(Req, Opts) -> + Peer = cowboy_req:peer(Req), + PeerCert = cowboy_req:cert(Req), + ?LOG_DEBUG("cowboy init: ~p~n", [{Req, Opts}]), + AllModOpts = [{peer, Peer}, {peercert, PeerCert} | Opts], + %% upgrade protocol + {cowboy_websocket, Req, AllModOpts, #{}}. + +terminate(_Reason, _Req, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% cowboy_http_websocket_handler callbacks +%%-------------------------------------------------------------------- + +% Called for every new websocket connection. +websocket_init(Opts) -> + ?LOG_DEBUG("websocket_init: ~p~n", [Opts]), + gen_server:call(mongoose_traffic, {register, self()}), + {ok, #state{}}. + +% Called when a text message arrives. +websocket_handle({text, Msg}, State) -> + case handle(jiffy:decode(Msg), State) of + {Event, State1} -> + {reply, reply(Event), State1}; + {Event, Payload, State1} -> + {reply, reply(Event, Payload), State1} + end; + +websocket_handle({binary, Msg}, State) -> + ?LOG_DEBUG("Received binary: ~p", [Msg]), + {ok, State}; + +% With this callback we can handle other kind of +% messages, like binary. +websocket_handle(Any, State) -> + ?LOG_DEBUG("Received non-text: ~p", [Any]), + {ok, State}. + +% Other messages from the system are handled here. +-spec websocket_info({message, + mongoose_debug:direction(), + pid(), + jid:jid(), + str_stanza()}, + state()) -> + {ok | stop, state()}. +websocket_info({message, _Dir, _P, _J, _Stanza}, #state{tracing = false} = State) -> + {ok, State}; +websocket_info({message, Dir, Pid, Jid, Stanza} = Message, State) -> + Spid = pid_to_binary(Pid), + Now = now_seconds(), + {Traces1, Mappings, IsNewMapping} = record_item(Now, + Dir, + Spid, Jid, + Stanza, + State#state.traces, + State#state.mappings), + State1 = State#state{traces = Traces1}, + State2 = State1#state{mappings = Mappings}, + State3 = maybe_store_start_time(Spid, Now, State2), + case maps:size(Traces1) of + N when N > ?MAX_TRACED -> + force_stop_tracing(State1); + _ -> + maybe_send_to_user(Now, IsNewMapping, Message, State3) + end; +websocket_info(stop, State) -> + {stop, State}; +websocket_info(Info, State) -> + ?LOG_DEBUG("unknown info: ~p", [Info]), + {ok, State}. + +force_stop_tracing(State) -> + State1 = State#state{tracing = false}, + M = reply(<<"error">>, #{<<"reason">> => <<"too_many_traced_procs">>}), + {reply, M, State1}. + +maybe_send_to_user(Now, IsNewMapping, {message, Dir, Pid, Jid, Stanza}, State) -> + Spid = pid_to_binary(Pid), + Announcement = maybe_announce_new(IsNewMapping, Now, Spid, Jid), + Msg = maybe_send_current(Now, Dir, Spid, Stanza, State), + {reply, Announcement ++ Msg, State}. + +maybe_announce_new(true, StartTime, Spid, Jid) -> + {BareJid, FullJid} = case classify_jid(Jid) of + empty -> {<<>>, <<>>}; + bare -> {<<>>, jid:to_bare_binary(Jid)}; + full -> {jid:to_binary(Jid), <<>>} + end, + [reply(<<"new_trace">>, + #{<<"pid">> => Spid, + <<"start_time">> => StartTime, + <<"bare_jid">> => BareJid, + <<"full_jid">> => FullJid})]; +maybe_announce_new(false, _, _, _) -> + []. + +maybe_send_current(Now, Dir, Spid, Stanza, State) -> + case is_current(Spid, State) of + true -> + Tm = Now - maps:get(Spid, State#state.start_times), + M = reply(<<"message">>, #{<<"dir">> => atom_to_binary(Dir, utf8), + <<"time">> => Tm, + <<"stanza">> => Stanza + }), + [M]; + false -> + [] + end. + + +handle({Json}, State) -> + M = maps:from_list(Json), + handle(maps:get(<<"event">>, M), maps:get(<<"payload">>, M), State). + +handle(<<"get_status">>, _, State) -> + return_status(State); +handle(<<"trace_flag">>, {Payload}, State) -> + #{<<"value">> := Flag} = maps:from_list(Payload), + return_status(State#state{tracing = Flag}); +handle(<<"get_trace">>, {Payload}, State) -> + #{<<"pid">> := Pid} = maps:from_list(Payload), + {<<"get_trace">>, + #{<<"pid">> => Pid, <<"trace">> => format_trace(maps:get(Pid, State#state.traces, []), + maps:get(Pid, State#state.start_times))}, + State#state{current = Pid}}; +handle(<<"clear_all">>, _, State) -> + {<<"cleared_all">>, + State#state{traces = #{}, current = <<>>, mappings = #{}, start_times = #{}}}; +handle(<<"heartbeat">>, _, State) -> + {<<"heartbeat_ok">>, + <<>>, + State}; +handle(Event, Payload, State) -> + ?LOG_WARNING(#{what => unknown_event, + text => <<"Traffic monitor sent something I don't understand">>, + event => Event, payload => Payload}), + {<<"error">>, <<"unknown event">>, State}. + + +return_status(State) -> + {<<"status">>, + #{<<"trace_flag">> => State#state.tracing}, + State}. + +reply(Event) -> + reply(Event, #{}). + +reply(Event, Payload) -> + {text, jiffy:encode(#{<<"event">> => Event, <<"payload">> => Payload})}. + + + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec is_current(str_pid(), state()) -> boolean(). +is_current(J, #state{current = J}) -> true; +is_current(_, _) -> false. + +record_item(Time, Dir, Spid, Jid, Stanza, Traces, Mappings) -> + Tr = case maps:get(Spid, Traces, undefined) of + undefined -> + queue:new(); + Q -> Q + end, + IsNew = is_new_mapping(maps:get(Spid, Mappings, undefined), Jid), + Mappings1 = case IsNew of + true -> + maps:put(Spid, Jid, Mappings); + false -> + Mappings + end, + Tr1 = queue:in({Time, Dir, Stanza}, Tr), + Tr2 = case queue:len(Tr1) of + ?MAX_ITEMS -> queue:out(Tr1); + _ -> Tr1 + end, + {maps:put(Spid, Tr2, Traces), Mappings1, IsNew}. + +format_trace([], _StartTime) -> + []; +format_trace(Trace, StartTime) -> + lists:map(fun({Time, Dir, Stanza}) -> + #{<<"dir">> => atom_to_binary(Dir, utf8), + <<"time">> => Time - StartTime, + <<"stanza">> => Stanza} + end, + lists:reverse(queue:to_list(Trace))). + +-spec pid_to_binary(pid()) -> str_pid(). +pid_to_binary(Pid) when is_pid(Pid) -> + [Spid] = io_lib:format("~p", [Pid]), + list_to_binary(Spid). + +-spec classify_jid(binary() | jid:jid()) -> empty | bare | full. +classify_jid(Bin) when is_binary(Bin) -> classify_jid(jid:from_binary(Bin)); +classify_jid(undefined) -> empty; +classify_jid(#jid{lresource = <<>>}) -> bare; +classify_jid(#jid{}) -> full. + +% we map pids to jids, initially we don't know the jid, then we +% know bare jid, and then full jid +% we need to know when it changes so that send an update to client +-spec is_new_mapping(jid:jid(), jid:jid()) -> boolean(). +is_new_mapping(Old, New) -> + case {classify_jid(Old), classify_jid(New)} of + {S, S} -> false; + _ -> true + end. + +now_seconds() -> + os:system_time(microsecond) / 1000000. + +maybe_store_start_time(Spid, Time, #state{start_times = StartTimes} = State) -> + case maps:get(Spid, StartTimes, undefined) of + undefined -> + State#state{start_times = maps:put(Spid, Time, StartTimes)}; + _ -> + State + end. diff --git a/test/mongoose_traffic_SUITE.erl b/test/mongoose_traffic_SUITE.erl new file mode 100644 index 00000000000..3c8159f0825 --- /dev/null +++ b/test/mongoose_traffic_SUITE.erl @@ -0,0 +1,100 @@ +-module(mongoose_traffic_SUITE). +-compile([export_all, nowarn_export_all]). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("jlib.hrl"). + +all() -> + [mongoose_debug, mongoose_traffic]. + +init_per_suite(Config) -> + application:ensure_all_started(exometer_core), + mongoose_config:set_opts(#{all_metrics_are_global => false}), + {ok, _} = application:ensure_all_started(jid), + Config. + +end_per_suite(Config) -> + mongoose_config:erase_opts(), + application:stop(exometer_core), + Config. + +init_per_testcase(_, Config) -> + mongooseim_helper:start_link_loaded_hooks(), + Config. + +end_per_testcase(_, Config) -> + meck:unload(), + Config. + + +%%---------------------------------------------------------------- +%% test cases +%%---------------------------------------------------------------- + +mongoose_debug(_) -> + Me = self(), + Fmtr = fun(I) -> Me ! I, <<"ok">> end, + code:ensure_loaded(mongoose_debug), + % this is how mongoose_debug is meant to be used + 1 = recon_trace:calls({mongoose_debug, traffic, '_'}, 10, [{scope, local}, {formatter, Fmtr}]), + mongoose_debug:start(localhost, []), + call_hooks_in(), + ?assertMatch({trace, _ ,call, {mongoose_debug, + traffic, + ["a@localhost/c"," C >>>> MiM "," ", _]}}, + receive_msg()), + ok. + +mongoose_traffic(_) -> + mongoose_traffic:start(localhost, #{standalone => true}), + gen_server:call(mongoose_traffic, {register, self()}), + call_hooks_in(), + ?assertMatch({message,client_to_server, _, #jid{}, _}, + receive_msg()), + gen_server:call(mongoose_traffic, {unregister, self()}), + call_hooks_in(), + call_hooks_out(), + no_new_msg(), + ok. + +call_hooks_in() -> + Acc = mongoose_acc:new(#{location => ?LOCATION, lserver => localhost}), + From = jid:from_binary(<<"a@localhost/c">>), + El = #xmlel{name = <<"testelement">>}, + mongoose_hooks:c2s_debug(Acc, {client_to_server, From, El}), + ok. + +call_hooks_out() -> + Acc = mongoose_acc:new(#{location => ?LOCATION, lserver => localhost}), + From = jid:from_binary(<<"a@localhost/c">>), + El = #xmlel{name = <<"testelement">>}, + mongoose_hooks:c2s_debug(Acc, {server_to_client, From, El}), + ok. + +receive_msg() -> + receive + M -> M + after 100 -> + ct:fail("message not received", []) + end. + +no_new_msg() -> + receive + M -> ct:fail("unexpected message received: ~p", [M]) + after 100 -> + ok + end. + +get_handlers_for_all_hooks() -> + maps:to_list(persistent_term:get(gen_hook, #{})). + +flush() -> + receive + M -> + ct:pal("received: ~p", [M]), + flush() + after 100 -> + ct:pal("asdf over: ~p", [over]), + ok + end.