Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sqlalchemy queue component partition_id #353

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ Default: ``5.0``
(in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered
overused. This affects only Scrapy scheduler."

.. setting:: QUEUE_HOSTNAME_PARTITIONING

QUEUE_HOSTNAME_PARTITIONING
--------------------

Default: ``False``

Wheter to use the hostname as a partitioning scheme or not (uses the fingerprint as default).

.. setting:: REQUEST_MODEL

REQUEST_MODEL
Expand Down
2 changes: 1 addition & 1 deletion frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _init_db_worker(self, manager):
self.check_and_create_tables(drop, clear_content, (metadata_m, queue_m))
self._metadata = Metadata(self.session_cls, metadata_m,
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
self._queue = Queue(self.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
self._queue = Queue(self.session_cls, queue_m, settings)

@classmethod
def strategy_worker(cls, manager):
Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/backends/sqlalchemy/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ def flush(self):


class Queue(BaseQueue):
def __init__(self, session_cls, queue_cls, partitions, ordering='default'):
def __init__(self, session_cls, queue_cls, settings, ordering='default'):
partitions = settings.get('SPIDER_FEED_PARTITIONS')
self.session = session_cls()
self.queue_model = queue_cls
self.logger = logging.getLogger("sqlalchemy.queue")
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.ordering = ordering

def frontier_stop(self):
Expand Down Expand Up @@ -202,7 +204,8 @@ def schedule(self, batch):
partition_id = self.partitions[0]
host_crc32 = 0
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
partition_key = hostname if self.hostname_partitioning else to_native_str(fprint)
partition_id = self.partitioner.partition(partition_key, self.partitions)
host_crc32 = get_crc32(hostname)
q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta,
headers=request.headers, cookies=request.cookies, method=to_native_str(request.method),
Expand Down
8 changes: 6 additions & 2 deletions tests/contrib/backends/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from frontera.core.components import States
from frontera.core.models import Request
from frontera import Settings
from happybase import Connection
from frontera.contrib.backends.hbase import HBaseState, HBaseQueue
from frontera.contrib.backends.sqlalchemy import States as SQLAlchemyStates, Queue as SQLAlchemyQueue
Expand Down Expand Up @@ -88,11 +89,14 @@ def queue(request):
return

if request.param == "sqlalchemy":
settings = Settings()
settings.SPIDER_FEED_PARTITIONS = 2
settings.QUEUE_HOSTNAME_PARTITIONING = True
engine = create_engine('sqlite:///:memory:', echo=False)
session_cls = sessionmaker()
session_cls.configure(bind=engine)
QueueModel.__table__.create(bind=engine)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, 2)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, settings)
yield sqla_queue
sqla_queue.frontier_stop()
engine.dispose()
Expand All @@ -114,4 +118,4 @@ def test_queue(queue):
assert set([r.url for r in queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r3.url])
assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r1.url, r2.url])
max_requests_per_host=10)]) == set([r1.url, r2.url])