Skip to content

Commit

Permalink
Merge pull request #60 from keboola/PS-1936-file-dest
Browse files Browse the repository at this point in the history
PS-1936 Allow writing multiple files to abs workspace
  • Loading branch information
pivnicek authored Mar 29, 2021
2 parents d314922 + e2ea898 commit a1e826e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
33 changes: 23 additions & 10 deletions kbcstorage/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""
from kbcstorage.base import Endpoint
from kbcstorage.files import Files
from kbcstorage.jobs import Jobs


def _make_body(mapping, source_key='source'):
Expand Down Expand Up @@ -145,16 +146,16 @@ def load_tables(self, workspace_id, table_mapping, preserve=None):

return self._post(url, data=body)

def load_files(self, workspace_id, file_mapping, preserve=None):
def load_files(self, workspace_id, file_mapping):
"""
Load tabes from storage into a workspace.
Load files from file storage into a workspace.
* only supports abs workspace
writes the matching files to "{destination}/file_name/file_id"
Args:
workspace_id (int or str): The id of the workspace to which to load
the tables.
file_mapping (:obj:`dict`): contains tags: [], destination: string
preserve (bool): If False, drop files, else keep files in workspace.
file_mapping (:obj:`dict`): contains tags: [], destination: string path without trailing /
Raises:
requests.HTTPError: If the API request fails.
Expand All @@ -164,10 +165,22 @@ def load_files(self, workspace_id, file_mapping, preserve=None):
raise Exception('Loading files to workspace is only available for ABS workspaces')
files = Files(self.root_url, self.token)
file_list = files.list(tags=file_mapping['tags'])
inputs = {}
jobs = Jobs(self.root_url, self.token)
jobs_list = []
for file in file_list:
inputs[file['id']] = file_mapping['destination']
body = _make_body(inputs, source_key='dataFileId')
body['preserve'] = preserve
url = '{}/{}/load'.format(self.base_url, workspace['id'])
return self._post(url, data=body)
inputs = {
file['id']: "%s/%s" % (file_mapping['destination'], file['name'])
}
body = _make_body(inputs, source_key='dataFileId')
# always preserve the workspace, otherwise it would be silly
body['preserve'] = 1
url = '{}/{}/load'.format(self.base_url, workspace['id'])
job = self._post(url, data=body)
jobs_list.append(job)

for job in jobs_list:
if not (jobs.block_for_success(job['id'])):
try:
print("Failed to load a file with error: %s" % job['results']['message'])
except IndexError:
print("An unknown error occurred loading data. Job ID %s" % job['id'])
27 changes: 20 additions & 7 deletions tests/functional/test_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,37 @@ def test_load_files_to_workspace(self):
file, path = tempfile.mkstemp(prefix='sapi-test')
os.write(file, bytes('fooBar', 'utf-8'))
os.close(file)
file_id = self.files.upload_file(path, tags=['sapi-client-python-tests', 'file1'])

# We'll put 2 files with the same tag to test multiple results
file1_id = self.files.upload_file(path, tags=['sapi-client-python-tests', 'file1'])
file2_id = self.files.upload_file(path, tags=['sapi-client-python-tests', 'file2'])

file1 = self.files.detail(file1_id)
file2 = self.files.detail(file2_id)
# create a workspace and load the file to it
workspace = self.workspaces.create('abs')
self.workspace_id = workspace['id']
job = self.workspaces.load_files(
self.workspaces.load_files(
workspace['id'],
{'tags': ['sapi-client-python-tests'], 'destination': 'data/in/files'}
{
'tags': ['sapi-client-python-tests'],
'destination': 'data/in/files'
}
)
self.jobs.block_until_completed(job['id'])

# assert that the file was loaded to the workspace
blob_service_client = BlobServiceClient.from_connection_string(workspace['connection']['connectionString'])
blob_client = blob_service_client.get_blob_client(
blob_client_1 = blob_service_client.get_blob_client(
container=workspace['connection']['container'],
blob='data/in/files/%s/%s' % (file1['name'], str(file1['id']))
)
self.assertEqual('fooBar', blob_client_1.download_blob().readall().decode('utf-8'))

blob_client_2 = blob_service_client.get_blob_client(
container=workspace['connection']['container'],
blob='data/in/files/%s' % str(file_id)
blob='data/in/files/%s/%s' % (file2['name'], str(file2['id']))
)
self.assertEqual('fooBar', blob_client.download_blob().readall().decode('utf-8'))
self.assertEqual('fooBar', blob_client_2.download_blob().readall().decode('utf-8'))

def __create_table(self, bucket_id, table_name, row):
file, path = tempfile.mkstemp(prefix='sapi-test')
Expand Down

0 comments on commit a1e826e

Please sign in to comment.