Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hibernate and more #1667

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions doc/src/manual/cowboy_http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ opts() :: #{
chunked => boolean(),
connection_type => worker | supervisor,
dynamic_buffer => false | {pos_integer(), pos_integer()},
hibernate => boolean(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
Expand Down Expand Up @@ -87,6 +88,10 @@ The dynamic buffer size functionality can be disabled by
setting this option to `false`. Cowboy will also disable
it by default when the `buffer` transport option is configured.

hibernate (false)::

Whether the connection process will hibernate automatically.

http10_keepalive (true)::

Whether keep-alive is enabled for HTTP/1.0 connections.
Expand All @@ -100,7 +105,7 @@ This option can be updated at any time using the

inactivity_timeout (300000)::

Time in ms with nothing received at all before Cowboy closes the connection.
**DEPRECATED** Time in ms with nothing received at all before Cowboy closes the connection.

initial_stream_flow_size (65535)::

Expand Down Expand Up @@ -178,8 +183,9 @@ Ordered list of stream handlers that will handle all stream events.

== Changelog

* *2.13*: The `inactivity_timeout` option was deprecated.
* *2.13*: The `active_n` default value was changed to `1`.
* *2.13*: The `dynamic_buffer` option was added.
* *2.13*: The `dynamic_buffer` and `hibernate` options were added.
* *2.11*: The `reset_idle_timeout_on_send` option was added.
* *2.8*: The `active_n` option was added.
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
Expand Down
10 changes: 8 additions & 2 deletions doc/src/manual/cowboy_http2.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ opts() :: #{
enable_connect_protocol => boolean(),
goaway_initial_timeout => timeout(),
goaway_complete_timeout => timeout(),
hibernate => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
Expand Down Expand Up @@ -122,13 +123,17 @@ goaway_complete_timeout (3000)::
Time in ms to wait for ongoing streams to complete before closing the connection
during a graceful shutdown.

hibernate (false)::

Whether the connection process will hibernate automatically.

idle_timeout (60000)::

Time in ms with no data received before Cowboy closes the connection.

inactivity_timeout (300000)::

Time in ms with nothing received at all before Cowboy closes the connection.
**DEPRECATED** Time in ms with nothing received at all before Cowboy closes the connection.

initial_connection_window_size (65535)::

Expand Down Expand Up @@ -301,8 +306,9 @@ too many `WINDOW_UPDATE` frames.

== Changelog

* *2.13*: The `inactivity_timeout` option was deprecated.
* *2.13*: The `active_n` default value was changed to `1`.
* *2.13*: The `dynamic_buffer` option was added.
* *2.13*: The `dynamic_buffer` and `hibernate` options were added.
* *2.11*: Websocket over HTTP/2 is now considered stable.
* *2.11*: The `reset_idle_timeout_on_send` option was added.
* *2.11*: Add the option `max_cancel_stream_rate` to protect
Expand Down
62 changes: 33 additions & 29 deletions src/cowboy_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-module(cowboy_http).

-export([init/6]).
-export([loop/1]).

-export([system_continue/3]).
-export([system_terminate/4]).
Expand All @@ -32,6 +33,7 @@
dynamic_buffer_initial_average => non_neg_integer(),
dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
hibernate => boolean(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
Expand Down Expand Up @@ -192,7 +194,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
loop(set_timeout(State, request_timeout)).
before_loop(set_timeout(State, request_timeout)).

-include("cowboy_dynamic_buffer.hrl").

Expand Down Expand Up @@ -223,6 +225,13 @@ flush_passive(Socket, Messages) ->
ok
end.

before_loop(State=#state{opts=#{hibernate := true}}) ->
proc_lib:hibernate(?MODULE, loop, [State]);
before_loop(State) ->
loop(State).

-spec loop(#state{}) -> ok.

loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
last_streamid=LastStreamID}) ->
Expand All @@ -233,7 +242,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% we want to process was received fully.
{OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
State1 = maybe_resize_buffer(State, Data),
loop(State1);
before_loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
State1 = maybe_resize_buffer(State, Data),
Expand All @@ -246,37 +255,37 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
loop(State);
before_loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
loop(State);
before_loop(State);
{timeout, TimerRef, Reason} ->
timeout(State, Reason);
{timeout, _, _} ->
loop(State);
before_loop(State);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
loop(initiate_closing(State, Reason));
before_loop(initiate_closing(State, Reason));
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State, StreamID, Msg));
before_loop(info(State, StreamID, Msg));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
loop(down(State, Pid, Msg));
before_loop(down(State, Pid, Msg));
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
loop(State);
before_loop(State);
%% Unknown messages.
Msg ->
cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
loop(State)
before_loop(State)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
Expand Down Expand Up @@ -362,12 +371,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).

parse(<<>>, State) ->
loop(State#state{buffer= <<>>});
before_loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
loop(State#state{buffer= <<>>});
before_loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
Expand Down Expand Up @@ -442,7 +451,7 @@ after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) ->
nofin -> idle_timeout
end));
after_parse({more, State}) ->
loop(set_timeout(State, idle_timeout)).
before_loop(set_timeout(State, idle_timeout)).

update_flow(fin, _, State) ->
%% This function is only called after parsing, therefore we
Expand Down Expand Up @@ -1259,21 +1268,16 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
Protocol:takeover(Parent, Ref, Socket, Transport,
opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
commands(State0, StreamID, [{set_options, SetOpts}|Tail]) ->
State = maps:fold(fun
(chunked, Chunked, StateF=#state{overriden_opts=Opts}) ->
StateF#state{overriden_opts=Opts#{chunked => Chunked}};
(idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) ->
set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
idle_timeout);
_ ->
State0
end,
State = case SetOpts of
#{chunked := Chunked} ->
State1#state{overriden_opts=Opts#{chunked => Chunked}};
_ ->
State1
end,
(_, _, StateF) ->
StateF
end, State0, SetOpts),
commands(State, StreamID, Tail);
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
Expand Down Expand Up @@ -1622,12 +1626,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->

-spec system_continue(_, _, #state{}) -> ok.
system_continue(_, _, State) ->
loop(State).
before_loop(State).

-spec system_terminate(any(), _, _, #state{}) -> no_return().
system_terminate(Reason0, _, _, State) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
loop(initiate_closing(State, Reason)).
before_loop(initiate_closing(State, Reason)).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->
Expand Down
41 changes: 25 additions & 16 deletions src/cowboy_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-export([init/6]).
-export([init/10]).
-export([init/12]).
-export([loop/2]).

-export([system_continue/3]).
-export([system_terminate/4]).
Expand All @@ -36,6 +37,7 @@
env => cowboy_middleware:env(),
goaway_initial_timeout => timeout(),
goaway_complete_timeout => timeout(),
hibernate => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
Expand Down Expand Up @@ -188,7 +190,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
<<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.

Expand Down Expand Up @@ -250,7 +252,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
ok = maybe_socket_error(State, Transport:send(Socket, Preface)),
safe_setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
<<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.

Expand All @@ -267,6 +269,13 @@ setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
safe_setopts_active(State) ->
ok = maybe_socket_error(State, setopts_active(State)).

before_loop(State=#state{opts=#{hibernate := true}}, Buffer) ->
proc_lib:hibernate(?MODULE, loop, [State, Buffer]);
before_loop(State, Buffer) ->
loop(State, Buffer).

-spec loop(#state{}, binary()) -> ok.

loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
Messages = Transport:messages(),
Expand All @@ -288,11 +297,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
loop(State, Buffer);
before_loop(State, Buffer);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
loop(initiate_closing(State, Reason), Buffer);
before_loop(initiate_closing(State, Reason), Buffer);
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
Expand All @@ -302,27 +311,27 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
tick_idle_timeout(State, Buffer);
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
loop(State, Buffer);
before_loop(State, Buffer);
{timeout, TRef, {cow_http2_machine, Name}} ->
loop(timeout(State, Name, TRef), Buffer);
before_loop(timeout(State, Name, TRef), Buffer);
{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
loop(closing(State, Reason), Buffer);
before_loop(closing(State, Reason), Buffer);
{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
terminate(State, {stop, stop_reason(Reason),
'Graceful shutdown timed out.'});
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State, StreamID, Msg), Buffer);
before_loop(info(State, StreamID, Msg), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
loop(down(State, Pid, Msg), Buffer);
before_loop(down(State, Pid, Msg), Buffer);
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
loop(State, Buffer);
before_loop(State, Buffer);
Msg ->
cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
loop(State, Buffer)
before_loop(State, Buffer)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
Expand All @@ -331,7 +340,7 @@ tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) ->
terminate(State, {stop, timeout,
'Connection idle longer than configuration allows.'});
tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) ->
loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).
before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).

set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _)
when Status =:= closing_initiated orelse Status =:= closing,
Expand Down Expand Up @@ -372,7 +381,7 @@ parse(State=#state{http2_status=sequence}, Data) ->
{ok, Rest} ->
parse(State#state{http2_status=settings}, Rest);
more ->
loop(State, Data);
before_loop(State, Data);
Error = {connection_error, _, _} ->
terminate(State, Error)
end;
Expand All @@ -391,7 +400,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre
more when Status =:= closing, Streams =:= #{} ->
terminate(State, {stop, normal, 'The connection is going away.'});
more ->
loop(State, Data)
before_loop(State, Data)
end.

%% Frame rate flood protection.
Expand Down Expand Up @@ -1379,12 +1388,12 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->

-spec system_continue(_, _, {#state{}, binary()}) -> ok.
system_continue(_, _, {State, Buffer}) ->
loop(State, Buffer).
before_loop(State, Buffer).

-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
system_terminate(Reason0, _, _, {State, Buffer}) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
loop(initiate_closing(State, Reason), Buffer).
before_loop(initiate_closing(State, Reason), Buffer).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->
Expand Down
Loading