diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 935e820..adaadf3 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -116,6 +116,8 @@ init([Name, Endpoints, Options]) -> gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)}, {auto_size, true}]), + gproc_pool:new({Name, active}, BalancerType, [{size, length(Endpoints)}, + {auto_size, true}]), Data = #data{ pool = Name, encoding = Encoding, @@ -172,10 +174,12 @@ handle_event(_, _, Data) -> {keep_state, Data}. terminate({shutdown, force_delete}, _State, #data{pool=Name}) -> - gproc_pool:force_delete(Name); + gproc_pool:force_delete(Name), + gproc_pool:force_delete({Name, active}); terminate(Reason, _State, #data{pool=Name}) -> [grpcbox_subchannel:stop(Pid, Reason) || {_Channel, Pid} <- gproc_pool:active_workers(Name)], gproc_pool:delete(Name), + gproc_pool:delete({Name, active}), ok. insert_interceptors(Name, Interceptors) -> diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 42bb025..4dd4ffd 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -48,7 +48,11 @@ get_channel(Options, Type) -> Channel = maps:get(channel, Options, default_channel), Key = maps:get(key, Options, undefined), - grpcbox_channel:pick(Channel, Type, Key). + PickStrategy = maps:get(pick_strategy, Options, undefined), + case PickStrategy of + active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); + undefined -> grpcbox_channel:pick(Channel, Type, Key) + end. unary(Ctx, Service, Method, Input, Def, Options) -> unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options). diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 017d867..0dd2c27 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -14,6 +14,8 @@ ready/3, disconnected/3]). +-define(RECONNECT_INTERVAL, 5000). + -record(data, {name :: any(), endpoint :: grpcbox_channel:endpoint(), channel :: grpcbox_channel:t(), @@ -42,13 +44,13 @@ stop(Pid, Reason) -> init([Name, Channel, Endpoint, Encoding, StatsHandler]) -> process_flag(trap_exit, true), - gproc_pool:connect_worker(Channel, Name), - {ok, disconnected, #data{name=Name, - conn=undefined, - info=info_map(Endpoint, Encoding, StatsHandler), - endpoint=Endpoint, - channel=Channel}}. + Data = #data{name=Name, + conn=undefined, + info=info_map(Endpoint, Encoding, StatsHandler), + endpoint=Endpoint, + channel=Channel}, + {ok, disconnected, Data, [{next_event, internal, connect}]}. info_map({http, Host, 80, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host), @@ -72,20 +74,28 @@ callback_mode() -> ready({call, From}, conn, #data{conn=Conn, info=Info}) -> {keep_state_and_data, [{reply, From, {ok, Conn, Info}}]}; +ready(info, {'EXIT', Pid, _}, Data=#data{conn=Pid, name=Name, channel=Channel}) -> + gproc_pool:disconnect_worker({Channel, active}, Name), + {next_state, disconnected, Data#data{conn=undefined}, [{next_event, internal, connect}]}; +ready(info, {timeout, connect}, _Data) -> + keep_state_and_data; ready(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). +disconnected(internal, connect, Data) -> + do_connect(Data); +disconnected(info, {timeout, connect}, Data) -> + do_connect(Data); disconnected({call, From}, conn, Data) -> connect(Data, From, [postpone]); +disconnected(info, {'EXIT', _, _}, #data{conn=undefined}) -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + keep_state_and_data; disconnected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). handle_event({call, From}, info, #data{info=Info}) -> {keep_state_and_data, [{reply, From, Info}]}; -handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) -> - {next_state, disconnected, Data#data{conn=undefined}}; -handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) -> - keep_state_and_data; handle_event({call, From}, shutdown, _) -> {stop_and_reply, normal, {reply, From, ok}}; handle_event(_, _, _) -> @@ -96,6 +106,7 @@ terminate(_Reason, _State, #data{conn=undefined, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(normal, _State, #data{conn=Pid, name=Name, @@ -103,21 +114,39 @@ terminate(normal, _State, #data{conn=Pid, h2_connection:stop(Pid), gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(Reason, _State, #data{conn=Pid, name=Name, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), exit(Pid, Reason), ok. -connect(Data=#data{conn=undefined, - endpoint={Transport, Host, Port, SSLOptions}}, From, Actions) -> +do_connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint={Transport, Host, Port, SSLOptions}}) -> + case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions), + #{garbage_on_end => true, + stream_callback_mod => grpcbox_client_stream}) of + {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), + {next_state, ready, Data#data{conn=Pid}}; + {error, _} -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + {next_state, disconnected, Data#data{conn=undefined}} + end. + +connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint={Transport, Host, Port, SSLOptions}}, From, Actions) -> case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions), #{garbage_on_end => true, stream_callback_mod => grpcbox_client_stream}) of {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), {next_state, ready, Data#data{conn=Pid}, Actions}; {error, _}=Error -> {next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]} diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index 354ce32..5b4295f 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -4,28 +4,42 @@ init_per_suite/1, end_per_suite/1, add_and_remove_endpoints/1, - pick_worker_strategy/1]). + add_and_remove_endpoints_active_workers/1, + pick_worker_strategy/1, + pick_active_worker_strategy/1]). -include_lib("eunit/include/eunit.hrl"). all() -> [ add_and_remove_endpoints, - pick_worker_strategy + add_and_remove_endpoints_active_workers, + pick_worker_strategy, + pick_active_worker_strategy ]. init_per_suite(_Config) -> - application:set_env(grpcbox, servers, []), + GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}}, + Servers = [#{grpc_opts => GrpcOptions, + listen_opts => #{port => 18080, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18081, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18082, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18083, ip => {127,0,0,1}}}], + application:set_env(grpcbox, servers, Servers), application:ensure_all_started(grpcbox), + ct:sleep(1000), grpcbox_channel_sup:start_link(), - grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}), + grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, #{}}], #{}), grpcbox_channel_sup:start_child(random_channel, - [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + [{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], #{balancer => random}), grpcbox_channel_sup:start_child(hash_channel, - [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + [{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], #{balancer => hash}), grpcbox_channel_sup:start_child(direct_channel, - [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + [{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.4", 18084, #{}}], #{ balancer => direct}), _Config. @@ -34,11 +48,30 @@ end_per_suite(_Config) -> application:stop(grpcbox), ok. + add_and_remove_endpoints(_Config) -> - grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]), + grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}]), + ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}]), + ?assertEqual(7, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], normal), ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), - grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal), - ?assertEqual(1, length(gproc_pool:active_workers(default_channel))). + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18080, #{}}, {https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}], normal), + ?assertEqual(2, length(gproc_pool:active_workers(default_channel))). + +add_and_remove_endpoints_active_workers(_Config) -> + grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))). pick_worker_strategy(_Config) -> ?assertEqual(ok, pick_worker(default_channel)), @@ -51,6 +84,17 @@ pick_worker_strategy(_Config) -> ?assertEqual(error, pick_worker(hash_channel)), ok. +pick_active_worker_strategy(_Config) -> + ct:sleep(1000), + ?assertEqual(ok, pick_worker({default_channel, active})), + ?assertEqual(ok, pick_worker({random_channel, active})), + ?assertEqual(ok, pick_worker({direct_channel, active}, 1)), + ?assertEqual(ok, pick_worker({hash_channel, active}, 1)), + ?assertEqual(error, pick_worker({default_channel, active}, 1)), + ?assertEqual(error, pick_worker({random_channel, active}, 1)), + ?assertEqual(error, pick_worker({direct_channel, active})), + ?assertEqual(error, pick_worker({hash_channel, active})), + ok. pick_worker(Name, N) -> {R, _} = grpcbox_channel:pick(Name, unary, N), R.