diff --git a/src/http/CMakeLists.txt b/src/http/CMakeLists.txt index c80391ef0..ed22ff5c6 100644 --- a/src/http/CMakeLists.txt +++ b/src/http/CMakeLists.txt @@ -3,6 +3,9 @@ add_library(http SHARED) target_sources(http PRIVATE AsyncReq.cc Client.cc + ClientConnPool.cc + ClientConfig.cc + ClientConnPool.cc Exceptions.cc MetaModule.cc Method.cc diff --git a/src/http/Client.cc b/src/http/Client.cc index c4acda94a..7526efa93 100644 --- a/src/http/Client.cc +++ b/src/http/Client.cc @@ -23,12 +23,14 @@ #include "http/Client.h" // Qserv headers +#include "http/ClientConnPool.h" #include "http/Exceptions.h" // Standard headers #include #include #include +#include #include using namespace std; @@ -36,43 +38,22 @@ using json = nlohmann::json; namespace lsst::qserv::http { -string const ClientConfig::category = "worker-http-file-reader"; - -string const ClientConfig::sslVerifyHostKey = "SSL_VERIFYHOST"; -string const ClientConfig::sslVerifyPeerKey = "SSL_VERIFYPEER"; -string const ClientConfig::caPathKey = "CAPATH"; -string const ClientConfig::caInfoKey = "CAINFO"; -string const ClientConfig::caInfoValKey = "CAINFO_VAL"; - -string const ClientConfig::proxySslVerifyHostKey = "PROXY_SSL_VERIFYHOST"; -string const ClientConfig::proxySslVerifyPeerKey = "PROXY_SSL_VERIFYPEER"; -string const ClientConfig::proxyCaPathKey = "PROXY_CAPATH"; -string const ClientConfig::proxyCaInfoKey = "PROXY_CAINFO"; -string const ClientConfig::proxyCaInfoValKey = "PROXY_CAINFO_VAL"; - -string const ClientConfig::proxyKey = "CURLOPT_PROXY"; -string const ClientConfig::noProxyKey = "CURLOPT_NOPROXY"; -string const ClientConfig::httpProxyTunnelKey = "CURLOPT_HTTPPROXYTUNNEL"; - -string const ClientConfig::connectTimeoutKey = "CONNECTTIMEOUT"; -string const ClientConfig::timeoutKey = "TIMEOUT"; -string const ClientConfig::lowSpeedLimitKey = "LOW_SPEED_LIMIT"; -string const ClientConfig::lowSpeedTimeKey = "LOW_SPEED_TIME"; - -string const ClientConfig::asyncProcLimitKey = "ASYNC_PROC_LIMIT"; - -size_t forwardToClient(char* ptr, size_t size, size_t nmemb, void* userdata) { +size_t forwardToClient(char* ptr, size_t size, size_t nmemb, void* client) { size_t const nchars = size * nmemb; - Client* reader = reinterpret_cast(userdata); - reader->_store(ptr, nchars); + reinterpret_cast(client)->_store(ptr, nchars); return nchars; } Client::Client(http::Method method, string const& url, string const& data, vector const& headers, - ClientConfig const& clientConfig) - : _method(method), _url(url), _data(data), _headers(headers), _clientConfig(clientConfig) { + ClientConfig const& clientConfig, shared_ptr const& connPool) + : _method(method), + _url(url), + _data(data), + _headers(headers), + _clientConfig(clientConfig), + _connPool(connPool) { _hcurl = curl_easy_init(); - assert(_hcurl != nullptr); // curl_easy_init() failed to allocate memory, etc. + assert(_hcurl != nullptr); } Client::~Client() { @@ -81,110 +62,161 @@ Client::~Client() { } void Client::read(CallbackType const& onDataRead) { - assert(onDataRead != nullptr); // no callback function provided - string const context = "Client::" + string(__func__) + " "; + if (onDataRead == nullptr) { + throw invalid_argument("http::Client::" + string(__func__) + ": no callback provided"); + } _onDataRead = onDataRead; - _errorChecked("curl_easy_setopt(CURLOPT_URL)", curl_easy_setopt(_hcurl, CURLOPT_URL, _url.c_str())); - _errorChecked("curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", - curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, nullptr)); + + _setConnOptions(); + _setSslCertOptions(); + _setProxyOptions(); + + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_URL)", + curl_easy_setopt(_hcurl, CURLOPT_URL, _url.c_str())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", + curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, nullptr)); if (_method == http::Method::GET) { - _errorChecked("curl_easy_setopt(CURLOPT_HTTPGET)", curl_easy_setopt(_hcurl, CURLOPT_HTTPGET, 1L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTPGET)", + curl_easy_setopt(_hcurl, CURLOPT_HTTPGET, 1L)); } else if (_method == http::Method::POST) { - _errorChecked("curl_easy_setopt(CURLOPT_POST)", curl_easy_setopt(_hcurl, CURLOPT_POST, 1L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POST)", curl_easy_setopt(_hcurl, CURLOPT_POST, 1L)); } else { - _errorChecked("curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", - curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, http::method2string(_method).data())); + _curlEasyErrorChecked( + "curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", + curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, http::method2string(_method).data())); } if (!_data.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_POSTFIELDS)", - curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDS, _data.c_str())); - _errorChecked("curl_easy_setopt(CURLOPT_POSTFIELDSIZE)", - curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDSIZE, _data.size())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDS)", + curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDS, _data.c_str())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDSIZE)", + curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDSIZE, _data.size())); } curl_slist_free_all(_hlist); _hlist = nullptr; for (auto& header : _headers) { _hlist = curl_slist_append(_hlist, header.c_str()); } - _errorChecked("curl_easy_setopt(CURLOPT_HTTPHEADER)", - curl_easy_setopt(_hcurl, CURLOPT_HTTPHEADER, _hlist)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTPHEADER)", + curl_easy_setopt(_hcurl, CURLOPT_HTTPHEADER, _hlist)); + + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_FAILONERROR)", + curl_easy_setopt(_hcurl, CURLOPT_FAILONERROR, 1L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_WRITEFUNCTION)", + curl_easy_setopt(_hcurl, CURLOPT_WRITEFUNCTION, forwardToClient)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_WRITEDATA)", + curl_easy_setopt(_hcurl, CURLOPT_WRITEDATA, this)); + _curlEasyErrorChecked("curl_easy_perform()", curl_easy_perform(_hcurl)); +} + +json Client::readAsJson() { + vector data; + this->read([&data](char const* buf, size_t size) { data.insert(data.cend(), buf, buf + size); }); + return json::parse(data); +} + +void Client::_setConnOptions() { + if (_clientConfig.httpVersion != CURL_HTTP_VERSION_NONE) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTP_VERSION)", + curl_easy_setopt(_hcurl, CURLOPT_HTTP_VERSION, _clientConfig.httpVersion)); + } + if (_clientConfig.bufferSize > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_BUFFERSIZE)", + curl_easy_setopt(_hcurl, CURLOPT_BUFFERSIZE, _clientConfig.bufferSize)); + } + if (_clientConfig.connectTimeout > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_CONNECTTIMEOUT)", + curl_easy_setopt(_hcurl, CURLOPT_CONNECTTIMEOUT, _clientConfig.connectTimeout)); + } + if (_clientConfig.timeout > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_TIMEOUT)", + curl_easy_setopt(_hcurl, CURLOPT_TIMEOUT, _clientConfig.timeout)); + } + if (_clientConfig.lowSpeedLimit > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_LOW_SPEED_LIMIT)", + curl_easy_setopt(_hcurl, CURLOPT_LOW_SPEED_LIMIT, _clientConfig.lowSpeedLimit)); + } + if (_clientConfig.lowSpeedTime > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_LOW_SPEED_TIME)", + curl_easy_setopt(_hcurl, CURLOPT_LOW_SPEED_TIME, _clientConfig.lowSpeedTime)); + } + if (_clientConfig.tcpKeepAlive) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_TCP_KEEPALIVE)", + curl_easy_setopt(_hcurl, CURLOPT_TCP_KEEPALIVE, 1L)); + if (_clientConfig.tcpKeepIdle > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_TCP_KEEPIDLE)", + curl_easy_setopt(_hcurl, CURLOPT_TCP_KEEPIDLE, _clientConfig.tcpKeepIdle)); + } + if (_clientConfig.tcpKeepIntvl > 0) { + _curlEasyErrorChecked( + "curl_easy_setopt(CURLOPT_TCP_KEEPINTVL)", + curl_easy_setopt(_hcurl, CURLOPT_TCP_KEEPINTVL, _clientConfig.tcpKeepIntvl)); + } + } + if (_connPool != nullptr) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_SHARE)", + curl_easy_setopt(_hcurl, CURLOPT_SHARE, _connPool->shareCurl())); + if (_connPool->maxConnections() > 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_MAXCONNECTS)", + curl_easy_setopt(_hcurl, CURLOPT_MAXCONNECTS, _connPool->maxConnections())); + } + } +} - // Optional settings for the peer's cert +void Client::_setSslCertOptions() { if (!_clientConfig.sslVerifyHost) { - _errorChecked("curl_easy_setopt(CURLOPT_SSL_VERIFYHOST)", - curl_easy_setopt(_hcurl, CURLOPT_SSL_VERIFYHOST, 0L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_SSL_VERIFYHOST)", + curl_easy_setopt(_hcurl, CURLOPT_SSL_VERIFYHOST, 0L)); } if (_clientConfig.sslVerifyPeer) { if (!_clientConfig.caPath.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_CAPATH)", - curl_easy_setopt(_hcurl, CURLOPT_CAPATH, _clientConfig.caPath.c_str())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_CAPATH)", + curl_easy_setopt(_hcurl, CURLOPT_CAPATH, _clientConfig.caPath.c_str())); } if (!_clientConfig.caInfo.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_CAINFO)", - curl_easy_setopt(_hcurl, CURLOPT_CAINFO, _clientConfig.caInfo.c_str())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_CAINFO)", + curl_easy_setopt(_hcurl, CURLOPT_CAINFO, _clientConfig.caInfo.c_str())); } } else { - _errorChecked("curl_easy_setopt(CURLOPT_SSL_VERIFYPEER)", - curl_easy_setopt(_hcurl, CURLOPT_SSL_VERIFYPEER, 0L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_SSL_VERIFYPEER)", + curl_easy_setopt(_hcurl, CURLOPT_SSL_VERIFYPEER, 0L)); } +} - // Optional settings for the proxy's cert +void Client::_setProxyOptions() { + if (!_clientConfig.proxy.empty()) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_PROXY)", + curl_easy_setopt(_hcurl, CURLOPT_PROXY, _clientConfig.proxy.c_str())); + if (_clientConfig.httpProxyTunnel != 0) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTPPROXYTUNNEL)", + curl_easy_setopt(_hcurl, CURLOPT_HTTPPROXYTUNNEL, 1L)); + } + } + if (!_clientConfig.noProxy.empty()) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_NOPROXY)", + curl_easy_setopt(_hcurl, CURLOPT_NOPROXY, _clientConfig.noProxy.c_str())); + } if (!_clientConfig.proxySslVerifyHost) { - _errorChecked("curl_easy_setopt(CURLOPT_PROXY_SSL_VERIFYHOST)", - curl_easy_setopt(_hcurl, CURLOPT_PROXY_SSL_VERIFYHOST, 0L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_PROXY_SSL_VERIFYHOST)", + curl_easy_setopt(_hcurl, CURLOPT_PROXY_SSL_VERIFYHOST, 0L)); } if (_clientConfig.proxySslVerifyPeer) { if (!_clientConfig.proxyCaPath.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_PROXY_CAPATH)", - curl_easy_setopt(_hcurl, CURLOPT_PROXY_CAPATH, _clientConfig.proxyCaPath.c_str())); + _curlEasyErrorChecked( + "curl_easy_setopt(CURLOPT_PROXY_CAPATH)", + curl_easy_setopt(_hcurl, CURLOPT_PROXY_CAPATH, _clientConfig.proxyCaPath.c_str())); } if (!_clientConfig.proxyCaInfo.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_PROXY_CAINFO)", - curl_easy_setopt(_hcurl, CURLOPT_PROXY_CAINFO, _clientConfig.proxyCaInfo.c_str())); + _curlEasyErrorChecked( + "curl_easy_setopt(CURLOPT_PROXY_CAINFO)", + curl_easy_setopt(_hcurl, CURLOPT_PROXY_CAINFO, _clientConfig.proxyCaInfo.c_str())); } } else { - _errorChecked("curl_easy_setopt(CURLOPT_PROXY_SSL_VERIFYPEER)", - curl_easy_setopt(_hcurl, CURLOPT_PROXY_SSL_VERIFYPEER, 0L)); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_PROXY_SSL_VERIFYPEER)", + curl_easy_setopt(_hcurl, CURLOPT_PROXY_SSL_VERIFYPEER, 0L)); } - - // Optional settings for proxies - if (!_clientConfig.proxy.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_PROXY)", - curl_easy_setopt(_hcurl, CURLOPT_PROXY, _clientConfig.proxy.c_str())); - if (_clientConfig.httpProxyTunnel != 0) { - _errorChecked("curl_easy_setopt(CURLOPT_HTTPPROXYTUNNEL)", - curl_easy_setopt(_hcurl, CURLOPT_HTTPPROXYTUNNEL, 1L)); - } - } - if (!_clientConfig.noProxy.empty()) { - _errorChecked("curl_easy_setopt(CURLOPT_NOPROXY)", - curl_easy_setopt(_hcurl, CURLOPT_NOPROXY, _clientConfig.noProxy.c_str())); - } - - // Optional settings for timing and performance of the transfer - _errorChecked("curl_easy_setopt(CURLOPT_CONNECTTIMEOUT)", - curl_easy_setopt(_hcurl, CURLOPT_CONNECTTIMEOUT, _clientConfig.connectTimeout)); - _errorChecked("curl_easy_setopt(CURLOPT_TIMEOUT)", - curl_easy_setopt(_hcurl, CURLOPT_TIMEOUT, _clientConfig.timeout)); - _errorChecked("curl_easy_setopt(CURLOPT_LOW_SPEED_LIMIT)", - curl_easy_setopt(_hcurl, CURLOPT_LOW_SPEED_LIMIT, _clientConfig.lowSpeedLimit)); - _errorChecked("curl_easy_setopt(CURLOPT_LOW_SPEED_TIME)", - curl_easy_setopt(_hcurl, CURLOPT_LOW_SPEED_TIME, _clientConfig.lowSpeedTime)); - - _errorChecked("curl_easy_setopt(CURLOPT_FAILONERROR)", curl_easy_setopt(_hcurl, CURLOPT_FAILONERROR, 1L)); - _errorChecked("curl_easy_setopt(CURLOPT_WRITEFUNCTION)", - curl_easy_setopt(_hcurl, CURLOPT_WRITEFUNCTION, forwardToClient)); - _errorChecked("curl_easy_setopt(CURLOPT_WRITEDATA)", curl_easy_setopt(_hcurl, CURLOPT_WRITEDATA, this)); - _errorChecked("curl_easy_perform()", curl_easy_perform(_hcurl)); -} - -json Client::readAsJson() { - vector data; - this->read([&data](char const* buf, size_t size) { data.insert(data.cend(), buf, buf + size); }); - return json::parse(data); } -void Client::_errorChecked(string const& scope, CURLcode errnum) { +void Client::_curlEasyErrorChecked(string const& scope, CURLcode errnum) { if (errnum != CURLE_OK) { string errorStr = curl_easy_strerror(errnum); long httpResponseCode = 0; diff --git a/src/http/Client.h b/src/http/Client.h index 66514da27..6d3dc8162 100644 --- a/src/http/Client.h +++ b/src/http/Client.h @@ -23,156 +23,25 @@ // System headers #include +#include #include #include -#include "curl/curl.h" // Third-party headers +#include "curl/curl.h" #include "nlohmann/json.hpp" // Qserv headers +#include "http/ClientConfig.h" #include "http/Method.h" -// This header declarations +// Forward declarations namespace lsst::qserv::http { +class ClientConnPool; +} // namespace lsst::qserv::http -/** - * Class ClientConfig encapsulates configuration parameters related to 'libcurl' - * option setter. - */ -class ClientConfig { -public: - /// The folder where the parameters are stored in the persistent configuration. - static std::string const category; - - // ------------------------------------------------ - // Keys for the SSL certs of the final data servers - // ------------------------------------------------ - - /// A flag set with 'CURLOPT_SSL_VERIFYHOST' - static std::string const sslVerifyHostKey; - - /// A flag set with 'CURLOPT_SSL_VERIFYPEER' - static std::string const sslVerifyPeerKey; - - /// A path to a folder (at worker) with certs set with 'CURLOPT_CAPATH'. - static std::string const caPathKey; - - /// A path to an existing cert file (at worker) set with 'CURLOPT_CAINFO'. - static std::string const caInfoKey; - - /// A value of a cert which would have to be pulled from the configuration - /// databases placed into a local file (at worker) be set with 'CURLOPT_CAINFO'. - /// This option is used if it's impossible to preload required certificates - /// at workers, or make them directly readable by worker's ingest services otherwise. - static std::string const caInfoValKey; - - // -------------------------------------------------------- - // Keys for the SSL certs of the intermediate proxy servers - // -------------------------------------------------------- - - /// A flag set with 'CURLOPT_PROXY_SSL_VERIFYHOST' - static std::string const proxySslVerifyHostKey; - - /// A flag set with 'CURLOPT_PROXY_SSL_VERIFYPEER' - static std::string const proxySslVerifyPeerKey; - - /// A path to a folder (at worker) with certs set with 'CURLOPT_PROXY_CAPATH'. - static std::string const proxyCaPathKey; - - /// A path to an existing cert file (at worker) set with 'CURLOPT_PROXY_CAINFO'. - static std::string const proxyCaInfoKey; - - /// A value of a cert which would have to be pulled from the configuration - /// databases placed into a local file (at worker) be set with 'CURLOPT_PROXY_CAINFO'. - /// This option is used if it's impossible to preload required certificates - /// at workers, or make them directly readable by worker's ingest services otherwise. - static std::string const proxyCaInfoValKey; - - // ---------------------------------------------------------- - // Configuration parameters of the intermediate proxy servers - // ---------------------------------------------------------- - - /// Set (or reset) the proxy to use for the upcoming request ('CURLOPT_PROXY'). - static std::string const proxyKey; - - /// Set a comma separated list of host names that do not require a proxy to get - /// reached, even if one is specified ('CURLOPT_NOPROXY'). - static std::string const noProxyKey; - - /// Set the tunnel parameter to a desired value to be used by the CURL library - /// when communicating with a proxy ('CURLOPT_HTTPPROXYTUNNEL'). - static std::string const httpProxyTunnelKey; - - // -------------------------------------------------------- - // The group of parameters affecting timing of the requests - // -------------------------------------------------------- - - /// A value of the connection timeout set with 'CURLOPT_CONNECTTIMEOUT'. - /// @note the default value is of the timeout is 300 seconds. Setting a value - /// of this parameter to 0 will reset it to the default. - static std::string const connectTimeoutKey; - - /// Set maximum time the request is allowed to take ('CURLOPT_TIMEOUT'). - /// @note by default, there is no timeout. Setting a value of this parameter - /// to 0 will reset it to the default. - static std::string const timeoutKey; - - /// Set low speed limit in bytes per second ('CURLOPT_LOW_SPEED_LIMIT'). - /// The parameter is normally used together with 'CURLOPT_LOW_SPEED_TIME'. - /// @note the default value of the parameter is 0, that puts no limit - /// on the minimally desired data transfer speed. - static std::string const lowSpeedLimitKey; - - /// Set low speed limit time period in seconds ('CURLOPT_LOW_SPEED_TIME'). - /// The parameter is normally used together with 'CURLOPT_LOW_SPEED_LIMIT'. - /// @note the default value of the parameter is 0, that puts no limit - /// on the minimally desired interval for measuring the data transfer speed. - static std::string const lowSpeedTimeKey; - - // ---------------------------------------------------------- - // A group of parameters that impose resource usage limits on - // ingest processing scheduler. - // ---------------------------------------------------------- - - /// The concurrency limit for the number of the asynchronous requests - /// to be processes simultaneously. - static std::string const asyncProcLimitKey; - - // Objects of this class can be trivially constructed, copied or deleted. - // The default state of an object corresponds to not having any of the options - // carried by the class be set when using 'libcurl' API. - - ClientConfig() = default; - ClientConfig(ClientConfig const&) = default; - ClientConfig& operator=(ClientConfig const&) = default; - ~ClientConfig() = default; - - // Values of the parameters - - bool sslVerifyHost = true; - bool sslVerifyPeer = true; - std::string caPath; - std::string caInfo; - std::string caInfoVal; - - bool proxySslVerifyHost = true; - bool proxySslVerifyPeer = true; - std::string proxyCaPath; - std::string proxyCaInfo; - std::string proxyCaInfoVal; - - std::string proxy; - std::string noProxy; - long httpProxyTunnel = 0; - - long connectTimeout = 0; ///< corresponds to the default (300 seconds) - long timeout = 0; ///< corresponds to the default (no timeout) - long lowSpeedLimit = 0; ///< corresponds to the default (no limit) - long lowSpeedTime = 0; ///< corresponds to the default (no limit) - - unsigned int asyncProcLimit = 0; ///< corresponds to the default (no limit) -}; +// This header declarations +namespace lsst::qserv::http { /** * Class Client is a simple interface for communicating over the HTTP protocol. @@ -220,10 +89,12 @@ class Client { * @param data Optional data to be sent with a request (depends on the HTTP headers). * @param headers Optional HTTP headers to be send with a request. * @param clientConfig Optional configuration parameters of the reader. + * @param connPool Optional connection pool */ Client(http::Method method, std::string const& url, std::string const& data = std::string(), std::vector const& headers = std::vector(), - ClientConfig const& clientConfig = ClientConfig()); + ClientConfig const& clientConfig = ClientConfig(), + std::shared_ptr const& connPool = nullptr); /** * Begin processing a request. The whole content of the remote data source @@ -251,22 +122,31 @@ class Client { private: /** - * Check for an error condition. - * - * @param scope A location from which the method was called (used for error reporting). - * @param errnum A result reported by the CURL library function. - * @throw std::runtime_error If the error-code is not CURL_OK. + * Set connection options as requested in the client configuration. + * @see _curlEasyErrorChecked for exceptions thrown by the method. */ - void _errorChecked(std::string const& scope, CURLcode errnum); + void _setConnOptions(); /** - * Non-member function declaration used for pushing chunks of data retrieved from - * an input stream managed by libcurl into the class's method _store(). + * Set SSL/TLS certificate as requested in the client configuration. + * @see _curlEasyErrorChecked for exceptions thrown by the method. + */ + void _setSslCertOptions(); + + /** + * Set proxy options as requested in the client configuration. + * @see _curlEasyErrorChecked for exceptions thrown by the method. + */ + void _setProxyOptions(); + + /** + * Check for an error condition. * - * See the implementation of the class for further details on the function. - * See the documentation on lincurl C API for an explanation of the function's parameters. + * @param scope A location from which the method was called (used for error reporting). + * @param errnum A result reported by the CURL library function. + * @throw std::runtime_error If the error-code is not CURLE_OK. */ - friend size_t forwardToClient(char* ptr, size_t size, size_t nmemb, void* userdata); + void _curlEasyErrorChecked(std::string const& scope, CURLcode errnum); /** * This method is invoked by function forwardToClient() on each chunk of data @@ -277,6 +157,16 @@ class Client { */ void _store(char const* ptr, size_t nchars); + /** + * The non-member callback function is used for pushing chunks of data retrieved from + * an input stream managed by libcurl into the class's method _store(). + * + * See the implementation of the class for further details on the function. + * See the documentation on lincurl C API for an explanation of + * the function's parameters. + */ + friend std::size_t forwardToClient(char* ptr, std::size_t size, std::size_t nmemb, void* client); + // Input parameters http::Method const _method; @@ -284,6 +174,7 @@ class Client { std::string const _data; std::vector const _headers; ClientConfig const _clientConfig; + std::shared_ptr const _connPool; CallbackType _onDataRead; ///< set by method read() before pulling the data diff --git a/src/http/ClientConfig.cc b/src/http/ClientConfig.cc new file mode 100644 index 000000000..68837b8c3 --- /dev/null +++ b/src/http/ClientConfig.cc @@ -0,0 +1,59 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "http/ClientConfig.h" + +using namespace std; + +namespace lsst::qserv::http { + +string const ClientConfig::category = "worker-http-file-reader"; + +string const ClientConfig::httpVersionKey = "CURLOPT_HTTP_VERSION"; +string const ClientConfig::bufferSizeKey = "CURLOPT_BUFFERSIZE"; +string const ClientConfig::maxConnectsKey = "CURLOPT_MAXCONNECTS"; +string const ClientConfig::connectTimeoutKey = "CONNECTTIMEOUT"; +string const ClientConfig::timeoutKey = "TIMEOUT"; +string const ClientConfig::lowSpeedLimitKey = "LOW_SPEED_LIMIT"; +string const ClientConfig::lowSpeedTimeKey = "LOW_SPEED_TIME"; +string const ClientConfig::tcpKeepAliveKey = "CURLOPT_TCP_KEEPALIVE"; +string const ClientConfig::tcpKeepIdleKey = "CURLOPT_TCP_KEEPIDLE"; +string const ClientConfig::tcpKeepIntvlKey = "CURLOPT_TCP_KEEPINTVL"; + +string const ClientConfig::sslVerifyHostKey = "SSL_VERIFYHOST"; +string const ClientConfig::sslVerifyPeerKey = "SSL_VERIFYPEER"; +string const ClientConfig::caPathKey = "CAPATH"; +string const ClientConfig::caInfoKey = "CAINFO"; +string const ClientConfig::caInfoValKey = "CAINFO_VAL"; + +string const ClientConfig::proxyKey = "CURLOPT_PROXY"; +string const ClientConfig::noProxyKey = "CURLOPT_NOPROXY"; +string const ClientConfig::httpProxyTunnelKey = "CURLOPT_HTTPPROXYTUNNEL"; +string const ClientConfig::proxySslVerifyHostKey = "PROXY_SSL_VERIFYHOST"; +string const ClientConfig::proxySslVerifyPeerKey = "PROXY_SSL_VERIFYPEER"; +string const ClientConfig::proxyCaPathKey = "PROXY_CAPATH"; +string const ClientConfig::proxyCaInfoKey = "PROXY_CAINFO"; +string const ClientConfig::proxyCaInfoValKey = "PROXY_CAINFO_VAL"; + +string const ClientConfig::asyncProcLimitKey = "ASYNC_PROC_LIMIT"; + +} // namespace lsst::qserv::http diff --git a/src/http/ClientConfig.h b/src/http/ClientConfig.h new file mode 100644 index 000000000..0b4302b42 --- /dev/null +++ b/src/http/ClientConfig.h @@ -0,0 +1,135 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_HTTP_CLIENTCONFIG_H +#define LSST_QSERV_HTTP_CLIENTCONFIG_H + +// System headers +#include + +// Third-party headers +#include "curl/curl.h" + +// This header declarations +namespace lsst::qserv::http { + +/** + * Class ClientConfig encapsulates configuration parameters related to 'libcurl' + * option setter. + */ +class ClientConfig { +public: + /// The folder where the parameters are stored in the persistent configuration. + static std::string const category; + + // The protocol and connection options keys + + static std::string const httpVersionKey; ///< CURLOPT_HTTP_VERSION + static std::string const bufferSizeKey; ///< CURLOPT_BUFFERSIZE + static std::string const maxConnectsKey; ///< CURLOPT_MAXCONNECTS + static std::string const connectTimeoutKey; ///< CURLOPT_CONNECTTIMEOUT + static std::string const timeoutKey; ///< CURLOPT_TIMEOUT + static std::string const lowSpeedLimitKey; ///< CURLOPT_LOW_SPEED_LIMIT + static std::string const lowSpeedTimeKey; ///< CURLOPT_LOW_SPEED_TIME + static std::string const tcpKeepAliveKey; ///< CURLOPT_TCP_KEEPALIVE + static std::string const tcpKeepIdleKey; ///< CURLOPT_TCP_KEEPIDLE + static std::string const tcpKeepIntvlKey; ///< CURLOPT_TCP_KEEPINTVL + + // Keys for the SSL certs of the final data servers + + static std::string const sslVerifyHostKey; ///< CURLOPT_SSL_VERIFYHOST + static std::string const sslVerifyPeerKey; ///< CURLOPT_SSL_VERIFYPEER + static std::string const caPathKey; ///< CURLOPT_CAPATH + static std::string const caInfoKey; ///< CURLOPT_CAINFO + + /// A value of a cert which would have to be pulled from the configuration + /// databases placed into a local file (at worker) be set with 'CURLOPT_CAINFO'. + /// This option is used if it's impossible to preload required certificates + /// at workers, or make them directly readable by worker's ingest services otherwise. + static std::string const caInfoValKey; + + // Configuration parameters of the intermediate proxy servers + + static std::string const proxyKey; ///< CURLOPT_PROXY + static std::string const noProxyKey; ///< CURLOPT_NOPROXY + static std::string const httpProxyTunnelKey; ///< CURLOPT_HTTPPROXYTUNNEL + static std::string const proxySslVerifyHostKey; ///< CURLOPT_PROXY_SSL_VERIFYHOST + static std::string const proxySslVerifyPeerKey; ///< CURLOPT_PROXY_SSL_VERIFYPEER + static std::string const proxyCaPathKey; ///< CURLOPT_PROXY_CAPATH + static std::string const proxyCaInfoKey; ///< CURLOPT_PROXY_CAINFO + + /// A value of a cert which would have to be pulled from the configuration + /// databases placed into a local file (at worker) be set with 'CURLOPT_PROXY_CAINFO'. + /// This option is used if it's impossible to preload required certificates + /// at workers, or make them directly readable by worker's ingest services otherwise. + static std::string const proxyCaInfoValKey; + + /// The concurrency limit for the number of the asynchronous requests + /// to be processes simultaneously. + /// TODO: Move this parameter to the Replication System's Configuration + /// as it doesn't belong here. + static std::string const asyncProcLimitKey; + + // Objects of this class can be trivially constructed, copied or deleted. + // The default state of an object corresponds to not having any of the options + // carried by the class be set when using 'libcurl' API. + + ClientConfig() = default; + ClientConfig(ClientConfig const&) = default; + ClientConfig& operator=(ClientConfig const&) = default; + ~ClientConfig() = default; + + /// The desired version number of the protocol, where CURL_HTTP_VERSION_NONE + /// corresponds to the default behavior of the library, which depends on a verison + /// of the library itself. + /// https://curl.se/libcurl/c/CURLOPT_HTTP_VERSION.html + long httpVersion = CURL_HTTP_VERSION_NONE; + + long bufferSize = 0; + long maxConnects = 0; + bool tcpKeepAlive = false; + long tcpKeepIdle = 0; + long tcpKeepIntvl = 0; + long connectTimeout = 0; + long timeout = 0; + long lowSpeedLimit = 0; + long lowSpeedTime = 0; + + bool sslVerifyHost = true; + bool sslVerifyPeer = true; + std::string caPath; + std::string caInfo; + std::string caInfoVal; + + std::string proxy; + std::string noProxy; + long httpProxyTunnel = 0; + bool proxySslVerifyHost = true; + bool proxySslVerifyPeer = true; + std::string proxyCaPath; + std::string proxyCaInfo; + std::string proxyCaInfoVal; + + unsigned int asyncProcLimit = 0; ///< Zero corresponds to the default behavior (no limit) +}; + +} // namespace lsst::qserv::http + +#endif // LSST_QSERV_HTTP_CLIENTCONFIG_H diff --git a/src/http/ClientConnPool.cc b/src/http/ClientConnPool.cc new file mode 100644 index 000000000..47b659525 --- /dev/null +++ b/src/http/ClientConnPool.cc @@ -0,0 +1,69 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "http/ClientConnPool.h" + +// Qserv headers +#include "http/Exceptions.h" + +// Standard headers +#include + +using namespace std; + +namespace lsst::qserv::http { + +mutex ClientConnPool::_accessShareCurlMtx; + +ClientConnPool::ClientConnPool(long maxConnections) : _maxConnections(maxConnections) { + _shareCurl = curl_share_init(); + assert(_shareCurl != nullptr); + _errorChecked("curl_share_setopt(CURLSHOPT_LOCKFUNC)", + curl_share_setopt(_shareCurl, CURLSHOPT_LOCKFUNC, &ClientConnPool::_share_lock_cb)); + _errorChecked("curl_share_setopt(CURLSHOPT_UNLOCKFUNC)", + curl_share_setopt(_shareCurl, CURLSHOPT_UNLOCKFUNC, &ClientConnPool::_share_unlock_cb)); + _errorChecked("curl_share_setopt(CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT)", + curl_share_setopt(_shareCurl, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT)); +} + +ClientConnPool::~ClientConnPool() { curl_share_cleanup(_shareCurl); } + +void ClientConnPool::_share_lock_cb(CURL* handle, curl_lock_data data, curl_lock_access access, + void* userptr) { + ClientConnPool::_accessShareCurlMtx.lock(); +} + +void ClientConnPool::_share_unlock_cb(CURL* handle, curl_lock_data data, curl_lock_access access, + void* userptr) { + ClientConnPool::_accessShareCurlMtx.unlock(); +} + +void ClientConnPool::_errorChecked(string const& scope, CURLSHcode errnum) { + if (errnum != CURLSHE_OK) { + string const errorStr = curl_share_strerror(errnum); + long const httpResponseCode = 0; + http::raiseRetryAllowedError(scope, " error: '" + errorStr + "', errnum: " + to_string(errnum), + httpResponseCode); + } +} + +} // namespace lsst::qserv::http diff --git a/src/http/ClientConnPool.h b/src/http/ClientConnPool.h new file mode 100644 index 000000000..095a39418 --- /dev/null +++ b/src/http/ClientConnPool.h @@ -0,0 +1,85 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_HTTP_CLIENTCONNPOOL_H +#define LSST_QSERV_HTTP_CLIENTCONNPOOL_H + +// System headers +#include +#include + +// Third-party headers +#include "curl/curl.h" + +// This header declarations +namespace lsst::qserv::http { +/** + * Class ClientConnPool is a helper class utilizing the libcurl's context + * sharing mechanism for building a configurable pool of the TCP connections. + * Note that this implementation doesn't directly manage connections. The connections + * are owned and managed by the library itself. A role of the class is to provide + * a synchronization context for acquring/releasing these connections in the multi-thread + * environment. + * + * The implementation is based on: https://curl.se/libcurl/c/libcurl-share.html + */ +class ClientConnPool { +public: + /** + * Initialize the pool + * @param maxConnections The maximum number of connections allowed in the pool. + * If 0 is passed as a value of the parameter then the default pool size + * (depends in an implementation of the loibcurl library) will not be changed. + */ + explicit ClientConnPool(long maxConnections = 0); + ClientConnPool(ClientConnPool const&) = delete; + ClientConnPool& operator=(ClientConnPool const&) = delete; + + ~ClientConnPool(); + + long maxConnections() const { return _maxConnections; } + CURLSH* shareCurl() { return _shareCurl; } + +private: + // These callback functions are required by libcurl to allow easy-based instances + // of the class Client acquire/release connections from the pool. + + static void _share_lock_cb(CURL* handle, curl_lock_data data, curl_lock_access access, void* userptr); + static void _share_unlock_cb(CURL* handle, curl_lock_data data, curl_lock_access access, void* userptr); + + /** + * Check for an error condition. + * + * @param scope A location from which the method was called (used for error reporting). + * @param errnum A result reported by the CURL library function. + * @throw std::runtime_error If the error-code is not CURLSHE_OK. + */ + void _errorChecked(std::string const& scope, CURLSHcode errnum); + + /// The mutex is shared by all instances of the pool. + static std::mutex _accessShareCurlMtx; + + long const _maxConnections = 0; + CURLSH* _shareCurl; +}; + +} // namespace lsst::qserv::http + +#endif // LSST_QSERV_HTTP_CLIENTCONNPOOL_H diff --git a/src/qhttp/Server.cc b/src/qhttp/Server.cc index 836693da9..18e741faf 100644 --- a/src/qhttp/Server.cc +++ b/src/qhttp/Server.cc @@ -245,10 +245,7 @@ void Server::_readRequest(std::shared_ptr socket) { } if (request->version == "HTTP/1.1") { - // Temporary disable this option due to a bug in the implementation - // causing disconnect if running the service within the Docker environment. - // See: DM-27396 - //*reuseSocket = true; + *reuseSocket = true; } if (request->header.count("Content-Length") > 0) {