Skip to content

Commit

Permalink
Merge pull request #58 from keboola/PS-1037-abs
Browse files Browse the repository at this point in the history
PS-1037 Add support for azure stacks
  • Loading branch information
pivnicek authored Mar 12, 2021
2 parents 65c5cc6 + c91a508 commit f994e2b
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 60 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ language: bash
services:
- docker
before_script:
- docker login --username "$DOCKERHUB_USER" --password "$DOCKERHUB_TOKEN"
- docker-compose build sapi-python-client
script:
- docker-compose run --rm --entrypoint=flake8 sapi-python-client
- docker-compose run --rm -e KBC_TEST_TOKEN -e KBC_TEST_API_URL sapi-python-client -m unittest discover
- docker-compose run --rm -e KBC_TEST_TOKEN=$KBC_AZ_TEST_TOKEN -e KBC_TEST_API_URL=$KBC_AZ_TEST_API_URL sapi-python-client -m unittest discover
after_success:
- docker images
deploy:
Expand Down
170 changes: 120 additions & 50 deletions kbcstorage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
.. _here:
http://docs.keboola.apiary.io/#reference/files/
"""
import json
import os
import boto3
import requests

from azure.storage.blob import BlobServiceClient, ContentSettings
from kbcstorage.base import Endpoint


Expand Down Expand Up @@ -71,36 +73,20 @@ def upload_file(self, file_path, tags=None, is_public=False,
if compress:
import gzip
import shutil
with open(file_path, 'rb') as f_in, gzip.open(file_path + '.gz',
'wb') as f_out:
with open(file_path, 'rb') as f_in, \
gzip.open(file_path + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
file_path = file_path + '.gz'
file_name = os.path.basename(file_path)
size = os.path.getsize(file_path)
file_resource = self.prepare_upload(file_name, size, tags, is_public,
is_permanent, is_encrypted,
is_sliced, do_notify, True)
upload_params = file_resource['uploadParams']
key_id = upload_params['credentials']['AccessKeyId']
key = upload_params['credentials']['SecretAccessKey']
token = upload_params['credentials']['SessionToken']
s3 = boto3.resource('s3', aws_access_key_id=key_id,
aws_secret_access_key=key,
aws_session_token=token,
region_name=file_resource['region'])
if file_resource['provider'] == 'azure':
self.__upload_to_azure(file_resource, file_path)
elif file_resource['provider'] == 'aws':
self.__upload_to_aws(file_resource, file_path, is_encrypted)

s3_object = s3.Object(bucket_name=upload_params['bucket'],
key=upload_params['key'])
disposition = 'attachment; filename={};'.format(file_resource['name'])
with open(file_path, mode='rb') as file:
if is_encrypted:
encryption = upload_params['x-amz-server-side-encryption']
s3_object.put(ACL=upload_params['acl'], Body=file,
ContentDisposition=disposition,
ServerSideEncryption=encryption)
else:
s3_object.put(ACL=upload_params['acl'], Body=file,
ContentDisposition=disposition)
return file_resource['id']

def prepare_upload(self, name, size_bytes=None, tags=None, is_public=False,
Expand Down Expand Up @@ -193,32 +179,116 @@ def download(self, file_id, local_path):
os.mkdir(local_path)
file_info = self.detail(file_id=file_id, federation_token=True)
local_file = os.path.join(local_path, file_info['name'])
s3 = boto3.resource(
's3',
aws_access_key_id=file_info['credentials']['AccessKeyId'],
aws_secret_access_key=file_info['credentials']['SecretAccessKey'],
aws_session_token=file_info['credentials']['SessionToken'],
region_name=file_info['region']
)
if file_info['isSliced']:
manifest = requests.get(url=file_info['url']).json()
file_names = []
for entry in manifest["entries"]:
full_path = entry["url"]
file_name = full_path.rsplit("/", 1)[1]
file_names.append(file_name)
splitted_path = full_path.split("/")
file_key = "/".join(splitted_path[3:])
bucket = s3.Bucket(file_info['s3Path']['bucket'])
bucket.download_file(file_key, file_name)
# merge the downloaded files
with open(local_file, mode='wb') as out_file:
for file_name in file_names:
with open(file_name, mode='rb') as in_file:
for line in in_file:
out_file.write(line)
os.remove(file_name)
else:
bucket = s3.Bucket(file_info["s3Path"]["bucket"])
bucket.download_file(file_info["s3Path"]["key"], local_file)
if file_info['provider'] == 'azure':
if file_info['isSliced']:
self.__download_sliced_file_from_azure(file_info, local_file)
else:
self.__download_file_from_azure(file_info, local_file)
elif file_info['provider'] == 'aws':
s3 = boto3.resource(
's3',
aws_access_key_id=file_info['credentials']['AccessKeyId'],
aws_secret_access_key=file_info['credentials']['SecretAccessKey'],
aws_session_token=file_info['credentials']['SessionToken'],
region_name=file_info['region']
)
if file_info['isSliced']:
self.__download_sliced_file_from_aws(file_info, local_file, s3)
else:
self.__download_file_from_aws(file_info, local_file, s3)
return local_file

def __upload_to_azure(self, preparation_result, file_path):
blob_client = self.__get_blob_client(
preparation_result['absUploadParams']['absCredentials']['SASConnectionString'],
preparation_result['absUploadParams']['container'],
preparation_result['absUploadParams']['blobName']
)
with open(file_path, "rb") as blob_data:
blob_client.upload_blob(
blob_data,
blob_type='BlockBlob',
content_settings=ContentSettings(
content_disposition='attachment;filename="%s"' % (preparation_result['name'])
)
)

def __upload_to_aws(self, prepare_result, file_path, is_encrypted):
upload_params = prepare_result['uploadParams']
key_id = upload_params['credentials']['AccessKeyId']
key = upload_params['credentials']['SecretAccessKey']
token = upload_params['credentials']['SessionToken']
s3 = boto3.resource('s3', aws_access_key_id=key_id,
aws_secret_access_key=key,
aws_session_token=token,
region_name=prepare_result['region'])
s3_object = s3.Object(bucket_name=upload_params['bucket'], key=upload_params['key'])
disposition = 'attachment; filename={};'.format(prepare_result['name'])
with open(file_path, mode='rb') as file:
if is_encrypted:
encryption = upload_params['x-amz-server-side-encryption']
s3_object.put(ACL=upload_params['acl'], Body=file,
ContentDisposition=disposition, ServerSideEncryption=encryption)
else:
s3_object.put(ACL=upload_params['acl'], Body=file,
ContentDisposition=disposition)

def __download_file_from_aws(self, file_info, destination, s3):
bucket = s3.Bucket(file_info["s3Path"]["bucket"])
bucket.download_file(file_info["s3Path"]["key"], destination)

def __download_sliced_file_from_aws(self, file_info, destination, s3):
manifest = requests.get(url=file_info['url']).json()
file_names = []
for entry in manifest["entries"]:
full_path = entry["url"]
file_name = full_path.rsplit("/", 1)[1]
file_names.append(file_name)
splitted_path = full_path.split("/")
file_key = "/".join(splitted_path[3:])
bucket = s3.Bucket(file_info['s3Path']['bucket'])
bucket.download_file(file_key, file_name)
self.__merge_split_files(file_names, destination)

def __download_file_from_azure(self, file_info, destination):
blob_client = self.__get_blob_client(
file_info['absCredentials']['SASConnectionString'],
file_info['absPath']['container'],
file_info['absPath']['name']
)
with open(destination, "wb") as downloaded_blob:
download_stream = blob_client.download_blob()
downloaded_blob.write(download_stream.readall())

def __download_sliced_file_from_azure(self, file_info, destination):
blob_service_client = BlobServiceClient.from_connection_string(
file_info['absCredentials']['SASConnectionString']
)
container_client = blob_service_client.get_container_client(
container=file_info['absPath']['container']
)
manifest_stream = container_client.download_blob(
file_info['absPath']['name'] + 'manifest'
)
manifest = json.loads(manifest_stream.readall())
file_names = []
for entry in manifest['entries']:
blob_path = entry['url'].split('blob.core.windows.net/%s/' % (file_info['absPath']['container']))[1]
full_path = entry["url"]
file_name = full_path.rsplit("/", 1)[1]
file_names.append(file_name)
with open(file_name, "wb") as file_slice:
file_slice.write(container_client.download_blob(blob_path).readall())
self.__merge_split_files(file_names, destination)

def __merge_split_files(self, file_names, destination):
with open(destination, mode='wb') as out_file:
for file_name in file_names:
with open(file_name, mode='rb') as in_file:
for line in in_file:
out_file.write(line)
os.remove(file_name)

def __get_blob_client(self, connection_string, container, blob_name):
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
return blob_service_client.get_blob_client(container=container, blob=blob_name)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
packages=find_packages(exclude=['tests']),
install_requires=[
'boto3',
'azure-storage-blob',
'requests'
],
test_suite='tests',
Expand Down
30 changes: 23 additions & 7 deletions tests/functional/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,29 @@ def test_download_file_credentials(self):
with self.subTest():
self.assertEqual(file_id, file_info['id'])
with self.subTest():
self.assertTrue('credentials' in file_info)
with self.subTest():
self.assertTrue('AccessKeyId' in file_info['credentials'])
with self.subTest():
self.assertTrue('SecretAccessKey' in file_info['credentials'])
with self.subTest():
self.assertTrue('SessionToken' in file_info['credentials'])
self.assertTrue(file_info['provider'] in ['aws', 'azure'])
if file_info['provider'] == 'aws':
with self.subTest():
self.assertTrue('credentials' in file_info)
with self.subTest():
self.assertTrue('AccessKeyId' in file_info['credentials'])
with self.subTest():
self.assertTrue('SecretAccessKey' in file_info['credentials'])
with self.subTest():
self.assertTrue('SessionToken' in file_info['credentials'])
elif file_info['provider'] == 'azure':
with self.subTest():
self.assertTrue('absCredentials' in file_info)
with self.subTest():
self.assertTrue('SASConnectionString' in file_info['absCredentials'])
with self.subTest():
self.assertTrue('expiration' in file_info['absCredentials'])
with self.subTest():
self.assertTrue('absPath' in file_info)
with self.subTest():
self.assertTrue('container' in file_info['absPath'])
with self.subTest():
self.assertTrue('name' in file_info['absPath'])

def test_download_file(self):
file, path = tempfile.mkstemp(prefix='sapi-test')
Expand Down
4 changes: 1 addition & 3 deletions tests/functional/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ def test_table_detail(self):
with self.subTest():
self.assertEqual('some-table', table_info['name'])
with self.subTest():
self.assertEqual('https://connection.keboola.com/v2/storage/'
'tables/in.c-py-test-tables.some-table',
table_info['uri'])
self.assertTrue('in.c-py-test-tables.some-table' in table_info['uri'])
with self.subTest():
self.assertEqual([], table_info['primaryKey'])
with self.subTest():
Expand Down

0 comments on commit f994e2b

Please sign in to comment.