Skip to content

Commit

Permalink
Merge pull request #59 from keboola/PS-1858-workspace
Browse files Browse the repository at this point in the history
PS-1858 support loading files and tables to file workspaces
  • Loading branch information
pivnicek authored Mar 17, 2021
2 parents f994e2b + d5529f3 commit d314922
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ before_script:
- docker-compose build sapi-python-client
script:
- docker-compose run --rm --entrypoint=flake8 sapi-python-client
- docker-compose run --rm -e KBC_TEST_TOKEN -e KBC_TEST_API_URL sapi-python-client -m unittest discover
- docker-compose run --rm -e KBC_TEST_TOKEN -e KBC_TEST_API_URL -e SKIP_ABS_TEST=1 sapi-python-client -m unittest discover
- docker-compose run --rm -e KBC_TEST_TOKEN=$KBC_AZ_TEST_TOKEN -e KBC_TEST_API_URL=$KBC_AZ_TEST_API_URL sapi-python-client -m unittest discover
after_success:
- docker images
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.6
FROM python:3.8

WORKDIR /code
COPY . /code/
Expand Down
34 changes: 31 additions & 3 deletions kbcstorage/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
http://docs.keboola.apiary.io/#reference/workspaces/
"""
from kbcstorage.base import Endpoint
from kbcstorage.files import Files


def _make_body(mapping):
def _make_body(mapping, source_key='source'):
"""
Given a dict mapping Keboola tables to aliases, construct the body of
the HTTP request to load said tables.
Expand All @@ -22,7 +23,7 @@ def _make_body(mapping):
body = {}
template = 'input[{0}][{1}]'
for i, (k, v) in enumerate(mapping.items()):
body[template.format(i, 'source')] = k
body[template.format(i, source_key)] = k
body[template.format(i, 'destination')] = v

return body
Expand Down Expand Up @@ -76,7 +77,7 @@ def create(self, backend=None, timeout=None):
Args:
backend (:obj:`str`): The type of engine for the workspace.
'redshift' or 'snowflake'. Default redshift.
'redshift', 'snowflake' or 'synapse'. Defaults to the project's default backend.
timeout (int): The timeout, in seconds, for SQL statements.
Only supported by snowflake backends.
Expand Down Expand Up @@ -143,3 +144,30 @@ def load_tables(self, workspace_id, table_mapping, preserve=None):
url = '{}/{}/load'.format(self.base_url, workspace_id)

return self._post(url, data=body)

def load_files(self, workspace_id, file_mapping, preserve=None):
"""
Load tabes from storage into a workspace.
* only supports abs workspace
Args:
workspace_id (int or str): The id of the workspace to which to load
the tables.
file_mapping (:obj:`dict`): contains tags: [], destination: string
preserve (bool): If False, drop files, else keep files in workspace.
Raises:
requests.HTTPError: If the API request fails.
"""
workspace = self.detail(workspace_id)
if (workspace['type'] != 'file' and workspace['connection']['backend'] != 'abs'):
raise Exception('Loading files to workspace is only available for ABS workspaces')
files = Files(self.root_url, self.token)
file_list = files.list(tags=file_mapping['tags'])
inputs = {}
for file in file_list:
inputs[file['id']] = file_mapping['destination']
body = _make_body(inputs, source_key='dataFileId')
body['preserve'] = preserve
url = '{}/{}/load'.format(self.base_url, workspace['id'])
return self._post(url, data=body)
131 changes: 131 additions & 0 deletions tests/functional/test_workspaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import csv
import os
import tempfile
import unittest
import warnings

from azure.storage.blob import BlobServiceClient
from requests import exceptions
from kbcstorage.buckets import Buckets
from kbcstorage.jobs import Jobs
from kbcstorage.files import Files
from kbcstorage.tables import Tables
from kbcstorage.workspaces import Workspaces


class TestWorkspaces(unittest.TestCase):
def setUp(self):
self.workspaces = Workspaces(os.getenv('KBC_TEST_API_URL'), os.getenv('KBC_TEST_TOKEN'))
self.buckets = Buckets(os.getenv('KBC_TEST_API_URL'), os.getenv('KBC_TEST_TOKEN'))
self.jobs = Jobs(os.getenv('KBC_TEST_API_URL'), os.getenv('KBC_TEST_TOKEN'))
self.tables = Tables(os.getenv('KBC_TEST_API_URL'), os.getenv('KBC_TEST_TOKEN'))
self.files = Files(os.getenv('KBC_TEST_API_URL'), os.getenv('KBC_TEST_TOKEN'))
try:
file_list = self.files.list(tags=['sapi-client-python-tests'])
for file in file_list:
self.files.delete(file['id'])
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
try:
self.buckets.delete('in.c-py-test-buckets', force=True)
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
# https://github.com/boto/boto3/issues/454
warnings.simplefilter("ignore", ResourceWarning)

def tearDown(self):
try:
if hasattr(self, 'workspace_id'):
self.workspaces.delete(self.workspace_id)
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
try:
self.buckets.delete('in.c-py-test-tables', force=True)
except exceptions.HTTPError as e:
if e.response.status_code != 404:
raise

def test_create_workspace(self):
workspace = self.workspaces.create()
self.workspace_id = workspace['id']
with self.subTest():
self.assertTrue('id' in workspace)
with self.subTest():
self.assertTrue('type' in workspace)
self.assertTrue(workspace['type'] in ['table', 'file'])
with self.subTest():
self.assertTrue('name' in workspace)
with self.subTest():
self.assertTrue('component' in workspace)
with self.subTest():
self.assertTrue('configurationId' in workspace)
with self.subTest():
self.assertTrue('created' in workspace)
with self.subTest():
self.assertTrue('connection' in workspace)
with self.subTest():
self.assertTrue('backend' in workspace['connection'])
with self.subTest():
self.assertTrue('creatorToken' in workspace)

def test_load_tables_to_workspace(self):
bucket_id = self.buckets.create('py-test-tables')['id']
table1_id = self.__create_table(bucket_id, 'test-table-1', {'col1': 'ping', 'col2': 'pong'})
table2_id = self.__create_table(bucket_id, 'test-table-2', {'col1': 'king', 'col2': 'kong'})
workspace = self.workspaces.create()
self.workspace_id = workspace['id']
job = self.workspaces.load_tables(
workspace['id'],
{table1_id: 'destination_1', table2_id: 'destination_2'}
)
self.jobs.block_until_completed(job['id'])

job = self.tables.create_raw(
bucket_id,
'back-and-forth-table',
data_workspace_id=workspace['id'],
data_table_name='destination_1'
)
self.jobs.block_until_completed(job['id'])

new_table = self.tables.detail(bucket_id + '.back-and-forth-table')
self.assertEqual('back-and-forth-table', new_table['name'])

# test load files into an abs workspace
def test_load_files_to_workspace(self):
if (os.getenv('SKIP_ABS_TEST')):
self.skipTest('Skipping ABS test because env var SKIP_ABS_TESTS was set')
# put a test file to storage
file, path = tempfile.mkstemp(prefix='sapi-test')
os.write(file, bytes('fooBar', 'utf-8'))
os.close(file)
file_id = self.files.upload_file(path, tags=['sapi-client-python-tests', 'file1'])

# create a workspace and load the file to it
workspace = self.workspaces.create('abs')
self.workspace_id = workspace['id']
job = self.workspaces.load_files(
workspace['id'],
{'tags': ['sapi-client-python-tests'], 'destination': 'data/in/files'}
)
self.jobs.block_until_completed(job['id'])

# assert that the file was loaded to the workspace
blob_service_client = BlobServiceClient.from_connection_string(workspace['connection']['connectionString'])
blob_client = blob_service_client.get_blob_client(
container=workspace['connection']['container'],
blob='data/in/files/%s' % str(file_id)
)
self.assertEqual('fooBar', blob_client.download_blob().readall().decode('utf-8'))

def __create_table(self, bucket_id, table_name, row):
file, path = tempfile.mkstemp(prefix='sapi-test')
with open(path, 'w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=['col1', 'col2'],
lineterminator='\n', delimiter=',', quotechar='"')
writer.writeheader()
writer.writerow(row)
return self.tables.create(name=table_name, file_path=path, bucket_id=bucket_id)
19 changes: 19 additions & 0 deletions tests/mocks/test_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,22 @@ def test_reset_password_for_inexistent_workspace(self):
with self.assertRaises(HTTPError) as error_context:
self.ws.reset_password(workspace_id)
assert error_context.exception.args[0] == msg

@responses.activate
def test_load_files_to_invalid_workspace(self):
"""
Raises exception when mock loading_files to invalid workspace
"""
msg = ('Loading files to workspace is only available for ABS workspaces')
responses.add(
responses.Response(
method='GET',
url='https://connection.keboola.com/v2/storage/workspaces/1',
json=detail_response
)
)
workspace_id = '1'
try:
self.ws.load_files(workspace_id, {'tags': ['sapi-client-python-tests'], 'destination': 'data/in/files'})
except Exception as ex:
assert str(ex) == msg
2 changes: 2 additions & 0 deletions tests/mocks/workspace_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
detail_response = {
"id": 1,
"name": "boring_wozniak",
"type": "table",
"component": "wr-db",
"configurationId": "aws-1",
"created": "2016-05-17T11:11:20+0200",
Expand All @@ -51,6 +52,7 @@
create_response = {
"id": 234,
"name": "boring_wozniak",
"type": "table",
"component": "wr-db",
"configurationId": "aws-1",
"created": "2016-05-17T11:11:20+0200",
Expand Down

0 comments on commit d314922

Please sign in to comment.