Skip to content

Commit

Permalink
New version of test request actions from bb
Browse files Browse the repository at this point in the history
  • Loading branch information
davidesner committed May 10, 2024
1 parent bfe9803 commit 40b8bb4
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 47 deletions.
2 changes: 2 additions & 0 deletions python-sync-actions/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ ENV PYTHONIOENCODING utf-8

COPY /src /code/src/
COPY /tests /code/tests/
COPY /scripts /code/scripts/
COPY requirements.txt /code/requirements.txt
COPY flake8.cfg /code/flake8.cfg
COPY deploy.sh /code/deploy.sh

# install gcc to be able to build packages - e.g. required by regex, dateparser, also required for pandas
RUN apt-get update && apt-get install -y build-essential curl
Expand Down
23 changes: 22 additions & 1 deletion python-sync-actions/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,25 @@ services:
- KBC_DATADIR=./data
command:
- /bin/sh
- /code/scripts/build_n_test.sh
- /code/scripts/build_n_test.sh

test-calls:
# Run examples against mock server
build: .
volumes:
- ./:/code
- ./data:/data
command: python /code/tests/_test_calls.py
links:
- mock-server
mock-server:
container_name: mock-server
image: quay.io/keboola/ex-generic-mock-server:latest
tty: true
stdin_open: true
ports:
- "8888:80"
volumes:
- ./tests/calls:/examples/
environment:
- KBC_EXAMPLES_DIR=/examples/
3 changes: 3 additions & 0 deletions python-sync-actions/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import sys
import os
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../src")
177 changes: 134 additions & 43 deletions python-sync-actions/src/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
"""
import json
import logging
from io import StringIO

from keboola.component.base import ComponentBase, sync_action
from keboola.component.exceptions import UserException
from nested_lookup import nested_lookup

import configuration
from configuration import Configuration, DataPath
from actions.curl import build_job_from_curl
from actions.mapping import infer_mapping
from configuration import Configuration
from http_generic.auth import AuthMethodBuilder, AuthBuilderError
from http_generic.client import GenericHttpClient
from user_functions import UserFunctions

from placeholders_utils import PlaceholdersUtils

# configuration variables
KEY_API_TOKEN = '#api_token'
KEY_PRINT_HELLO = 'print_hello'
Expand All @@ -40,47 +43,22 @@ class Component(ComponentBase):

def __init__(self):
super().__init__()
# initialize instance parameters
self.log = StringIO()
logging.getLogger().addHandler(logging.StreamHandler(self.log))

self.user_functions = UserFunctions()

self._configuration: Configuration = None
self._client: GenericHttpClient = None
self._parent_params = {}
self._final_results = []
self._final_response = None

def run(self):
"""
Main execution code
Main component method
"""

self.init_component()
api_cfg = self._configuration.api
request_cfg = self._configuration.request_parameters
# fix KBC bug
user_params = self._configuration.user_parameters
# evaluate user_params inside the user params itself
user_params = self._fill_in_user_parameters(user_params, user_params)

# build headers
headers = {**api_cfg.default_headers.copy(), **request_cfg.headers.copy()}
new_headers = self._fill_in_user_parameters(headers, user_params)

# build additional parameters
query_parameters = {**api_cfg.default_query_parameters.copy(), **request_cfg.query_parameters.copy()}
query_parameters = self._fill_in_user_parameters(query_parameters, user_params)
ssl_verify = api_cfg.ssl_verification
timeout = api_cfg.timeout
# additional_params = self._build_request_parameters(additional_params_cfg)
request_parameters = {'params': query_parameters,
'headers': new_headers,
'verify': ssl_verify,
'timeout': timeout}

endpoint_path = request_cfg.endpoint_path

# use client to send requests / perform actions
response = self._client.send_request(method=request_cfg.method, endpoint_path=endpoint_path,
**request_parameters)

return self._parse_data(response.json(), self._configuration.data_path)
pass

def init_component(self):

Expand Down Expand Up @@ -111,7 +89,8 @@ def init_component(self):
def _fill_in_user_parameters(self, conf_objects: dict, user_param: dict):
"""
This method replaces user parameter references via attr + parses functions inside user parameters,
evaluates them and fills in the resulting values
evaluates them and fills in the resulting values
Args:
conf_objects: Configuration that contains the references via {"attr": "key"} to user parameters or function
definitions
Expand Down Expand Up @@ -139,6 +118,57 @@ def _fill_in_user_parameters(self, conf_objects: dict, user_param: dict):
'are not present in "user_parameters" field.'.format(non_matched))
return new_steps

def _fill_placeholders(self, placeholders, path):
"""
Fill placeholders in the path
Args:
placeholders: placeholders - names dict to fill
path: path with placeholders
row: row with data
"""

result_path = path
for key, dict in placeholders[0].items():
result_path = result_path.replace(f"{{{dict.get('placeholder')}}}", str(dict.get('value')))
return result_path

def _process_nested_job(self, parent_result, config, parent_results_list, client,
method, **request_parameters) -> list:
"""
Process nested job
Args:
parent_result: result of the parent job
config: configuration of the nested job
parent_results_list: list of parent results
client: http client
method: method to use
request_parameters: request parameters
"""

for row in parent_result:

parent_results_ext = parent_results_list + [row]

placeholders = PlaceholdersUtils.get_params_for_child_jobs(config.get('placeholders', {}),
parent_results_ext, self._parent_params)

self._parent_params = placeholders[0]
row_path = self._fill_placeholders(placeholders, config['endpoint'])
response = client.send_request(method=method, endpoint_path=row_path, **request_parameters)
child_response = self._parse_data(response.json(), DataPath(config.get('dataType'),
config.get('dataField', '.')))
children = config.get('children', [])
results = []
if children[0] if children else None:
nested_data = self._process_nested_job(child_response, children[0], parent_results_ext,
client, method, **request_parameters)
results.append(nested_data)
else:
self._final_results.append(child_response)
self._final_response = response

return results

def _perform_custom_function(self, key: str, function_cfg: dict, user_params: dict):
"""
Perform custom function recursively (may be nested)
Expand Down Expand Up @@ -175,14 +205,27 @@ def _parse_data(self, data, path) -> list:
Returns:
"""
keys = path.path.split(path.delimiter)
value = data
try:
for key in keys:
value = value[key]
except KeyError:
return None
return value

if path.path is None:
result = data
else:
if path.delimiter in path.path:
keys = path.path.split(path.delimiter) # Split the path only if delimiter is present
else:
keys = [path.path]

value = data
try:
for key in keys:
value = value[key]
result = value
except KeyError:
result = None

if not isinstance(result, list):
result = [result] if result is not None else []

return result

@sync_action('load_from_curl')
def load_from_curl(self) -> dict:
Expand Down Expand Up @@ -256,6 +299,54 @@ def infer_mapping(self) -> dict:
mapping = infer_mapping(data)
return mapping

@sync_action('test_request')
def test_request(self):

self.init_component()
self._client.login()

api_cfg = self._configuration.api
request_cfg = self._configuration.request_parameters
# fix KBC bug
user_params = self._configuration.user_parameters
# evaluate user_params inside the user params itself
user_params = self._fill_in_user_parameters(user_params, user_params)

# build headers
headers = {**api_cfg.default_headers.copy(), **request_cfg.headers.copy()}
new_headers = self._fill_in_user_parameters(headers, user_params)

# build additional parameters
query_parameters = {**api_cfg.default_query_parameters.copy(), **request_cfg.query_parameters.copy()}
query_parameters = self._fill_in_user_parameters(query_parameters, user_params)
ssl_verify = api_cfg.ssl_verification
timeout = api_cfg.timeout
# additional_params = self._build_request_parameters(additional_params_cfg)
request_parameters = {'params': query_parameters,
'headers': new_headers,
'verify': ssl_verify,
'timeout': timeout}

endpoint_path = request_cfg.endpoint_path

# use client to send requests / perform actions
self._final_response = self._client.send_request(method=request_cfg.method, endpoint_path=endpoint_path,
**request_parameters)

headers = dict(self._final_response.headers)

result = self._parse_data(self._final_response.json(), self._configuration.data_path)

if request_cfg.nested_job:
parent_results = []
self._process_nested_job(result, request_cfg.nested_job[0], parent_results, self._client,
request_cfg.method, **request_parameters)
else:
self._final_results = result

return [self._final_response.status_code, self._final_response.json(),
self._final_results, self.log.getvalue()]


"""
Main entrypoint
Expand Down
12 changes: 9 additions & 3 deletions python-sync-actions/src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ApiRequest(ConfigurationBase):
headers: dict = field(default_factory=dict)
query_parameters: dict = field(default_factory=dict)
continue_on_failure: bool = False
nested_job: dict = field(default_factory=dict)


@dataclass
Expand Down Expand Up @@ -196,7 +197,7 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent,
if '_' in job_path:
parent, child = job_path.split('_')
else:
parent, child = job_path, None # noqa
parent, child = job_path, None # noqa: F841

jobs_section: list[dict] = configuration.get('config', {}).get('jobs')
if not jobs_section:
Expand All @@ -205,6 +206,8 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent,
# TODO: support recursive child-job config. E.g. have chained/list of ApiRequests objects instead of just one
endpoint_config = jobs_section[int(parent)]

nested_job = endpoint_config.get('children')

method = endpoint_config.get('method', 'GET')

match method:
Expand Down Expand Up @@ -233,7 +236,10 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent,
return (ApiRequest(method=method,
endpoint_path=endpoint_path,
headers=endpoint_config.get('headers', {}),
query_parameters=endpoint_config.get('params', {})), request_content,
query_parameters=endpoint_config.get('params', {}),
nested_job=nested_job
),
request_content,
DataPath(path=path, delimiter=delimiter))


Expand Down Expand Up @@ -266,7 +272,7 @@ def convert(cls, config_parameters: dict) -> Authentication | None:

@classmethod
def _convert_basic(cls, config_parameters: dict) -> Authentication:
username = config_parameters.get('config').get('#username')
username = config_parameters.get('config').get('username')
password = config_parameters.get('config').get('#password')
if not username or not password:
raise ValueError('Username or password not found in the BasicAuth configuration')
Expand Down
Loading

0 comments on commit 40b8bb4

Please sign in to comment.