Skip to content

Commit

Permalink
Ensure partitioning is consistent accross message bus and queue
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jun 21, 2017
1 parent 1dec22f commit 1da2d73
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 130 deletions.
3 changes: 2 additions & 1 deletion docs/source/topics/cluster-setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ a common modules and import settings from it in component's modules.
'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware'
])

QUEUE_HOSTNAME_PARTITIONING = True

SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port
SCORING_TOPIC = 'frontier-scoring'
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'
Expand Down
22 changes: 22 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,28 @@ Default: ``0``

Per-spider setting, pointing spider to it's assigned partition.

.. setting:: SPIDER_FEED_PARTITIONER

SPIDER_LOG_PARTITIONER
-----------------------

Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``

Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of extracted links to the
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.

.. setting:: SPIDER_FEED_PARTITIONER

SPIDER_FEED_PARTITIONER
-----------------------

Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``

Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of requests to the
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.

.. setting:: STATE_CACHE_SIZE

STATE_CACHE_SIZE
Expand Down
4 changes: 2 additions & 2 deletions examples/cluster/bc/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
#--------------------------------------------------------
# Crawl frontier backend
#--------------------------------------------------------
QUEUE_HOSTNAME_PARTITIONING = True
SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'

#MESSAGE_BUS='frontera.contrib.messagebus.kafkabus.MessageBus'
#KAFKA_LOCATION = 'localhost:9092'
#SCORING_GROUP = 'scrapy-scoring'
#SCORING_TOPIC = 'frontier-score'
#SCORING_TOPIC = 'frontier-score'
26 changes: 11 additions & 15 deletions frontera/contrib/backends/hbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from frontera.core.components import Metadata, Queue, States
from frontera.core.models import Request
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
from frontera.utils.misc import chunks, get_crc32
from frontera.utils.misc import chunks, get_crc32, load_object
from frontera.contrib.backends.remote.codecs.msgpack import Decoder, Encoder

from happybase import Connection
Expand Down Expand Up @@ -66,10 +66,9 @@ class HBaseQueue(Queue):

GET_RETRIES = 3

def __init__(self, connection, partitions, table_name, drop=False):
def __init__(self, connection, partitioner, table_name, drop=False):
self.connection = connection
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.partitioner = partitioner
self.logger = logging.getLogger("hbase.queue")
self.table_name = to_bytes(table_name)

Expand Down Expand Up @@ -141,14 +140,9 @@ def get_interval(score, resolution):
for request, score in batch:
domain = request.meta[b'domain']
fingerprint = request.meta[b'fingerprint']
if type(domain) == dict:
partition_id = self.partitioner.partition(domain[b'name'], self.partitions)
host_crc32 = get_crc32(domain[b'name'])
elif type(domain) == int:
partition_id = self.partitioner.partition_by_hash(domain, self.partitions)
host_crc32 = domain
else:
raise TypeError("domain of unknown type.")
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
host_crc32 = domain if type(domain) == int else get_crc32(key)
item = (unhexlify(fingerprint), host_crc32, self.encoder.encode_request(request), score)
score = 1 - score # because of lexicographical sort in HBase
rk = "%d_%s_%d" % (partition_id, "%0.2f_%0.2f" % get_interval(score, 0.01), random_str)
Expand Down Expand Up @@ -404,7 +398,9 @@ def __init__(self, manager):
self._min_hosts = settings.get('BC_MIN_HOSTS')
self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST')

self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS')
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self.partitioner = partitioner_cls(partitions)
host = choice(hosts) if type(hosts) in [list, tuple] else hosts
kwargs = {
'host': host,
Expand Down Expand Up @@ -435,7 +431,7 @@ def db_worker(cls, manager):
o = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('HBASE_DROP_ALL_TABLES')
o._queue = HBaseQueue(o.connection, o.queue_partitions,
o._queue = HBaseQueue(o.connection, o.partitioner,
settings.get('HBASE_QUEUE_TABLE'), drop=drop_all_tables)
o._metadata = HBaseMetadata(o.connection, settings.get('HBASE_METADATA_TABLE'), drop_all_tables,
settings.get('HBASE_USE_SNAPPY'), settings.get('HBASE_BATCH_SIZE'),
Expand Down Expand Up @@ -484,7 +480,7 @@ def get_next_requests(self, max_next_requests, **kwargs):
next_pages = []
self.logger.debug("Querying queue table.")
partitions = set(kwargs.pop('partitions', []))
for partition_id in range(0, self.queue_partitions):
for partition_id in self.partitioner.partitions:
if partition_id not in partitions:
continue
results = self.queue.get_next_requests(max_next_requests, partition_id,
Expand Down
47 changes: 20 additions & 27 deletions frontera/contrib/backends/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from frontera.core.components import Metadata, Queue, States
from frontera.core import OverusedBuffer
from frontera.utils.heap import Heap
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
from frontera.utils.url import parse_domain_from_url_fast
from frontera.utils.misc import load_object
import six
from six.moves import map
from six.moves import range
Expand Down Expand Up @@ -52,12 +52,11 @@ def update_score(self, batch):


class MemoryQueue(Queue):
def __init__(self, partitions):
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
def __init__(self, partitioner):
self.partitioner = partitioner
self.logger = logging.getLogger("memory.queue")
self.heap = {}
for partition in self.partitions:
for partition in self.partitioner.partitions:
self.heap[partition] = Heap(self._compare_pages)

def count(self):
Expand All @@ -70,31 +69,26 @@ def schedule(self, batch):
for fprint, score, request, schedule in batch:
if schedule:
request.meta[b'_scr'] = score
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
if not hostname:
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
partition_id = self.partitions[0]
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
self.heap[partition_id].push(request)

def _compare_pages(self, first, second):
return cmp(first.meta[b'_scr'], second.meta[b'_scr'])


class MemoryDequeQueue(Queue):
def __init__(self, partitions, is_fifo=True):
def __init__(self, partitioner, is_fifo=True):
"""
Deque-based queue (see collections module). Efficient queue for LIFO and FIFO strategies.
:param partitions: int count of partitions
:param partitioner: Partitioner
:param type: bool, True for FIFO, False for LIFO
"""
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.partitioner = partitioner
self.logger = logging.getLogger("memory.dequequeue")
self.queues = {}
self.is_fifo = is_fifo
for partition in self.partitions:
for partition in self.partitioner.partitions:
self.queues[partition] = deque()

def count(self):
Expand All @@ -112,12 +106,8 @@ def schedule(self, batch):
for fprint, score, request, schedule in batch:
if schedule:
request.meta[b'_scr'] = score
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
if not hostname:
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
partition_id = self.partitions[0]
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
self.queues[partition_id].append(request)


Expand Down Expand Up @@ -165,6 +155,9 @@ def __init__(self, manager):
settings = manager.settings
self._metadata = MemoryMetadata()
self._states = MemoryStates(settings.get("STATE_CACHE_SIZE"))
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self._partitioner = partitioner_cls(partitions)
self._queue = self._create_queue(settings)
self._id = 0

Expand Down Expand Up @@ -222,27 +215,27 @@ def _compare_pages(self, first, second):

class MemoryFIFOBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryDequeQueue(self._partitioner)


class MemoryLIFOBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'), is_fifo=False)
return MemoryDequeQueue(self._partitioner, is_fifo=False)


class MemoryDFSBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryDFSQueue(self._partitioner)


class MemoryBFSBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryBFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryBFSQueue(self._partitioner)


class MemoryRandomBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryRandomQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryRandomQueue(self._partitioner)


class MemoryDFSOverusedBackend(MemoryDFSBackend):
Expand Down
36 changes: 29 additions & 7 deletions frontera/contrib/backends/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,43 @@

from frontera.core.components import Partitioner
from frontera.utils.misc import get_crc32
from frontera.utils.url import parse_domain_from_url_fast


class Crc32NamePartitioner(Partitioner):
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
if key is None:
return self.partitions[0]
value = get_crc32(key)
return self.partition_by_hash(value, partitions if partitions else self.partitions)
return partitions[0]
elif type(key) == int:
value = key
else:
value = get_crc32(key)
return self.partition_by_hash(value, partitions)

def partition_by_hash(self, value, partitions):
size = len(partitions)
idx = value % size
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)
@staticmethod
def get_key(request):
domain = request.meta.get(b'domain')
if domain is not None:
if type(domain) == dict:
return domain[b'name']
elif type(domain) == int:
return domain
else:
raise TypeError("domain of unknown type.")

try:
_, name, _, _, _, _ = parse_domain_from_url_fast(request.url)
except Exception:
return None
else:
return name.encode('utf-8', 'ignore')


class FingerprintPartitioner(Partitioner):
Expand All @@ -32,5 +53,6 @@ def partition(self, key, partitions=None):
idx = value[0] % len(partitions)
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)
@staticmethod
def get_key(request):
return request.meta[b'fingerprint']
17 changes: 11 additions & 6 deletions frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(self, manager):
session.execute(table.delete())
session.commit()
session.close()

partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self.partitioner = partitioner_cls(partitions)

self._metadata = Metadata(self.session_cls, self.models['MetadataModel'],
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
self._states = States(self.session_cls, self.models['StateModel'],
Expand All @@ -48,7 +53,7 @@ def frontier_stop(self):
self.engine.dispose()

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

@property
def queue(self):
Expand All @@ -67,23 +72,23 @@ class FIFOBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy FIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
ordering='created')


class LIFOBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy LIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
ordering='created_desc')


class DFSBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy DFS Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

def _get_score(self, obj):
return -obj.meta[b'depth']
Expand All @@ -93,7 +98,7 @@ class BFSBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy BFS Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

def _get_score(self, obj):
return obj.meta[b'depth']
Expand Down Expand Up @@ -170,7 +175,7 @@ def db_worker(cls, manager):

b._metadata = Metadata(b.session_cls, metadata_m,
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
b._queue = Queue(b.session_cls, queue_m, b.partitioner)
return b

@property
Expand Down
Loading

0 comments on commit 1da2d73

Please sign in to comment.