Skip to content

Commit

Permalink
Add timeout to cowboy_loop
Browse files Browse the repository at this point in the history
LH: I have added a test that does both hibernate and timeout
    and fixed a related issue. I also tweaked the docs and tests.
  • Loading branch information
jdamanalo authored and essen committed Dec 15, 2023
1 parent a72bf41 commit a81dc8a
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 25 deletions.
27 changes: 26 additions & 1 deletion doc/src/guide/loop_handlers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ for plain HTTP handlers.
The `init/2` function must return a `cowboy_loop` tuple to enable
loop handler behavior. This tuple may optionally contain
the atom `hibernate` to make the process enter hibernation
until a message is received.
until a message is received. Alternatively, the tuple may
optionally contain a positive integer to create a `timeout`
message when the process has not received messages for too
long.

This snippet enables the loop handler:

Expand All @@ -49,6 +52,14 @@ init(Req, State) ->
{cowboy_loop, Req, State, hibernate}.
----

This makes the process time out after 1000ms of idle time.

[source,erlang]
----
init(Req, State) ->
{cowboy_loop, Req, State, 1000}.
----

=== Receive loop

Once initialized, Cowboy will wait for messages to arrive
Expand Down Expand Up @@ -123,3 +134,17 @@ messages received. This is done by returning the atom
`hibernate` as part of the `loop` tuple callbacks normally
return. Just add the atom at the end and Cowboy will hibernate
accordingly.

=== Idle timeout

You may activate timeout events by returning a positive integer
`N` as part of the `loop` tuple callbacks return. The default
value is `infinity`. The `info` callback will be called with the
atom `timeout` unless a message is received within `N` milliseconds:

[source,erlang]
----
info(timeout, Req, State) ->
%% Do something...
{ok, Req, State, 1000}.
----
8 changes: 5 additions & 3 deletions doc/src/manual/cowboy_loop.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ Loop handlers implement the following interface:
----
init(Req, State)
-> {cowboy_loop, Req, State}
| {cowboy_loop, Req, State, hibernate}
| {cowboy_loop, Req, State, hibernate | timeout()}
info(Info, Req, State)
-> {ok, Req, State}
| {ok, Req, State, hibernate}
| {ok, Req, State, hibernate | timeout()}
| {stop, Req, State}
terminate(Reason, Req, State) -> ok %% optional
Expand Down Expand Up @@ -69,7 +69,9 @@ stop::

== Changelog

* *2.0*: Loop handlers no longer need to handle overflow/timeouts.
* *2.11*: A timeout may be returned instead of `hibernate`.
It functions the same way as the `gen_server` timeout.
* *2.0*: Loop handlers no longer need to handle socket events.
* *1.0*: Behavior introduced.

== See also
Expand Down
43 changes: 26 additions & 17 deletions src/cowboy_loop.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

-export([upgrade/4]).
-export([upgrade/5]).
-export([loop/4]).
-export([loop/5]).

-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

%% From gen_server.
-define(is_timeout(X), ((X) =:= infinity orelse (is_integer(X) andalso (X) >= 0))).

-callback init(Req, any())
-> {ok | module(), Req, any()}
| {module(), Req, any(), any()}
Expand All @@ -41,40 +44,46 @@
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState) ->
loop(Req, Env, Handler, HandlerState).
loop(Req, Env, Handler, HandlerState, infinity).

-spec upgrade(Req, Env, module(), any(), hibernate)
-spec upgrade(Req, Env, module(), any(), hibernate | timeout())
-> {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
suspend(Req, Env, Handler, HandlerState).
suspend(Req, Env, Handler, HandlerState);
upgrade(Req, Env, Handler, HandlerState, Timeout) when ?is_timeout(Timeout) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec loop(Req, Env, module(), any())
-spec loop(Req, Env, module(), any(), timeout())
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
%% @todo Handle system messages.
loop(Req=#{pid := Parent}, Env, Handler, HandlerState) ->
loop(Req=#{pid := Parent}, Env, Handler, HandlerState, Timeout) ->
receive
%% System messages.
{'EXIT', Parent, Reason} ->
terminate(Req, Env, Handler, HandlerState, Reason);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{Req, Env, Handler, HandlerState});
{Req, Env, Handler, HandlerState, Timeout});
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
Message ->
call(Req, Env, Handler, HandlerState, Message)
call(Req, Env, Handler, HandlerState, Timeout, Message)
after Timeout ->
call(Req, Env, Handler, HandlerState, Timeout, timeout)
end.

call(Req0, Env, Handler, HandlerState0, Message) ->
call(Req0, Env, Handler, HandlerState0, Timeout, Message) ->
try Handler:info(Message, Req0, HandlerState0) of
{ok, Req, HandlerState} ->
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
{ok, Req, HandlerState, hibernate} ->
suspend(Req, Env, Handler, HandlerState);
{ok, Req, HandlerState, NewTimeout} when ?is_timeout(NewTimeout) ->
loop(Req, Env, Handler, HandlerState, NewTimeout);
{stop, Req, HandlerState} ->
terminate(Req, Env, Handler, HandlerState, stop)
catch Class:Reason:Stacktrace ->
Expand All @@ -83,23 +92,23 @@ call(Req0, Env, Handler, HandlerState0, Message) ->
end.

suspend(Req, Env, Handler, HandlerState) ->
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState, infinity]}.

terminate(Req, Env, Handler, HandlerState, Reason) ->
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
{ok, Req, Env#{result => Result}}.

%% System callbacks.

-spec system_continue(_, _, {Req, Env, module(), any()})
-spec system_continue(_, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_continue(_, _, {Req, Env, Handler, HandlerState}) ->
loop(Req, Env, Handler, HandlerState).
system_continue(_, _, {Req, Env, Handler, HandlerState, Timeout}) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec system_terminate(any(), _, _, {Req, Env, module(), any()})
-spec system_terminate(any(), _, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState}) ->
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState, _}) ->
terminate(Req, Env, Handler, HandlerState, Reason).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc}
Expand Down
30 changes: 30 additions & 0 deletions test/handlers/loop_handler_timeout_hibernate_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
%% This module implements a loop handler that first
%% sets a timeout, then hibernates, then ensures
%% that the timeout initially set no longer triggers.
%% If everything goes fine a 200 is returned. If the
%% timeout triggers again a 299 is.

-module(loop_handler_timeout_hibernate_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
self() ! message1,
{cowboy_loop, Req, undefined, 100}.

info(message1, Req, State) ->
erlang:send_after(200, self(), message2),
{ok, Req, State, hibernate};
info(message2, Req, State) ->
erlang:send_after(200, self(), message3),
%% Don't set a timeout now.
{ok, Req, State};
info(message3, Req, State) ->
{stop, cowboy_req:reply(200, Req), State};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 23 additions & 0 deletions test/handlers/loop_handler_timeout_info_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that changes
%% the timeout value to 500ms after the first message
%% then sends itself another message after 1000ms.
%% It is expected to timeout, that is, reply a 299.

-module(loop_handler_timeout_info_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
self() ! message,
{cowboy_loop, Req, undefined}.

info(message, Req, State) ->
erlang:send_after(200, self(), message),
{ok, Req, State, 100};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 23 additions & 0 deletions test/handlers/loop_handler_timeout_init_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that reads
%% the request query for a timeout value, then sends
%% itself a message after 1000ms. It replies a 200 when
%% the message does not timeout and a 299 otherwise.

-module(loop_handler_timeout_init_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
#{timeout := Timeout} = cowboy_req:match_qs([{timeout, int}], Req),
erlang:send_after(200, self(), message),
{cowboy_loop, Req, undefined, Timeout}.

info(message, Req, State) ->
{stop, cowboy_req:reply(200, Req), State};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
32 changes: 30 additions & 2 deletions test/loop_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ init_dispatch(_) ->
cowboy_router:compile([{'_', [
{"/long_polling", long_polling_h, []},
{"/loop_body", loop_handler_body_h, []},
{"/loop_timeout", loop_handler_timeout_h, []}
{"/loop_request_timeout", loop_handler_timeout_h, []},
{"/loop_timeout_init", loop_handler_timeout_init_h, []},
{"/loop_timeout_info", loop_handler_timeout_info_h, []},
{"/loop_timeout_hibernate", loop_handler_timeout_hibernate_h, []}
]}]).

%% Tests.
Expand Down Expand Up @@ -79,6 +82,31 @@ long_polling_pipeline(Config) ->
request_timeout(Config) ->
doc("Ensure that the request_timeout isn't applied when a request is ongoing."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
Ref = gun:get(ConnPid, "/loop_request_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, nofin, 200, _} = gun:await(ConnPid, Ref, 10000),
ok.

timeout_hibernate(Config) ->
doc("Ensure that loop handler idle timeouts don't trigger after hibernate is returned."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_timeout_hibernate", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 200, _} = gun:await(ConnPid, Ref),
ok.

timeout_info(Config) ->
doc("Ensure that loop handler idle timeouts trigger on time when set in info/3."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_timeout_info", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref),
ok.

timeout_init(Config) ->
doc("Ensure that loop handler idle timeouts trigger on time when set in init/2."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_timeout_init?timeout=300",
[{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 200, _} = gun:await(ConnPid, Ref),
Ref2 = gun:get(ConnPid, "/loop_timeout_init?timeout=100",
[{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref2),
ok.
4 changes: 2 additions & 2 deletions test/sys_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ sys_get_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:get_state(Pid),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:get_state(Pid),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down Expand Up @@ -784,7 +784,7 @@ sys_replace_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:replace_state(Pid, fun(S) -> S end),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:replace_state(Pid, fun(S) -> S end),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down

0 comments on commit a81dc8a

Please sign in to comment.