Skip to content

Commit

Permalink
feat: refact to consider schema as a ext. service
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanildoBarauna committed Apr 14, 2024
1 parent a7c1b95 commit 470583b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 65 deletions.
4 changes: 2 additions & 2 deletions etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, *xargs) -> None:
totalInvalidParams += 1

if totalInvalidParams == self.params_count:
raise TypeError("Invalid parameters")
raise TypeError(f"Invalid parameters >>>> {self.params}")

self.ValidParams = self.ValidParamsForCall()
self.pipelineExecute()
Expand All @@ -38,4 +38,4 @@ def pipelineExecute(self):
extraction(self.ValidParams)
# pgLoading()
else:
raise KeyError(f"The informed params are not disponible for extract, see avaliable list in: {ENDPOINT_LIST_AVALIABLE_PARITYS}")
raise KeyError(f"The informed params: {self.params} are not disponible for extract, see avaliable list in: {ENDPOINT_LIST_AVALIABLE_PARITYS}")
75 changes: 27 additions & 48 deletions etl/jobs/extract/ApiToParquetFile.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
from etl.jobs.extract import (
pyarrow, requests, loggingInfo, loggingError, loggingWarn
,DefaultOutputFolder, DefaultTimestampStr, CustomBeam
, ENDPOINT_QUOTES_AWESOME_API, WORK_DIR
,DefaultOutputFolder, DefaultTimestampStr, DefaultQuotesAPISchema
,CustomBeam, ENDPOINT_QUOTES_AWESOME_API, WORK_DIR
)

class extraction:
def __init__(self, ValidParams: list) -> None:
self.params = ValidParams
self.PipelineRun()

def ParquetSchemaLoad(self, element: dict):
api_header = list(element.keys())
schema = []

for field in api_header:
schema += [(field, pyarrow.string())]

beam_schema = pyarrow.schema(schema)
loggingInfo("Schema - 200 OK", WORK_DIR)

return beam_schema

def PipelineRun(self):
response = requests.get(ENDPOINT_QUOTES_AWESOME_API + ','.join(self.params))

Expand All @@ -29,40 +17,31 @@ def PipelineRun(self):
params = self.params
else:
raise ConnectionError(f"endpoint connection: {ENDPOINT_QUOTES_AWESOME_API}. status_code: {response.status_code}")

## For generate schema is necessary extract one currency from dicionary
extract_index_params = [item.replace("-", "") for item in params]

FileSchema = self.ParquetSchemaLoad(json_data[extract_index_params[0]])

FileSchema = DefaultQuotesAPISchema()
output_path = DefaultOutputFolder()
insert_timestamp = DefaultTimestampStr()
beam = CustomBeam.BeamObj()
extracted_files = []

for index, param in enumerate(params):
dic = json_data[param.replace("-", "")]

if dic:
output_path = DefaultOutputFolder()
insert_timestamp = DefaultTimestampStr()
beam = CustomBeam.BeamObj()
extracted_files = []
try:
loggingInfo(f"Starting pipeline {index + 1} of {len(params)} - {param} - Starting!", WORK_DIR)

with CustomBeam.PipelineDirectRunner() as pipe:
input_pcollection = (
pipe
| "Create" >> beam.Create([dic])
| "WriteToParquet"
>> beam.io.WriteToParquet(
file_path_prefix=f"{output_path}{param}-{insert_timestamp}",
file_name_suffix=".parquet",
num_shards=1,
schema=FileSchema,
)
)

loggingInfo(f"Pipeline execution OK >> {index + 1} of {len(params)} - {param} - Extracted!", WORK_DIR)

extracted_files.append(f"{output_path}{param}-{insert_timestamp}-00000-of-00001.parquet")

except Exception as err:
loggingError(f"{param} - Pipeline Execution Error >>> {err}", WORK_DIR)


loggingInfo(f"{index + 1} of {len(params)} - {param} - Starting", WORK_DIR)

with CustomBeam.PipelineDirectRunner() as pipe:
input_pcollection = (
pipe
| "Create" >> beam.Create([dic])
| "WriteToParquet"
>> beam.io.WriteToParquet(
file_path_prefix=f"{output_path}{param}-{insert_timestamp}",
file_name_suffix=".parquet",
num_shards=1,
schema=FileSchema,
)
)

loggingInfo(f"{index + 1} of {len(params)} - {param} - file extracted: {output_path}{param}-{insert_timestamp}", WORK_DIR)

loggingInfo(f"All files extracted in: {output_path}", WORK_DIR)
2 changes: 1 addition & 1 deletion etl/jobs/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import requests
# Custom Logs
from etl.utils.logs import loggingInfo, loggingError, loggingWarn
from etl.utils.common import DefaultTimestampStr, DefaultOutputFolder
from etl.utils.common import DefaultTimestampStr, DefaultOutputFolder, DefaultQuotesAPISchema
from etl.config.beam import CustomBeam
from etl.utils.constants import ENDPOINT_QUOTES_AWESOME_API
import os
Expand Down
23 changes: 22 additions & 1 deletion etl/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
import os
from datetime import datetime
import pyarrow

def DefaultOutputFolder():
return os.path.join(os.path.dirname(__file__), "../../data/")

def DefaultTimestampStr() -> str:
current = datetime.now().timestamp()
return str(int(current))
return str(int(current))

def DefaultQuotesAPISchema():
api_header = [
"code",
"codein",
"name",
"high",
"low",
"varBid",
"pctChange",
"bid",
"ask",
"timestamp",
"create_date"]

schema = []
for field in api_header:
schema += [(field, pyarrow.string())]

return pyarrow.schema(schema)
13 changes: 0 additions & 13 deletions etl/utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,2 @@
ENDPOINT_LIST_AVALIABLE_PARITYS = "https://economia.awesomeapi.com.br/json/available"
ENDPOINT_QUOTES_AWESOME_API = "https://economia.awesomeapi.com.br/last/"
QUOTES_API_SCHEMA = [
"code",
"codein"
"name",
"high",
"low",
"varBid",
"pctChange",
"bid",
"ask",
"timestamp",
"create_date"
]

0 comments on commit 470583b

Please sign in to comment.