From 22c8de1a18e80cea072bb807eac881c2846adc80 Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:43:56 +0800 Subject: [PATCH] Pick a subchannel which connection is established --- src/grpcbox_channel.erl | 7 ++- src/grpcbox_client.erl | 6 ++- src/grpcbox_subchannel.erl | 75 ++++++++++++++++++++++---------- test/grpcbox_SUITE.erl | 1 + test/grpcbox_channel_SUITE.erl | 79 +++++++++++++++++++++++++++++++--- 5 files changed, 138 insertions(+), 30 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index cf7cce8..ab98bd7 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -118,6 +118,8 @@ init([Name, Endpoints, Options]) -> gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)}, {auto_size, true}]), + gproc_pool:new({Name, active}, BalancerType, [{size, length(Endpoints)}, + {auto_size, true}]), Data = #data{ pool = Name, encoding = Encoding, @@ -173,10 +175,12 @@ handle_event(_, _, Data) -> {keep_state, Data}. terminate({shutdown, force_delete}, _State, #data{pool=Name}) -> - gproc_pool:force_delete(Name); + gproc_pool:force_delete(Name), + gproc_pool:force_delete({Name, active}); terminate(Reason, _State, #data{pool=Name}) -> [grpcbox_subchannel:stop(Pid, Reason) || {_Channel, Pid} <- gproc_pool:active_workers(Name)], gproc_pool:delete(Name), + gproc_pool:delete({Name, active}), ok. insert_interceptors(Name, Interceptors) -> @@ -212,6 +216,7 @@ insert_stream_interceptor(Name, _Type, Interceptors) -> start_workers(Pool, StatsHandler, Encoding, Endpoints) -> [begin gproc_pool:add_worker(Pool, Endpoint), + gproc_pool:add_worker({Pool, active}, Endpoint), {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, Endpoint, Encoding, StatsHandler), Pid diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 42bb025..4dd4ffd 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -48,7 +48,11 @@ get_channel(Options, Type) -> Channel = maps:get(channel, Options, default_channel), Key = maps:get(key, Options, undefined), - grpcbox_channel:pick(Channel, Type, Key). + PickStrategy = maps:get(pick_strategy, Options, undefined), + case PickStrategy of + active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); + undefined -> grpcbox_channel:pick(Channel, Type, Key) + end. unary(Ctx, Service, Method, Input, Def, Options) -> unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options). diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 93e5a10..60a50fb 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -14,6 +14,8 @@ ready/3, disconnected/3]). +-define(RECONNECT_INTERVAL, 5000). + -record(data, {name :: any(), endpoint :: grpcbox_channel:endpoint(), channel :: grpcbox_channel:t(), @@ -42,13 +44,13 @@ stop(Pid, Reason) -> init([Name, Channel, Endpoint, Encoding, StatsHandler]) -> process_flag(trap_exit, true), - gproc_pool:connect_worker(Channel, Name), - {ok, disconnected, #data{name=Name, - conn=undefined, - info=info_map(Endpoint, Encoding, StatsHandler), - endpoint=Endpoint, - channel=Channel}}. + Data = #data{name=Name, + conn=undefined, + info=info_map(Endpoint, Encoding, StatsHandler), + endpoint=Endpoint, + channel=Channel}, + {ok, disconnected, Data, [{next_event, internal, connect}]}. info_map({http, Host, 80, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host), @@ -72,20 +74,28 @@ callback_mode() -> ready({call, From}, conn, #data{conn=Conn, info=Info}) -> {keep_state_and_data, [{reply, From, {ok, Conn, Info}}]}; +ready(info, {'EXIT', Pid, _}, Data=#data{conn=Pid, name=Name, channel=Channel}) -> + gproc_pool:disconnect_worker({Channel, active}, Name), + {next_state, disconnected, Data#data{conn=undefined}, [{next_event, internal, connect}]}; +ready(info, {timeout, connect}, _Data) -> + keep_state_and_data; ready(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). +disconnected(internal, connect, Data) -> + do_connect(Data); +disconnected(info, {timeout, connect}, Data) -> + do_connect(Data); disconnected({call, From}, conn, Data) -> connect(Data, From, [postpone]); +disconnected(info, {'EXIT', _, _}, #data{conn=undefined}) -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + keep_state_and_data; disconnected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). handle_event({call, From}, info, #data{info=Info}) -> {keep_state_and_data, [{reply, From, Info}]}; -handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) -> - {next_state, disconnected, Data#data{conn=undefined}}; -handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) -> - keep_state_and_data; handle_event({call, From}, shutdown, _) -> {stop_and_reply, normal, {reply, From, ok}}; handle_event(_, _, _) -> @@ -96,6 +106,7 @@ terminate(_Reason, _State, #data{conn=undefined, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(normal, _State, #data{conn=Pid, name=Name, @@ -103,29 +114,37 @@ terminate(normal, _State, #data{conn=Pid, h2_connection:stop(Pid), gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(Reason, _State, #data{conn=Pid, name=Name, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), exit(Pid, Reason), ok. -connect(Data=#data{conn=undefined, - endpoint={Transport, Host, Port, EndpointOptions}}, From, Actions) -> - % Get and delete non-ssl options from endpoint options, these are passed as connection settings - ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000), - TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0), - EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions), - EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2), +do_connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint=Endpoint}) -> + case start_h2_client(Endpoint) of + {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), + {next_state, ready, Data#data{conn=Pid}}; + {error, _} -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + {next_state, disconnected, Data#data{conn=undefined}} + end. + +connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint=Endpoint}, + From, Actions) -> + case start_h2_client(Endpoint) of - case h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3), - #{garbage_on_end => true, - stream_callback_mod => grpcbox_client_stream, - connect_timeout => ConnectTimeout, - tcp_user_timeout => TcpUserTimeout}) of {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), {next_state, ready, Data#data{conn=Pid}, Actions}; {error, _}=Error -> {next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]} @@ -138,3 +157,15 @@ options(https, Options) -> [{client_preferred_next_protocols, {client, [<<"h2">>]}} | Options]; options(http, Options) -> Options. + +start_h2_client({Transport, Host, Port, EndpointOptions}) -> + % Get and delete non-ssl options from endpoint options, these are passed as connection settings + ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000), + TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0), + EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions), + EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2), + h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3), + #{garbage_on_end => true, + stream_callback_mod => grpcbox_client_stream, + connect_timeout => ConnectTimeout, + tcp_user_timeout => TcpUserTimeout}). diff --git a/test/grpcbox_SUITE.erl b/test/grpcbox_SUITE.erl index 81c1c1a..cab1be3 100644 --- a/test/grpcbox_SUITE.erl +++ b/test/grpcbox_SUITE.erl @@ -343,6 +343,7 @@ end_per_testcase(_, _Config) -> initially_down_service(_Config) -> Point = #{latitude => 409146138, longitude => -746188906}, Ctx = ctx:with_deadline_after(ctx:new(), 5, second), + ct:sleep(100), ?assertMatch({error, econnrefused}, routeguide_route_guide_client:get_feature(Ctx, Point)), grpcbox:start_server(#{grpc_opts => #{service_protos => [route_guide_pb], diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index 5a20a62..b6f8135 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -1,19 +1,37 @@ -module(grpcbox_channel_SUITE). -export([all/0, - init_per_suite/1, - end_per_suite/1, - add_and_remove_endpoints/1]). + init_per_suite/1, + end_per_suite/1, + add_and_remove_endpoints/1, + add_and_remove_endpoints_active_workers/1, + pick_worker_strategy/1, + pick_active_worker_strategy/1 + ]). -include_lib("eunit/include/eunit.hrl"). all() -> [ - add_and_remove_endpoints + add_and_remove_endpoints, + add_and_remove_endpoints_active_workers, + pick_worker_strategy, + pick_active_worker_strategy ]. init_per_suite(_Config) -> - application:set_env(grpcbox, servers, []), + application:set_env(grpcbox, client, #{channel => []}), + GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}}, + Servers = [#{grpc_opts => GrpcOptions, + listen_opts => #{port => 18080, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18081, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18082, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18083, ip => {127,0,0,1}}}], + application:set_env(grpcbox, servers, Servers), application:ensure_all_started(grpcbox), + ct:sleep(1000), grpcbox_channel_sup:start_link(), grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, []}], #{}), grpcbox_channel_sup:start_child(random_channel, @@ -32,8 +50,57 @@ end_per_suite(_Config) -> application:stop(grpcbox), ok. + add_and_remove_endpoints(_Config) -> grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]), ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]), - ?assertEqual(7, length(gproc_pool:active_workers(default_channel))). + ?assertEqual(7, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal), + ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18080, []}, {https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}], normal), + ?assertEqual(2, length(gproc_pool:active_workers(default_channel))). + +add_and_remove_endpoints_active_workers(_Config) -> + grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))). + +pick_worker_strategy(_Config) -> + ?assertEqual(ok, pick_worker(default_channel)), + ?assertEqual(ok, pick_worker(random_channel)), + ?assertEqual(ok, pick_worker(direct_channel, 1)), + ?assertEqual(ok, pick_worker(hash_channel, 1)), + ?assertEqual(error, pick_worker(default_channel, 1)), + ?assertEqual(error, pick_worker(random_channel, 1)), + ?assertEqual(error, pick_worker(direct_channel)), + ?assertEqual(error, pick_worker(hash_channel)), + ok. + +pick_active_worker_strategy(_Config) -> + ct:sleep(1000), + ?assertEqual(ok, pick_worker({default_channel, active})), + ?assertEqual(ok, pick_worker({random_channel, active})), + ?assertEqual(ok, pick_worker({direct_channel, active}, 1)), + ?assertEqual(ok, pick_worker({hash_channel, active}, 1)), + ?assertEqual(error, pick_worker({default_channel, active}, 1)), + ?assertEqual(error, pick_worker({random_channel, active}, 1)), + ?assertEqual(error, pick_worker({direct_channel, active})), + ?assertEqual(error, pick_worker({hash_channel, active})), + ok. + +pick_worker(Name, N) -> + {R, _} = grpcbox_channel:pick(Name, unary, N), + R. +pick_worker(Name) -> + {R, _} = grpcbox_channel:pick(Name, unary, undefined), + R.