Skip to content

Commit

Permalink
(#14) Remove deprecated code
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-tessier committed Apr 1, 2021
1 parent 196b9a5 commit d9a737e
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 194 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
build:
python setup.py sdist
test:
test_3:
python3 -m unittest
test_2:
python2 -m unittest discover -p 'test_*.py'
package_upload_test:
twine upload --verbose --repository-url https://test.pypi.org/legacy/ dist/*
package_upload:
Expand Down
3 changes: 2 additions & 1 deletion paper_cup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .paper_cup import PaperCup, ConsumePC, PublishPC
from .client import SNSClient, SQSClient
from .paper_cup import PaperCup
from .decorators import retry
70 changes: 34 additions & 36 deletions paper_cup/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import boto3
from botocore.exceptions import NoCredentialsError
from paper_cup.decorators import retry
from .decorators import retry


class SNSClient:
Expand All @@ -12,21 +12,29 @@ class SNSClient:
CUSTOM_RETRY_RULE = {'tries': RETRY_TRIES, 'delay': RETRY_DELAY, 'backoff': RETRY_BACKOFF}

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def __init__(self, endpoint_url, region='ap-northeast-1', aws_access_key_id=None, aws_secret_access_key=None):
def __init__(self, endpoint_url, region, aws_access_key_id=None, aws_secret_access_key=None):
"""Constructor with already set in options."""

session = boto3.Session(region_name=region, aws_access_key_id=aws_secret_access_key, aws_secret_access_key=aws_secret_access_key)
self.sns = session.client('sns', endpoint_url=endpoint_url)
self._sns_client = session.client('sns', endpoint_url=endpoint_url)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def create_topic(self, topic_name):
"""Create SNS topic by name."""
return self._sns_client.create_topic(Name=topic_name)['TopicArn']

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def delete_topic(self, topic_name):
"""Delete a given SNS topic from it arn."""
return self._sns_client.delete_topic(TopicArn=self.get_topic_arn(topic_name))

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def get_topic_arn(self, topic_name, token=None):
"""Get sns topic by name and paginate until is found."""
sns = self.sns

if token:
list_topics = sns.list_topics(NextToken=token)
list_topics = self._sns_client.list_topics(NextToken=token)
else:
list_topics = sns.list_topics()
list_topics = self._sns_client.list_topics()

next_token = list_topics.get('NextToken')
topic_list = list_topics['Topics']
Expand All @@ -36,37 +44,27 @@ def get_topic_arn(self, topic_name, token=None):
return topic['TopicArn']
else:
if next_token:
self.get_topic(token=next_token)
return self.get_topic_arn(topic_name=topic_name, token=next_token)
else:
raise NotImplementedError('SNS topic not found!')

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def publish(self, message, topic_arn):
def publish(self, message, topic_name):
"""Send message to all subscriber of this topic."""
return self.sns.publish(
TopicArn=topic_arn,
return self._sns_client.publish(
TopicArn=self.get_topic_arn(topic_name),
Message=message,
)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def subscribe(self, topic_arn, queue_arn):
"""Subscribe to topic."""
return self.sns.subscribe(
def add_sqs_subscription(self, topic_name, queue_arn):
"""Subscribe the sqs queue to the topic."""
return self._sns_client.subscribe(
Protocol='sqs',
TopicArn=topic_arn,
TopicArn=self.get_topic_arn(topic_name),
Endpoint=queue_arn,
)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def create_topic(self, topic_name):
"""Create SNS topic by name."""
return self.sns.create_topic(Name=topic_name)['TopicArn']

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def delete_topic(self, topic_arn):
"""Delete a given SNS topic from it arn."""
return self.sns.delete_topic(TopicArn=topic_arn)


class SQSClient:
"""Common sqs usages."""
Expand All @@ -77,34 +75,34 @@ class SQSClient:
CUSTOM_RETRY_RULE = {'tries': RETRY_TRIES, 'delay': RETRY_DELAY, 'backoff': RETRY_BACKOFF}

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def __init__(self, endpoint_url, region='ap-northeast-1', aws_access_key_id=None, aws_secret_access_key=None):
def __init__(self, endpoint_url, region, aws_access_key_id=None, aws_secret_access_key=None):
"""Constructor with already set in options."""
session = boto3.Session(region_name=region, aws_access_key_id=aws_secret_access_key, aws_secret_access_key=aws_secret_access_key)
self.sqs = session.client('sqs', endpoint_url=endpoint_url)
self.sqs_obj = session.resource('sqs', endpoint_url=endpoint_url)
self._sqs_client = session.client('sqs', endpoint_url=endpoint_url)
self._sqs_resource = session.resource('sqs', endpoint_url=endpoint_url)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def create_queue(self, queue_name):
"""Create SQS queue by name."""
return self.sqs.create_queue(QueueName=queue_name)
return self._sqs_client.create_queue(QueueName=queue_name)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def delete_queue(self, queue_url):
def delete_queue(self, queue_name):
"""Delete a given SQS queue from it url."""
return self.sqs.delete_queue(QueueUrl=queue_url)
return self._sqs_client.delete_queue(QueueUrl=self.get_queue_url(queue_name))

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def get_queue_by_name(self, queue_name):
"""Get SQS queue object by name."""
return self.sqs_obj.get_queue_by_name(QueueName=queue_name)
return self._sqs_resource.get_queue_by_name(QueueName=queue_name)

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def get_queue_url(self, queue_name):
"""Get SQS queue url by it name."""
return self.sqs.get_queue_url(QueueName=queue_name)['QueueUrl']
return self._sqs_client.get_queue_url(QueueName=queue_name)['QueueUrl']

@retry(NoCredentialsError, **CUSTOM_RETRY_RULE)
def get_queue_arn(self, queue_url):
"""Get SQS queue arn by it url."""
sqs_queue_attrs = self.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes']
def get_queue_arn(self, queue_name):
"""Get SQS queue arn by it name."""
sqs_queue_attrs = self._sqs_client.get_queue_attributes(QueueUrl=self.get_queue_url(queue_name), AttributeNames=['All'])['Attributes']
return sqs_queue_attrs['QueueArn']
126 changes: 56 additions & 70 deletions paper_cup/paper_cup.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,63 @@
import json

from .client import SNSClient, SQSClient


class PaperCup(object):
"""Publisher and subscribe settings."""
PC_ENABLE = True
PC_TOPIC = 'topic' # Publisher (SNS)
PC_QUEUE = 'queue' # Consumer (SQS)
PC_LISTEN = ['service'] # list of service name for message to process ex: cas listen only 'booking' so for cas PC_LISTEN = ['booking']
PC_SENDER = 'service' # name of the app that send the message ex: 'booking'
PC_SERVICE_LISTEN = ['service'] # list of service name for message to process ex: ['my_other_app']
PC_SERVICE_SENDER = 'service' # name of the app that send the message ex: 'my_app'

PC_DEFAULT_CONSUME_CLIENT = 'SQS'
PC_DEFAULT_PUBLISH_CLIENT = 'SNS'

PC_SUPPORTED_PUBLISH_CLIENT = ['SNS', 'SQS']
PC_SUPPORTED_CONSUME_CLIENT = ['SQS']

PC_SNS_TOPIC = 'topic'
PC_SQS_QUEUE = 'queue'

# default values set for test
PC_AWS_ACCESS_KEY_ID = 'test'
PC_AWS_SECRET_ACCESS_KEY_ID = 'test'
PC_AWS_LOCAL_ENDPOINT = 'http://192.168.56.1:9010' # we use moto

PC_AWS_REGION = 'ap-northeast-1'

# set default attribut values
sns = False
sqs = False
sns_client = False
sqs_client = False


class PublishPC(PaperCup):
"""Public class for Publisher."""

def __init__(self):
"""Deprecated As we never need Publish and Consume in same time."""
def __init__(self, *args, **kwargs):
""""""
if self.PC_ENABLE:
self.sns = SNSClient(self.PC_AWS_LOCAL_ENDPOINT, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
self.topic_arn = self.sns.get_topic_arn(self.PC_TOPIC)
client = kwargs.get('client', PaperCup.PC_DEFAULT_PUBLISH_CLIENT)
assert(client in PaperCup.PC_SUPPORTED_PUBLISH_CLIENT)

self.sqs = SQSClient(self.PC_AWS_LOCAL_ENDPOINT, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
if client == 'SNS':
self.sns_client = SNSClient(self.PC_AWS_LOCAL_ENDPOINT, region=self.PC_AWS_REGION, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
elif client == 'SQS':
self.sqs_client = SQSClient(self.PC_AWS_LOCAL_ENDPOINT, region=self.PC_AWS_REGION, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
self.sqs_client.queue = self.sqs_client.get_queue_by_name(self.PC_SQS_QUEUE)

def publish(self, message, action):
"""Send message to sns."""
if self.sns:
if self.sns_client:
message = self._add_more_data(message, action)
message = json.dumps(message)
self.sns.publish(message, self.topic_arn)
self.sns_client.sns.publish(message, self.PC_SNS_TOPIC)

def _add_more_data(self, message, action):
"""Add necessary data to detemine the consumer and action function."""
# we expect the publish class name as PublishUser and it's cosumer will be ConsumeUser
class_name = self.__class__.__name__
message['consumer_action_class'] = class_name.replace('Publish', 'Consume')
message['action'] = action
message['sender'] = self.PC_SENDER
message['sender'] = self.PC_SERVICE_SENDER
return message

def bulk_publish(self, list_message, list_action):
Expand All @@ -50,30 +66,42 @@ def bulk_publish(self, list_message, list_action):
"""
message = ''
msg_list = []
if self.sns:
if self.sns_client:
for i, one_message in enumerate(list_message):
full_message = self._add_more_data(one_message, list_action[i])
temp_message = json.dumps(full_message)
# check max size of the message to publish under the limit
if (len(message) + len(temp_message)) > 256000:
self.sns.publish(message, self.topic_arn)
self.sns_client.publish(message, self.PC_SNS_TOPIC)
msg_list = [full_message]
else:
msg_list.append(one_message)
message = json.dumps(msg_list)

if message:
self.sns.publish(message, self.topic_arn)
self.sns_client.publish(message, self.PC_SNS_TOPIC)

def run(self):
"""Deprecated as bad naming.
Read the message in queue and use the class that will handle the action. (Consume the message)
"""
if self.sqs:
queue = self.sqs.get_queue_by_name(self.PC_QUEUE)

class ConsumePC(PaperCup):
"""Public class for Consume."""

def __init__(self, *args, **kwargs):
""""""
if self.PC_ENABLE:
client = kwargs.get('client', PaperCup.PC_DEFAULT_CONSUME_CLIENT)
assert(client in PaperCup.PC_SUPPORTED_CONSUME_CLIENT)

if client == 'SQS':
self.sqs_client = SQSClient(self.PC_AWS_LOCAL_ENDPOINT, region=self.PC_AWS_REGION, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
self.sqs_client.queue = self.sqs_client.get_queue_by_name(self.PC_SQS_QUEUE)

def consume(self):
"""Read the message in queue and use the class that will handle the action."""
if self.sqs_client.queue:
# get all the consumer classes that will handle actions
action_classes = {cls.__name__: cls() for cls in self.__class__.__subclasses__() if 'Consume' in cls.__name__}
messages = queue.receive_messages(WaitTimeSeconds=20, MaxNumberOfMessages=10, VisibilityTimeout=30)
messages = self.sqs_client.queue.receive_messages(WaitTimeSeconds=20, MaxNumberOfMessages=10, VisibilityTimeout=30)

while messages:
for message in messages:
body = json.loads(message.body)
Expand All @@ -87,15 +115,15 @@ def run(self):

message.delete()

messages = queue.receive_messages(MaxNumberOfMessages=10, VisibilityTimeout=30)
messages = self.sqs_client.queue.receive_messages(MaxNumberOfMessages=10, VisibilityTimeout=30)

def _consume_msg(self, msg, action_classes):
"""Common call to consume a message."""
"""Common call to consume the queue."""
action = msg.get('action')
sender = msg.get('sender')

# check that the consumer listen from the sender and do the action
if sender in self.PC_LISTEN:
if sender in self.PC_SERVICE_LISTEN:
consumer_action_class = msg.get('consumer_action_class')
action_class = action_classes.get(consumer_action_class)
# only handle the action with consumers
Expand All @@ -106,45 +134,3 @@ def _do_action(self, message, action):
"""Call the action of the consumer class."""
method = getattr(self, action)
method(message)


class PublishPC(PaperCup):
"""Public class for Publisher."""

def __init__(self, *args, **kwargs):
""""""
if self.PC_ENABLE:
self.sns = SNSClient(self.PC_AWS_LOCAL_ENDPOINT, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
self.topic_arn = self.sns.get_topic_arn(self.PC_TOPIC)


class ConsumePC(PaperCup):
"""Public class for Consume ."""

def __init__(self, *args, **kwargs):
""""""
if self.PC_ENABLE:
self.sqs = SQSClient(self.PC_AWS_LOCAL_ENDPOINT, aws_access_key_id=self.PC_AWS_ACCESS_KEY_ID, aws_secret_access_key=self.PC_AWS_SECRET_ACCESS_KEY_ID)
self.sqs_queue = self.sqs.get_queue_by_name(self.PC_QUEUE)

def consume(self):
"""Read the message in queue and use the class that will handle the action. (Consume the message)"""
if self.sqs_queue:
# get all the consumer classes that will handle actions
action_classes = {cls.__name__: cls() for cls in self.__class__.__subclasses__() if 'Consume' in cls.__name__}
messages = self.sqs_queue.receive_messages(WaitTimeSeconds=20, MaxNumberOfMessages=10, VisibilityTimeout=30)

while messages:
for message in messages:
body = json.loads(message.body)
msg = json.loads(body['Message'])

if isinstance(msg, list):
for one_msg in msg:
self._consume_msg(one_msg, action_classes)
else:
self._consume_msg(msg, action_classes)

message.delete()

messages = self.sqs_queue.receive_messages(MaxNumberOfMessages=10, VisibilityTimeout=30)
Loading

0 comments on commit d9a737e

Please sign in to comment.