Skip to content

Commit

Permalink
Rework and improve the decompress stream handler
Browse files Browse the repository at this point in the history
The read buffer was changed into an iovec to avoid doing
too many binary concatenations and allocations.

Decompression happens transparently: when decoding gzip,
the content-encoding header is removed (we only decode
when "gzip" is the only encoding so nothing remains).

We always add a content_decoded key to the Req object.
This key contains a list of codings that were decoded,
in the reverse order in which they were. Currently it
can only be empty or contain <<"gzip">> but future
improvements or user handlers may see it contain more
values.

The option to disable decompression was renamed to
decompress_enabled and defaults to true.

It is no longer possible to enable/disable decompression
in the middle of reading the body: this ensures that the
data we pass forward is always valid.

Various smaller improvements were made to the code,
tests and manual pages.
  • Loading branch information
essen committed Jan 4, 2024
1 parent 3ed1b24 commit fd9711d
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 198 deletions.
6 changes: 3 additions & 3 deletions doc/src/guide/streams.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ enabled by default. It is a good example for writing your
own handlers that will modify responses.

link:man:cowboy_decompress_h(3)[cowboy_decompress_h] will
automatically decompress requests when possible. It is not
enabled by default. It is a good example for writing your
own handlers that will modify requests.
automatically decompress request bodies when possible.
It is not enabled by default. It is a good example for
writing your own handlers that will modify requests.

link:man:cowboy_metrics_h(3)[cowboy_metrics_h] gathers
metrics about a stream then passes them to a configurable
Expand Down
1 change: 1 addition & 0 deletions doc/src/manual/cowboy_app.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Stream handlers:

* link:man:cowboy_stream_h(3)[cowboy_stream_h(3)] - Default stream handler
* link:man:cowboy_compress_h(3)[cowboy_compress_h(3)] - Compress stream handler
* link:man:cowboy_decompress_h(3)[cowboy_decompress_h(3)] - Decompress stream handler
* link:man:cowboy_metrics_h(3)[cowboy_metrics_h(3)] - Metrics stream handler
* link:man:cowboy_tracer_h(3)[cowboy_tracer_h(3)] - Tracer stream handler

Expand Down
34 changes: 23 additions & 11 deletions doc/src/manual/cowboy_decompress_h.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,26 @@ cowboy_decompress_h - Decompress stream handler
== Description

The module `cowboy_decompress_h` decompresses request bodies
automatically when the server supports it. Requests will
only be decompressed when their compression ratio is lower
than the configured limit. Mismatch of the content and
`content-encoding` is rejected with `400 Bad Request`.
automatically when the server supports it.

The only compression algorithm currently supported is the
gzip algorithm. Another limitation is that decompression
is only attempted when gzip is the only content-encoding
in the request.

This stream handler always adds a field to the Req object
with the name `content_decoded` which is treated as a
list of decoded content-encoding values. Currently this
list may only contain the `<<"gzip">>` binary if content
was decoded; or be empty otherwise.

== Options

[source,erlang]
----
opts() :: #{
decompress_ratio_limit => non_neg_integer(),
decompress_ignore => boolean()
decompress_enabled => boolean(),
decompress_ratio_limit => non_neg_integer()
}
----

Expand All @@ -28,17 +36,21 @@ The default value is given next to the option name:

decompress_ratio_limit (20)::
The max ratio of the compressed and decompressed body
before it is rejected with `413 Payload Too Large`.
before it is rejected with a `413 Payload Too Large`
error response.
+
This option can be updated at any time using the
`set_options` stream handler command.

decompress_ignore (false)::
decompress_enabled (true)::

Whether the handler will be ignored.
Whether the handler is enabled by default.
+
This option can be updated at any time using the
`set_options` stream handler command.
This option can be updated using the `set_options`
stream handler command. This allows disabling
decompression for the current stream. Attempts
to enable or disable decompression after starting
to read the body will be ignored.

== Events

Expand Down
146 changes: 93 additions & 53 deletions src/cowboy_decompress_h.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
%% Copyright (c) 2024, jdamanalo <[email protected]>
%% Copyright (c) 2024, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(cowboy_decompress_h).
-behavior(cowboy_stream).

Expand All @@ -9,21 +24,27 @@

-record(state, {
next :: any(),
enabled :: boolean(),
ratio_limit :: non_neg_integer() | undefined,
ignore = false :: boolean(),
compress = undefined :: undefined | gzip,
inflate = undefined :: undefined | zlib:zstream(),
is_reading = false :: boolean(),
read_body_buffer = <<>> :: binary(),

%% We use a list of binaries to avoid doing unnecessary
%% memory allocations when inflating. We convert to binary
%% when we propagate the data. The data must be reversed
%% before converting to binary or inflating: this is done
%% via the buffer_to_binary/buffer_to_iovec functions.
read_body_buffer = [] :: [binary()],
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}
}).

-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
-> {cowboy_stream:commands(), #state{}}.
init(StreamID, Req, Opts) ->
init(StreamID, Req0, Opts) ->
Enabled = maps:get(decompress_enabled, Opts, true),
RatioLimit = maps:get(decompress_ratio_limit, Opts, 20),
Ignore = maps:get(decompress_ignore, Opts, false),
State = check_req(Req),
{Req, State} = check_and_update_req(Req0),
Inflate = case State#state.compress of
undefined ->
undefined;
Expand All @@ -33,48 +54,46 @@ init(StreamID, Req, Opts) ->
Z
end,
{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
fold(Commands, State#state{next=Next, ratio_limit=RatioLimit, ignore=Ignore,
inflate=Inflate}).
fold(Commands, State#state{next=Next, enabled=Enabled,
ratio_limit=RatioLimit, inflate=Inflate}).

-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
data(StreamID, IsFin, Data, State=#state{next=Next0, inflate=undefined}) ->
{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
fold(Commands, State#state{next=Next, read_body_is_fin=IsFin});
data(StreamID, IsFin, Data, State=#state{next=Next0, ignore=true, read_body_buffer=Buffer}) ->
data(StreamID, IsFin, Data, State=#state{next=Next0, enabled=false, read_body_buffer=Buffer}) ->
{Commands, Next} = cowboy_stream:data(StreamID, IsFin,
<< Buffer/binary, Data/binary >>, Next0),
buffer_to_binary([Data|Buffer]), Next0),
fold(Commands, State#state{next=Next, read_body_is_fin=IsFin});
data(StreamID, IsFin, Data, State0=#state{next=Next0, ratio_limit=RatioLimit,
inflate=Z, is_reading=true, read_body_buffer=Buffer0}) ->
Buffer = << Buffer0/binary, Data/binary >>,
case inflate(Z, RatioLimit, Buffer) of
{error, Type} ->
Status = case Type of
data -> 400;
size -> 413
inflate=Z, is_reading=true, read_body_buffer=Buffer}) ->
case inflate(Z, RatioLimit, buffer_to_iovec([Data|Buffer])) of
{error, ErrorType} ->
zlib:close(Z),
Status = case ErrorType of
data_error -> 400;
size_error -> 413
end,
Commands = [
{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
stop
],
fold(Commands, State0#state{inflate=undefined});
fold(Commands, State0#state{inflate=undefined, read_body_buffer=[]});
{ok, Inflated} ->
State = case IsFin of
nofin ->
State0;
fin ->
zlib:inflateEnd(Z),
zlib:close(Z),
State0#state{inflate=undefined}
end,
{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Inflated, Next0),
fold(Commands, State#state{next=Next, read_body_buffer= <<>>,
fold(Commands, State#state{next=Next, read_body_buffer=[],
read_body_is_fin=IsFin})
end;
data(_, IsFin, Data, State=#state{read_body_buffer=Buffer0}) ->
Buffer = << Buffer0/binary, Data/binary >>,
{[], State#state{read_body_buffer=Buffer, read_body_is_fin=IsFin}}.
data(_, IsFin, Data, State=#state{read_body_buffer=Buffer}) ->
{[], State#state{read_body_buffer=[Data|Buffer], read_body_is_fin=IsFin}}.

-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
Expand All @@ -86,12 +105,19 @@ info(StreamID, Info={CommandTag, _, _, _, _}, State=#state{next=Next0, read_body
{Commands0, Next1} = cowboy_stream:info(StreamID, Info, Next0),
{Commands, Next} = data(StreamID, IsFin, <<>>, State#state{next=Next1, is_reading=true}),
fold(Commands ++ Commands0, Next);
info(StreamID, Info={set_options, Opts}, State=#state{next=Next0,
ignore=Ignore0, ratio_limit=RatioLimit0}) ->
Ignore = maps:get(decompress_ignore, Opts, Ignore0),
info(StreamID, Info={set_options, Opts}, State0=#state{next=Next0,
enabled=Enabled0, ratio_limit=RatioLimit0, is_reading=IsReading}) ->
Enabled = maps:get(decompress_enabled, Opts, Enabled0),
RatioLimit = maps:get(decompress_ratio_limit, Opts, RatioLimit0),
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
fold(Commands, State#state{next=Next, ignore=Ignore, ratio_limit=RatioLimit});
%% We can't change the enabled setting after we start reading,
%% otherwise the data becomes garbage. Changing the setting
%% is not treated as an error, it is just ignored.
State = case IsReading of
true -> State0;
false -> State0#state{enabled=Enabled}
end,
fold(Commands, State#state{next=Next, ratio_limit=RatioLimit});
info(StreamID, Info, State=#state{next=Next0}) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
fold(Commands, State#state{next=Next}).
Expand All @@ -112,31 +138,49 @@ early_error(StreamID, Reason, PartialReq, Resp, Opts) ->

%% Internal.

check_req(Req) ->
%% Check whether the request needs content decoding, and if it does
%% whether it fits our criteria for decoding. We also update the
%% Req to indicate whether content was decoded.
%%
%% We always set the content_decoded value in the Req because it
%% indicates whether content decoding was attempted.
%%
%% A malformed content-encoding header results in no decoding.
check_and_update_req(Req=#{headers := Headers}) ->
ContentDecoded = maps:get(content_decoded, Req, []),
try cowboy_req:parse_header(<<"content-encoding">>, Req) of
undefined ->
#state{compress=undefined};
Encodings ->
case [E || E=(<<"gzip">>) <- Encodings] of
[] ->
#state{compress=undefined};
_ ->
#state{compress=gzip}
end
catch
_:_ ->
#state{compress=undefined}
%% We only automatically decompress when gzip is the only
%% encoding used. Since it's the only encoding used, we
%% can remove the header entirely before passing the Req
%% forward.
[<<"gzip">>] ->
{Req#{
headers => maps:remove(<<"content-encoding">>, Headers),
content_decoded => [<<"gzip">>|ContentDecoded]
}, #state{compress=gzip}};
_ ->
{Req#{content_decoded => ContentDecoded},
#state{compress=undefined}}
catch _:_ ->
{Req#{content_decoded => ContentDecoded},
#state{compress=undefined}}
end.

buffer_to_iovec(Buffer) ->
lists:reverse(Buffer).

buffer_to_binary(Buffer) ->
iolist_to_binary(lists:reverse(Buffer)).

fold(Commands, State) ->
fold(Commands, State, []).

fold([], State, Acc) ->
{lists:reverse(Acc), State};
fold([{response, Status, Headers0, Body}|Tail], State=#state{ignore=false}, Acc) ->
fold([{response, Status, Headers0, Body}|Tail], State=#state{enabled=true}, Acc) ->
Headers = add_accept_encoding(Headers0),
fold(Tail, State, [{response, Status, Headers, Body}|Acc]);
fold([{headers, Status, Headers0} | Tail], State=#state{ignore=false}, Acc) ->
fold([{headers, Status, Headers0} | Tail], State=#state{enabled=true}, Acc) ->
Headers = add_accept_encoding(Headers0),
fold(Tail, State, [{headers, Status, Headers}|Acc]);
fold([Command|Tail], State, Acc) ->
Expand All @@ -146,7 +190,7 @@ add_accept_encoding(Headers=#{<<"accept-encoding">> := AcceptEncoding}) ->
try cow_http_hd:parse_accept_encoding(iolist_to_binary(AcceptEncoding)) of
List ->
case lists:keyfind(<<"gzip">>, 1, List) of
%% gzip is excluded but this handler is not ignored; we replace.
%% gzip is excluded but this handler is enabled; we replace.
{_, 0} ->
Replaced = lists:keyreplace(<<"gzip">>, 1, List, {<<"gzip">>, 1000}),
Codings = build_accept_encoding(Replaced),
Expand All @@ -167,18 +211,20 @@ add_accept_encoding(Headers=#{<<"accept-encoding">> := AcceptEncoding}) ->
end
end
catch _:_ ->
%% The accept-encoding header is invalid. Probably empty. We replace it with ours.
Headers#{<<"accept-encoding">> => <<"gzip">>}
end;
add_accept_encoding(Headers) ->
Headers#{<<"accept-encoding">> => <<"gzip">>}.

%% From cowlib, maybe expose?
%% @todo From cowlib, maybe expose?
qvalue_to_iodata(0) -> <<"0">>;
qvalue_to_iodata(Q) when Q < 10 -> [<<"0.00">>, integer_to_binary(Q)];
qvalue_to_iodata(Q) when Q < 100 -> [<<"0.0">>, integer_to_binary(Q)];
qvalue_to_iodata(Q) when Q < 1000 -> [<<"0.">>, integer_to_binary(Q)];
qvalue_to_iodata(1000) -> <<"1">>.

%% @todo Should be added to Cowlib.
build_accept_encoding([{ContentCoding, Q}|Tail]) ->
Weight = iolist_to_binary(qvalue_to_iodata(Q)),
Acc = <<ContentCoding/binary, ";q=", Weight/binary>>,
Expand All @@ -195,20 +241,14 @@ inflate(Z, RatioLimit, Data) ->
try
{Status, Output} = zlib:safeInflate(Z, Data),
Size = iolist_size(Output),
do_inflate(Z, Size, byte_size(Data) * RatioLimit, Status, [Output])
do_inflate(Z, Size, iolist_size(Data) * RatioLimit, Status, [Output])
catch
error:data_error ->
zlib:close(Z),
{error, data}
{error, data_error}
end.

do_inflate(Z, Size, Limit, Status, _) when Size > Limit ->
case Status of
continue -> ok;
finished -> zlib:inflateEnd(Z)
end,
zlib:close(Z),
{error, size};
do_inflate(_, Size, Limit, _, _) when Size > Limit ->
{error, size_error};
do_inflate(Z, Size0, Limit, continue, Acc) ->
{Status, Output} = zlib:safeInflate(Z, []),
Size = Size0 + iolist_size(Output),
Expand Down
Loading

0 comments on commit fd9711d

Please sign in to comment.