From 63e108d24d708a8c6796ad002fbbf73bcfb27418 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Fri, 27 Apr 2018 13:17:53 +0200 Subject: [PATCH 1/7] framed transport in kafka mb # Conflicts: # frontera/contrib/messagebus/kafkabus.py --- .../contrib/messagebus/kafka/transport.py | 51 +++++++++++++++++++ frontera/contrib/messagebus/kafkabus.py | 29 ++++++----- tests/test_framed_transport.py | 42 +++++++++++++++ tests/test_message_bus.py | 23 +++++++-- 4 files changed, 128 insertions(+), 17 deletions(-) create mode 100644 frontera/contrib/messagebus/kafka/transport.py create mode 100644 tests/test_framed_transport.py diff --git a/frontera/contrib/messagebus/kafka/transport.py b/frontera/contrib/messagebus/kafka/transport.py new file mode 100644 index 000000000..3707093bf --- /dev/null +++ b/frontera/contrib/messagebus/kafka/transport.py @@ -0,0 +1,51 @@ +from math import ceil + +import hashlib +from cachetools import LRUCache +from msgpack import Packer, unpackb +from random import randint +from six import MAXSIZE +from struct import pack + + +def random_bytes(): + return pack("L", randint(0, MAXSIZE)) + + +class FramedTransport(object): + def __init__(self, max_message_size): + self.max_message_size = max_message_size + self.buffer = LRUCache(10) + self.packer = Packer() + + def read(self, kafka_msg): + frame = unpackb(kafka_msg.value) + seg_id, seg_count, msg_key, msg = frame + if seg_count == 1: + return msg + + buffer = self.buffer.get(msg_key, dict()) + if not buffer: + self.buffer[msg_key] = buffer + buffer[seg_id] = frame + if len(buffer) == seg_count: + msgs = [buffer[seg_id][3] for seg_id in sorted(buffer.keys())] + final_msg = b''.join(msgs) + del self.buffer[msg_key] + return final_msg + return None + + def write(self, key, msg): + if len(msg) < self.max_message_size: + yield self.packer.pack((0, 1, None, msg)) + else: + length = len(msg) + seg_size = self.max_message_size + seg_count = int(ceil(length / float(seg_size))) + h = hashlib.sha1() + h.update(msg) + h.update(random_bytes()) + msg_key = h.digest() + for seg_id in range(seg_count): + seg_msg = msg[seg_id * seg_size: (seg_id + 1) * seg_size] + yield self.packer.pack((seg_id, seg_count, msg_key, seg_msg)) \ No newline at end of file diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index f4761e052..2932396fb 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -2,23 +2,21 @@ from __future__ import absolute_import from logging import getLogger -from time import sleep import six from kafka import KafkaConsumer, KafkaProducer, TopicPartition from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.kafka.offsets_fetcher import OffsetsFetcherAsync +from frontera.contrib.messagebus.kafka.transport import FramedTransport from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \ BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer, BaseStatsLogStream -from twisted.internet.task import LoopingCall -from traceback import format_tb from os.path import join as os_path_join - DEFAULT_BATCH_SIZE = 1024 * 1024 DEFAULT_BUFFER_MEMORY = 130 * 1024 * 1024 DEFAULT_MAX_REQUEST_SIZE = 4 * 1024 * 1024 +MAX_SEGMENT_SIZE = int(DEFAULT_MAX_REQUEST_SIZE * 0.95) logger = getLogger("messagebus.kafka") @@ -59,13 +57,16 @@ def __init__(self, location, enable_ssl, cert_path, topic, group, partition_id): else: self._partitions = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)] self._consumer.subscribe(topics=[self._topic]) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) def get_messages(self, timeout=0.1, count=1): result = [] while count > 0: try: - m = next(self._consumer) - result.append(m.value) + kafka_msg = next(self._consumer) + msg = self._transport.read(kafka_msg) + if msg is not None: + result.append(msg) count -= 1 except StopIteration: break @@ -89,18 +90,21 @@ def __init__(self, location, enable_ssl, cert_path, topic, compression, **kwargs self._compression = compression self._create(enable_ssl, cert_path, **kwargs) + def _create(self, enable_ssl, cert_path, **kwargs): - max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {}) self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5, compression_type=self._compression, - max_request_size=max_request_size, + max_request_size=DEFAULT_MAX_REQUEST_SIZE, **kwargs) + def send(self, key, *messages): for msg in messages: - self._producer.send(self._topic, value=msg) + for kafka_msg in self._transport.write(key, msg): + self._producer.send(self._topic, value=kafka_msg) def flush(self): self._producer.flush() @@ -115,18 +119,19 @@ def __init__(self, location, enable_ssl, cert_path, topic_done, partitioner, com self._topic_done = topic_done self._partitioner = partitioner self._compression = compression - max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE) kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {}) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5, compression_type=self._compression, - max_request_size=max_request_size, + max_request_size=DEFAULT_MAX_REQUEST_SIZE, **kwargs) def send(self, key, *messages): for msg in messages: - self._producer.send(self._topic_done, key=key, value=msg) + for kafka_msg in self._transport.write(key, msg): + self._producer.send(self._topic_done, key=key, value=kafka_msg) def flush(self): self._producer.flush() diff --git a/tests/test_framed_transport.py b/tests/test_framed_transport.py new file mode 100644 index 000000000..9d0c2f76a --- /dev/null +++ b/tests/test_framed_transport.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +from frontera.contrib.messagebus.kafka.transport import FramedTransport +import random +import string +from collections import namedtuple +import unittest + +KafkaMessage = namedtuple("KafkaMessage", ['key', 'value']) + + +def get_blob(size): + s = ''.join(random.choice(string.ascii_letters) for x in range(size)) + return s.encode("latin1") + + +class TestFramedTransport(unittest.TestCase): + def setUp(self): + self.transport = FramedTransport(32768) + + def test_big_message(self): + test_msg = get_blob(1000000) + assert len(test_msg) == 1000000 + framed_msgs = [m for m in self.transport.write(b"key", test_msg)] + assert len(framed_msgs) == 31 + + random.shuffle(framed_msgs) + + for i, msg in enumerate(framed_msgs): + km = KafkaMessage(key=b"key", value=msg) + result = self.transport.read(km) + if i < len(framed_msgs) - 1: + assert result is None + assert result == test_msg # the last one is triggering msg assembling + + def test_common_message(self): + test_msg = get_blob(4096) + framed_msgs = [m for m in self.transport.write(b"key", test_msg)] + assert len(framed_msgs) == 1 + + km = KafkaMessage(key=b"key", value=framed_msgs[0]) + result = self.transport.read(km) + assert result == test_msg diff --git a/tests/test_message_bus.py b/tests/test_message_bus.py index 5293c2445..ac6ff0c8b 100644 --- a/tests/test_message_bus.py +++ b/tests/test_message_bus.py @@ -5,8 +5,9 @@ from frontera.contrib.messagebus.kafkabus import MessageBus as KafkaMessageBus from frontera.utils.fingerprint import sha1 from kafka import KafkaClient -from random import randint +from random import randint, choice from time import sleep +import string from six.moves import range import logging from sys import stdout @@ -14,6 +15,11 @@ from w3lib.util import to_bytes +def get_blob(size): + s = ''.join(choice(string.ascii_letters) for x in range(size)) + return s.encode("latin1") + + class MessageBusTester(object): def __init__(self, cls, settings=Settings()): settings.set('SPIDER_FEED_PARTITIONS', 1) @@ -119,7 +125,8 @@ def close(self): class KafkaMessageBusTest(unittest.TestCase): def setUp(self): logging.basicConfig() - handler = logging.StreamHandler(stdout) + #handler = logging.StreamHandler(stdout) + handler = logging.FileHandler("kafka-debug.log") logger = logging.getLogger("kafka") logger.setLevel(logging.INFO) logger.addHandler(handler) @@ -177,7 +184,8 @@ def spider_log_activity(self, messages): if i % 2 == 0: self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://helloworld.com/way/to/the/sun/' + b'0') else: - self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://way.to.the.sun' + b'0') + msg = b'http://way.to.the.sun' + b'0' if i != messages - 1 else get_blob(10485760) + self.sp_sl_p.send(sha1(str(randint(1, 1000))), msg) self.sp_sl_p.flush() self.logger.debug("spider log activity finished") @@ -190,12 +198,17 @@ def spider_feed_activity(self): def sw_activity(self): c = 0 p = 0 + big_message_passed = False for m in self.sw_sl_c.get_messages(timeout=0.1, count=512): if m.startswith(b'http://helloworld.com/'): p += 1 self.sw_us_p.send(None, b'message' + b'0' + b"," + to_bytes(str(c))) + else: + if len(m) == 10485760: + big_message_passed = True c += 1 assert p > 0 + assert big_message_passed return c def db_activity(self, messages): @@ -218,8 +231,8 @@ def db_activity(self, messages): def test_integration(self): self.spider_log_activity(64) assert self.sw_activity() == 64 - assert self.db_activity(128) == (64, 32) - assert self.spider_feed_activity() == 128 + #assert self.db_activity(128) == (64, 32) + #assert self.spider_feed_activity() == 128 class IPv6MessageBusTester(MessageBusTester): From d8b484c0acb313a8b4321c47277365d5eea653b8 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 25 Jul 2018 12:30:35 +0500 Subject: [PATCH 2/7] uncommenting --- tests/test_message_bus.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_message_bus.py b/tests/test_message_bus.py index ac6ff0c8b..c78d5393d 100644 --- a/tests/test_message_bus.py +++ b/tests/test_message_bus.py @@ -231,8 +231,8 @@ def db_activity(self, messages): def test_integration(self): self.spider_log_activity(64) assert self.sw_activity() == 64 - #assert self.db_activity(128) == (64, 32) - #assert self.spider_feed_activity() == 128 + assert self.db_activity(128) == (64, 32) + assert self.spider_feed_activity() == 128 class IPv6MessageBusTester(MessageBusTester): From 5234fd7d271ecef579b324de288f134ed1358355 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 25 Jul 2018 12:36:47 +0500 Subject: [PATCH 3/7] style --- frontera/contrib/messagebus/kafka/transport.py | 2 +- frontera/contrib/messagebus/kafkabus.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/frontera/contrib/messagebus/kafka/transport.py b/frontera/contrib/messagebus/kafka/transport.py index 3707093bf..6d7d24ccc 100644 --- a/frontera/contrib/messagebus/kafka/transport.py +++ b/frontera/contrib/messagebus/kafka/transport.py @@ -29,7 +29,7 @@ def read(self, kafka_msg): self.buffer[msg_key] = buffer buffer[seg_id] = frame if len(buffer) == seg_count: - msgs = [buffer[seg_id][3] for seg_id in sorted(buffer.keys())] + msgs = [buffer[_seg_id][3] for _seg_id in sorted(buffer.keys())] final_msg = b''.join(msgs) del self.buffer[msg_key] return final_msg diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 2932396fb..6140b7ce4 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -90,7 +90,6 @@ def __init__(self, location, enable_ssl, cert_path, topic, compression, **kwargs self._compression = compression self._create(enable_ssl, cert_path, **kwargs) - def _create(self, enable_ssl, cert_path, **kwargs): self._transport = FramedTransport(MAX_SEGMENT_SIZE) kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {}) @@ -100,7 +99,6 @@ def _create(self, enable_ssl, cert_path, **kwargs): max_request_size=DEFAULT_MAX_REQUEST_SIZE, **kwargs) - def send(self, key, *messages): for msg in messages: for kafka_msg in self._transport.write(key, msg): From 65e236e258acc04a6efc17c46823c7a2272f68c1 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 25 Jul 2018 13:17:33 +0500 Subject: [PATCH 4/7] setting max message size to 4Mb --- tests/kafka/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/kafka/docker-compose.yml b/tests/kafka/docker-compose.yml index f9b7a5e5f..e630fae8e 100644 --- a/tests/kafka/docker-compose.yml +++ b/tests/kafka/docker-compose.yml @@ -12,5 +12,6 @@ services: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_MESSAGE_MAX_BYTES: 4194304 volumes: - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file From 3949dd79879ebe62309ae5cc765fde8202eca420 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 25 Jul 2018 13:27:27 +0500 Subject: [PATCH 5/7] setting kafka version to 0.11.0.2 --- tests/kafka/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafka/docker-compose.yml b/tests/kafka/docker-compose.yml index e630fae8e..0dcaf1c01 100644 --- a/tests/kafka/docker-compose.yml +++ b/tests/kafka/docker-compose.yml @@ -5,7 +5,7 @@ services: ports: - "2181:2181" kafka: - image: wurstmeister/kafka + image: wurstmeister/kafka:2.11-0.11.0.2 ports: - "9092:9092" environment: From 18dde04851293be1e410f90e28dd3feec31a048c Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Fri, 27 Jul 2018 17:39:20 +0500 Subject: [PATCH 6/7] creating topics --- tests/kafka/docker-compose.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/kafka/docker-compose.yml b/tests/kafka/docker-compose.yml index 0dcaf1c01..1f0dd710a 100644 --- a/tests/kafka/docker-compose.yml +++ b/tests/kafka/docker-compose.yml @@ -10,8 +10,9 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - KAFKA_CREATE_TOPICS: "test:1:1" + KAFKA_CREATE_TOPICS: "frontier-todo:1:1,frontier-done:1:1,frontier-score:4:1,frontier-stats:4:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_MESSAGE_MAX_BYTES: 4194304 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" volumes: - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file From f9d842df5205b2f425c218d016d0c1a0dee4f4d6 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Fri, 27 Jul 2018 17:51:41 +0500 Subject: [PATCH 7/7] trying kafka 2.11-0.10.2.1 --- tests/kafka/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafka/docker-compose.yml b/tests/kafka/docker-compose.yml index 1f0dd710a..b06962b63 100644 --- a/tests/kafka/docker-compose.yml +++ b/tests/kafka/docker-compose.yml @@ -5,7 +5,7 @@ services: ports: - "2181:2181" kafka: - image: wurstmeister/kafka:2.11-0.11.0.2 + image: wurstmeister/kafka:2.11-0.10.2.1 ports: - "9092:9092" environment: