diff --git a/python-sync-actions/Dockerfile b/python-sync-actions/Dockerfile index f1088b6..6709f14 100644 --- a/python-sync-actions/Dockerfile +++ b/python-sync-actions/Dockerfile @@ -3,8 +3,10 @@ ENV PYTHONIOENCODING utf-8 COPY /src /code/src/ COPY /tests /code/tests/ +COPY /scripts /code/scripts/ COPY requirements.txt /code/requirements.txt COPY flake8.cfg /code/flake8.cfg +COPY deploy.sh /code/deploy.sh # install gcc to be able to build packages - e.g. required by regex, dateparser, also required for pandas RUN apt-get update && apt-get install -y build-essential curl diff --git a/python-sync-actions/docker-compose.yml b/python-sync-actions/docker-compose.yml index 6eaebaa..af85521 100644 --- a/python-sync-actions/docker-compose.yml +++ b/python-sync-actions/docker-compose.yml @@ -18,4 +18,25 @@ services: - KBC_DATADIR=./data command: - /bin/sh - - /code/scripts/build_n_test.sh \ No newline at end of file + - /code/scripts/build_n_test.sh + + test-calls: + # Run examples against mock server + build: . + volumes: + - ./:/code + - ./data:/data + command: python /code/tests/_test_calls.py + links: + - mock-server + mock-server: + container_name: mock-server + image: quay.io/keboola/ex-generic-mock-server:latest + tty: true + stdin_open: true + ports: + - "8888:80" + volumes: + - ./tests/calls:/examples/ + environment: + - KBC_EXAMPLES_DIR=/examples/ \ No newline at end of file diff --git a/python-sync-actions/src/__init__.py b/python-sync-actions/src/__init__.py new file mode 100644 index 0000000..6aeff72 --- /dev/null +++ b/python-sync-actions/src/__init__.py @@ -0,0 +1,3 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../src") diff --git a/python-sync-actions/src/component.py b/python-sync-actions/src/component.py index b016baa..66065e0 100644 --- a/python-sync-actions/src/component.py +++ b/python-sync-actions/src/component.py @@ -4,19 +4,22 @@ """ import json import logging +from io import StringIO from keboola.component.base import ComponentBase, sync_action from keboola.component.exceptions import UserException from nested_lookup import nested_lookup import configuration +from configuration import Configuration, DataPath from actions.curl import build_job_from_curl from actions.mapping import infer_mapping -from configuration import Configuration from http_generic.auth import AuthMethodBuilder, AuthBuilderError from http_generic.client import GenericHttpClient from user_functions import UserFunctions +from placeholders_utils import PlaceholdersUtils + # configuration variables KEY_API_TOKEN = '#api_token' KEY_PRINT_HELLO = 'print_hello' @@ -40,47 +43,22 @@ class Component(ComponentBase): def __init__(self): super().__init__() - # initialize instance parameters + self.log = StringIO() + logging.getLogger().addHandler(logging.StreamHandler(self.log)) + self.user_functions = UserFunctions() self._configuration: Configuration = None self._client: GenericHttpClient = None + self._parent_params = {} + self._final_results = [] + self._final_response = None def run(self): """ - Main execution code + Main component method """ - - self.init_component() - api_cfg = self._configuration.api - request_cfg = self._configuration.request_parameters - # fix KBC bug - user_params = self._configuration.user_parameters - # evaluate user_params inside the user params itself - user_params = self._fill_in_user_parameters(user_params, user_params) - - # build headers - headers = {**api_cfg.default_headers.copy(), **request_cfg.headers.copy()} - new_headers = self._fill_in_user_parameters(headers, user_params) - - # build additional parameters - query_parameters = {**api_cfg.default_query_parameters.copy(), **request_cfg.query_parameters.copy()} - query_parameters = self._fill_in_user_parameters(query_parameters, user_params) - ssl_verify = api_cfg.ssl_verification - timeout = api_cfg.timeout - # additional_params = self._build_request_parameters(additional_params_cfg) - request_parameters = {'params': query_parameters, - 'headers': new_headers, - 'verify': ssl_verify, - 'timeout': timeout} - - endpoint_path = request_cfg.endpoint_path - - # use client to send requests / perform actions - response = self._client.send_request(method=request_cfg.method, endpoint_path=endpoint_path, - **request_parameters) - - return self._parse_data(response.json(), self._configuration.data_path) + pass def init_component(self): @@ -111,7 +89,8 @@ def init_component(self): def _fill_in_user_parameters(self, conf_objects: dict, user_param: dict): """ This method replaces user parameter references via attr + parses functions inside user parameters, - evaluates them and fills in the resulting values + evaluates them and fills in the resulting values + Args: conf_objects: Configuration that contains the references via {"attr": "key"} to user parameters or function definitions @@ -139,6 +118,57 @@ def _fill_in_user_parameters(self, conf_objects: dict, user_param: dict): 'are not present in "user_parameters" field.'.format(non_matched)) return new_steps + def _fill_placeholders(self, placeholders, path): + """ + Fill placeholders in the path + Args: + placeholders: placeholders - names dict to fill + path: path with placeholders + row: row with data + """ + + result_path = path + for key, dict in placeholders[0].items(): + result_path = result_path.replace(f"{{{dict.get('placeholder')}}}", str(dict.get('value'))) + return result_path + + def _process_nested_job(self, parent_result, config, parent_results_list, client, + method, **request_parameters) -> list: + """ + Process nested job + Args: + parent_result: result of the parent job + config: configuration of the nested job + parent_results_list: list of parent results + client: http client + method: method to use + request_parameters: request parameters + """ + + for row in parent_result: + + parent_results_ext = parent_results_list + [row] + + placeholders = PlaceholdersUtils.get_params_for_child_jobs(config.get('placeholders', {}), + parent_results_ext, self._parent_params) + + self._parent_params = placeholders[0] + row_path = self._fill_placeholders(placeholders, config['endpoint']) + response = client.send_request(method=method, endpoint_path=row_path, **request_parameters) + child_response = self._parse_data(response.json(), DataPath(config.get('dataType'), + config.get('dataField', '.'))) + children = config.get('children', []) + results = [] + if children[0] if children else None: + nested_data = self._process_nested_job(child_response, children[0], parent_results_ext, + client, method, **request_parameters) + results.append(nested_data) + else: + self._final_results.append(child_response) + self._final_response = response + + return results + def _perform_custom_function(self, key: str, function_cfg: dict, user_params: dict): """ Perform custom function recursively (may be nested) @@ -175,14 +205,27 @@ def _parse_data(self, data, path) -> list: Returns: """ - keys = path.path.split(path.delimiter) - value = data - try: - for key in keys: - value = value[key] - except KeyError: - return None - return value + + if path.path is None: + result = data + else: + if path.delimiter in path.path: + keys = path.path.split(path.delimiter) # Split the path only if delimiter is present + else: + keys = [path.path] + + value = data + try: + for key in keys: + value = value[key] + result = value + except KeyError: + result = None + + if not isinstance(result, list): + result = [result] if result is not None else [] + + return result @sync_action('load_from_curl') def load_from_curl(self) -> dict: @@ -256,6 +299,54 @@ def infer_mapping(self) -> dict: mapping = infer_mapping(data) return mapping + @sync_action('test_request') + def test_request(self): + + self.init_component() + self._client.login() + + api_cfg = self._configuration.api + request_cfg = self._configuration.request_parameters + # fix KBC bug + user_params = self._configuration.user_parameters + # evaluate user_params inside the user params itself + user_params = self._fill_in_user_parameters(user_params, user_params) + + # build headers + headers = {**api_cfg.default_headers.copy(), **request_cfg.headers.copy()} + new_headers = self._fill_in_user_parameters(headers, user_params) + + # build additional parameters + query_parameters = {**api_cfg.default_query_parameters.copy(), **request_cfg.query_parameters.copy()} + query_parameters = self._fill_in_user_parameters(query_parameters, user_params) + ssl_verify = api_cfg.ssl_verification + timeout = api_cfg.timeout + # additional_params = self._build_request_parameters(additional_params_cfg) + request_parameters = {'params': query_parameters, + 'headers': new_headers, + 'verify': ssl_verify, + 'timeout': timeout} + + endpoint_path = request_cfg.endpoint_path + + # use client to send requests / perform actions + self._final_response = self._client.send_request(method=request_cfg.method, endpoint_path=endpoint_path, + **request_parameters) + + headers = dict(self._final_response.headers) + + result = self._parse_data(self._final_response.json(), self._configuration.data_path) + + if request_cfg.nested_job: + parent_results = [] + self._process_nested_job(result, request_cfg.nested_job[0], parent_results, self._client, + request_cfg.method, **request_parameters) + else: + self._final_results = result + + return [self._final_response.status_code, self._final_response.json(), + self._final_results, self.log.getvalue()] + """ Main entrypoint diff --git a/python-sync-actions/src/configuration.py b/python-sync-actions/src/configuration.py index da3de22..f562017 100644 --- a/python-sync-actions/src/configuration.py +++ b/python-sync-actions/src/configuration.py @@ -76,6 +76,7 @@ class ApiRequest(ConfigurationBase): headers: dict = field(default_factory=dict) query_parameters: dict = field(default_factory=dict) continue_on_failure: bool = False + nested_job: dict = field(default_factory=dict) @dataclass @@ -196,7 +197,7 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent, if '_' in job_path: parent, child = job_path.split('_') else: - parent, child = job_path, None # noqa + parent, child = job_path, None # noqa: F841 jobs_section: list[dict] = configuration.get('config', {}).get('jobs') if not jobs_section: @@ -205,6 +206,8 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent, # TODO: support recursive child-job config. E.g. have chained/list of ApiRequests objects instead of just one endpoint_config = jobs_section[int(parent)] + nested_job = endpoint_config.get('children') + method = endpoint_config.get('method', 'GET') match method: @@ -233,7 +236,10 @@ def build_api_request(configuration: dict) -> Tuple[ApiRequest, RequestContent, return (ApiRequest(method=method, endpoint_path=endpoint_path, headers=endpoint_config.get('headers', {}), - query_parameters=endpoint_config.get('params', {})), request_content, + query_parameters=endpoint_config.get('params', {}), + nested_job=nested_job + ), + request_content, DataPath(path=path, delimiter=delimiter)) @@ -266,7 +272,7 @@ def convert(cls, config_parameters: dict) -> Authentication | None: @classmethod def _convert_basic(cls, config_parameters: dict) -> Authentication: - username = config_parameters.get('config').get('#username') + username = config_parameters.get('config').get('username') password = config_parameters.get('config').get('#password') if not username or not password: raise ValueError('Username or password not found in the BasicAuth configuration') diff --git a/python-sync-actions/src/placeholders_utils.py b/python-sync-actions/src/placeholders_utils.py new file mode 100644 index 0000000..56a8627 --- /dev/null +++ b/python-sync-actions/src/placeholders_utils.py @@ -0,0 +1,114 @@ +from collections import namedtuple +from itertools import product +from typing import List, Dict, Union, Any + + +class UserException(Exception): + pass + + +class NoDataFoundException(Exception): + pass + + +Placeholder = namedtuple('Placeholder', ['placeholder', 'field', 'value']) + + +class PlaceholdersUtils: + + @staticmethod + def get_params_for_child_jobs(placeholders: Dict[str, Any], parent_results: List[Dict[str, Any]], + parent_params: Dict[str, Any]) -> List[Dict[str, Any]]: + params = {} + for placeholder, field in placeholders.items(): + params[placeholder] = PlaceholdersUtils.get_placeholder(placeholder, field, parent_results) + + # Add parent params as well (for 'tagging' child-parent data) + # Same placeholder in deeper nesting replaces parent value + params = {**parent_params, **params} + + # Create all combinations if there are some parameter values as array. + # Each combination will be one child job. + return PlaceholdersUtils.get_params_per_child_job(params) + + @staticmethod + def get_placeholder(placeholder: str, field: Union[str, Dict[str, Any]], + parent_results: List[Dict[str, Any]]) -> Dict[str, Any]: + # Determine the level based on the presence of ':' in the placeholder name + level = 0 if ':' not in placeholder else int(placeholder.split(':')[0]) - 1 + + # Check function (defined as dict) + if not isinstance(field, str): + if 'path' not in field: + raise UserException(f"The path for placeholder '{placeholder}' must be a string value" + f"or an object containing 'path' and 'function'.") + fn = field.copy() + field = fn.pop('path') + + # Get value + value = PlaceholdersUtils.get_placeholder_value(str(field), parent_results, level, placeholder) + + # Run function if provided + if 'fn' in locals(): + # Example function to be replaced by actual implementation + value = value + + return { + 'placeholder': placeholder, + 'field': field, + 'value': value + } + + @staticmethod + def get_placeholder_value(field: str, parent_results: List[Dict[str, Any]], level: int, placeholder: str) -> Any: + try: + if level >= len(parent_results): + max_level = 0 if not parent_results else len(parent_results) + raise UserException(f'Level {level + 1} not found in parent results! Maximum level: {max_level}') + + # Implement get_data_from_path to fetch data using a dot notation + data = get_data_from_path(field, parent_results[level]) + + return data + + except NoDataFoundException: + raise UserException( + f"No value found for {placeholder} in parent result. (level: {level + 1})", + None, None, {'parents': parent_results} + ) + + @staticmethod + def get_params_per_child_job(params: Dict[str, Placeholder]) -> List[Dict[str, Any]]: + # Flatten parameters to a list of lists + flattened = {} + for placeholder_name, placeholder in params.items(): + if isinstance(placeholder['value'], list): + flattened[placeholder_name] = [ + {'placeholder': placeholder_name, 'field': placeholder['field'], 'value': value} + for value in placeholder['value'] + ] + else: + flattened[placeholder_name] = [placeholder] + + # Cartesian product to get all combinations + return PlaceholdersUtils.cartesian(flattened) + + @staticmethod + def cartesian(input: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + # Generate the Cartesian product of input lists + keys, values = zip(*input.items()) + product_list = [dict(zip(keys, combination)) for combination in product(*values)] + + return product_list + + +def get_data_from_path(field: str, data: Dict[str, Any], separator: str = '.', strict: bool = True) -> Any: + """Mock function to fetch data using a dot-separated path notation. Replace with actual implementation.""" + keys = field.split(separator) + for key in keys: + if key not in data: + if strict: + raise NoDataFoundException(f"Key '{key}' not found in data.") + return None + data = data[key] + return data