From f5a6316607f13b71305e5c15bf985d637cff5305 Mon Sep 17 00:00:00 2001 From: "soufian.labed-ext@amundi.com" Date: Fri, 10 Jan 2025 14:56:42 +0100 Subject: [PATCH] add proxy config --- packages/opal-client/opal_client/config.py | 24 ++++++++++++-- .../opal-client/opal_client/data/updater.py | 6 +++- packages/opal-client/opal_client/limiter.py | 5 ++- .../opal-client/opal_client/policy/fetcher.py | 5 ++- .../opal_client/policy_store/cedar_client.py | 23 +++++++------ .../opal_client/policy_store/opa_client.py | 32 ++++++++----------- 6 files changed, 59 insertions(+), 36 deletions(-) diff --git a/packages/opal-client/opal_client/config.py b/packages/opal-client/opal_client/config.py index b5d94fb3a..421838276 100644 --- a/packages/opal-client/opal_client/config.py +++ b/packages/opal-client/opal_client/config.py @@ -30,7 +30,11 @@ class OpalClientConfig(Confi): "http://localhost:8181", description="The URL of the policy store (e.g., OPA agent).", ) - + POLICY_STORE_PROXY_URL = confi.str( + "POLICY_STORE_PROXY_URL", + None, + description="The URL of the proxy to use for the policy store.", + ) POLICY_STORE_AUTH_TYPE = confi.enum( "POLICY_STORE_AUTH_TYPE", PolicyStoreAuth, @@ -96,7 +100,21 @@ class OpalClientConfig(Confi): }, description="Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)", ) - + DATA_FETCHER_PROXY_URL = confi.str( + "DATA_FETCHER_PROXY_URL", + None, + description="The URL of the proxy to use for the data fetcher.", + ) + POLICY_PROXY_URL = confi.str( + "POLICY_PROXY_URL", + None, + description="The URL of the proxy to use for the policy.", + ) + LIMITER_SERVER_PROXY_URL = confi.str( + "LIMITER_PROXY_URL", + None, + description="The URL of the proxy to use for the limiter.", + ) POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list( "POLICY_STORE_POLICY_PATHS_TO_IGNORE", [], @@ -389,4 +407,4 @@ def on_load(self): self.DATA_UPDATER_CONN_RETRY = self.DATA_STORE_CONN_RETRY -opal_client_config = OpalClientConfig(prefix="OPAL_") +opal_client_config = OpalClientConfig(prefix="OPAL_") \ No newline at end of file diff --git a/packages/opal-client/opal_client/data/updater.py b/packages/opal-client/opal_client/data/updater.py index c99ca3884..a122056c2 100644 --- a/packages/opal-client/opal_client/data/updater.py +++ b/packages/opal-client/opal_client/data/updater.py @@ -137,6 +137,9 @@ def __init__( self._polling_update_tasks = [] self._on_connect_callbacks = on_connect or [] self._on_disconnect_callbacks = on_disconnect or [] + self._aiohttp_client_session_args = {} + if opal_client_config.DATA_FETCHER_PROXY_URL: + _aiohttp_client_session_args['proxy'] = opal_client_config.DATA_FETCHER_PROXY_URL async def __aenter__(self): await self.start() @@ -183,7 +186,8 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig: url = self._data_sources_config_url logger.info("Getting data-sources configuration from '{source}'", source=url) try: - async with ClientSession(headers=self._extra_headers) as session: + _aiohttp_client_session_args['headers'] = self._extra_headers + async with ClientSession(**self._aiohttp_client_session_args) as session: response = await session.get(url, **self._ssl_context_kwargs) if response.status == 200: return DataSourceConfig.parse_obj(await response.json()) diff --git a/packages/opal-client/opal_client/limiter.py b/packages/opal-client/opal_client/limiter.py index e3a5f4d00..ff1ec3823 100644 --- a/packages/opal-client/opal_client/limiter.py +++ b/packages/opal-client/opal_client/limiter.py @@ -27,11 +27,14 @@ def __init__(self, backend_url=None, token=None): if self._custom_ssl_context is not None else {} ) + self._aiohttp_client_session_args = {} + if opal_client_config.LIMITER_SERVER_PROXY_URL: + _aiohttp_client_session_args['proxy'] = opal_client_config.LIMITER_SERVER_PROXY_URL @retry(wait=wait_random_exponential(max=10), stop=stop.stop_never, reraise=True) async def wait_for_server_ready(self): logger.info("Trying to get server's load limit pass") - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: async with session.get( self._loadlimit_endpoint_url, diff --git a/packages/opal-client/opal_client/policy/fetcher.py b/packages/opal-client/opal_client/policy/fetcher.py index b7c8c543f..7ea5137d9 100644 --- a/packages/opal-client/opal_client/policy/fetcher.py +++ b/packages/opal-client/opal_client/policy/fetcher.py @@ -57,6 +57,9 @@ def __init__(self, backend_url=None, token=None): if self._custom_ssl_context is not None else {} ) + self._aiohttp_client_session_args = {} + if opal_client_config.POLICY_PROXY_URL: + _aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_PROXY_URL @property def policy_endpoint_url(self): @@ -85,7 +88,7 @@ async def _fetch_policy_bundle( params = {"path": directories} if base_hash is not None: params["base_hash"] = base_hash - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: logger.info( "Fetching policy bundle from {url}", url=self._policy_endpoint_url, diff --git a/packages/opal-client/opal_client/policy_store/cedar_client.py b/packages/opal-client/opal_client/policy_store/cedar_client.py index 3be4cbea7..abd3015e4 100644 --- a/packages/opal-client/opal_client/policy_store/cedar_client.py +++ b/packages/opal-client/opal_client/policy_store/cedar_client.py @@ -43,6 +43,9 @@ def __init__( self._had_successful_policy_transaction = False self._most_recent_data_transaction: Optional[StoreTransaction] = None self._most_recent_policy_transaction: Optional[StoreTransaction] = None + self._aiohttp_client_session_args = {} + if opal_client_config.POLICY_STORE_PROXY_URL: + _aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL if auth_type == PolicyStoreAuth.TOKEN: if self._token is None: @@ -75,7 +78,7 @@ async def set_policy( f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE." ) return - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() async with session.put( @@ -99,7 +102,7 @@ async def set_policy( @fail_silently() @retry(**RETRY_CONFIG) async def get_policy(self, policy_id: str) -> Optional[str]: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -116,7 +119,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]: @fail_silently() @retry(**RETRY_CONFIG) async def get_policies(self) -> Optional[Dict[str, str]]: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -140,8 +143,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE." ) return - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -175,8 +177,7 @@ async def set_policy_data( logger.warning( "OPAL client was instructed to put something that is not a list on Cedar. This will probably not work." ) - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() async with session.put( @@ -204,8 +205,7 @@ async def delete_policy_data( ): if path != "": raise ValueError("Cedar can only change the entire data structure at once.") - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -237,8 +237,7 @@ async def get_data(self, path: str) -> Dict: raise ValueError("Cedar can only change the entire data structure at once.") try: headers = await self._get_auth_headers() - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: async with session.get( f"{self._cedar_url}/data", headers=headers ) as cedar_response: @@ -312,4 +311,4 @@ async def set_policies( for module_id in deleted_modules: print(module_id) await self.delete_policy(policy_id=module_id) - self._policy_version = bundle.hash + self._policy_version = bundle.hash \ No newline at end of file diff --git a/packages/opal-client/opal_client/policy_store/opa_client.py b/packages/opal-client/opal_client/policy_store/opa_client.py index 86caa5d72..f58da958c 100644 --- a/packages/opal-client/opal_client/policy_store/opa_client.py +++ b/packages/opal-client/opal_client/policy_store/opa_client.py @@ -111,6 +111,9 @@ def __init__( self._last_failed_policy_transaction: Optional[StoreTransaction] = None self._last_data_transaction: Optional[StoreTransaction] = None self._last_failed_data_transaction: Optional[StoreTransaction] = None + self._aiohttp_client_session_args = {} + if opal_client_config.POLICY_STORE_PROXY_URL: + self._aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL @property def ready(self) -> bool: @@ -410,8 +413,7 @@ async def get_policy_version(self) -> Optional[str]: @retry(**RETRY_CONFIG) async def _get_oauth_token(self): logger.info("Retrieving a new OAuth access_token.") - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: async with session.post( self._oauth_server, @@ -475,7 +477,7 @@ async def set_policy( f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE." ) return - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -500,7 +502,7 @@ async def set_policy( @fail_silently() @retry(**RETRY_CONFIG) async def get_policy(self, policy_id: str) -> Optional[str]: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -518,7 +520,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]: @fail_silently() @retry(**RETRY_CONFIG) async def get_policies(self) -> Optional[Dict[str, str]]: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -544,8 +546,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE." ) return - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -758,8 +759,7 @@ async def set_policy_data( "OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped." ) policy_data = {"items": policy_data} - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() data = json.dumps(exclude_none_fields(policy_data)) @@ -799,8 +799,7 @@ async def patch_policy_data( "OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped." ) policy_data = {"items": policy_data} - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() headers["Content-Type"] = "application/json-patch+json" @@ -833,8 +832,7 @@ async def delete_policy_data( path = self._safe_data_module_path(path) if not path: return await self.set_policy_data({}) - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: try: headers = await self._get_auth_headers() @@ -871,8 +869,7 @@ async def get_data(self, path: str) -> Dict: path = "/" + path try: headers = await self._get_auth_headers() - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: async with session.get( f"{self._opa_url}/data{path}", headers=headers, @@ -901,8 +898,7 @@ async def get_data_with_input(self, path: str, input: BaseModel) -> Dict: path = path[1:] try: headers = await self._get_auth_headers() - - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session: async with session.post( f"{self._opa_url}/data/{path}", data=json.dumps(opa_input), @@ -968,4 +964,4 @@ async def full_import(self, reader: AsyncTextIOWrapper) -> None: ] ) - await self.set_policy_data(import_data["data"]) + await self.set_policy_data(import_data["data"]) \ No newline at end of file