Skip to content

Commit

Permalink
Fix sqlalchemy queue component partition_id
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbalarini committed Mar 25, 2019
1 parent e1a4ca9 commit c36b645
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
9 changes: 9 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ Default: ``5.0``
(in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered
overused. This affects only Scrapy scheduler."

.. setting:: QUEUE_HOSTNAME_PARTITIONING

QUEUE_HOSTNAME_PARTITIONING
--------------------

Default: ``False``

Wheter to use the hostname as a partitioning scheme or not (uses the fingerprint as default).

.. setting:: REQUEST_MODEL

REQUEST_MODEL
Expand Down
2 changes: 1 addition & 1 deletion frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _init_db_worker(self, manager):
self.check_and_create_tables(drop, clear_content, (metadata_m, queue_m))
self._metadata = Metadata(self.session_cls, metadata_m,
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
self._queue = Queue(self.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
self._queue = Queue(self.session_cls, queue_m, settings)

@classmethod
def strategy_worker(cls, manager):
Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/backends/sqlalchemy/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ def flush(self):


class Queue(BaseQueue):
def __init__(self, session_cls, queue_cls, partitions, ordering='default'):
def __init__(self, session_cls, queue_cls, settings, ordering='default'):
partitions = settings.get('SPIDER_FEED_PARTITIONS')
self.session = session_cls()
self.queue_model = queue_cls
self.logger = logging.getLogger("sqlalchemy.queue")
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.ordering = ordering

def frontier_stop(self):
Expand Down Expand Up @@ -202,7 +204,8 @@ def schedule(self, batch):
partition_id = self.partitions[0]
host_crc32 = 0
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
partition_key = hostname if self.hostname_partitioning else to_native_str(fprint)
partition_id = self.partitioner.partition(partition_key, self.partitions)
host_crc32 = get_crc32(hostname)
q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta,
headers=request.headers, cookies=request.cookies, method=to_native_str(request.method),
Expand Down
8 changes: 6 additions & 2 deletions tests/contrib/backends/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from frontera.core.components import States
from frontera.core.models import Request
from frontera import Settings
from happybase import Connection
from frontera.contrib.backends.hbase import HBaseState, HBaseQueue
from frontera.contrib.backends.sqlalchemy import States as SQLAlchemyStates, Queue as SQLAlchemyQueue
Expand Down Expand Up @@ -88,11 +89,14 @@ def queue(request):
return

if request.param == "sqlalchemy":
settings = Settings()
settings.SPIDER_FEED_PARTITIONS = 2
settings.QUEUE_HOSTNAME_PARTITIONING = True
engine = create_engine('sqlite:///:memory:', echo=False)
session_cls = sessionmaker()
session_cls.configure(bind=engine)
QueueModel.__table__.create(bind=engine)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, 2)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, settings)
yield sqla_queue
sqla_queue.frontier_stop()
engine.dispose()
Expand All @@ -114,4 +118,4 @@ def test_queue(queue):
assert set([r.url for r in queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r3.url])
assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r1.url, r2.url])
max_requests_per_host=10)]) == set([r1.url, r2.url])

0 comments on commit c36b645

Please sign in to comment.