Skip to content

Commit

Permalink
Add timeout to cowboy_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
jdamanalo committed Mar 9, 2023
1 parent 30ee75c commit 16280aa
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 25 deletions.
19 changes: 18 additions & 1 deletion doc/src/guide/loop_handlers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ for plain HTTP handlers.
The `init/2` function must return a `cowboy_loop` tuple to enable
loop handler behavior. This tuple may optionally contain
the atom `hibernate` to make the process enter hibernation
until a message is received.
until a message is received. Alternatively, the tuple may
optionally contain a positive integer to make the process
handle timeouts.

This snippet enables the loop handler:

Expand All @@ -49,6 +51,14 @@ init(Req, State) ->
{cowboy_loop, Req, State, hibernate}.
----

This makes the process timeout after 1000ms of idle time:

[source,erlang]
----
init(Req, State) ->
{cowboy_loop, Req, State, 1000}.
----

=== Receive loop

Once initialized, Cowboy will wait for messages to arrive
Expand Down Expand Up @@ -123,3 +133,10 @@ messages received. This is done by returning the atom
`hibernate` as part of the `loop` tuple callbacks normally
return. Just add the atom at the end and Cowboy will hibernate
accordingly.

=== Timeout

You may activate timeout events by returning a positive integer
`N` as part of the `loop` tuple callbacks return. The default
value is `infinity`. The `info` callback will be called with the
atom `timeout` unless a message is received within `N` milliseconds.
4 changes: 2 additions & 2 deletions doc/src/manual/cowboy_loop.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ Loop handlers implement the following interface:
----
init(Req, State)
-> {cowboy_loop, Req, State}
| {cowboy_loop, Req, State, hibernate}
| {cowboy_loop, Req, State, hibernate | timeout()}
info(Info, Req, State)
-> {ok, Req, State}
| {ok, Req, State, hibernate}
| {ok, Req, State, hibernate | timeout()}
| {stop, Req, State}
terminate(Reason, Req, State) -> ok %% optional
Expand Down
47 changes: 28 additions & 19 deletions src/cowboy_loop.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

-export([upgrade/4]).
-export([upgrade/5]).
-export([loop/4]).
-export([loop/5]).

-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

%% From gen_server.
-define(is_timeout(X), ((X) =:= infinity orelse (is_integer(X) andalso (X) >= 0))).

-callback init(Req, any())
-> {ok | module(), Req, any()}
| {module(), Req, any(), any()}
Expand All @@ -41,65 +44,71 @@
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState) ->
loop(Req, Env, Handler, HandlerState).
loop(Req, Env, Handler, HandlerState, infinity).

-spec upgrade(Req, Env, module(), any(), hibernate)
-spec upgrade(Req, Env, module(), any(), hibernate | timeout())
-> {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
suspend(Req, Env, Handler, HandlerState).
suspend(Req, Env, Handler, HandlerState, infinity);
upgrade(Req, Env, Handler, HandlerState, Timeout) when ?is_timeout(Timeout) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec loop(Req, Env, module(), any())
-spec loop(Req, Env, module(), any(), timeout())
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
%% @todo Handle system messages.
loop(Req=#{pid := Parent}, Env, Handler, HandlerState) ->
loop(Req=#{pid := Parent}, Env, Handler, HandlerState, Timeout) ->
receive
%% System messages.
{'EXIT', Parent, Reason} ->
terminate(Req, Env, Handler, HandlerState, Reason);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{Req, Env, Handler, HandlerState});
{Req, Env, Handler, HandlerState, Timeout});
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
Message ->
call(Req, Env, Handler, HandlerState, Message)
call(Req, Env, Handler, HandlerState, Timeout, Message)
after Timeout ->
call(Req, Env, Handler, HandlerState, Timeout, timeout)
end.

call(Req0, Env, Handler, HandlerState0, Message) ->
call(Req0, Env, Handler, HandlerState0, Timeout, Message) ->
try Handler:info(Message, Req0, HandlerState0) of
{ok, Req, HandlerState} ->
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
{ok, Req, HandlerState, hibernate} ->
suspend(Req, Env, Handler, HandlerState);
suspend(Req, Env, Handler, HandlerState, Timeout);
{ok, Req, HandlerState, NewTimeout} when ?is_timeout(NewTimeout) ->
loop(Req, Env, Handler, HandlerState, NewTimeout);
{stop, Req, HandlerState} ->
terminate(Req, Env, Handler, HandlerState, stop)
catch Class:Reason:Stacktrace ->
cowboy_handler:terminate({crash, Class, Reason}, Req0, HandlerState0, Handler),
erlang:raise(Class, Reason, Stacktrace)
end.

suspend(Req, Env, Handler, HandlerState) ->
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
suspend(Req, Env, Handler, HandlerState, Timeout) ->
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState, Timeout]}.

terminate(Req, Env, Handler, HandlerState, Reason) ->
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
{ok, Req, Env#{result => Result}}.

%% System callbacks.

-spec system_continue(_, _, {Req, Env, module(), any()})
-spec system_continue(_, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_continue(_, _, {Req, Env, Handler, HandlerState}) ->
loop(Req, Env, Handler, HandlerState).
system_continue(_, _, {Req, Env, Handler, HandlerState, Timeout}) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec system_terminate(any(), _, _, {Req, Env, module(), any()})
-spec system_terminate(any(), _, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState}) ->
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState, _}) ->
terminate(Req, Env, Handler, HandlerState, Reason).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc}
Expand Down
23 changes: 23 additions & 0 deletions test/handlers/loop_idle_timeout_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that reads
%% the request query for a timeout value, then sends
%% itself a message after 1000ms. It replies a 200 when
%% the message does not timeout and a 299 otherwise.

-module(loop_idle_timeout_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
#{timeout := Timeout} = cowboy_req:match_qs([{timeout, int}], Req),
erlang:send_after(1000, self(), message),
{cowboy_loop, Req, 2, Timeout}.

info(message, Req, State) ->
{stop, cowboy_req:reply(200, Req), State};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 23 additions & 0 deletions test/handlers/loop_new_timeout_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that changes
%% the timeout value to 500ms after the first message
%% then sends itself another message after 1000ms.
%% It is expected to timeout, that is, reply a 299.

-module(loop_new_timeout_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
self() ! message,
{cowboy_loop, Req, 2}.

info(message, Req, State) ->
erlang:send_after(1000, self(), message),
{ok, Req, State, 500};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 22 additions & 1 deletion test/loop_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ init_dispatch(_) ->
cowboy_router:compile([{'_', [
{"/long_polling", long_polling_h, []},
{"/loop_body", loop_handler_body_h, []},
{"/loop_timeout", loop_handler_timeout_h, []}
{"/loop_timeout", loop_handler_timeout_h, []},
{"/loop_idle", loop_idle_timeout_h, []},
{"/loop_idle_new", loop_new_timeout_h, []}
]}]).

%% Tests.
Expand Down Expand Up @@ -82,3 +84,22 @@ request_timeout(Config) ->
Ref = gun:get(ConnPid, "/loop_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, nofin, 200, _} = gun:await(ConnPid, Ref, 10000),
ok.

idle_timeout(Config) ->
doc("Check idle timeout."),
ConnPid = gun_open(Config),

Ref = gun:get(ConnPid, "/loop_idle?timeout=2000", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 200, _} = gun:await(ConnPid, Ref),

Ref2 = gun:get(ConnPid, "/loop_idle?timeout=500", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref2),

ok.

new_timeout(Config) ->
doc("Check that the new timeout gets set."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_idle_new", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref),
ok.
4 changes: 2 additions & 2 deletions test/sys_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ sys_get_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:get_state(Pid),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:get_state(Pid),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down Expand Up @@ -776,7 +776,7 @@ sys_replace_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:replace_state(Pid, fun(S) -> S end),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:replace_state(Pid, fun(S) -> S end),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down

0 comments on commit 16280aa

Please sign in to comment.