diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 430a735..de46254 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -5,13 +5,16 @@ -export([start_link/3, is_ready/1, pick/2, - stop/1]). + stop/1, + add_channel/3, + delete_channel/1]). -export([init/1, callback_mode/0, terminate/3, connected/3, idle/3]). +-include_lib("stdlib/include/ms_transform.hrl"). -include("grpcbox.hrl"). -define(CHANNEL(Name), {via, gproc, {n, l, {?MODULE, Name}}}). @@ -45,6 +48,16 @@ stats_handler :: module() | undefined, refresh_interval :: timer:time()}). +%% @doc add a new channel +-spec add_channel(name(), [endpoint()], options()) -> {ok, pid()}. +add_channel(Name, Endpoints, Options) -> + grpcbox_channel_sup:start_child(Name, Endpoints, Options). + +%% @doc Delete a channel +-spec delete_channel(pid()) -> any(). +delete_channel(Pid) when is_pid(Pid) -> + ok = supervisor:terminate_child(grpcbox_channel_sup, Pid). + -spec start_link(name(), [endpoint()], options()) -> {ok, pid()}. start_link(Name, Endpoints, Options) -> gen_statem:start_link(?CHANNEL(Name), ?MODULE, [Name, Endpoints, Options], []). @@ -129,6 +142,12 @@ handle_event(_, _, Data) -> {keep_state, Data}. terminate(_Reason, _State, #data{pool=Name}) -> + [ets:delete(?CHANNELS_TAB, Key) || Key <- + ets:select(?CHANNELS_TAB, ets:fun2ms(fun + ({{N, V}, _}) when N == Name -> + {N, V} + end)) + ], gproc_pool:force_delete(Name), ok. diff --git a/test/grpcbox_SUITE.erl b/test/grpcbox_SUITE.erl index 69c5bb3..37ec262 100644 --- a/test/grpcbox_SUITE.erl +++ b/test/grpcbox_SUITE.erl @@ -477,8 +477,8 @@ multiple_servers(_Config) -> unary(_Config), unary(_Config). -bidirectional(_Config) -> - {ok, S} = routeguide_route_guide_client:route_chat(ctx:new()), +bidirectional(Config) -> + {ok, S} = routeguide_route_guide_client:route_chat(ctx:new(), proplists:get_value(options, Config, #{})), %% send 2 before receiving since the server only sends what it already had in its list of messages for the %% location of your last send. ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}), @@ -561,10 +561,30 @@ stress_test(Config) -> stress_test(Config, Count) -> lists:foreach(fun - (Ref) -> + (ProcId) -> Parent = self(), spawn(fun() -> - stress_test_function(fun bidirectional/1, Config, Ref, Parent) end) + Channel = erlang:list_to_atom("proc_" ++ erlang:integer_to_list(ProcId)), + erlang:register(Channel, self()), + {ok, _Pid} = grpcbox_channel:add_channel( + Channel, + [{http, "localhost", 8080, []}], + #{} + ), + lists:foldl(fun + (_, not_ready) -> + timer:sleep(10), + grpcbox_channel:is_ready(Channel); + (_,Acc) -> + Acc + end, not_ready, lists:seq(1, 100)), + + stress_test_function(fun bidirectional/1, + [{options,#{channel => Channel}} | Config], + ProcId, Parent), + ok + %% grpcbox_channel:delete_channel(Pid) + end) end, lists:seq(1, Count)), Loop = fun Loop(LoopCount) ->