Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framed transport for Kafka message bus #329

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions frontera/contrib/messagebus/kafka/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from math import ceil

import hashlib
from cachetools import LRUCache
from msgpack import Packer, unpackb
from random import randint
from six import MAXSIZE
from struct import pack


def random_bytes():
return pack("L", randint(0, MAXSIZE))
Comment on lines +11 to +12

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have an error here. Look at six sources

if PY3:
    string_types = str,
    integer_types = int,
    class_types = type,
    text_type = str
    binary_type = bytes

    MAXSIZE = sys.maxsize
else:
    string_types = basestring,
    integer_types = (int, long)
    class_types = (type, types.ClassType)
    text_type = unicode
    binary_type = str

    if sys.platform.startswith("java"):
        # Jython always uses 32 bits.
        MAXSIZE = int((1 << 31) - 1)
    else:
        # It's possible to have sizeof(long) != sizeof(Py_ssize_t).
        class X(object):

            def __len__(self):
                return 1 << 31
        try:
            len(X())
        except OverflowError:
            # 32-bit
            MAXSIZE = int((1 << 31) - 1)
        else:
            # 64-bit
            MAXSIZE = int((1 << 63) - 1)
        del X

sys.maxsize is 64 bit in Python 3 (9223372036854775807). As you can see from six, MAXSIZE can be 64 bit both for Python 2 and 3. According to documentation (Python 2 and 3), packing symbol L is used to pack 32 bit uint. You should change it to Q which is used to pack 64 bit uint.

Suggested change
def random_bytes():
return pack("L", randint(0, MAXSIZE))
def random_bytes():
return pack('Q', randint(0, MAXSIZE))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to change MAXSIZE variable to literal 4_294_967_295.



class FramedTransport(object):
def __init__(self, max_message_size):
self.max_message_size = max_message_size
self.buffer = LRUCache(10)
self.packer = Packer()

def read(self, kafka_msg):
frame = unpackb(kafka_msg.value)
seg_id, seg_count, msg_key, msg = frame
if seg_count == 1:
return msg

buffer = self.buffer.get(msg_key, dict())
if not buffer:
self.buffer[msg_key] = buffer
buffer[seg_id] = frame
if len(buffer) == seg_count:
msgs = [buffer[_seg_id][3] for _seg_id in sorted(buffer.keys())]
final_msg = b''.join(msgs)
del self.buffer[msg_key]
return final_msg
return None

def write(self, key, msg):
if len(msg) < self.max_message_size:
yield self.packer.pack((0, 1, None, msg))
else:
length = len(msg)
seg_size = self.max_message_size
seg_count = int(ceil(length / float(seg_size)))
h = hashlib.sha1()
h.update(msg)
h.update(random_bytes())
msg_key = h.digest()
for seg_id in range(seg_count):
seg_msg = msg[seg_id * seg_size: (seg_id + 1) * seg_size]
yield self.packer.pack((seg_id, seg_count, msg_key, seg_msg))
27 changes: 15 additions & 12 deletions frontera/contrib/messagebus/kafkabus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@
from __future__ import absolute_import

from logging import getLogger
from time import sleep

import six
from kafka import KafkaConsumer, KafkaProducer, TopicPartition

from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner
from frontera.contrib.messagebus.kafka.offsets_fetcher import OffsetsFetcherAsync
from frontera.contrib.messagebus.kafka.transport import FramedTransport
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \
BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer, BaseStatsLogStream
from twisted.internet.task import LoopingCall
from traceback import format_tb
from os.path import join as os_path_join


DEFAULT_BATCH_SIZE = 1024 * 1024
DEFAULT_BUFFER_MEMORY = 130 * 1024 * 1024
DEFAULT_MAX_REQUEST_SIZE = 4 * 1024 * 1024
MAX_SEGMENT_SIZE = int(DEFAULT_MAX_REQUEST_SIZE * 0.95)

logger = getLogger("messagebus.kafka")

Expand Down Expand Up @@ -59,13 +57,16 @@ def __init__(self, location, enable_ssl, cert_path, topic, group, partition_id):
else:
self._partitions = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)]
self._consumer.subscribe(topics=[self._topic])
self._transport = FramedTransport(MAX_SEGMENT_SIZE)

def get_messages(self, timeout=0.1, count=1):
result = []
while count > 0:
try:
m = next(self._consumer)
result.append(m.value)
kafka_msg = next(self._consumer)
msg = self._transport.read(kafka_msg)
if msg is not None:
result.append(msg)
count -= 1
except StopIteration:
break
Expand All @@ -90,17 +91,18 @@ def __init__(self, location, enable_ssl, cert_path, topic, compression, **kwargs
self._create(enable_ssl, cert_path, **kwargs)

def _create(self, enable_ssl, cert_path, **kwargs):
max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE)
self._transport = FramedTransport(MAX_SEGMENT_SIZE)
kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {})
self._producer = KafkaProducer(bootstrap_servers=self._location,
retries=5,
compression_type=self._compression,
max_request_size=max_request_size,
max_request_size=DEFAULT_MAX_REQUEST_SIZE,
**kwargs)

def send(self, key, *messages):
for msg in messages:
self._producer.send(self._topic, value=msg)
for kafka_msg in self._transport.write(key, msg):
self._producer.send(self._topic, value=kafka_msg)

def flush(self):
self._producer.flush()
Expand All @@ -115,18 +117,19 @@ def __init__(self, location, enable_ssl, cert_path, topic_done, partitioner, com
self._topic_done = topic_done
self._partitioner = partitioner
self._compression = compression
max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE)
kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {})
self._transport = FramedTransport(MAX_SEGMENT_SIZE)
self._producer = KafkaProducer(bootstrap_servers=self._location,
partitioner=partitioner,
retries=5,
compression_type=self._compression,
max_request_size=max_request_size,
max_request_size=DEFAULT_MAX_REQUEST_SIZE,
**kwargs)

def send(self, key, *messages):
for msg in messages:
self._producer.send(self._topic_done, key=key, value=msg)
for kafka_msg in self._transport.write(key, msg):
self._producer.send(self._topic_done, key=key, value=kafka_msg)

def flush(self):
self._producer.flush()
Expand Down
6 changes: 4 additions & 2 deletions tests/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ services:
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
image: wurstmeister/kafka:2.11-0.10.2.1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_CREATE_TOPICS: "frontier-todo:1:1,frontier-done:1:1,frontier-score:4:1,frontier-stats:4:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_MESSAGE_MAX_BYTES: 4194304
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
42 changes: 42 additions & 0 deletions tests/test_framed_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
from frontera.contrib.messagebus.kafka.transport import FramedTransport
import random
import string
from collections import namedtuple
import unittest

KafkaMessage = namedtuple("KafkaMessage", ['key', 'value'])


def get_blob(size):
s = ''.join(random.choice(string.ascii_letters) for x in range(size))
return s.encode("latin1")


class TestFramedTransport(unittest.TestCase):
def setUp(self):
self.transport = FramedTransport(32768)

def test_big_message(self):
test_msg = get_blob(1000000)
assert len(test_msg) == 1000000
framed_msgs = [m for m in self.transport.write(b"key", test_msg)]
assert len(framed_msgs) == 31

random.shuffle(framed_msgs)

for i, msg in enumerate(framed_msgs):
km = KafkaMessage(key=b"key", value=msg)
result = self.transport.read(km)
if i < len(framed_msgs) - 1:
assert result is None
assert result == test_msg # the last one is triggering msg assembling

def test_common_message(self):
test_msg = get_blob(4096)
framed_msgs = [m for m in self.transport.write(b"key", test_msg)]
assert len(framed_msgs) == 1

km = KafkaMessage(key=b"key", value=framed_msgs[0])
result = self.transport.read(km)
assert result == test_msg
19 changes: 16 additions & 3 deletions tests/test_message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@
from frontera.contrib.messagebus.kafkabus import MessageBus as KafkaMessageBus
from frontera.utils.fingerprint import sha1
from kafka import KafkaClient
from random import randint
from random import randint, choice
from time import sleep
import string
from six.moves import range
import logging
from sys import stdout
import unittest
from w3lib.util import to_bytes


def get_blob(size):
s = ''.join(choice(string.ascii_letters) for x in range(size))
return s.encode("latin1")


class MessageBusTester(object):
def __init__(self, cls, settings=Settings()):
settings.set('SPIDER_FEED_PARTITIONS', 1)
Expand Down Expand Up @@ -119,7 +125,8 @@ def close(self):
class KafkaMessageBusTest(unittest.TestCase):
def setUp(self):
logging.basicConfig()
handler = logging.StreamHandler(stdout)
#handler = logging.StreamHandler(stdout)
handler = logging.FileHandler("kafka-debug.log")
logger = logging.getLogger("kafka")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
Expand Down Expand Up @@ -177,7 +184,8 @@ def spider_log_activity(self, messages):
if i % 2 == 0:
self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://helloworld.com/way/to/the/sun/' + b'0')
else:
self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://way.to.the.sun' + b'0')
msg = b'http://way.to.the.sun' + b'0' if i != messages - 1 else get_blob(10485760)
self.sp_sl_p.send(sha1(str(randint(1, 1000))), msg)
self.sp_sl_p.flush()
self.logger.debug("spider log activity finished")

Expand All @@ -190,12 +198,17 @@ def spider_feed_activity(self):
def sw_activity(self):
c = 0
p = 0
big_message_passed = False
for m in self.sw_sl_c.get_messages(timeout=0.1, count=512):
if m.startswith(b'http://helloworld.com/'):
p += 1
self.sw_us_p.send(None, b'message' + b'0' + b"," + to_bytes(str(c)))
else:
if len(m) == 10485760:
big_message_passed = True
c += 1
assert p > 0
assert big_message_passed
return c

def db_activity(self, messages):
Expand Down