diff --git a/etl/__init__.py b/etl/__init__.py index 25be4b6..0085f2c 100644 --- a/etl/__init__.py +++ b/etl/__init__.py @@ -16,7 +16,7 @@ def __init__(self, *xargs) -> None: totalInvalidParams += 1 if totalInvalidParams == self.params_count: - raise TypeError("Invalid parameters") + raise TypeError(f"Invalid parameters >>>> {self.params}") self.ValidParams = self.ValidParamsForCall() self.pipelineExecute() @@ -38,4 +38,4 @@ def pipelineExecute(self): extraction(self.ValidParams) # pgLoading() else: - raise KeyError(f"The informed params are not disponible for extract, see avaliable list in: {ENDPOINT_LIST_AVALIABLE_PARITYS}") \ No newline at end of file + raise KeyError(f"The informed params: {self.params} are not disponible for extract, see avaliable list in: {ENDPOINT_LIST_AVALIABLE_PARITYS}") \ No newline at end of file diff --git a/etl/jobs/extract/ApiToParquetFile.py b/etl/jobs/extract/ApiToParquetFile.py index 107e9f4..f9a393c 100644 --- a/etl/jobs/extract/ApiToParquetFile.py +++ b/etl/jobs/extract/ApiToParquetFile.py @@ -1,7 +1,7 @@ from etl.jobs.extract import ( pyarrow, requests, loggingInfo, loggingError, loggingWarn - ,DefaultOutputFolder, DefaultTimestampStr, CustomBeam - , ENDPOINT_QUOTES_AWESOME_API, WORK_DIR + ,DefaultOutputFolder, DefaultTimestampStr, DefaultQuotesAPISchema + ,CustomBeam, ENDPOINT_QUOTES_AWESOME_API, WORK_DIR ) class extraction: @@ -9,18 +9,6 @@ def __init__(self, ValidParams: list) -> None: self.params = ValidParams self.PipelineRun() - def ParquetSchemaLoad(self, element: dict): - api_header = list(element.keys()) - schema = [] - - for field in api_header: - schema += [(field, pyarrow.string())] - - beam_schema = pyarrow.schema(schema) - loggingInfo("Schema - 200 OK", WORK_DIR) - - return beam_schema - def PipelineRun(self): response = requests.get(ENDPOINT_QUOTES_AWESOME_API + ','.join(self.params)) @@ -29,40 +17,31 @@ def PipelineRun(self): params = self.params else: raise ConnectionError(f"endpoint connection: {ENDPOINT_QUOTES_AWESOME_API}. status_code: {response.status_code}") - - ## For generate schema is necessary extract one currency from dicionary - extract_index_params = [item.replace("-", "") for item in params] - - FileSchema = self.ParquetSchemaLoad(json_data[extract_index_params[0]]) + + FileSchema = DefaultQuotesAPISchema() + output_path = DefaultOutputFolder() + insert_timestamp = DefaultTimestampStr() + beam = CustomBeam.BeamObj() + extracted_files = [] for index, param in enumerate(params): dic = json_data[param.replace("-", "")] - - if dic: - output_path = DefaultOutputFolder() - insert_timestamp = DefaultTimestampStr() - beam = CustomBeam.BeamObj() - extracted_files = [] - try: - loggingInfo(f"Starting pipeline {index + 1} of {len(params)} - {param} - Starting!", WORK_DIR) - - with CustomBeam.PipelineDirectRunner() as pipe: - input_pcollection = ( - pipe - | "Create" >> beam.Create([dic]) - | "WriteToParquet" - >> beam.io.WriteToParquet( - file_path_prefix=f"{output_path}{param}-{insert_timestamp}", - file_name_suffix=".parquet", - num_shards=1, - schema=FileSchema, - ) - ) - - loggingInfo(f"Pipeline execution OK >> {index + 1} of {len(params)} - {param} - Extracted!", WORK_DIR) - - extracted_files.append(f"{output_path}{param}-{insert_timestamp}-00000-of-00001.parquet") - - except Exception as err: - loggingError(f"{param} - Pipeline Execution Error >>> {err}", WORK_DIR) - \ No newline at end of file + + loggingInfo(f"{index + 1} of {len(params)} - {param} - Starting", WORK_DIR) + + with CustomBeam.PipelineDirectRunner() as pipe: + input_pcollection = ( + pipe + | "Create" >> beam.Create([dic]) + | "WriteToParquet" + >> beam.io.WriteToParquet( + file_path_prefix=f"{output_path}{param}-{insert_timestamp}", + file_name_suffix=".parquet", + num_shards=1, + schema=FileSchema, + ) + ) + + loggingInfo(f"{index + 1} of {len(params)} - {param} - file extracted: {output_path}{param}-{insert_timestamp}", WORK_DIR) + + loggingInfo(f"All files extracted in: {output_path}", WORK_DIR) \ No newline at end of file diff --git a/etl/jobs/extract/__init__.py b/etl/jobs/extract/__init__.py index d700529..0534003 100644 --- a/etl/jobs/extract/__init__.py +++ b/etl/jobs/extract/__init__.py @@ -3,7 +3,7 @@ import requests # Custom Logs from etl.utils.logs import loggingInfo, loggingError, loggingWarn -from etl.utils.common import DefaultTimestampStr, DefaultOutputFolder +from etl.utils.common import DefaultTimestampStr, DefaultOutputFolder, DefaultQuotesAPISchema from etl.config.beam import CustomBeam from etl.utils.constants import ENDPOINT_QUOTES_AWESOME_API import os diff --git a/etl/utils/common.py b/etl/utils/common.py index 6938890..f459ba1 100644 --- a/etl/utils/common.py +++ b/etl/utils/common.py @@ -1,9 +1,30 @@ import os from datetime import datetime +import pyarrow def DefaultOutputFolder(): return os.path.join(os.path.dirname(__file__), "../../data/") def DefaultTimestampStr() -> str: current = datetime.now().timestamp() - return str(int(current)) \ No newline at end of file + return str(int(current)) + +def DefaultQuotesAPISchema(): + api_header = [ + "code", + "codein", + "name", + "high", + "low", + "varBid", + "pctChange", + "bid", + "ask", + "timestamp", + "create_date"] + + schema = [] + for field in api_header: + schema += [(field, pyarrow.string())] + + return pyarrow.schema(schema) \ No newline at end of file diff --git a/etl/utils/constants.py b/etl/utils/constants.py index f890506..6a76094 100644 --- a/etl/utils/constants.py +++ b/etl/utils/constants.py @@ -1,15 +1,2 @@ ENDPOINT_LIST_AVALIABLE_PARITYS = "https://economia.awesomeapi.com.br/json/available" ENDPOINT_QUOTES_AWESOME_API = "https://economia.awesomeapi.com.br/last/" -QUOTES_API_SCHEMA = [ - "code", - "codein" - "name", - "high", - "low", - "varBid", - "pctChange", - "bid", - "ask", - "timestamp", - "create_date" -]