Skip to content

Commit

Permalink
Merge pull request #80 from keboola/feature-gcp-files
Browse files Browse the repository at this point in the history
added support for files on gcp
  • Loading branch information
kudj authored May 3, 2024
2 parents 6d5d3ca + 3d64ce0 commit 40001aa
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ env:
KBC_TEST_API_URL: https://connection.keboola.com
KBC_AZ_TEST_TOKEN: ${{ secrets.KBC_AZ_TEST_TOKEN }}
KBC_AZ_TEST_API_URL: https://connection.north-europe.azure.keboola.com
KBC_GCP_TEST_TOKEN: ${{ secrets.KBC_GCP_SNOW_TEST_TOKEN }}
KBC_GCP_TEST_API_URL: https://connection.europe-west3.gcp.keboola.com
APP_IMAGE: sapi-python-client

jobs:
Expand Down Expand Up @@ -86,10 +88,28 @@ jobs:
run: |
docker-compose run --rm -e KBC_TEST_TOKEN=$KBC_AZ_TEST_TOKEN -e KBC_TEST_API_URL=$KBC_AZ_TEST_API_URL -e SKIP_ABS_TESTS=1 ci -m unittest --verbose
tests_gcp:
name: Run tests (GCP)
needs: build
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v3

- name: Download image
uses: ishworkh/docker-image-artifact-download@v1
with:
image: "sapi-python-client"

- name: Run Tests
run: |
docker-compose run --rm -e KBC_TEST_TOKEN=$KBC_GCP_TEST_TOKEN -e KBC_TEST_API_URL=$KBC_GCP_TEST_API_URL -e SKIP_ABS_TESTS=1 ci -m unittest --verbose
deploy_to_pypi:
needs:
- tests_aws
- tests_azure
- tests_gcp
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/') && needs.build.outputs.is_semantic_tag == 'true'
steps:
Expand Down
55 changes: 55 additions & 0 deletions kbcstorage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from azure.storage.blob import BlobServiceClient, ContentSettings
from kbcstorage.base import Endpoint
from google.oauth2 import credentials
from google.cloud import storage as GCPStorage


class Files(Endpoint):
Expand Down Expand Up @@ -86,6 +88,8 @@ def upload_file(self, file_path, tags=None, is_public=False,
self.__upload_to_azure(file_resource, file_path)
elif file_resource['provider'] == 'aws':
self.__upload_to_aws(file_resource, file_path, is_encrypted)
elif file_resource['provider'] == 'gcp':
self.__upload_to_gcp(file_resource, file_path)

return file_resource['id']

Expand Down Expand Up @@ -196,6 +200,16 @@ def download(self, file_id, local_path):
self.__download_sliced_file_from_aws(file_info, local_file, s3)
else:
self.__download_file_from_aws(file_info, local_file, s3)
elif file_info['provider'] == 'gcp':
storage_client = self.__get_gcp_client(
file_info['gcsCredentials']['access_token'],
file_info['gcsCredentials']['projectId'],
)

if file_info['isSliced']:
self.__download_sliced_file_from_gcp(file_info, local_file, storage_client)
else:
self.__download_file_from_gcp(file_info, local_file, storage_client)
return local_file

def __upload_to_azure(self, preparation_result, file_path):
Expand Down Expand Up @@ -233,6 +247,18 @@ def __upload_to_aws(self, prepare_result, file_path, is_encrypted):
s3_object.put(ACL=upload_params['acl'], Body=file,
ContentDisposition=disposition)

def __upload_to_gcp(self, preparation_result, file_path):
storage_client = self.__get_gcp_client(
preparation_result['gcsUploadParams']['access_token'],
preparation_result['gcsUploadParams']['projectId']
)

bucket = storage_client.bucket(preparation_result['gcsUploadParams']['bucket'])
blob = bucket.blob(preparation_result['gcsUploadParams']['key'])

with open(file_path, "rb") as blob_data:
blob.upload_from_file(blob_data)

def __download_file_from_aws(self, file_info, destination, s3):
bucket = s3.Bucket(file_info["s3Path"]["bucket"])
bucket.download_file(file_info["s3Path"]["key"], destination)
Expand Down Expand Up @@ -281,6 +307,30 @@ def __download_sliced_file_from_azure(self, file_info, destination):
file_slice.write(container_client.download_blob(blob_path).readall())
self.__merge_split_files(file_names, destination)

def __download_file_from_gcp(self, file_info, destination, storage_client):

bucket = storage_client.bucket(file_info['gcsPath']['bucket'])
blob = bucket.blob(file_info['gcsPath']['key'])
blob.download_to_filename(destination)

def __download_sliced_file_from_gcp(self, file_info, destination, storage_client):
manifest = requests.get(url=file_info['url']).json()
file_names = []
for entry in manifest["entries"]:
full_path = entry["url"]
file_name = full_path.rsplit("/", 1)[1]
file_names.append(file_name)
splitted_path = full_path.split("/")
file_key = "/".join(splitted_path[3:])
bucket = storage_client.bucket(file_info['gcsPath']['bucket'])

blob = bucket.blob(file_key)

with open(file_name, "wb") as file_slice:
file_slice.write(blob.download_as_bytes())

self.__merge_split_files(file_names, destination)

def __merge_split_files(self, file_names, destination):
with open(destination, mode='wb') as out_file:
for file_name in file_names:
Expand All @@ -292,3 +342,8 @@ def __merge_split_files(self, file_names, destination):
def __get_blob_client(self, connection_string, container, blob_name):
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
return blob_service_client.get_blob_client(container=container, blob=blob_name)

def __get_gcp_client(self, token, project):
creds = credentials.Credentials(token=token)
gcp_storage_client = GCPStorage.Client(credentials=creds, project=project)
return gcp_storage_client
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies = [
"requests",
"responses",
"python-dotenv",
"google-cloud-storage==2.16.0",
"google-auth==2.29.0"
]
dynamic = ["version"]

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_download_file_credentials(self):
with self.subTest():
self.assertEqual(file_id, file_info['id'])
with self.subTest():
self.assertTrue(file_info['provider'] in ['aws', 'azure'])
self.assertTrue(file_info['provider'] in ['aws', 'azure', 'gcp'])
if file_info['provider'] == 'aws':
with self.subTest():
self.assertTrue('credentials' in file_info)
Expand Down

0 comments on commit 40001aa

Please sign in to comment.