Skip to content

Commit

Permalink
Merge pull request #81 from keboola/triggers
Browse files Browse the repository at this point in the history
Add Triggers endpoint
  • Loading branch information
tomasfejfar authored May 9, 2024
2 parents 40001aa + 2b04f70 commit e30f83a
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 3 deletions.
26 changes: 26 additions & 0 deletions kbcstorage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,32 @@ def _post(self, *args, **kwargs):
else:
return r.json()

def _put(self, *args, **kwargs):
"""
Construct a requests PUT call with args and kwargs and process the
results.
Args:
*args: Positional arguments to pass to the post request.
**kwargs: Key word arguments to pass to the post request.
Returns:
body: Response body parsed from json.
Raises:
requests.HTTPError: If the API request fails.
"""
headers = kwargs.pop('headers', {})
headers.update(self._auth_header)
r = requests.put(headers=headers, *args, **kwargs)
try:
r.raise_for_status()
except requests.HTTPError:
# Handle different error codes
raise
else:
return r.json()

def _delete(self, *args, **kwargs):
"""
Construct a requests DELETE call with args and kwargs and process the
Expand Down
8 changes: 5 additions & 3 deletions kbcstorage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from kbcstorage.buckets import Buckets
from kbcstorage.components import Components
from kbcstorage.configurations import Configurations
from kbcstorage.tokens import Tokens
from kbcstorage.workspaces import Workspaces
from kbcstorage.files import Files
from kbcstorage.jobs import Jobs
from kbcstorage.tables import Tables
from kbcstorage.files import Files
from kbcstorage.tokens import Tokens
from kbcstorage.triggers import Triggers
from kbcstorage.workspaces import Workspaces


class Client:
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, api_domain, token, branch_id='default'):
self.configurations = Configurations(self.root_url, self.token, self.branch_id)
self.tokens = Tokens(self.root_url, self.token)
self.branches = Branches(self.root_url, self.token)
self.triggers = Triggers(self.root_url, self.token)

@property
def token(self):
Expand Down
123 changes: 123 additions & 0 deletions kbcstorage/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Manages calls to the Storage API relating to triggers
Full documentation `here`.
.. _here:
http://docs.keboola.apiary.io/#reference/triggers/
"""
from kbcstorage.base import Endpoint


class Triggers(Endpoint):
"""
Triggers Endpoint
"""

def __init__(self, root_url, token):
"""
Create a Triggers endpoint.
Args:
root_url (:obj:`str`): The base url for the API.
token (:obj:`str`): A storage API key.
"""
super().__init__(root_url, 'triggers', token)

def list(self):
"""
List all triggers in project.
Returns:
response_body: The parsed json from the HTTP response.
Raises:
requests.HTTPError: If the API request fails.
"""

return self._get(self.base_url)

def detail(self, trigger_id):
"""
Retrieves information about a given trigger.
Args:
trigger_id (str): The id of the trigger.
Raises:
requests.HTTPError: If the API request fails.
"""
url = '{}/{}'.format(self.base_url, trigger_id)

return self._get(url)

def create(self, runWithTokenId, component, configurationId, coolDownPeriodMinutes, tableIds):
"""
Create a new trigger.
Args:
runWithTokenId (int): ID of token used for running configured component.
component (str): For now we support only 'orchestration'.
configurationId (int): Id of component configuration.
coolDownPeriodMinutes (int): Minimal cool down period before
firing action again in minutes (min is 1 minute).
tableIds (list[str]) IDs of tables.
Returns:
response_body: The parsed json from the HTTP response.
Raises:
requests.HTTPError: If the API request fails.
"""
# Separating create and link into two distinct functions...
# Need to check args...
body = {
"runWithTokenId": runWithTokenId,
"component": component,
"configurationId": configurationId,
"coolDownPeriodMinutes": coolDownPeriodMinutes,
"tableIds": tableIds
}

return self._post(self.base_url, json=body)

def delete(self, trigger_id):
"""
Delete a trigger referenced by ``trigger_id``.
Args:
trigger_id (int): The id of the trigger to be deleted.
"""
url = '{}/{}'.format(self.base_url, trigger_id)
self._delete(url)

def update(self, trigger_id, runWithTokenId=None, component=None, configurationId=None,
coolDownPeriodMinutes=None, tableIds=None):
"""
Update a trigger referenced by ``trigger_id``.
Args:
runWithTokenId (int): ID of token used for running configured component.
component (str): For now we support only 'orchestration'.
configurationId (int): Id of component configuration.
coolDownPeriodMinutes (int): Minimal cool down period before
firing action again in minutes (min is 1 minute).
tableIds (list[str]) IDs of tables.
Returns:
response_body: The parsed json from the HTTP response.
Raises:
requests.HTTPError: If the API request fails.
"""
url = '{}/{}'.format(self.base_url, trigger_id)
body = {
k: v for k, v in {
"runWithTokenId": runWithTokenId,
"component": component,
"configurationId": configurationId,
"coolDownPeriodMinutes": coolDownPeriodMinutes,
"tableIds": tableIds
}.items()
if v is not None
}
return self._put(url, data=body)
100 changes: 100 additions & 0 deletions tests/functional/test_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
import tempfile
import warnings

from requests import exceptions

from kbcstorage.buckets import Buckets
from kbcstorage.configurations import Configurations
from kbcstorage.jobs import Jobs
from kbcstorage.tables import Tables
from kbcstorage.tokens import Tokens
from kbcstorage.triggers import Triggers
from tests.base_test_case import BaseTestCase


class TestEndpoint(BaseTestCase):
TEST_BUCKET_NAME = "trigger_test_bucket"
TEST_TABLE_NAME = "trigger_test_table"

def setUp(self):
self.root_url = os.getenv('KBC_TEST_API_URL')
self.token = os.getenv('KBC_TEST_TOKEN')

self.triggers = Triggers(self.root_url, self.token)
self.tables = Tables(self.root_url, self.token)
self.buckets = Buckets(self.root_url, self.token)
self.jobs = Jobs(self.root_url, self.token)
self.configurations = Configurations(self.root_url, self.token, 'default')
self.tokens = Tokens(self.root_url, self.token)

self.created_trigger_ids = []
# https://github.com/boto/boto3/issues/454
warnings.simplefilter("ignore", ResourceWarning)

self.clean()
self.token_id = self.tokens.verify()["id"]
self.test_bucket_id = self.buckets.create(self.TEST_BUCKET_NAME)['id']
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(b"a,b,c\n1,2,3\n")
self.table_id = self.tables.create(self.test_bucket_id, self.TEST_TABLE_NAME, tmp_file.name)
self.component = self.TEST_COMPONENT_NAME
self.configuration_id = self.configurations.create(self.TEST_COMPONENT_NAME, 'trigger_test_config')["id"]

def clean(self):
try:
if hasattr(self, "test_bucket_id"):
self.buckets.delete(self.test_bucket_id, True)
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise

def tearDown(self):
self.clean()

def test_steps(self):
self.create_trigger()
self.list_triggers()
self.trigger_detail()
self.update_trigger()
self.delete_triggers()

def create_trigger(self):
trigger_id = self.triggers.create(
runWithTokenId=self.token_id,
component=self.component,
configurationId=self.configuration_id,
coolDownPeriodMinutes=10,
tableIds=[self.table_id]
)['id']
self.created_trigger_ids.append(trigger_id)
self.assertEqual(trigger_id, self.triggers.detail(trigger_id)['id'])

def trigger_detail(self):
self.assertGreater(len(self.created_trigger_ids), 0)
first_id = self.created_trigger_ids[0]
detail = self.triggers.detail(first_id)
self.assertEqual(detail["runWithTokenId"], int(self.token_id))
self.assertEqual(detail["component"], self.component)
self.assertEqual(detail["configurationId"], self.configuration_id)
self.assertEqual(detail["coolDownPeriodMinutes"], 10)
self.assertEqual([t['tableId'] for t in detail["tables"]], [self.table_id])
self.assertEqual(detail["id"], first_id)

def list_triggers(self):
self.assertGreater(len(self.created_trigger_ids), 0)
all_triggers = self.triggers.list()
api_trigger_ids = {x["id"] for x in all_triggers}
created_trigger_ids = {x for x in self.created_trigger_ids}
self.assertTrue(created_trigger_ids.issubset(api_trigger_ids))

def update_trigger(self):
self.assertGreater(len(self.created_trigger_ids), 0)
first_id = self.created_trigger_ids[0]
self.triggers.update(first_id, coolDownPeriodMinutes=100)
detail = self.triggers.detail(first_id)
self.assertEqual(detail["coolDownPeriodMinutes"], 100)

def delete_triggers(self):
for t_id in self.created_trigger_ids:
self.triggers.delete(t_id)
98 changes: 98 additions & 0 deletions tests/mocks/test_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import unittest
import responses
from kbcstorage.triggers import Triggers
from .triggers_responses import list_response, detail_response, create_response, update_response


class TestTriggersWithMocks(unittest.TestCase):
def setUp(self):
token = 'dummy_token'
base_url = 'https://connection.keboola.com/'
self.triggers = Triggers(base_url, token)

@responses.activate
def test_list(self):
"""
triggers mocks list correctly.
"""
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/triggers',
json=list_response
)
)
triggers_list = self.triggers.list()
assert isinstance(triggers_list, list)

@responses.activate
def test_detail_by_id(self):
"""
triggers mocks detail by integer id correctly.
"""
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/triggers/3',
json=detail_response
)
)
trigger_id = '3'
trigger_detail = self.triggers.detail(trigger_id)
assert trigger_detail['id'] == '3'

@responses.activate
def test_delete(self):
"""
Triggers mock deletes trigger by id.
"""
responses.add(
responses.Response(
method='DELETE',
url='https://connection.keboola.com/v2/storage/triggers/1',
json={}
)
)
trigger_id = 1
deleted_detail = self.triggers.delete(trigger_id)
assert deleted_detail is None

@responses.activate
def test_update(self):
"""
Triggers mock update trigger by id.
"""
responses.add(
responses.Response(
method='PUT',
url='https://connection.keboola.com/v2/storage/triggers/1',
json=update_response
)
)
trigger_id = 1
updated_detail = self.triggers.update(trigger_id, runWithTokenId=100)
assert updated_detail['id'] == '3'

@responses.activate
def test_create(self):
"""
Triggers mock creates new trigger.
"""
responses.add(
responses.Response(
method='POST',
url='https://connection.keboola.com/v2/storage/triggers',
json=create_response
)
)
created_detail = self.triggers.create(
runWithTokenId=123,
component="orchestration",
configurationId=123,
coolDownPeriodMinutes=20,
tableIds=[
"in.c-test.watched-1",
"in.c-prod.watched-5"
]
)
assert created_detail['runWithTokenId'] == 123
Loading

0 comments on commit e30f83a

Please sign in to comment.