Skip to content

Commit

Permalink
Handle socket errors in HTTP/1.1 and HTTP/2
Browse files Browse the repository at this point in the history
Doing so will let us notice when the connection is gone instead
of waiting for timeouts, at least in the cases where the remote
socket was closed properly. Timeouts are still needed in case
of TCP half-open problems.

This change means that the order of stream handler commands is
more important than before because socket errors may occur
during the processing of commands.
  • Loading branch information
essen committed Dec 12, 2023
1 parent 3f5f326 commit efb681d
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 127 deletions.
9 changes: 9 additions & 0 deletions doc/src/manual/cowboy_stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ the `early_error/5` callback must return a response command.
// @todo The logger option and the {log, Level, Format, Args}
// options need to be documented and tested.

The order in which the commands are given matters. For example,
when sending a response and at the same time creating a new child
process, the first command should be the `spawn` and the second the
`response`. The reason for that is that the sending of the response
may result in a socket error which leads to the termination of
the connection before the rest of the commands are executed.

The following commands are defined:

[[inform_command]]
Expand Down Expand Up @@ -236,6 +243,8 @@ will end successfully as far as the client is concerned.
To indicate that an error occurred, either use `error_response`
before stopping, or use `internal_error`.

No other command can be executed after the `stop` command.

=== internal_error

Stop the stream with an error.
Expand Down
163 changes: 97 additions & 66 deletions src/cowboy_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
Peer0 = Transport:peername(Socket),
Sock0 = Transport:sockname(Socket),
Cert1 = case Transport:name() of
{ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket),
'A socket error occurred when retrieving the peer name.'),
{ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket),
'A socket error occurred when retrieving the sock name.'),
CertResult = case Transport:name() of
ssl ->
case ssl:peercert(Socket) of
{error, no_peercert} ->
Expand All @@ -170,36 +172,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
_ ->
{ok, undefined}
end,
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=maps:get(max_keepalive, Opts, 1000)},
setopts_active(State),
loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
{_, {error, Reason}, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the sock name.'});
{_, _, {error, Reason}} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the client TLS certificate.'})
end.
{ok, Cert} = maybe_socket_error(undefined, CertResult,
'A socket error occurred when retrieving the client TLS certificate.'),
State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
loop(set_timeout(State, request_timeout)).

setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
N = maps:get(active_n, Opts, 100),
Transport:setopts(Socket, [{active, N}]).

safe_setopts_active(State) ->
ok = maybe_socket_error(State, setopts_active(State)).

active(State) ->
setopts_active(State),
safe_setopts_active(State),
State#state{active=true}.

passive(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, false}]),
ok = maybe_socket_error(State, Transport:setopts(Socket, [{active, false}])),
Messages = Transport:messages(),
flush_passive(Socket, Messages),
State#state{active=false}.
Expand Down Expand Up @@ -234,7 +229,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
{Passive, Socket} when Passive =:= element(4, Messages);
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
setopts_active(State),
safe_setopts_active(State),
loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
Expand Down Expand Up @@ -953,6 +948,11 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
end.

%% Commands.
%%
%% The order in which the commands are given matters. Cowboy may
%% stop processing commands after the 'stop' command or when an
%% error occurred, such as a socket error. Critical commands such
%% as 'spawn' should always be given first.

commands(State, _, []) ->
State;
Expand Down Expand Up @@ -1013,8 +1013,8 @@ commands(State=#state{socket=Socket, transport=Transport, out_state=wait, stream
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
_ = case Version of
'HTTP/1.1' ->
Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
headers_to_list(Headers)));
ok = maybe_socket_error(State, Transport:send(Socket,
cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))));
%% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
'HTTP/1.0' ->
ok
Expand All @@ -1037,10 +1037,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
%% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2)
case Body of
{sendfile, _, _, _} ->
Transport:send(Socket, Response),
ok = maybe_socket_error(State, Transport:send(Socket, Response)),
sendfile(State, Body);
_ ->
Transport:send(Socket, [Response, Body])
ok = maybe_socket_error(State, Transport:send(Socket, [Response, Body]))
end,
commands(State, StreamID, Tail);
%% Send response headers and initiate chunked encoding or streaming.
Expand Down Expand Up @@ -1079,7 +1079,8 @@ commands(State0=#state{socket=Socket, transport=Transport,
_ -> maps:remove(<<"trailer">>, Headers1)
end,
{State, Headers} = connection(State1, Headers2, StreamID, Version),
Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
ok = maybe_socket_error(State, Transport:send(Socket,
cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))),
commands(State, StreamID, Tail);
%% Send a response body chunk.
%% @todo We need to kill the stream if it tries to send data before headers.
Expand All @@ -1098,27 +1099,33 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
Stream0=#stream{method= <<"HEAD">>} ->
Stream0;
Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
Transport:send(Socket, <<"0\r\n\r\n">>),
ok = maybe_socket_error(State0,
Transport:send(Socket, <<"0\r\n\r\n">>)),
Stream0;
Stream0 when Size =:= 0 ->
Stream0;
Stream0 when is_tuple(Data), OutState =:= chunked ->
Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
ok = maybe_socket_error(State0,
Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>])),
sendfile(State0, Data),
Transport:send(Socket,
case IsFin of
fin -> <<"\r\n0\r\n\r\n">>;
nofin -> <<"\r\n">>
end),
ok = maybe_socket_error(State0,
Transport:send(Socket,
case IsFin of
fin -> <<"\r\n0\r\n\r\n">>;
nofin -> <<"\r\n">>
end)
),
Stream0;
Stream0 when OutState =:= chunked ->
Transport:send(Socket, [
integer_to_binary(Size, 16), <<"\r\n">>, Data,
case IsFin of
fin -> <<"\r\n0\r\n\r\n">>;
nofin -> <<"\r\n">>
end
]),
ok = maybe_socket_error(State0,
Transport:send(Socket, [
integer_to_binary(Size, 16), <<"\r\n">>, Data,
case IsFin of
fin -> <<"\r\n0\r\n\r\n">>;
nofin -> <<"\r\n">>
end
])
),
Stream0;
Stream0 when OutState =:= streaming ->
#stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
Expand All @@ -1130,7 +1137,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
is_tuple(Data) ->
sendfile(State0, Data);
true ->
Transport:send(Socket, Data)
ok = maybe_socket_error(State0, Transport:send(Socket, Data))
end,
Stream0#stream{local_sent_size=SentSize}
end,
Expand All @@ -1144,13 +1151,16 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
StreamID, [{trailers, Trailers}|Tail]) ->
case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
trailers ->
Transport:send(Socket, [
<<"0\r\n">>,
cow_http:headers(maps:to_list(Trailers)),
<<"\r\n">>
]);
ok = maybe_socket_error(State,
Transport:send(Socket, [
<<"0\r\n">>,
cow_http:headers(maps:to_list(Trailers)),
<<"\r\n">>
])
);
no_trailers ->
Transport:send(Socket, <<"0\r\n\r\n">>);
ok = maybe_socket_error(State,
Transport:send(Socket, <<"0\r\n\r\n">>));
not_chunked ->
ok
end,
Expand Down Expand Up @@ -1238,10 +1248,12 @@ sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
{sendfile, Offset, Bytes, Path}) ->
try
%% When sendfile is disabled we explicitly use the fallback.
_ = case maps:get(sendfile, Opts, true) of
true -> Transport:sendfile(Socket, Path, Offset, Bytes);
false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
end,
{ok, _} = maybe_socket_error(State,
case maps:get(sendfile, Opts, true) of
true -> Transport:sendfile(Socket, Path, Offset, Bytes);
false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
end
),
ok
catch _:_ ->
terminate(State, {socket_error, sendfile_crash,
Expand Down Expand Up @@ -1420,28 +1432,31 @@ error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamStat
early_error(StatusCode, State, Reason, PartialReq) ->
early_error(StatusCode, State, Reason, PartialReq, #{}).

early_error(StatusCode0, #state{socket=Socket, transport=Transport,
early_error(StatusCode0, State=#state{socket=Socket, transport=Transport,
opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
Resp = {response, StatusCode0, RespHeaders1, <<>>},
try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
{response, StatusCode, RespHeaders, RespBody} ->
Transport:send(Socket, [
cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
%% @todo We shouldn't send the body when the method is HEAD.
%% @todo Technically we allow the sendfile tuple.
RespBody
])
ok = maybe_socket_error(State,
Transport:send(Socket, [
cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
%% @todo We shouldn't send the body when the method is HEAD.
%% @todo Technically we allow the sendfile tuple.
RespBody
])
)
catch Class:Exception:Stacktrace ->
cowboy:log(cowboy_stream:make_error_log(early_error,
[StreamID, Reason, PartialReq, Resp, Opts],
Class, Exception, Stacktrace), Opts),
%% We still need to send an error response, so send what we initially
%% wanted to send. It's better than nothing.
Transport:send(Socket, cow_http:response(StatusCode0,
'HTTP/1.1', maps:to_list(RespHeaders1)))
end,
ok.
ok = maybe_socket_error(State,
Transport:send(Socket, cow_http:response(StatusCode0,
'HTTP/1.1', maps:to_list(RespHeaders1)))
)
end.

initiate_closing(State=#state{streams=[]}, Reason) ->
terminate(State, Reason);
Expand All @@ -1450,6 +1465,19 @@ initiate_closing(State=#state{streams=[_Stream|Streams],
terminate_all_streams(State, Streams, Reason),
State#state{last_streamid=OutStreamID}.

%% Function replicated in cowboy_http2.
maybe_socket_error(State, {error, closed}) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
maybe_socket_error(State, Reason) ->
maybe_socket_error(State, Reason, 'An error has occurred on the socket.').

maybe_socket_error(_, Result = ok, _) ->
Result;
maybe_socket_error(_, Result = {ok, _}, _) ->
Result;
maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).

-spec terminate(_, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
Expand Down Expand Up @@ -1484,6 +1512,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
terminate_linger_before_loop(State, TimerRef, Messages) ->
%% We may already be in active mode when we do this
%% but it's OK because we are shutting down anyway.
%%
%% We specially handle the socket error to terminate
%% when an error occurs.
case setopts_active(State) of
ok ->
terminate_linger_loop(State, TimerRef, Messages);
Expand Down
Loading

0 comments on commit efb681d

Please sign in to comment.