From a8132a7e926230f5095e90940d81e6e6dd5545b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 3 Nov 2020 17:50:54 +0100 Subject: [PATCH] CONNECT method WIP --- ebin/cowboy.app | 2 +- src/cowboy_http.erl | 47 +++-- src/cowboy_http2.erl | 19 +- src/cowboy_stream_h.erl | 2 + src/cowboy_tunnel.erl | 404 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 455 insertions(+), 19 deletions(-) create mode 100644 src/cowboy_tunnel.erl diff --git a/ebin/cowboy.app b/ebin/cowboy.app index b44c9512f..fbc654429 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.8.0"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_tunnel','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}}, diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 89ba9d8ff..c8578348b 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -448,9 +448,9 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests << "OPTIONS * ", Rest/bits >> -> parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>); - <<"CONNECT ", _/bits>> -> - error_terminate(501, State, {connection_error, no_error, - 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'}); + %% <<"CONNECT ", _/bits>> -> + %% error_terminate(501, State, {connection_error, no_error, + %% 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'}); <<"TRACE ", _/bits>> -> error_terminate(501, State, {connection_error, no_error, 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'}); @@ -494,6 +494,8 @@ parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method) parse_uri_authority(Rest, State, Method); parse_uri(<< $/, Rest/bits >>, State, Method) -> parse_uri_path(Rest, State, Method, undefined, <<$/>>); +parse_uri(Rest, State, Method = <<"CONNECT">>) -> + parse_uri_authority(Rest, State, Method); parse_uri(_, State, _) -> error_terminate(400, State, {connection_error, protocol_error, 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}). @@ -524,6 +526,8 @@ parse_uri_authority(<>, State, Method, SoFar, Remaining) -> error_terminate(400, State, {connection_error, protocol_error, 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'}); $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>); + %% $\s when Method =:= <<"CONNECT">> -> + %% parse_version(Rest, State, Method, SoFar, <<>>, <<>>); $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>); $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>); $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>); @@ -571,6 +575,10 @@ parse_version(_, State, _, _, _, _) -> error_terminate(505, State, {connection_error, protocol_error, 'Unsupported HTTP version. (RFC7230 2.6)'}). +before_parse_headers(_Rest, State, <<"CONNECT">>, Authority, Path, Qs, _V) + when Authority =:= undefined; Path =/= <<"/">>; Qs =/= <<>> -> + error_terminate(400, State, {connection_error, protocol_error, + 'The CONNECT method requires a request-target with only host and port. (RFC7231 4.3.6)'}); before_parse_headers(Rest, State, M, A, P, Q, V) -> parse_header(Rest, State#state{in_state=#ps_header{ method=M, authority=A, path=P, qs=Q, version=V}}, #{}). @@ -700,17 +708,18 @@ horse_clean_value_ws_end() -> -endif. request(Buffer, State=#state{transport=Transport, - in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) -> + in_state=PS=#ps_header{authority=Authority, method=Method, version=Version}}, Headers) -> case maps:get(<<"host">>, Headers, undefined) of undefined when Version =:= 'HTTP/1.1' -> %% @todo Might want to not close the connection on this and next one. error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}}, {stream_error, protocol_error, 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'}); - undefined -> + undefined when Method =/= <<"CONNECT">> -> request(Buffer, State, Headers, <<>>, default_port(Transport:secure())); - %% @todo When CONNECT requests come in we need to ignore the RawHost - %% and instead use the Authority as the source of host. + undefined when Method =:= <<"CONNECT">> -> + %% Should we forbid CONNECT with HTTP/1.0? + request_parse_host(Buffer, State, Headers, Authority); RawHost when Authority =:= undefined; Authority =:= RawHost -> request_parse_host(Buffer, State, Headers, RawHost); %% RFC7230 does not explicitly ask us to reject requests @@ -725,7 +734,7 @@ request(Buffer, State=#state{transport=Transport, request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) -> try cow_http_hd:parse_host(RawHost) of - {Host, undefined} -> + {Host, undefined} when PS#ps_header.method =/= <<"CONNECT">> -> request(Buffer, State, Headers, Host, default_port(Transport:secure())); {Host, Port} when Port > 0, Port =< 65535 -> request(Buffer, State, Headers, Host, Port); @@ -748,9 +757,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock proxy_header=ProxyHeader, in_streamid=StreamID, in_state= PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, Headers0, Host, Port) -> - Scheme = case Transport:secure() of - true -> <<"https">>; - false -> <<"http">> + Scheme = case Method of + <<"CONNECT">> -> <<>>; + _ -> + case Transport:secure() of + true -> <<"https">>; + false -> <<"http">> + end end, {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of #{<<"transfer-encoding">> := TransferEncoding0} -> @@ -1177,6 +1190,18 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor cowboy_children:terminate(Children), flush(Parent), Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); +commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, + out_state=_OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, + [{takeover, Protocol, InitialState}|_Tail]) -> + %% Takeover without sending any HTTP response. + %% @todo If there's streams opened after this one, fail instead of 101. + State1 = cancel_timeout(State0), + State = #state{streams=Streams} = passive(State1), + #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams), + stream_call_terminate(StreamID, switch_protocol, StreamState, State), + cowboy_children:terminate(Children), + flush(Parent), + Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); %% Set options dynamically. commands(State0=#state{overriden_opts=Opts}, StreamID, [{set_options, SetOpts}|Tail]) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 8dc8c3beb..728fb4cb1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -406,11 +406,11 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin State0 end. -headers_frame(State, StreamID, IsFin, Headers, - PseudoHeaders=#{method := <<"CONNECT">>}, _) - when map_size(PseudoHeaders) =:= 2 -> - early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501, - 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'); +%% headers_frame(State, StreamID, IsFin, Headers, +%% PseudoHeaders=#{method := <<"CONNECT">>, authority := Authority}, _) +%% when map_size(PseudoHeaders) =:= 2 -> +%% early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501, +%% 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'); headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{method := <<"TRACE">>}, _) -> early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501, @@ -427,13 +427,18 @@ headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen) -> end. headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert, proxy_header=ProxyHeader}, - StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs}, + StreamID, IsFin, Headers, PseudoHeaders=#{method := Method}, BodyLen, Authority) -> + Scheme = maps:get(scheme, PseudoHeaders, <<>>), + PathWithQs = maps:get(path, PseudoHeaders, <<"/">>), try cow_http_hd:parse_host(Authority) of + {_Host, undefined} when Method =:= <<"CONNECT">>, map_size(PseudoHeaders) =:= 2 -> + reset_stream(State, StreamID, {stream_error, protocol_error, + 'The CONNECT method requires :authority with a port to connect to. (RFC7540 8.3)'}); {Host, Port0} -> Port = ensure_port(Scheme, Port0), try cow_http:parse_fullpath(PathWithQs) of - {<<>>, _} -> + {<<>>, _} when method =/= <<"CONNECT">>; map_size(PseudoHeaders) =/= 2 -> reset_stream(State, StreamID, {stream_error, protocol_error, 'The path component must not be empty. (RFC7540 8.1.2.3)'}); {Path, Qs} -> diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index a4a1c1ab5..5ef74eb24 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -256,6 +256,8 @@ info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> do_info(StreamID, Push, [Push], State); info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) -> do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined}); +info(StreamID, SwitchProtocol={takeover, _, _}, State) -> + do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined}); %% Convert the set_options message to a command. info(StreamID, SetOptions={set_options, _}, State) -> do_info(StreamID, SetOptions, [SetOptions], State); diff --git a/src/cowboy_tunnel.erl b/src/cowboy_tunnel.erl new file mode 100644 index 000000000..6be0fe660 --- /dev/null +++ b/src/cowboy_tunnel.erl @@ -0,0 +1,404 @@ +%% Copyright (c) 2020, Viktor Söderqvist +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% Handles the CONNECT method. In HTTP/2 for a stream; in HTTP/1 it takes over +%% the connection. +-module(cowboy_tunnel). + +-behaviour(cowboy_sub_protocol). +-export([upgrade/4]). +-export([upgrade/5]). + +-export([is_connect_request/1]). +-export([takeover/7]). +-export([loop/2]). + +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). + +-type commands() :: [{data, iodata()} + | stop + | {active, boolean()} + | {set_options, map()} + | {shutdown_reason, any()} +]. +-export_type([commands/0]). + +-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. + +-type terminate_reason() :: normal | stop | timeout + | {error, closed | atom()} + | {crash, error | exit | throw, any()}. + +%% Init/2 and terminate/3 overlap with the cowboy_handler behaviour. If a module +%% is a callback module for both, there will be a compiler warning. + +%% -callback init(Req, any()) +%% -> {ok | module(), Req, any()} +%% | {module(), Req, any(), any()} +%% when Req::cowboy_req:req(). + +-callback tunnel_init(State) -> call_result(State) when State::any(). + +-callback tunnel_handle(nofin|fin, binary(), State) + -> call_result(State) when State::any(). + +-callback tunnel_info(any(), State) -> call_result(State) when State::any(). + +%% -callback terminate(any(), cowboy_req:req(), any()) -> ok. +%% -optional_callbacks([terminate/3]). + +-type opts() :: #{ + active_n => pos_integer(), + idle_timeout => timeout(), + req_filter => fun((cowboy_req:req()) -> map()) +}. +-export_type([opts/0]). + +-record(state, { + parent :: undefined | pid(), + ref :: ranch:ref(), + socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, + transport = undefined :: module() | undefined, + http_response_sent = false :: boolean(), + opts = #{} :: opts(), + active = true :: boolean(), + handler :: module(), + timeout_ref = undefined :: undefined | reference(), + messages = undefined :: undefined | {atom(), atom(), atom()} + | {atom(), atom(), atom(), atom()}, + hibernate = false :: boolean(), + req = #{} :: map(), + shutdown_reason = normal :: any() +}). + +-spec is_connect_request(cowboy_req:req()) -> boolean(). +is_connect_request(#{method := <<"CONNECT">>, version := 'HTTP/2', + protocol := _}) -> + false; +is_connect_request(#{method := Method}) -> + Method =:= <<"CONNECT">>. + +%% Stream process. + +-spec upgrade(Req, Env, module(), any()) -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +%% @todo Immediately crash if a response has already been sent. +upgrade(Req, Env, Handler, HandlerState, Opts) -> + FilteredReq = case maps:get(req_filter, Opts, undefined) of + undefined -> maps:with([method, version, host, port, peer], Req); + FilterFun -> FilterFun(Req) + end, + State = #state{opts=Opts, handler=Handler, req=FilteredReq}, + case is_connect_request(Req) of + true -> + takeover_stream(State, Req, HandlerState, Env); + false -> + {ok, cowboy_req:reply(400, Req), Env} + end. + +-spec takeover_stream(#state{}, Req, any(), Env) -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +takeover_stream(State, + Req=#{version := 'HTTP/1.1', pid := Pid, streamid := StreamID}, + HandlerState, Env) -> + Pid ! {{Pid, StreamID}, {takeover, ?MODULE, {State, HandlerState}}}, + {ok, Req, Env}; +%% For HTTP/2 we do not let the process die, we instead keep it +%% for the tunnel stream. This is because in HTTP/2 we only +%% have a stream; we don't take over the whole connection. +takeover_stream(State, #{ref := Ref, pid := Pid, streamid := StreamID}, + HandlerState, _Env) -> + takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + {State, HandlerState}). + + +%% Connection process. + +-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, + module() | undefined, any(), binary(), + {#state{}, any()}) -> no_return(). +takeover(Parent, Ref, Socket, Transport, _Opts, _Buffer, {State0, HandlerState}) -> + %% @todo Remove from ranch as we do for websockets? Better not... + %%ranch:remove_connection(Ref), + Messages = case Transport of + undefined -> undefined; + _ -> Transport:messages() + end, + State = loop_timeout(State0#state{parent=Parent, + ref=Ref, socket=Socket, transport=Transport, + messages=Messages}), + handler_call(State, HandlerState, tunnel_init, undefined). + +send_connect_successful(#state{socket=Stream={Pid, _}, transport=undefined}) -> + Pid ! {Stream, {headers, 200, #{}}}; +send_connect_successful(#state{socket=Socket, transport=Transport}) -> + Transport:send(Socket, cow_http:response(200, 'HTTP/1.1', [])). + +send_connect_error(#state{socket=Stream={Pid, _}, transport=undefined}) -> + Pid ! {Stream, {response, 503, #{}, <<"Failed to connect to peer">>}}; +send_connect_error(#state{socket=Socket, transport=Transport}) -> + Transport:send(Socket, cow_http:response(503, 'HTTP/1.1', [])). + +after_init(State=#state{active=Active}) -> + %% Send 2xx or 5xx response to the CONNECT request + send_connect_successful(State), + case Active of + true -> + %% Enable active,N for HTTP/1.1, and auto read_body for + %% HTTP/2. We must do this only after calling + %% tunnel_init/1 to give the handler a chance to disable + %% active mode immediately. + setopts_active(State), + maybe_read_body(State); + false -> + ok + end. + +%% We have two ways of reading the client data. For HTTP/1.1 +%% we have full control of the socket and can therefore use active,N. +%% For HTTP/2 we are just a stream, and are instead using read_body +%% (automatic mode). Technically HTTP/2 will only go passive after +%% receiving the next data message, while HTTP/1.1 goes passive +%% immediately but there might still be data to be processed in +%% the message queue. + +setopts_active(#state{transport=undefined}) -> + ok; +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> + %% @todo Keep Ref around. + ReadBodyRef = make_ref(), + Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, + ok; +maybe_read_body(_) -> + ok. + +active(State) -> + setopts_active(State), + maybe_read_body(State), + State#state{active=true}. + +passive(State=#state{transport=undefined}) -> + %% Unfortunately we cannot currently cancel read_body. + %% But that's OK, we will just stop reading the body + %% after the next message. + State#state{active=false}; +passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> + Transport:setopts(Socket, [{active, false}]), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. + +before_loop(State=#state{hibernate=true}, HandlerState) -> + proc_lib:hibernate(?MODULE, loop, + [State#state{hibernate=false}, HandlerState]); +before_loop(State, HandlerState) -> + loop(State, HandlerState). + +-spec loop_timeout(#state{}) -> #state{}. +loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) -> + _ = case PrevRef of + undefined -> ignore; + PrevRef -> erlang:cancel_timer(PrevRef) + end, + case maps:get(idle_timeout, Opts, 60000) of + infinity -> + State#state{timeout_ref=undefined}; + Timeout -> + TRef = erlang:start_timer(Timeout, self(), ?MODULE), + State#state{timeout_ref=TRef} + end. + +-spec loop(#state{}, any()) -> no_return(). +loop(State=#state{parent=Parent, socket=Socket, messages=Messages, + timeout_ref=TRef}, HandlerState) -> + receive + %% Socket messages. (HTTP/1.1) + {OK, Socket, Data} when OK =:= element(1, Messages) -> + State2 = loop_timeout(State), + data(State2, HandlerState, nofin, Data); + {Closed, Socket} when Closed =:= element(2, Messages) -> + %% This means client has closed for writing. + %% Continue reading from peer until receiving FIN. + State2 = loop_timeout(State), + data(State2, HandlerState, fin, <<>>); + %%terminate(State, HandlerState, {error, closed}); + {Error, Socket, Reason} when Error =:= element(3, Messages) -> + terminate(State, HandlerState, {error, Reason}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State, HandlerState); + %% Body reading messages. (HTTP/2) + {request_body, _Ref, IsFin, Data} -> + maybe_read_body(State), + State2 = loop_timeout(State), + data(State2, HandlerState, IsFin, Data); + %% Timeouts. + {timeout, TRef, ?MODULE} -> + %% _ = transport_send(State, fin, <<>>), + terminate(State, HandlerState, timeout); + {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> + before_loop(State, HandlerState); + %% System messages. + {'EXIT', Parent, Reason} -> + %% @todo We should exit gracefully. + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, HandlerState}); + %% Calls from supervisor module. + {'$gen_call', From, Call} -> + cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + before_loop(State, HandlerState); + Message -> + handler_call(State, HandlerState, tunnel_info, Message) + end. + +data(State, HandlerState, IsFin, Data) -> + handler_call(State, HandlerState, tunnel_handle, {IsFin, Data}). + +handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> + try case Callback of + tunnel_init -> + Handler:tunnel_init(HandlerState); + tunnel_handle -> + {IsFin, Data} = Message, + Handler:tunnel_handle(IsFin, Data, HandlerState); + tunnel_info -> + Handler:tunnel_info(Message, HandlerState) + end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, + HandlerState2, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, + HandlerState2, Commands) + catch Class:Reason:Stacktrace -> + handler_terminate(State, HandlerState, {crash, Class, Reason}), + erlang:raise(Class, Reason, Stacktrace) + end. + +-spec handler_call_result(#state{}, any(), commands()) -> no_return(). +handler_call_result(State0, HandlerState, Commands) -> + case commands(Commands, State0, []) of + {ok, State} -> + before_loop(State, HandlerState); + {stop, State} -> + terminate(State, HandlerState, stop); + {Error = {error, _}, State} -> + terminate(State, HandlerState, Error) + end. + +commands([], State=#state{http_response_sent = false}, Data) -> + after_init(State), + commands([], State#state{http_response_sent = true}, Data); +commands([], State, []) -> + {ok, State}; +commands([], State, Data) -> + Result = transport_send(State, nofin, lists:reverse(Data)), + {Result, State}; +commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) -> + State = if + Active, not Active0 -> + active(State0); + Active0, not Active -> + passive(State0); + true -> + State0 + end, + commands(Tail, State#state{active=Active}, Data); +commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> + State = case SetOpts of + #{idle_timeout := IdleTimeout} -> + loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}); + _ -> + State0 + end, + commands(Tail, State, Data); +commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) -> + commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data); +commands([{data, Data}|Tail], State, DataAcc) -> + commands(Tail, State, [Data|DataAcc]); +commands([stop|_Tail], State, Data) -> + %% @todo Handle the case when http_response_sent = false + _ = transport_send(State, fin, lists:reverse(Data)), + {stop, State}. + +transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> + Pid ! {Stream, {data, IsFin, Data}}, + ok; +transport_send(#state{socket=Socket, transport=Transport}, IsFin, Data) -> + Result = Transport:send(Socket, Data), + case IsFin of + fin -> _ = Transport:shutdown(Socket, write); + nofin -> ok + end, + Result. + +transport_close(#state{transport=undefined}) -> + ok; +transport_close(#state{socket=Socket, transport=Transport}) -> + Transport:close(Socket). + +-spec terminate(#state{}, any(), terminate_reason()) -> no_return(). +terminate(State=#state{http_response_sent=false}, HandlerState, Reason) -> + send_connect_error(State), + terminate(State#state{http_response_sent = true}, HandlerState, Reason); +terminate(State=#state{shutdown_reason=Shutdown}, HandlerState, Reason) -> + handler_terminate(State, HandlerState, Reason), + _ = transport_close(State), + case Shutdown of + normal -> exit(normal); + _ -> exit({shutdown, Shutdown}) + end. + +handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> + cowboy_handler:terminate(Reason, Req, HandlerState, Handler). + +%% System callbacks. + +-spec system_continue(_, _, {#state{}, any()}) -> no_return(). +system_continue(_, _, {State, HandlerState}) -> + loop(State, HandlerState). + +-spec system_terminate(any(), _, _, {#state{}, any()}) -> no_return(). +system_terminate(Reason, _, _, {State, HandlerState}) -> + %% @todo We should exit gracefully, if possible. + terminate(State, HandlerState, Reason). + +-spec system_code_change(Misc, _, _, _) + -> {ok, Misc} when Misc::{#state{}, any()}. +system_code_change(Misc, _, _, _) -> + {ok, Misc}.