-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Framed transport for Kafka message bus #329
Open
sibiryakov
wants to merge
7
commits into
master
Choose a base branch
from
framed-transport
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
63e108d
framed transport in kafka mb
sibiryakov d8b484c
uncommenting
sibiryakov 5234fd7
style
sibiryakov 65e236e
setting max message size to 4Mb
sibiryakov 3949dd7
setting kafka version to 0.11.0.2
sibiryakov 18dde04
creating topics
sibiryakov f9d842d
trying kafka 2.11-0.10.2.1
sibiryakov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from math import ceil | ||
|
||
import hashlib | ||
from cachetools import LRUCache | ||
from msgpack import Packer, unpackb | ||
from random import randint | ||
from six import MAXSIZE | ||
from struct import pack | ||
|
||
|
||
def random_bytes(): | ||
return pack("L", randint(0, MAXSIZE)) | ||
|
||
|
||
class FramedTransport(object): | ||
def __init__(self, max_message_size): | ||
self.max_message_size = max_message_size | ||
self.buffer = LRUCache(10) | ||
self.packer = Packer() | ||
|
||
def read(self, kafka_msg): | ||
frame = unpackb(kafka_msg.value) | ||
seg_id, seg_count, msg_key, msg = frame | ||
if seg_count == 1: | ||
return msg | ||
|
||
buffer = self.buffer.get(msg_key, dict()) | ||
if not buffer: | ||
self.buffer[msg_key] = buffer | ||
buffer[seg_id] = frame | ||
if len(buffer) == seg_count: | ||
msgs = [buffer[_seg_id][3] for _seg_id in sorted(buffer.keys())] | ||
final_msg = b''.join(msgs) | ||
del self.buffer[msg_key] | ||
return final_msg | ||
return None | ||
|
||
def write(self, key, msg): | ||
if len(msg) < self.max_message_size: | ||
yield self.packer.pack((0, 1, None, msg)) | ||
else: | ||
length = len(msg) | ||
seg_size = self.max_message_size | ||
seg_count = int(ceil(length / float(seg_size))) | ||
h = hashlib.sha1() | ||
h.update(msg) | ||
h.update(random_bytes()) | ||
msg_key = h.digest() | ||
for seg_id in range(seg_count): | ||
seg_msg = msg[seg_id * seg_size: (seg_id + 1) * seg_size] | ||
yield self.packer.pack((seg_id, seg_count, msg_key, seg_msg)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# -*- coding: utf-8 -*- | ||
from frontera.contrib.messagebus.kafka.transport import FramedTransport | ||
import random | ||
import string | ||
from collections import namedtuple | ||
import unittest | ||
|
||
KafkaMessage = namedtuple("KafkaMessage", ['key', 'value']) | ||
|
||
|
||
def get_blob(size): | ||
s = ''.join(random.choice(string.ascii_letters) for x in range(size)) | ||
return s.encode("latin1") | ||
|
||
|
||
class TestFramedTransport(unittest.TestCase): | ||
def setUp(self): | ||
self.transport = FramedTransport(32768) | ||
|
||
def test_big_message(self): | ||
test_msg = get_blob(1000000) | ||
assert len(test_msg) == 1000000 | ||
framed_msgs = [m for m in self.transport.write(b"key", test_msg)] | ||
assert len(framed_msgs) == 31 | ||
|
||
random.shuffle(framed_msgs) | ||
|
||
for i, msg in enumerate(framed_msgs): | ||
km = KafkaMessage(key=b"key", value=msg) | ||
result = self.transport.read(km) | ||
if i < len(framed_msgs) - 1: | ||
assert result is None | ||
assert result == test_msg # the last one is triggering msg assembling | ||
|
||
def test_common_message(self): | ||
test_msg = get_blob(4096) | ||
framed_msgs = [m for m in self.transport.write(b"key", test_msg)] | ||
assert len(framed_msgs) == 1 | ||
|
||
km = KafkaMessage(key=b"key", value=framed_msgs[0]) | ||
result = self.transport.read(km) | ||
assert result == test_msg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have an error here. Look at
six
sourcessys.maxsize
is 64 bit in Python 3 (9223372036854775807). As you can see from six, MAXSIZE can be 64 bit both for Python 2 and 3. According to documentation (Python 2 and 3), packing symbolL
is used to pack 32 bit uint. You should change it toQ
which is used to pack 64 bit uint.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to change
MAXSIZE
variable to literal4_294_967_295
.