Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opal Client add proxy config #729 #735

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions packages/opal-client/opal_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ class OpalClientConfig(Confi):
"http://localhost:8181",
description="The URL of the policy store (e.g., OPA agent).",
)

POLICY_STORE_PROXY_URL = confi.str(
"POLICY_STORE_PROXY_URL",
None,
description="The URL of the proxy to use for the policy store.",
)
POLICY_STORE_AUTH_TYPE = confi.enum(
"POLICY_STORE_AUTH_TYPE",
PolicyStoreAuth,
Expand Down Expand Up @@ -96,7 +100,21 @@ class OpalClientConfig(Confi):
},
description="Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)",
)

DATA_FETCHER_PROXY_URL = confi.str(
"DATA_FETCHER_PROXY_URL",
None,
description="The URL of the proxy to use for the data fetcher.",
)
POLICY_PROXY_URL = confi.str(
"POLICY_PROXY_URL",
None,
description="The URL of the proxy to use for the policy.",
)
LIMITER_SERVER_PROXY_URL = confi.str(
"LIMITER_PROXY_URL",
None,
description="The URL of the proxy to use for the limiter.",
)
POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list(
"POLICY_STORE_POLICY_PATHS_TO_IGNORE",
[],
Expand Down Expand Up @@ -389,4 +407,4 @@ def on_load(self):
self.DATA_UPDATER_CONN_RETRY = self.DATA_STORE_CONN_RETRY


opal_client_config = OpalClientConfig(prefix="OPAL_")
opal_client_config = OpalClientConfig(prefix="OPAL_")
6 changes: 5 additions & 1 deletion packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def __init__(
self._polling_update_tasks = []
self._on_connect_callbacks = on_connect or []
self._on_disconnect_callbacks = on_disconnect or []
self._aiohttp_client_session_args = {}
if opal_client_config.DATA_FETCHER_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.DATA_FETCHER_PROXY_URL

async def __aenter__(self):
await self.start()
Expand Down Expand Up @@ -183,7 +186,8 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig:
url = self._data_sources_config_url
logger.info("Getting data-sources configuration from '{source}'", source=url)
try:
async with ClientSession(headers=self._extra_headers) as session:
_aiohttp_client_session_args['headers'] = self._extra_headers
async with ClientSession(**self._aiohttp_client_session_args) as session:
response = await session.get(url, **self._ssl_context_kwargs)
if response.status == 200:
return DataSourceConfig.parse_obj(await response.json())
Expand Down
5 changes: 4 additions & 1 deletion packages/opal-client/opal_client/limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ def __init__(self, backend_url=None, token=None):
if self._custom_ssl_context is not None
else {}
)
self._aiohttp_client_session_args = {}
if opal_client_config.LIMITER_SERVER_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.LIMITER_SERVER_PROXY_URL

@retry(wait=wait_random_exponential(max=10), stop=stop.stop_never, reraise=True)
async def wait_for_server_ready(self):
logger.info("Trying to get server's load limit pass")
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
async with session.get(
self._loadlimit_endpoint_url,
Expand Down
5 changes: 4 additions & 1 deletion packages/opal-client/opal_client/policy/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def __init__(self, backend_url=None, token=None):
if self._custom_ssl_context is not None
else {}
)
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_PROXY_URL

@property
def policy_endpoint_url(self):
Expand Down Expand Up @@ -85,7 +88,7 @@ async def _fetch_policy_bundle(
params = {"path": directories}
if base_hash is not None:
params["base_hash"] = base_hash
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
logger.info(
"Fetching policy bundle from {url}",
url=self._policy_endpoint_url,
Expand Down
23 changes: 11 additions & 12 deletions packages/opal-client/opal_client/policy_store/cedar_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(
self._had_successful_policy_transaction = False
self._most_recent_data_transaction: Optional[StoreTransaction] = None
self._most_recent_policy_transaction: Optional[StoreTransaction] = None
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_STORE_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL

if auth_type == PolicyStoreAuth.TOKEN:
if self._token is None:
Expand Down Expand Up @@ -75,7 +78,7 @@ async def set_policy(
f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
async with session.put(
Expand All @@ -99,7 +102,7 @@ async def set_policy(
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policy(self, policy_id: str) -> Optional[str]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -116,7 +119,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]:
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policies(self) -> Optional[Dict[str, str]]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -140,8 +143,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No
f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -175,8 +177,7 @@ async def set_policy_data(
logger.warning(
"OPAL client was instructed to put something that is not a list on Cedar. This will probably not work."
)

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
async with session.put(
Expand Down Expand Up @@ -204,8 +205,7 @@ async def delete_policy_data(
):
if path != "":
raise ValueError("Cedar can only change the entire data structure at once.")

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -237,8 +237,7 @@ async def get_data(self, path: str) -> Dict:
raise ValueError("Cedar can only change the entire data structure at once.")
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.get(
f"{self._cedar_url}/data", headers=headers
) as cedar_response:
Expand Down Expand Up @@ -312,4 +311,4 @@ async def set_policies(
for module_id in deleted_modules:
print(module_id)
await self.delete_policy(policy_id=module_id)
self._policy_version = bundle.hash
self._policy_version = bundle.hash
32 changes: 14 additions & 18 deletions packages/opal-client/opal_client/policy_store/opa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(
self._last_failed_policy_transaction: Optional[StoreTransaction] = None
self._last_data_transaction: Optional[StoreTransaction] = None
self._last_failed_data_transaction: Optional[StoreTransaction] = None
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_STORE_PROXY_URL:
self._aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL

@property
def ready(self) -> bool:
Expand Down Expand Up @@ -410,8 +413,7 @@ async def get_policy_version(self) -> Optional[str]:
@retry(**RETRY_CONFIG)
async def _get_oauth_token(self):
logger.info("Retrieving a new OAuth access_token.")

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
async with session.post(
self._oauth_server,
Expand Down Expand Up @@ -475,7 +477,7 @@ async def set_policy(
f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -500,7 +502,7 @@ async def set_policy(
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policy(self, policy_id: str) -> Optional[str]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -518,7 +520,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]:
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policies(self) -> Optional[Dict[str, str]]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -544,8 +546,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No
f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -758,8 +759,7 @@ async def set_policy_data(
"OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped."
)
policy_data = {"items": policy_data}

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
data = json.dumps(exclude_none_fields(policy_data))
Expand Down Expand Up @@ -799,8 +799,7 @@ async def patch_policy_data(
"OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped."
)
policy_data = {"items": policy_data}

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
headers["Content-Type"] = "application/json-patch+json"
Expand Down Expand Up @@ -833,8 +832,7 @@ async def delete_policy_data(
path = self._safe_data_module_path(path)
if not path:
return await self.set_policy_data({})

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -871,8 +869,7 @@ async def get_data(self, path: str) -> Dict:
path = "/" + path
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.get(
f"{self._opa_url}/data{path}",
headers=headers,
Expand Down Expand Up @@ -901,8 +898,7 @@ async def get_data_with_input(self, path: str, input: BaseModel) -> Dict:
path = path[1:]
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.post(
f"{self._opa_url}/data/{path}",
data=json.dumps(opa_input),
Expand Down Expand Up @@ -968,4 +964,4 @@ async def full_import(self, reader: AsyncTextIOWrapper) -> None:
]
)

await self.set_policy_data(import_data["data"])
await self.set_policy_data(import_data["data"])