Skip to content

Commit

Permalink
Pick a subchannel which connection is established
Browse files Browse the repository at this point in the history
  • Loading branch information
heshaoqiong committed Jun 5, 2023
1 parent 5c041a3 commit 22c8de1
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
7 changes: 6 additions & 1 deletion src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ init([Name, Endpoints, Options]) ->

gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)},
{auto_size, true}]),
gproc_pool:new({Name, active}, BalancerType, [{size, length(Endpoints)},
{auto_size, true}]),
Data = #data{
pool = Name,
encoding = Encoding,
Expand Down Expand Up @@ -173,10 +175,12 @@ handle_event(_, _, Data) ->
{keep_state, Data}.

terminate({shutdown, force_delete}, _State, #data{pool=Name}) ->
gproc_pool:force_delete(Name);
gproc_pool:force_delete(Name),
gproc_pool:force_delete({Name, active});
terminate(Reason, _State, #data{pool=Name}) ->
[grpcbox_subchannel:stop(Pid, Reason) || {_Channel, Pid} <- gproc_pool:active_workers(Name)],
gproc_pool:delete(Name),
gproc_pool:delete({Name, active}),
ok.

insert_interceptors(Name, Interceptors) ->
Expand Down Expand Up @@ -212,6 +216,7 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
[begin
gproc_pool:add_worker(Pool, Endpoint),
gproc_pool:add_worker({Pool, active}, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
Pool, Endpoint, Encoding, StatsHandler),
Pid
Expand Down
6 changes: 5 additions & 1 deletion src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
get_channel(Options, Type) ->
Channel = maps:get(channel, Options, default_channel),
Key = maps:get(key, Options, undefined),
grpcbox_channel:pick(Channel, Type, Key).
PickStrategy = maps:get(pick_strategy, Options, undefined),
case PickStrategy of
active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key);
undefined -> grpcbox_channel:pick(Channel, Type, Key)
end.

unary(Ctx, Service, Method, Input, Def, Options) ->
unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options).
Expand Down
75 changes: 53 additions & 22 deletions src/grpcbox_subchannel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
ready/3,
disconnected/3]).

-define(RECONNECT_INTERVAL, 5000).

-record(data, {name :: any(),
endpoint :: grpcbox_channel:endpoint(),
channel :: grpcbox_channel:t(),
Expand Down Expand Up @@ -42,13 +44,13 @@ stop(Pid, Reason) ->

init([Name, Channel, Endpoint, Encoding, StatsHandler]) ->
process_flag(trap_exit, true),

gproc_pool:connect_worker(Channel, Name),
{ok, disconnected, #data{name=Name,
conn=undefined,
info=info_map(Endpoint, Encoding, StatsHandler),
endpoint=Endpoint,
channel=Channel}}.
Data = #data{name=Name,
conn=undefined,
info=info_map(Endpoint, Encoding, StatsHandler),
endpoint=Endpoint,
channel=Channel},
{ok, disconnected, Data, [{next_event, internal, connect}]}.

info_map({http, Host, 80, _}, Encoding, StatsHandler) ->
#{authority => list_to_binary(Host),
Expand All @@ -72,20 +74,28 @@ callback_mode() ->
ready({call, From}, conn, #data{conn=Conn,
info=Info}) ->
{keep_state_and_data, [{reply, From, {ok, Conn, Info}}]};
ready(info, {'EXIT', Pid, _}, Data=#data{conn=Pid, name=Name, channel=Channel}) ->
gproc_pool:disconnect_worker({Channel, active}, Name),
{next_state, disconnected, Data#data{conn=undefined}, [{next_event, internal, connect}]};
ready(info, {timeout, connect}, _Data) ->
keep_state_and_data;
ready(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

disconnected(internal, connect, Data) ->
do_connect(Data);
disconnected(info, {timeout, connect}, Data) ->
do_connect(Data);
disconnected({call, From}, conn, Data) ->
connect(Data, From, [postpone]);
disconnected(info, {'EXIT', _, _}, #data{conn=undefined}) ->
erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}),
keep_state_and_data;
disconnected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

handle_event({call, From}, info, #data{info=Info}) ->
{keep_state_and_data, [{reply, From, Info}]};
handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) ->
{next_state, disconnected, Data#data{conn=undefined}};
handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) ->
keep_state_and_data;
handle_event({call, From}, shutdown, _) ->
{stop_and_reply, normal, {reply, From, ok}};
handle_event(_, _, _) ->
Expand All @@ -96,36 +106,45 @@ terminate(_Reason, _State, #data{conn=undefined,
channel=Channel}) ->
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:remove_worker({Channel, active}, Name),
ok;
terminate(normal, _State, #data{conn=Pid,
name=Name,
channel=Channel}) ->
h2_connection:stop(Pid),
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:disconnect_worker({Channel, active}, Name),
gproc_pool:remove_worker({Channel, active}, Name),
ok;
terminate(Reason, _State, #data{conn=Pid,
name=Name,
channel=Channel}) ->
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:disconnect_worker({Channel, active}, Name),
gproc_pool:remove_worker({Channel, active}, Name),
exit(Pid, Reason),
ok.

connect(Data=#data{conn=undefined,
endpoint={Transport, Host, Port, EndpointOptions}}, From, Actions) ->
% Get and delete non-ssl options from endpoint options, these are passed as connection settings
ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000),
TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0),
EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions),
EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2),
do_connect(Data=#data{name=Name, channel=Channel,
conn=undefined, endpoint=Endpoint}) ->
case start_h2_client(Endpoint) of
{ok, Pid} ->
gproc_pool:connect_worker({Channel, active}, Name),
{next_state, ready, Data#data{conn=Pid}};
{error, _} ->
erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}),
{next_state, disconnected, Data#data{conn=undefined}}
end.

connect(Data=#data{name=Name, channel=Channel,
conn=undefined, endpoint=Endpoint},
From, Actions) ->
case start_h2_client(Endpoint) of

case h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3),
#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream,
connect_timeout => ConnectTimeout,
tcp_user_timeout => TcpUserTimeout}) of
{ok, Pid} ->
gproc_pool:connect_worker({Channel, active}, Name),
{next_state, ready, Data#data{conn=Pid}, Actions};
{error, _}=Error ->
{next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]}
Expand All @@ -138,3 +157,15 @@ options(https, Options) ->
[{client_preferred_next_protocols, {client, [<<"h2">>]}} | Options];
options(http, Options) ->
Options.

start_h2_client({Transport, Host, Port, EndpointOptions}) ->
% Get and delete non-ssl options from endpoint options, these are passed as connection settings
ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000),
TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0),
EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions),
EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2),
h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3),
#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream,
connect_timeout => ConnectTimeout,
tcp_user_timeout => TcpUserTimeout}).
1 change: 1 addition & 0 deletions test/grpcbox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ end_per_testcase(_, _Config) ->
initially_down_service(_Config) ->
Point = #{latitude => 409146138, longitude => -746188906},
Ctx = ctx:with_deadline_after(ctx:new(), 5, second),
ct:sleep(100),
?assertMatch({error, econnrefused}, routeguide_route_guide_client:get_feature(Ctx, Point)),

grpcbox:start_server(#{grpc_opts => #{service_protos => [route_guide_pb],
Expand Down
79 changes: 73 additions & 6 deletions test/grpcbox_channel_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
-module(grpcbox_channel_SUITE).

-export([all/0,
init_per_suite/1,
end_per_suite/1,
add_and_remove_endpoints/1]).
init_per_suite/1,
end_per_suite/1,
add_and_remove_endpoints/1,
add_and_remove_endpoints_active_workers/1,
pick_worker_strategy/1,
pick_active_worker_strategy/1
]).

-include_lib("eunit/include/eunit.hrl").

all() ->
[
add_and_remove_endpoints
add_and_remove_endpoints,
add_and_remove_endpoints_active_workers,
pick_worker_strategy,
pick_active_worker_strategy
].
init_per_suite(_Config) ->
application:set_env(grpcbox, servers, []),
application:set_env(grpcbox, client, #{channel => []}),
GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}},
Servers = [#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18080, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18081, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18082, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18083, ip => {127,0,0,1}}}],
application:set_env(grpcbox, servers, Servers),
application:ensure_all_started(grpcbox),
ct:sleep(1000),
grpcbox_channel_sup:start_link(),
grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, []}], #{}),
grpcbox_channel_sup:start_child(random_channel,
Expand All @@ -32,8 +50,57 @@ end_per_suite(_Config) ->
application:stop(grpcbox),
ok.


add_and_remove_endpoints(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]),
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]),
?assertEqual(7, length(gproc_pool:active_workers(default_channel))).
?assertEqual(7, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal),
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18080, []}, {https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}], normal),
?assertEqual(2, length(gproc_pool:active_workers(default_channel))).

add_and_remove_endpoints_active_workers(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]),
ct:sleep(1000),
?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]),
ct:sleep(1000),
?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal),
ct:sleep(1000),
?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}], normal),
ct:sleep(1000),
?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))).

pick_worker_strategy(_Config) ->
?assertEqual(ok, pick_worker(default_channel)),
?assertEqual(ok, pick_worker(random_channel)),
?assertEqual(ok, pick_worker(direct_channel, 1)),
?assertEqual(ok, pick_worker(hash_channel, 1)),
?assertEqual(error, pick_worker(default_channel, 1)),
?assertEqual(error, pick_worker(random_channel, 1)),
?assertEqual(error, pick_worker(direct_channel)),
?assertEqual(error, pick_worker(hash_channel)),
ok.

pick_active_worker_strategy(_Config) ->
ct:sleep(1000),
?assertEqual(ok, pick_worker({default_channel, active})),
?assertEqual(ok, pick_worker({random_channel, active})),
?assertEqual(ok, pick_worker({direct_channel, active}, 1)),
?assertEqual(ok, pick_worker({hash_channel, active}, 1)),
?assertEqual(error, pick_worker({default_channel, active}, 1)),
?assertEqual(error, pick_worker({random_channel, active}, 1)),
?assertEqual(error, pick_worker({direct_channel, active})),
?assertEqual(error, pick_worker({hash_channel, active})),
ok.

pick_worker(Name, N) ->
{R, _} = grpcbox_channel:pick(Name, unary, N),
R.
pick_worker(Name) ->
{R, _} = grpcbox_channel:pick(Name, unary, undefined),
R.

0 comments on commit 22c8de1

Please sign in to comment.