Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to cowboy_loop #1608

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion doc/src/guide/loop_handlers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ for plain HTTP handlers.
The `init/2` function must return a `cowboy_loop` tuple to enable
loop handler behavior. This tuple may optionally contain
the atom `hibernate` to make the process enter hibernation
until a message is received.
until a message is received. Alternatively, the tuple may
optionally contain a positive integer to make the process
handle timeouts.

This snippet enables the loop handler:

Expand All @@ -49,6 +51,14 @@ init(Req, State) ->
{cowboy_loop, Req, State, hibernate}.
----

This makes the process timeout after 1000ms of idle time:

[source,erlang]
----
init(Req, State) ->
{cowboy_loop, Req, State, 1000}.
----

=== Receive loop

Once initialized, Cowboy will wait for messages to arrive
Expand Down Expand Up @@ -123,3 +133,10 @@ messages received. This is done by returning the atom
`hibernate` as part of the `loop` tuple callbacks normally
return. Just add the atom at the end and Cowboy will hibernate
accordingly.

=== Timeout

You may activate timeout events by returning a positive integer
`N` as part of the `loop` tuple callbacks return. The default
value is `infinity`. The `info` callback will be called with the
atom `timeout` unless a message is received within `N` milliseconds.
4 changes: 2 additions & 2 deletions doc/src/manual/cowboy_loop.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ Loop handlers implement the following interface:
----
init(Req, State)
-> {cowboy_loop, Req, State}
| {cowboy_loop, Req, State, hibernate}
| {cowboy_loop, Req, State, hibernate | timeout()}

info(Info, Req, State)
-> {ok, Req, State}
| {ok, Req, State, hibernate}
| {ok, Req, State, hibernate | timeout()}
| {stop, Req, State}

terminate(Reason, Req, State) -> ok %% optional
Expand Down
47 changes: 28 additions & 19 deletions src/cowboy_loop.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

-export([upgrade/4]).
-export([upgrade/5]).
-export([loop/4]).
-export([loop/5]).

-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

%% From gen_server.
-define(is_timeout(X), ((X) =:= infinity orelse (is_integer(X) andalso (X) >= 0))).

-callback init(Req, any())
-> {ok | module(), Req, any()}
| {module(), Req, any(), any()}
Expand All @@ -41,65 +44,71 @@
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState) ->
loop(Req, Env, Handler, HandlerState).
loop(Req, Env, Handler, HandlerState, infinity).

-spec upgrade(Req, Env, module(), any(), hibernate)
-spec upgrade(Req, Env, module(), any(), hibernate | timeout())
-> {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
suspend(Req, Env, Handler, HandlerState).
suspend(Req, Env, Handler, HandlerState, infinity);
upgrade(Req, Env, Handler, HandlerState, Timeout) when ?is_timeout(Timeout) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec loop(Req, Env, module(), any())
-spec loop(Req, Env, module(), any(), timeout())
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
%% @todo Handle system messages.
loop(Req=#{pid := Parent}, Env, Handler, HandlerState) ->
loop(Req=#{pid := Parent}, Env, Handler, HandlerState, Timeout) ->
receive
%% System messages.
{'EXIT', Parent, Reason} ->
terminate(Req, Env, Handler, HandlerState, Reason);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{Req, Env, Handler, HandlerState});
{Req, Env, Handler, HandlerState, Timeout});
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
Message ->
call(Req, Env, Handler, HandlerState, Message)
call(Req, Env, Handler, HandlerState, Timeout, Message)
after Timeout ->
call(Req, Env, Handler, HandlerState, Timeout, timeout)
end.

call(Req0, Env, Handler, HandlerState0, Message) ->
call(Req0, Env, Handler, HandlerState0, Timeout, Message) ->
try Handler:info(Message, Req0, HandlerState0) of
{ok, Req, HandlerState} ->
loop(Req, Env, Handler, HandlerState);
loop(Req, Env, Handler, HandlerState, Timeout);
{ok, Req, HandlerState, hibernate} ->
suspend(Req, Env, Handler, HandlerState);
suspend(Req, Env, Handler, HandlerState, Timeout);
jdamanalo marked this conversation as resolved.
Show resolved Hide resolved
{ok, Req, HandlerState, NewTimeout} when ?is_timeout(NewTimeout) ->
loop(Req, Env, Handler, HandlerState, NewTimeout);
{stop, Req, HandlerState} ->
terminate(Req, Env, Handler, HandlerState, stop)
catch Class:Reason:Stacktrace ->
cowboy_handler:terminate({crash, Class, Reason}, Req0, HandlerState0, Handler),
erlang:raise(Class, Reason, Stacktrace)
end.

suspend(Req, Env, Handler, HandlerState) ->
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
suspend(Req, Env, Handler, HandlerState, Timeout) ->
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState, Timeout]}.

terminate(Req, Env, Handler, HandlerState, Reason) ->
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
{ok, Req, Env#{result => Result}}.

%% System callbacks.

-spec system_continue(_, _, {Req, Env, module(), any()})
-spec system_continue(_, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_continue(_, _, {Req, Env, Handler, HandlerState}) ->
loop(Req, Env, Handler, HandlerState).
system_continue(_, _, {Req, Env, Handler, HandlerState, Timeout}) ->
loop(Req, Env, Handler, HandlerState, Timeout).

-spec system_terminate(any(), _, _, {Req, Env, module(), any()})
-spec system_terminate(any(), _, _, {Req, Env, module(), any(), timeout()})
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState}) ->
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState, _}) ->
terminate(Req, Env, Handler, HandlerState, Reason).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc}
Expand Down
23 changes: 23 additions & 0 deletions test/handlers/loop_idle_timeout_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that reads
%% the request query for a timeout value, then sends
%% itself a message after 1000ms. It replies a 200 when
%% the message does not timeout and a 299 otherwise.

-module(loop_idle_timeout_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
#{timeout := Timeout} = cowboy_req:match_qs([{timeout, int}], Req),
erlang:send_after(1000, self(), message),
{cowboy_loop, Req, 2, Timeout}.

info(message, Req, State) ->
{stop, cowboy_req:reply(200, Req), State};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 23 additions & 0 deletions test/handlers/loop_new_timeout_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% This module implements a loop handler that changes
%% the timeout value to 500ms after the first message
%% then sends itself another message after 1000ms.
%% It is expected to timeout, that is, reply a 299.

-module(loop_new_timeout_h).

-export([init/2]).
-export([info/3]).
-export([terminate/3]).

init(Req, _) ->
self() ! message,
{cowboy_loop, Req, 2}.

info(message, Req, State) ->
erlang:send_after(1000, self(), message),
{ok, Req, State, 500};
info(timeout, Req, State) ->
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.

terminate(stop, _, _) ->
ok.
23 changes: 22 additions & 1 deletion test/loop_handler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ init_dispatch(_) ->
cowboy_router:compile([{'_', [
{"/long_polling", long_polling_h, []},
{"/loop_body", loop_handler_body_h, []},
{"/loop_timeout", loop_handler_timeout_h, []}
{"/loop_timeout", loop_handler_timeout_h, []},
{"/loop_idle", loop_idle_timeout_h, []},
{"/loop_idle_new", loop_new_timeout_h, []}
]}]).

%% Tests.
Expand Down Expand Up @@ -82,3 +84,22 @@ request_timeout(Config) ->
Ref = gun:get(ConnPid, "/loop_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, nofin, 200, _} = gun:await(ConnPid, Ref, 10000),
ok.

idle_timeout(Config) ->
doc("Check idle timeout."),
ConnPid = gun_open(Config),

Ref = gun:get(ConnPid, "/loop_idle?timeout=2000", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 200, _} = gun:await(ConnPid, Ref),

Ref2 = gun:get(ConnPid, "/loop_idle?timeout=500", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref2),

ok.

new_timeout(Config) ->
doc("Check that the new timeout gets set."),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/loop_idle_new", [{<<"accept-encoding">>, <<"gzip">>}]),
{response, fin, 299, _} = gun:await(ConnPid, Ref),
ok.
4 changes: 2 additions & 2 deletions test/sys_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ sys_get_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:get_state(Pid),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:get_state(Pid),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down Expand Up @@ -784,7 +784,7 @@ sys_replace_state_loop(Config) ->
timer:sleep(100),
SupPid = get_remote_pid_tcp(Socket),
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
{Req, Env, long_polling_sys_h, undefined} = sys:replace_state(Pid, fun(S) -> S end),
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:replace_state(Pid, fun(S) -> S end),
#{pid := _, streamid := _} = Req,
#{dispatch := _} = Env,
ok.
Expand Down
Loading