diff --git a/.gitignore b/.gitignore index 287b0b569..e74280063 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,6 @@ docs/_build/ # PyBuilder target/ + +# PyCharm Idea Folder +.idea/ \ No newline at end of file diff --git a/docs/README b/docs/README index 3d9114563..fd04c32f5 100644 --- a/docs/README +++ b/docs/README @@ -30,6 +30,9 @@ from this dir:: Documentation will be generated (in HTML format) inside the ``build/html`` dir. +If you get the error "ImportError: No module named sphinx_rtd_theme" run: + sudo pip install sphinx-rtd-theme + View the documentation ---------------------- diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 13ee93318..28d105bd7 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -437,6 +437,96 @@ Default: ``timedelta(days=1)`` Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect documents scheduled after the change. All previously queued documents will be crawled with old periodicity. +.. _cassandra-settings: + +Cassandra +--------- + + +.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES + +CASSANDRABACKEND_DROP_ALL_TABLES +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``False`` + +Set to ``True`` if you need to drop of all DB tables on backend instantiation (e.g. every Scrapy spider run). + +.. setting:: SQLALCHEMYBACKEND_ENGINE + +CASSANDRABACKEND_CLUSTER_IPS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``['127.0.0.1']`` + +Set IPs from Cassandra Cluster. Default is localhost. To assign more than one IP use this Syntax: ``['192.168.0.1', '192.168.0.2']`` + +CASSANDRABACKEND_CLUSTER_PORT +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``9042`` + +Set port from Cassandra Cluster / Nodes + + +CASSANDRABACKEND_GENERATE_STATS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``False`` + +Set this to true if you want to create an extra Table for stats collection. In this table there will be pages crawled, links queued etv. counted up. + + +CASSANDRABACKEND_KEYSPACE +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``frontera`` + +Set cassandra Keyspace + +CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``True`` + +Creates Keyspace if it not exist. Set to false if you frontera shouldn't check on every startup. + + +CASSANDRABACKEND_CRAWL_ID +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``default`` + +Sets an ID in each table for the actual crawl. If you want to run another crawl from begining in same Table set to another Crawl ID. Its an Text field. + + +CASSANDRABACKEND_MODELS +^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: + + { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel' + } + +This is mapping with Cassandra models used by backends. It is mainly used for customization. + + +Revisiting backend +------------------ + +.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL + +CASSANDRABACKEND_REVISIT_INTERVAL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``timedelta(days=1)`` + +Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect +documents scheduled after the change. All previously queued documents will be crawled with old periodicity. .. _hbase-settings: diff --git a/docs/source/topics/frontier-backends.rst b/docs/source/topics/frontier-backends.rst index 37c67bf68..f02af4c9b 100644 --- a/docs/source/topics/frontier-backends.rst +++ b/docs/source/topics/frontier-backends.rst @@ -266,6 +266,34 @@ Current implementation of revisiting backend has no prioritization. During long there are no documents available for crawling, but there are documents waiting for their scheduled revisit time. +.. _frontier-backends-cassandra: + +Cassandra backends +^^^^^^^^^^^^^^^^^^ + +This set of :class:`Backend ` objects will use `Cassandra`_ as storage for +:ref:`basic algorithms `. + +Cassandra is a NoSQL Colum-Store Database with Linear scalability and a SQL-Like Query Language. + +If you need to use your own `declarative cassandra models`_, you can do it by using the +:setting:`CASSANDRABACKEND_MODELS` setting. + +This setting uses a dictionary where ``key`` represents the name of the model to define and ``value`` the model to use. + +For a complete list of all settings used for Cassandra backends check the :doc:`settings ` section. + +.. class:: frontera.contrib.backends.cassandra.BASE + + Base class for Cassandra :class:`Backend ` objects. + It runs cassandra in multi-spider one worker mode with the FIFO algorithm. + +.. class:: frontera.contrib.backends.cassandra.Distributed + + Cassandra :class:`Backend ` implementation of the distributed Backend. + + + HBase backend ^^^^^^^^^^^^^ diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py new file mode 100644 index 000000000..ff4134f6f --- /dev/null +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -0,0 +1,199 @@ +from __future__ import absolute_import +from cassandra.cluster import Cluster +from cassandra.cqlengine import connection +from cassandra.query import dict_factory +from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy +from cassandra.cqlengine.management import sync_table +from cassandra.cqlengine.management import drop_table +from frontera.core.components import DistributedBackend +from frontera.contrib.backends import CommonBackend +from frontera.contrib.backends.cassandra.components import Metadata, Queue, States +from frontera.utils.misc import load_object +import logging + + +class CassandraBackend(CommonBackend): + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') + models = settings.get('CASSANDRABACKEND_MODELS') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') + generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') + + self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) + + self.cluster = Cluster( + contact_points=cluster_ips, + port=cluster_port, + compression=True, + default_retry_policy=RetryPolicy(), + reconnection_policy=ConstantReconnectionPolicy(10, 100) + ) + + self.session = self.cluster.connect() + self.session.row_factory = dict_factory + self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection + self.crawl_id = crawl_id + self.generate_stats = generate_stats + + if keyspace_create: + query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" + WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) + self.session.execute(query) + + self.session.set_keyspace(keyspace) + + connection.set_session(self.session) + + if drop_all_tables: + for key, value in self.models.iteritems(): + drop_table(value) + + for key, value in self.models.iteritems(): + if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats is True: + sync_table(value) + + self._metadata = Metadata(self.session, self.models['MetadataModel'], self.crawl_id, self.generate_stats) + self._states = States(self.session, self.models['StateModel'], + settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id) + self._queue = self._create_queue(settings) + + def frontier_stop(self): + self.states.flush() + self.session.shutdown() + + def _create_queue(self, settings): + return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + self.crawl_id, self.generate_stats) + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + +BASE = CassandraBackend + + +class Distributed(DistributedBackend): + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true + models = settings.get('CASSANDRABACKEND_MODELS') + + self.cluster = Cluster(cluster_ips, cluster_port) + self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) + + self.session = self.cluster.connect() + self.session.row_factory = dict_factory + + if keyspace_create: + query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" + WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) + self.session.execute(query) + self.session.set_keyspace(keyspace) + connection.set_session(self.session) + + self._metadata = None + self._queue = None + self._states = None + + @classmethod + def strategy_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') + model = b.models['StateModel'] + + if drop_all_tables: + drop_table(model) + + sync_table(model) + + b._states = States(b.session, model, + settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id) + return b + + @classmethod + def db_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') + generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') + + metadata_m = b.models['MetadataModel'] + queue_m = b.models['QueueModel'] + stats_m = b.models['CrawlStatsModel'] + if drop: + drop_table(metadata_m) + drop_table(queue_m) + drop_table(stats_m) + + sync_table(metadata_m) + sync_table(queue_m) + if generate_stats is True: + sync_table(stats_m) + + b._metadata = Metadata(b.session, metadata_m, crawl_id, generate_stats) + b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id, generate_stats) + return b + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + + def frontier_start(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_start() + + def frontier_stop(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_stop() + + def add_seeds(self, seeds): + self.metadata.add_seeds(seeds) + + def get_next_requests(self, max_next_requests, **kwargs): + partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions + batch = [] + for partition_id in partitions: + batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) + return batch + + def page_crawled(self, response, links): + self.metadata.page_crawled(response, links) + + def request_error(self, request, error): + self.metadata.request_error(request, error) + + def finished(self): + return NotImplementedError + + diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py new file mode 100644 index 000000000..9584a8629 --- /dev/null +++ b/frontera/contrib/backends/cassandra/components.py @@ -0,0 +1,314 @@ +# -*- coding: utf-8 -*- +import logging +from datetime import datetime +from time import time +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.contrib.backends.memory import MemoryStates +from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue +from frontera.core.models import Request, Response +from frontera.utils.misc import get_crc32, chunks +from frontera.utils.url import parse_domain_from_url_fast +from cassandra.concurrent import execute_concurrent_with_args +from frontera.contrib.backends.cassandra.models import Meta + + +class Metadata(BaseMetadata): + def __init__(self, session, model_cls, crawl_id, generate_stats): + self.session = session + self.model = model_cls + self.table = 'MetadataModel' + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") + self.crawl_id = crawl_id + self.generate_stats = generate_stats + self.counter_cls = CassandraCount(crawl_id, self.session, generate_stats) + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + cql_items = [] + for seed in seeds: + query = self.session.prepare( + "INSERT INTO metadata (crawl, fingerprint, url, created_at, meta, headers, cookies, method, depth) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") + meta = Meta(domain=seed.meta['domain'], fingerprint=seed.meta['fingerprint'], + origin_is_frontier=seed.meta['origin_is_frontier'], + scrapy_callback=seed.meta['scrapy_callback'], scrapy_errback=seed.meta['scrapy_errback'], + scrapy_meta=seed.meta['scrapy_meta']) + cql_i = (self.crawl_id, seed.meta['fingerprint'], seed.url, datetime.utcnow(), meta, + seed.headers, seed.cookies, seed.method, 0) + cql_items.append(cql_i) + if len(seeds) > 0: + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.counter_cls.cass_count({"seed_urls": len(seeds)}) + + def request_error(self, page, error): + query_page = self.session.prepare( + "UPDATE metadata SET error = ? WHERE crawl = ? AND fingerprint = ?") + self.session.execute(query_page, (error, self.crawl_id, page.meta['fingerprint'])) + self.counter_cls.cass_count({"error": 1}) + + def page_crawled(self, response, links): + query_page = self.session.prepare( + "UPDATE metadata SET fetched_at = ?, headers = ?, method = ?, cookies = ?, status_code = ? " + "WHERE crawl= ? AND fingerprint = ?") + self.session.execute_async(query_page, (datetime.utcnow(), response.request.headers, response.request.method, + response.request.cookies, response.status_code, self.crawl_id, + response.meta['fingerprint'])) + depth = 0 + page_res = self.model.objects.filter(crawl=self.crawl_id, fingerprint=response.meta['fingerprint']) + if page_res[0].depth > 0: + depth = page_res[0].depth + + query = self.session.prepare( + "INSERT INTO metadata (crawl, fingerprint, created_at, method, url, depth) VALUES (?, ?, ?, ?, ?, ?)") + cql_items = [] + for link in links: + if response.meta['fingerprint'] != link.meta['fingerprint']: + cql_i = (self.crawl_id, link.meta['fingerprint'], datetime.utcnow(), link.method, link.url, depth+1) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.counter_cls.cass_count({"pages_crawled": 1, "links_found": len(cql_items)}) + + def update_score(self, batch): + query = self.session.prepare("UPDATE metadata SET score = ? WHERE crawl = ? AND fingerprint = ?") + cql_items = [] + for fprint, score, request, schedule in batch: + cql_i = (score, self.crawl_id, fprint) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.counter_cls.cass_count({"scored_urls": len(cql_items)}) + + +class States(MemoryStates): + + def __init__(self, session, model_cls, cache_size_limit, crawl_id): + super(States, self).__init__(cache_size_limit) + self.session = session + self.model = model_cls + self.table = 'StateModel' + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") + self.crawl_id = crawl_id + + def frontier_stop(self): + pass + + def fetch(self, fingerprints): + to_fetch = [f for f in fingerprints if f not in self._cache] + self.logger.debug("cache size %s", len(self._cache)) + self.logger.debug("to fetch %d from %d", (len(to_fetch), len(fingerprints))) + + for chunk in chunks(to_fetch, 128): + for state in self.model.objects.filter(crawl=self.crawl_id, fingerprint__in=chunk): + self._cache[state.fingerprint] = state.state + + def flush(self, force_clear=False): + query = self.session.prepare("INSERT INTO states (crawl, fingerprint, state) VALUES (?, ?, ?)") + cql_items = [] + for fingerprint, state_val in self._cache.iteritems(): + cql_i = (self.crawl_id, fingerprint, state_val) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=20000) + super(States, self).flush(force_clear) + + +class Queue(BaseQueue): + def __init__(self, session, queue_cls, partitions, crawl_id, generate_stats, ordering='default'): + self.session = session + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + self.ordering = ordering + self.crawl_id = crawl_id + self.counter_cls = CassandraCount(crawl_id, self.session, generate_stats) + + def frontier_stop(self): + pass + + def _order_by(self): + if self.ordering == 'created': + return "created_at" + return "created_at" + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + :param max_n_requests: maximum number of requests to return + :param partition_id: partition id + :return: list of :class:`Request ` objects. + """ + results = [] + try: + dequeued_urls = 0 + cql_ditems = [] + d_query = self.session.prepare("DELETE FROM queue WHERE crawl = ? AND fingerprint = ? AND partition_id = ? " + "AND score = ? AND created_at = ?") + for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\ + order_by("partition_id", "score", self._order_by()).limit(max_n_requests): + method = 'GET' if not item.method else item.method + + meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta) + if not name.startswith('__')) + # TODO: How the result can be an dict not an object -> Objects get error while encodeing for Message Bus + # If I take meta_dict2 direct to Request i get the same error message + + meta_dict = dict() + meta_dict["fingerprint"] = meta_dict2["fingerprint"] + meta_dict["domain"] = meta_dict2["domain"] + meta_dict["origin_is_frontier"] = meta_dict2["origin_is_frontier"] + meta_dict["scrapy_callback"] = meta_dict2["scrapy_callback"] + meta_dict["scrapy_errback"] = meta_dict2["scrapy_errback"] + meta_dict["scrapy_meta"] = meta_dict2["scrapy_meta"] + meta_dict["score"] = meta_dict2["score"] + meta_dict["jid"] = meta_dict2["jid"] + + r = Request(item.url, method=method, meta=meta_dict, headers=item.headers, cookies=item.cookies) + r.meta['fingerprint'] = item.fingerprint + r.meta['score'] = item.score + results.append(r) + + cql_d = (item.crawl, item.fingerprint, item.partition_id, item.score, item.created_at) + cql_ditems.append(cql_d) + dequeued_urls += 1 + + if dequeued_urls > 0: + execute_concurrent_with_args(self.session, d_query, cql_ditems, concurrency=200) + + self.counter_cls.cass_count({"dequeued_urls": dequeued_urls}) + + except Exception, exc: + self.logger.exception(exc) + + return results + + def schedule(self, batch): + query = self.session.prepare("INSERT INTO queue (crawl, fingerprint, score, partition_id, host_crc32, url, " + "created_at, meta, depth, headers, method, cookies) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + cql_items = [] + for fprint, score, request, schedule in batch: + if schedule: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + created_at = time()*1E+6 + + if "domain" not in request.meta: + request.meta["domain"] = {} + if "origin_is_frontier" not in request.meta: + request.meta["origin_is_frontier"] = '' + if "scrapy_callback" not in request.meta: + request.meta["scrapy_callback"] = None + if "scrapy_errback" not in request.meta: + request.meta["scrapy_errback"] = None + if "scrapy_meta" not in request.meta: + request.meta["scrapy_meta"] = {} + if "score" not in request.meta: + request.meta["score"] = 0 + if "jid" not in request.meta: + request.meta["jid"] = 0 + + meta = Meta(domain=request.meta['domain'], fingerprint=fprint, + origin_is_frontier=request.meta['origin_is_frontier'], + scrapy_callback=request.meta['scrapy_callback'], + scrapy_errback=request.meta['scrapy_errback'], scrapy_meta=request.meta['scrapy_meta']) + + cql_i = (self.crawl_id, fprint, score, partition_id, host_crc32, request.url, created_at, meta, 0, + request.headers, request.method, request.cookies) + cql_items.append(cql_i) + + request.meta['state'] = States.QUEUED + + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.counter_cls.cass_count({"queued_urls": len(cql_items)}) + + def count(self): + count = self.queue_model.objects.filter(crawl=self.crawl_id).count() + return count + + def cass_count(self, counts, generate_stats): + if generate_stats is True: + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) + + +class BroadCrawlingQueue(Queue): + GET_RETRIES = 3 + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + Priorities, from highest to lowest: + - max_requests_per_host + - max_n_requests + - min_hosts & min_requests + + :param max_n_requests: + :param partition_id: + :param kwargs: min_requests, min_hosts, max_requests_per_host + :return: list of :class:`Request ` objects. + """ + min_requests = kwargs.pop("min_requests", None) + min_hosts = kwargs.pop("min_hosts", None) + max_requests_per_host = kwargs.pop("max_requests_per_host", None) + assert(max_n_requests > min_requests) + + queue = {} + limit = max_n_requests + tries = 0 + count = 0 + while tries < self.GET_RETRIES: + tries += 1 + limit *= 5.5 if tries > 1 else 1.0 + self.logger.debug("Try %d, limit %d, last attempt: requests %d, hosts %d" % + (tries, limit, count, len(queue.keys()))) + queue.clear() + count = 0 + for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\ + order_by("crawl", "score", self._order_by()).limit(limit): + if item.host_crc32 not in queue: + queue[item.host_crc32] = [] + if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host: + continue + queue[item.host_crc32].append(item) + count += 1 + if count > max_n_requests: + break + if min_hosts is not None and len(queue.keys()) < min_hosts: + continue + if min_requests is not None and count < min_requests: + continue + break + self.logger.debug("Finished: tries %d, hosts %d, requests %d" % (tries, len(queue.keys()), count)) + + results = [] + for items in queue.itervalues(): + for item in items: + method = 'GET' if not item.method else item.method + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, + cookies=item.cookies)) + item.delete() + return results + + +class CassandraCount: + + def __init__(self, crawl_id, session, generate_stats): + self.generate_stats = generate_stats + self.session = session + self.crawl_id = crawl_id + + def cass_count(self, counts): + if self.generate_stats is True: + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) \ No newline at end of file diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py new file mode 100644 index 000000000..9065477d0 --- /dev/null +++ b/frontera/contrib/backends/cassandra/models.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +import uuid +from cassandra.cqlengine.models import Model +from cassandra.cqlengine.usertype import UserType +from cassandra.cqlengine.columns import Map, Text, Float, Integer, DateTime, UserDefinedType, Counter, Boolean, \ + SmallInt, BigInt + + +class Meta(UserType): + domain = Map(Text(), Text(), required=False) + fingerprint = Text() + origin_is_frontier = Boolean() + scrapy_callback = Text() + scrapy_errback = Text() + scrapy_meta = Map(Text(), Text(), required=False) + score = Float(required=False) + jid = Integer(required=False) + + +class MetadataModel(Model): + __table_name__ = 'metadata' + + crawl = Text(primary_key=True) + fingerprint = Text(primary_key=True) + url = Text(index=True) + depth = Integer() + created_at = DateTime() + fetched_at = DateTime(required=False) + status_code = Integer(required=False) + score = Float(required=False) + error = Text(required=False) + meta = UserDefinedType(Meta) + headers = Map(Text(), Text(), required=False) + cookies = Map(Text(), Text(), required=False) + method = Text(required=False) + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.fingerprint) + + +class StateModel(Model): + __table_name__ = 'states' + + crawl = Text(primary_key=True) + fingerprint = Text(primary_key=True) + state = SmallInt(index=True) + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.fingerprint, self.state) + + +class QueueModel(Model): + __table_name__ = 'queue' + + crawl = Text(primary_key=True) + partition_id = Integer(primary_key=True) + score = Float(primary_key=True) + created_at = BigInt(primary_key=True) + fingerprint = Text(primary_key=True) + url = Text() + host_crc32 = Integer() + meta = UserDefinedType(Meta) + headers = Map(Text(), Text(), required=False) + cookies = Map(Text(), Text(), required=False) + method = Text(required=False) + depth = SmallInt(required=False) + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.id) + + +class CrawlStatsModel(Model): + __table_name__ = 'crawlstats' + + crawl = Text(primary_key=True) + pages_crawled = Counter() + links_found = Counter() + errors = Counter() + seed_urls = Counter() + scored_urls = Counter() + queued_urls = Counter() + dequeued_urls = Counter() + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.id) diff --git a/frontera/contrib/backends/cassandra/revisiting.py b/frontera/contrib/backends/cassandra/revisiting.py new file mode 100644 index 000000000..51ce212fc --- /dev/null +++ b/frontera/contrib/backends/cassandra/revisiting.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +import logging +import json +from datetime import datetime, timedelta +from time import time, sleep + +from frontera import Request +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.contrib.backends.cassandra import CassandraBackend +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model +from frontera.core.components import Queue as BaseQueue, States +from frontera.utils.misc import get_crc32 +from frontera.utils.url import parse_domain_from_url_fast + + +class RevisitingQueueModel(Model): + __table_name__ = 'revisiting_queue' + + crawl_at = columns.DateTime(required=True, default=datetime.now(), index=True) + + +def retry_and_rollback(func): + def func_wrapper(self, *args, **kwargs): + tries = 5 + while True: + try: + return func(self, *args, **kwargs) + except Exception, exc: + self.logger.exception(exc) + sleep(5) + tries -= 1 + if tries > 0: + self.logger.info("Tries left %i" % tries) + continue + else: + raise exc + return func_wrapper + + +class RevisitingQueue(BaseQueue): + def __init__(self, session, queue_cls, partitions): + self.session = session() + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.revisiting.RevisitingQueue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + + def frontier_stop(self): + pass + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + results = [] + try: + for item in self.queue_model.objects.filter(crawl_at=datetime.utcnow(), partition_id=partition_id).\ + limit(max_n_requests): + method = 'GET' if not item.method else item.method + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, + cookies=item.cookies)) + item.delete() + except Exception, exc: + self.logger.exception(exc) + return results + + @retry_and_rollback + def schedule(self, batch): + for fprint, score, request, schedule_at in batch: + if schedule_at: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + created_at = time()*1E+6 + q = self._create_queue(request, fprint, score, partition_id, host_crc32, created_at) + + q.save() + request.meta['state'] = States.QUEUED + + def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, created_at): + db_queue = self.queue_model() + db_queue.fingerprint = fingerprint + db_queue.score = score + db_queue.partition_id = partition_id + db_queue.host_crc32 = host_crc32 + db_queue.url = obj.url + db_queue.created_at = created_at + + new_dict = {} + for kmeta, vmeta in obj.meta.iteritems(): + if type(vmeta) is dict: + new_dict[kmeta] = json.dumps(vmeta) + else: + new_dict[kmeta] = str(vmeta) + + db_queue.meta = new_dict + db_queue.depth = 0 + + db_queue.headers = obj.headers + db_queue.method = obj.method + db_queue.cookies = obj.cookies + + return db_queue + + @retry_and_rollback + def count(self): + return self.session.query(self.queue_model).count() + + +class Backend(CassandraBackend): + + def _create_queue(self, settings): + self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") + assert isinstance(self.interval, timedelta) + return RevisitingQueue(self.session, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) + + def _schedule(self, requests): + batch = [] + queue_incr = 0 + for request in requests: + if request.meta['state'] in [States.NOT_CRAWLED, None]: + schedule_at = datetime.utcnow() + elif request.meta['state'] in [States.CRAWLED, States.ERROR]: + schedule_at = datetime.utcnow() + self.interval + else: # QUEUED + schedule_at = None + batch.append((request.meta['fingerprint'], self._get_score(request), request, schedule_at)) + if schedule_at: + queue_incr += 1 + self.queue.schedule(batch) + self.metadata.update_score(batch) + self.queue_size += queue_incr + + def page_crawled(self, response, links): + super(Backend, self).page_crawled(response, links) + self._schedule([response.request]) diff --git a/frontera/contrib/backends/cassandra/test_backend.py b/frontera/contrib/backends/cassandra/test_backend.py new file mode 100644 index 000000000..1570d4c4c --- /dev/null +++ b/frontera/contrib/backends/cassandra/test_backend.py @@ -0,0 +1,23 @@ +import os + +from psycopg2 import connect +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +from frontera.tests import backends +from frontera.tests.test_revisiting_backend import RevisitingBackendTest + + +#---------------------------------------------------- +# Cassandra base classes +#---------------------------------------------------- +class cassandraFIFO(backends.FIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.FIFO' + + +class cassandraLIFO(backends.LIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.LIFO' + + +class cassandraRevisiting(RevisitingBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.revisiting.Backend' + diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 92293636f..81bbf8e5f 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -45,6 +45,23 @@ 'QueueModel': 'frontera.contrib.backends.sqlalchemy.models.QueueModel' } SQLALCHEMYBACKEND_REVISIT_INTERVAL = timedelta(days=1) + +CASSANDRABACKEND_CACHE_SIZE = 10000 +CASSANDRABACKEND_DROP_ALL_TABLES = False +CASSANDRABACKEND_MODELS = { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel' +} +CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(days=1) +CASSANDRABACKEND_CLUSTER_IPS = ['127.0.0.1'] +CASSANDRABACKEND_CLUSTER_PORT = 9042 +CASSANDRABACKEND_KEYSPACE = 'frontera' +CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS = True +CASSANDRABACKEND_CRAWL_ID = "default" +CASSANDRABACKEND_GENERATE_STATS = False + STATE_CACHE_SIZE = 1000000 STORE_CONTENT = False TEST_MODE = False diff --git a/requirements/tests.txt b/requirements/tests.txt index 35181c73b..17c03d371 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -8,5 +8,5 @@ SQLAlchemy>=1.0.0 cachetools pyzmq msgpack-python +cassandra-driver kafka-python -