diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index d9db1c7cc..51564c04c 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -310,6 +310,15 @@ Default: ``5.0`` (in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered overused. This affects only Scrapy scheduler." +.. setting:: QUEUE_HOSTNAME_PARTITIONING + +QUEUE_HOSTNAME_PARTITIONING +-------------------- + +Default: ``False`` + +Wheter to use the hostname as a partitioning scheme or not (uses the fingerprint as default). + .. setting:: REQUEST_MODEL REQUEST_MODEL diff --git a/frontera/contrib/backends/sqlalchemy/__init__.py b/frontera/contrib/backends/sqlalchemy/__init__.py index 9b46d2efe..a37b711c4 100644 --- a/frontera/contrib/backends/sqlalchemy/__init__.py +++ b/frontera/contrib/backends/sqlalchemy/__init__.py @@ -58,7 +58,7 @@ def _init_db_worker(self, manager): self.check_and_create_tables(drop, clear_content, (metadata_m, queue_m)) self._metadata = Metadata(self.session_cls, metadata_m, settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) - self._queue = Queue(self.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) + self._queue = Queue(self.session_cls, queue_m, settings) @classmethod def strategy_worker(cls, manager): diff --git a/frontera/contrib/backends/sqlalchemy/components.py b/frontera/contrib/backends/sqlalchemy/components.py index 160cf7a2d..9d4eaeab0 100644 --- a/frontera/contrib/backends/sqlalchemy/components.py +++ b/frontera/contrib/backends/sqlalchemy/components.py @@ -148,12 +148,14 @@ def flush(self): class Queue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions, ordering='default'): + def __init__(self, session_cls, queue_cls, settings, ordering='default'): + partitions = settings.get('SPIDER_FEED_PARTITIONS') self.session = session_cls() self.queue_model = queue_cls self.logger = logging.getLogger("sqlalchemy.queue") self.partitions = [i for i in range(0, partitions)] self.partitioner = Crc32NamePartitioner(self.partitions) + self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') self.ordering = ordering def frontier_stop(self): @@ -202,7 +204,8 @@ def schedule(self, batch): partition_id = self.partitions[0] host_crc32 = 0 else: - partition_id = self.partitioner.partition(hostname, self.partitions) + partition_key = hostname if self.hostname_partitioning else to_native_str(fprint) + partition_id = self.partitioner.partition(partition_key, self.partitions) host_crc32 = get_crc32(hostname) q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta, headers=request.headers, cookies=request.cookies, method=to_native_str(request.method), diff --git a/tests/contrib/backends/test_backends.py b/tests/contrib/backends/test_backends.py index dfdc3b798..cce5c951a 100644 --- a/tests/contrib/backends/test_backends.py +++ b/tests/contrib/backends/test_backends.py @@ -1,6 +1,7 @@ import pytest from frontera.core.components import States from frontera.core.models import Request +from frontera import Settings from happybase import Connection from frontera.contrib.backends.hbase import HBaseState, HBaseQueue from frontera.contrib.backends.sqlalchemy import States as SQLAlchemyStates, Queue as SQLAlchemyQueue @@ -88,11 +89,14 @@ def queue(request): return if request.param == "sqlalchemy": + settings = Settings() + settings.SPIDER_FEED_PARTITIONS = 2 + settings.QUEUE_HOSTNAME_PARTITIONING = True engine = create_engine('sqlite:///:memory:', echo=False) session_cls = sessionmaker() session_cls.configure(bind=engine) QueueModel.__table__.create(bind=engine) - sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, 2) + sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, settings) yield sqla_queue sqla_queue.frontier_stop() engine.dispose() @@ -114,4 +118,4 @@ def test_queue(queue): assert set([r.url for r in queue.get_next_requests(10, 0, min_requests=3, min_hosts=1, max_requests_per_host=10)]) == set([r3.url]) assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1, - max_requests_per_host=10)]) == set([r1.url, r2.url]) \ No newline at end of file + max_requests_per_host=10)]) == set([r1.url, r2.url])