Skip to content

Commit

Permalink
cria funcao lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
andresionek91 committed Jul 9, 2021
1 parent 71e9fbd commit 3f28d15
Show file tree
Hide file tree
Showing 12 changed files with 728 additions and 66 deletions.
13 changes: 7 additions & 6 deletions mercado_bitcoin/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class MercadoBitcoinApi(ABC):

def __init__(self, coin: str) -> None:
self.coin = coin
self.base_endpoint = "https://www.mercadobitcoin.net/api"
Expand Down Expand Up @@ -44,17 +43,19 @@ class TradesApi(MercadoBitcoinApi):
def _get_unix_epoch(self, date: datetime.datetime) -> int:
return int(date.timestamp())

def _get_endpoint(self, date_from: datetime.datetime = None, date_to: datetime.datetime = None) -> str:
def _get_endpoint(
self, date_from: datetime.datetime = None, date_to: datetime.datetime = None
) -> str:
if date_from and not date_to:
unix_date_from = self._get_unix_epoch(date_from)
endpoint = f'{self.base_endpoint}/{self.coin}/{self.type}/{unix_date_from}'
endpoint = f"{self.base_endpoint}/{self.coin}/{self.type}/{unix_date_from}"
elif date_from and date_to:
if date_from > date_to:
raise RuntimeError("date_from cannot be greater than date_to")
unix_date_from = self._get_unix_epoch(date_from)
unix_date_to = self._get_unix_epoch(date_to)
endpoint = f'{self.base_endpoint}/{self.coin}/{self.type}/{unix_date_from}/{unix_date_to}'
endpoint = f"{self.base_endpoint}/{self.coin}/{self.type}/{unix_date_from}/{unix_date_to}"
else:
endpoint = f'{self.base_endpoint}/{self.coin}/{self.type}'
endpoint = f"{self.base_endpoint}/{self.coin}/{self.type}"

return endpoint
return endpoint
63 changes: 63 additions & 0 deletions mercado_bitcoin/checkpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import datetime

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute
import logging

logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)


class CheckpointModel(Model):
class Meta:
table_name = "mercado_bitcoin_ingestor_checkpoints"
region = "us-east-1"

report_id = UnicodeAttribute(hash_key=True)
checkpoint_date = UnicodeAttribute()


class DynamoCheckpoints:
def __init__(self, model: CheckpointModel, report_id: str, default_start_date: datetime.date):
self.default_start_date = default_start_date
self.model = model
self.report_id = report_id
self.create_table()

def create_checkpoint(self, checkpoint_date):
checkpoint = self.model(self.report_id, checkpoint_date=f"{checkpoint_date}")
checkpoint.save()

def update_checkpoint(self, checkpoint_date):
checkpoint = self.model.get(self.report_id)
checkpoint.checkpoint_date = f"{checkpoint_date}"
checkpoint.save()

def create_or_update_checkpoint(self, checkpoint_date):
logger.info(f"Saving checkpoint for {self.report_id}: {checkpoint_date}")
if not self.checkpoint_exist:
self.create_checkpoint(checkpoint_date)
else:
self.update_checkpoint(checkpoint_date)

@property
def checkpoint_exist(self):
try:
return list(self.model.query(self.report_id)) != []
except KeyError:
logger.warning(f"KeyError: {self.report_id}")
return False

def create_table(self):
logger.info(f"Creating dynamo table")
if not self.model.exists():
self.model.create_table(billing_mode="PAY_PER_REQUEST", wait=True)

def get_checkpoint(self):
if self.checkpoint_exist:
checkpoint = list(self.model.query(self.report_id))[0].checkpoint_date
logger.info(f"Checkpoint found for {self.report_id}: {checkpoint}")
return datetime.datetime.strptime(checkpoint, "%Y-%m-%d").date()
else:
logger.info(f"Checkpoint not found for {self.report_id} using default_start_date")
return self.default_start_date
42 changes: 40 additions & 2 deletions mercado_bitcoin/ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from typing import List

from mercado_bitcoin.apis import DaySummaryApi
from mercado_bitcoin.checkpoints import DynamoCheckpoints, CheckpointModel


class DataIngestor(ABC):

def __init__(self, writer, coins: List[str], default_start_date: datetime.date) -> None:
def __init__(
self, writer, coins: List[str], default_start_date: datetime.date
) -> None:
self.default_start_date = default_start_date
self.coins = coins
self.writer = writer
Expand Down Expand Up @@ -38,7 +40,43 @@ def ingest(self) -> None:


class DaySummaryIngestor(DataIngestor):
def ingest(self) -> None:
date = self._load_checkpoint()
if date < datetime.date.today():
for coin in self.coins:
api = DaySummaryApi(coin=coin)
data = api.get_data(date=date)
self.writer(coin=coin, api=api.type).write(data)
self._update_checkpoint(date + datetime.timedelta(days=1))


class AwsDataIngestor(ABC):
def __init__(
self, writer, coins: List[str], default_start_date: datetime.date
) -> None:
self.dynamodb_checkpoint = DynamoCheckpoints(
model=CheckpointModel,
report_id=self.__class__.__name__,
default_start_date=default_start_date,
)
self.default_start_date = default_start_date
self.coins = coins
self.writer = writer
self._checkpoint = self._load_checkpoint()

def _load_checkpoint(self) -> datetime.date:
return self.dynamodb_checkpoint.get_checkpoint()

def _update_checkpoint(self, value):
self._checkpoint = value
self.dynamodb_checkpoint.create_or_update_checkpoint(checkpoint_date=self._checkpoint)

@abstractmethod
def ingest(self) -> None:
pass


class AwsDaySummaryIngestor(AwsDataIngestor):
def ingest(self) -> None:
date = self._load_checkpoint()
if date < datetime.date.today():
Expand Down
19 changes: 19 additions & 0 deletions mercado_bitcoin/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import datetime

from mercado_bitcoin.ingestors import AwsDaySummaryIngestor
from mercado_bitcoin.writers import S3Writter
import logging

logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)


def lambda_handler(event, context):
logger.info(f"{event}")
logger.info(f"{context}")

AwsDaySummaryIngestor(
writer=S3Writter,
coins=["BTC", "ETH", "LTC", "BCH"],
default_start_date=datetime.date(2021, 6, 1),
).ingest()
10 changes: 3 additions & 7 deletions mercado_bitcoin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,20 @@
import time

from schedule import repeat, every, run_pending
from mercado_bitcoin.ingestors import DaySummaryIngestor
from mercado_bitcoin.ingestors import AwsDaySummaryIngestor
from mercado_bitcoin.writers import S3Writter

if __name__ == "__main__":
day_summary_ingestor = DaySummaryIngestor(
day_summary_ingestor = AwsDaySummaryIngestor(
writer=S3Writter,
coins=["BTC", "ETH", "LTC", "BCH"],
default_start_date=datetime.date(2021, 6, 1)
default_start_date=datetime.date(2021, 6, 1),
)


@repeat(every(1).seconds)
def job():
day_summary_ingestor.ingest()


while True:
run_pending()
time.sleep(0.5)


10 changes: 2 additions & 8 deletions mercado_bitcoin/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def __init__(self, data):


class DataWriter:

def __init__(self, coin: str, api: str) -> None:
self.api = api
self.coin = coin
Expand All @@ -26,7 +25,7 @@ def _write_row(self, row: str) -> None:
with open(self.filename, "a") as f:
f.write(row)

def _write_to_file(self, data: [List, dict]):
def _write_to_file(self, data: [List, dict]):
if isinstance(data, dict):
self._write_row(json.dumps(data) + "\n")
elif isinstance(data, List):
Expand Down Expand Up @@ -56,10 +55,5 @@ def write(self, data: [List, dict]):

def _write_file_to_s3(self):
self.client.put_object(
Body=self.tempfile,
Bucket="belisco-data-lake-raw",
Key=self.key
Body=self.tempfile, Bucket="belisco-data-lake-raw", Key=self.key
)



Loading

0 comments on commit 3f28d15

Please sign in to comment.