Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Triggers endpoint #81

Merged
merged 10 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions kbcstorage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,32 @@ def _post(self, *args, **kwargs):
else:
return r.json()

def _put(self, *args, **kwargs):
"""
Construct a requests PUT call with args and kwargs and process the
results.

Args:
*args: Positional arguments to pass to the post request.
**kwargs: Key word arguments to pass to the post request.

Returns:
body: Response body parsed from json.

Raises:
requests.HTTPError: If the API request fails.
"""
headers = kwargs.pop('headers', {})
headers.update(self._auth_header)
r = requests.put(headers=headers, *args, **kwargs)
try:
r.raise_for_status()
except requests.HTTPError:
# Handle different error codes
raise
else:
return r.json()

def _delete(self, *args, **kwargs):
"""
Construct a requests DELETE call with args and kwargs and process the
Expand Down
8 changes: 5 additions & 3 deletions kbcstorage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from kbcstorage.buckets import Buckets
from kbcstorage.components import Components
from kbcstorage.configurations import Configurations
from kbcstorage.tokens import Tokens
from kbcstorage.workspaces import Workspaces
from kbcstorage.files import Files
from kbcstorage.jobs import Jobs
from kbcstorage.tables import Tables
from kbcstorage.files import Files
from kbcstorage.tokens import Tokens
from kbcstorage.triggers import Triggers
from kbcstorage.workspaces import Workspaces


class Client:
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, api_domain, token, branch_id='default'):
self.configurations = Configurations(self.root_url, self.token, self.branch_id)
self.tokens = Tokens(self.root_url, self.token)
self.branches = Branches(self.root_url, self.token)
self.triggers = Triggers(self.root_url, self.token)

@property
def token(self):
Expand Down
123 changes: 123 additions & 0 deletions kbcstorage/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Manages calls to the Storage API relating to triggers

Full documentation `here`.

.. _here:
http://docs.keboola.apiary.io/#reference/triggers/
"""
from kbcstorage.base import Endpoint


class Triggers(Endpoint):
"""
Triggers Endpoint
"""

def __init__(self, root_url, token):
"""
Create a Triggers endpoint.

Args:
root_url (:obj:`str`): The base url for the API.
token (:obj:`str`): A storage API key.
"""
super().__init__(root_url, 'triggers', token)

def list(self):
"""
List all triggers in project.

Returns:
response_body: The parsed json from the HTTP response.

Raises:
requests.HTTPError: If the API request fails.
"""

return self._get(self.base_url)

def detail(self, trigger_id):
"""
Retrieves information about a given trigger.

Args:
trigger_id (str): The id of the trigger.

Raises:
requests.HTTPError: If the API request fails.
"""
url = '{}/{}'.format(self.base_url, trigger_id)

return self._get(url)

def create(self, runWithTokenId, component, configurationId, coolDownPeriodMinutes, tableIds):
"""
Create a new trigger.

Args:
runWithTokenId (int): ID of token used for running configured component.
component (str): For now we support only 'orchestration'.
configurationId (int): Id of component configuration.
coolDownPeriodMinutes (int): Minimal cool down period before
firing action again in minutes (min is 1 minute).
tableIds (list[str]) IDs of tables.
Returns:
response_body: The parsed json from the HTTP response.

Raises:
requests.HTTPError: If the API request fails.
"""
# Separating create and link into two distinct functions...
# Need to check args...
body = {
"runWithTokenId": runWithTokenId,
"component": component,
"configurationId": configurationId,
"coolDownPeriodMinutes": coolDownPeriodMinutes,
"tableIds": tableIds
}

return self._post(self.base_url, json=body)

def delete(self, trigger_id):
"""
Delete a trigger referenced by ``trigger_id``.

Args:
trigger_id (int): The id of the trigger to be deleted.

"""
url = '{}/{}'.format(self.base_url, trigger_id)
self._delete(url)

def update(self, trigger_id, runWithTokenId=None, component=None, configurationId=None,
coolDownPeriodMinutes=None, tableIds=None):
"""
Update a trigger referenced by ``trigger_id``.

Args:
runWithTokenId (int): ID of token used for running configured component.
component (str): For now we support only 'orchestration'.
configurationId (int): Id of component configuration.
coolDownPeriodMinutes (int): Minimal cool down period before
firing action again in minutes (min is 1 minute).
tableIds (list[str]) IDs of tables.
Returns:
response_body: The parsed json from the HTTP response.

Raises:
requests.HTTPError: If the API request fails.
"""
url = '{}/{}'.format(self.base_url, trigger_id)
body = {
k: v for k, v in {
"runWithTokenId": runWithTokenId,
"component": component,
"configurationId": configurationId,
"coolDownPeriodMinutes": coolDownPeriodMinutes,
"tableIds": tableIds
}.items()
if v is not None
}
return self._put(url, data=body)
113 changes: 113 additions & 0 deletions tests/functional/test_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import tempfile
import warnings

from requests import exceptions

from kbcstorage.base import Endpoint
from kbcstorage.configurations import Configurations
from kbcstorage.jobs import Jobs
from kbcstorage.triggers import Triggers
from kbcstorage.tables import Tables
from kbcstorage.buckets import Buckets
from tests.base_test_case import BaseTestCase


class Tokens(Endpoint):
"""
Testing class for obtaining token ID from token.
Kukant marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, root_url, token):
super().__init__(root_url, 'tokens', token)

def verify(self):
url = '{}/verify'.format(self.base_url)
return self._get(url)


class TestEndpoint(BaseTestCase):
TEST_BUCKET_NAME = "kbc_trigger_test_bucket"
Kukant marked this conversation as resolved.
Show resolved Hide resolved
TEST_BUCKET_ID = f"in.c-{TEST_BUCKET_NAME}"
TEST_TABLE_NAME = "kbc_trigger_test_table"

def setUp(self):
self.root_url = os.getenv('KBC_TEST_API_URL')
self.token = os.getenv('KBC_TEST_TOKEN')

self.triggers = Triggers(self.root_url, self.token)
self.tables = Tables(self.root_url, self.token)
self.buckets = Buckets(self.root_url, self.token)
self.jobs = Jobs(self.root_url, self.token)
self.configurations = Configurations(self.root_url, self.token, 'default')
self.tokens = Tokens(self.root_url, self.token)

self.created_trigger_ids = []
# https://github.com/boto/boto3/issues/454
warnings.simplefilter("ignore", ResourceWarning)

self.clean()
self.token_id = self.tokens.verify()["id"]
self.buckets.create(self.TEST_BUCKET_NAME)
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(b"a,b,c\n1,2,3\n")
self.table_id = self.tables.create(self.TEST_BUCKET_ID, self.TEST_TABLE_NAME, tmp_file.name)
Kukant marked this conversation as resolved.
Show resolved Hide resolved
self.component = self.TEST_COMPONENT_NAME
self.configuration_id = self.configurations.create(self.TEST_COMPONENT_NAME, 'trigger_test_config')["id"]

def clean(self):
try:
self.buckets.delete(self.TEST_BUCKET_ID, True)
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise

def tearDown(self):
self.clean()

def test_steps(self):
self.create_trigger()
self.list_triggers()
self.trigger_detail()
self.update_trigger()
self.delete_triggers()

def create_trigger(self):
trigger_id = self.triggers.create(
runWithTokenId=self.token_id,
component=self.component,
configurationId=self.configuration_id,
coolDownPeriodMinutes=10,
tableIds=[self.table_id]
)['id']
self.created_trigger_ids.append(trigger_id)
self.assertEqual(trigger_id, self.triggers.detail(trigger_id)['id'])

def trigger_detail(self):
self.assertTrue(len(self.created_trigger_ids) > 0)
first_id = self.created_trigger_ids[0]
detail = self.triggers.detail(first_id)
self.assertTrue(detail["runWithTokenId"] == int(self.token_id))
self.assertTrue(detail["component"] == self.component)
self.assertTrue(detail["configurationId"] == self.configuration_id)
self.assertTrue(detail["coolDownPeriodMinutes"] == 10)
self.assertTrue([t['tableId'] for t in detail["tables"]] == [self.table_id])
self.assertTrue(detail["id"] == first_id)
Kukant marked this conversation as resolved.
Show resolved Hide resolved

def list_triggers(self):
self.assertTrue(len(self.created_trigger_ids) > 0)
all_triggers = self.triggers.list()
api_trigger_ids = {x["id"] for x in all_triggers}
created_trigger_ids = {x for x in self.created_trigger_ids}
self.assertTrue(created_trigger_ids.issubset(api_trigger_ids))

def update_trigger(self):
self.assertTrue(len(self.created_trigger_ids) > 0)
first_id = self.created_trigger_ids[0]
self.triggers.update(first_id, coolDownPeriodMinutes=100)
detail = self.triggers.detail(first_id)
self.assertTrue(detail["coolDownPeriodMinutes"] == 100)

def delete_triggers(self):
for t_id in self.created_trigger_ids:
self.triggers.delete(t_id)
98 changes: 98 additions & 0 deletions tests/mocks/test_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import unittest
import responses
from kbcstorage.triggers import Triggers
from .triggers_responses import list_response, detail_response, create_response, update_response


class TestTriggersWithMocks(unittest.TestCase):
def setUp(self):
token = 'dummy_token'
base_url = 'https://connection.keboola.com/'
self.triggers = Triggers(base_url, token)

@responses.activate
def test_list(self):
"""
triggers mocks list correctly.
"""
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/triggers',
json=list_response
)
)
triggers_list = self.triggers.list()
assert isinstance(triggers_list, list)

@responses.activate
def test_detail_by_id(self):
"""
triggers mocks detail by integer id correctly.
"""
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/triggers/3',
json=detail_response
)
)
trigger_id = '3'
trigger_detail = self.triggers.detail(trigger_id)
assert trigger_detail['id'] == '3'

@responses.activate
def test_delete(self):
"""
Triggers mock deletes trigger by id.
"""
responses.add(
responses.Response(
method='DELETE',
url='https://connection.keboola.com/v2/storage/triggers/1',
json={}
)
)
trigger_id = 1
deleted_detail = self.triggers.delete(trigger_id)
assert deleted_detail is None

@responses.activate
def test_update(self):
"""
Triggers mock update trigger by id.
"""
responses.add(
responses.Response(
method='PUT',
url='https://connection.keboola.com/v2/storage/triggers/1',
json=update_response
)
)
trigger_id = 1
updated_detail = self.triggers.update(trigger_id, runWithTokenId=100)
assert updated_detail['id'] == '3'

@responses.activate
def test_create(self):
"""
Triggers mock creates new trigger.
"""
responses.add(
responses.Response(
method='POST',
url='https://connection.keboola.com/v2/storage/triggers',
json=create_response
)
)
created_detail = self.triggers.create(
runWithTokenId=123,
component="orchestration",
configurationId=123,
coolDownPeriodMinutes=20,
tableIds=[
"in.c-test.watched-1",
"in.c-prod.watched-5"
]
)
assert created_detail['runWithTokenId'] == 123
Loading
Loading