From 4f9c92a444ba0df161f38fb1da99575ad9f1192e Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 26 Feb 2016 17:10:45 +0000 Subject: [PATCH 01/17] Initial Commit with Cassandra Backend --- .idea/dictionaries/osboxes.xml | 8 + .idea/vcs.xml | 6 + .../contrib/backends/cassandra/__init__.py | 219 ++++++++++++++ .../contrib/backends/cassandra/components.py | 269 ++++++++++++++++++ frontera/contrib/backends/cassandra/models.py | 70 +++++ .../contrib/backends/cassandra/revisiting.py | 122 ++++++++ .../backends/cassandra/test_backend.py | 209 ++++++++++++++ frontera/settings/default_settings.py | 18 ++ 8 files changed, 921 insertions(+) create mode 100644 .idea/dictionaries/osboxes.xml create mode 100644 .idea/vcs.xml create mode 100644 frontera/contrib/backends/cassandra/__init__.py create mode 100644 frontera/contrib/backends/cassandra/components.py create mode 100644 frontera/contrib/backends/cassandra/models.py create mode 100644 frontera/contrib/backends/cassandra/revisiting.py create mode 100644 frontera/contrib/backends/cassandra/test_backend.py diff --git a/.idea/dictionaries/osboxes.xml b/.idea/dictionaries/osboxes.xml new file mode 100644 index 000000000..77ef20cf7 --- /dev/null +++ b/.idea/dictionaries/osboxes.xml @@ -0,0 +1,8 @@ + + + + cassandrabackend + keyspace + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..94a25f7f4 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py new file mode 100644 index 000000000..fb87824a2 --- /dev/null +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -0,0 +1,219 @@ +from __future__ import absolute_import + +from cassandra.cluster import Cluster +from cassandra.cqlengine import connection +from cassandra.query import dict_factory +from cassandra.cqlengine.management import sync_table +from cassandra.cqlengine.management import drop_table + +from frontera.core.components import DistributedBackend +from frontera.contrib.backends import CommonBackend +from frontera.contrib.backends.cassandra.components import Metadata, Queue, States +from frontera.utils.misc import load_object + + +class CassandraBackend(CommonBackend): + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true + models = settings.get('CASSANDRA_MODELS') + + self.cluster = Cluster(cluster_ips, cluster_port) + self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) + + self.session_cls = self.cluster.connect() + self.session_cls.row_factory = dict_factory + + if keyspace_create: + query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" + WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) + self.session_cls.execute(query) + + self.session_cls.set_keyspace(keyspace) + + connection.set_session(self.session_cls) + + if drop_all_tables: + for key, value in self.models.iteritems(): + drop_table(key) + + for key, value in self.models.iteritems(): + sync_table(key) + + self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], + settings.get('CASSANDRABACKEND_CACHE_SIZE')) + self._states = States(self.session_cls, self.models['StateModel'], + settings.get('STATE_CACHE_SIZE_LIMIT')) + self._queue = self._create_queue(settings) + + def frontier_stop(self): + super(CassandraBackend, self).frontier_stop() + self.session_cls.shutdown() + + def _create_queue(self, settings): + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + + +class FIFOBackend(CassandraBackend): + component_name = 'Cassandra FIFO Backend' + + def _create_queue(self, settings): + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + ordering='created') + + +class LIFOBackend(CassandraBackend): + component_name = 'SQLAlchemy LIFO Backend' + + def _create_queue(self, settings): + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + ordering='created_desc') + + +class DFSBackend(CassandraBackend): + component_name = 'Cassandra DFS Backend' + + def _create_queue(self, settings): + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + def _get_score(self, obj): + return -obj.meta['depth'] + + +class BFSBackend(CassandraBackend): + component_name = 'Cassandra BFS Backend' + + def _create_queue(self, settings): + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + def _get_score(self, obj): + return obj.meta['depth'] + + +BASE = CommonBackend +LIFO = LIFOBackend +FIFO = FIFOBackend +DFS = DFSBackend +BFS = BFSBackend + + +class Distributed(DistributedBackend): + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true + models = settings.get('CASSANDRA_MODELS') + + self.cluster = Cluster(cluster_ips, cluster_port) + self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) + + self.session_cls = self.cluster.connect() + self.session_cls.row_factory = dict_factory + + if keyspace_create: + query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" + WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) + self.session_cls.execute(query) + + self.session_cls.set_keyspace(keyspace) + + connection.set_session(self.session_cls) + + self._metadata = None + self._queue = None + self._states = None + + @classmethod + def strategy_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + model = b.models['StateModel'] + + if drop_all_tables: + model.__table__.drop(bind=b.session_cls) + model.__table__.create(bind=b.session_cls) + + b._states = States(b.session_cls, model, + settings.get('STATE_CACHE_SIZE_LIMIT')) + return b + + @classmethod + def db_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + + metadata_m = b.models['MetadataModel'] + queue_m = b.models['QueueModel'] + if drop: + metadata_m.__table__.drop(bind=b.session_cls) + queue_m.__table__.drop(bind=b.session_cls) + metadata_m.__table__.create(bind=b.session_cls) + queue_m.__table__.create(bind=b.session_cls) + + b._metadata = Metadata(b.session_cls, metadata_m, + settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) + b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) + return b + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + + def frontier_start(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_start() + + def frontier_stop(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_stop() + + def add_seeds(self, seeds): + self.metadata.add_seeds(seeds) + + def get_next_requests(self, max_next_requests, **kwargs): + partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions + batch = [] + for partition_id in partitions: + batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) + return batch + + def page_crawled(self, response, links): + self.metadata.page_crawled(response, links) + + def request_error(self, request, error): + self.metadata.request_error(request, error) + + def finished(self): + return NotImplementedError diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py new file mode 100644 index 000000000..0325b47cf --- /dev/null +++ b/frontera/contrib/backends/cassandra/components.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- +import logging +from datetime import datetime +from time import time, sleep + +from cachetools import LRUCache +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.contrib.backends.memory import MemoryStates +from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue +from frontera.core.models import Request, Response +from frontera.utils.misc import get_crc32, chunks +from frontera.utils.url import parse_domain_from_url_fast + + +def retry_and_rollback(func): + def func_wrapper(self, *args, **kwargs): + tries = 5 + while True: + try: + return func(self, *args, **kwargs) + except Exception, exc: + self.logger.exception(exc) + self.session.rollback() + sleep(5) + tries -= 1 + if tries > 0: + self.logger.info("Tries left %i" % tries) + continue + else: + raise exc + return func_wrapper + + +class Metadata(BaseMetadata): + def __init__(self, session_cls, model_cls, cache_size): + self.session = session_cls(expire_on_commit=False) # FIXME: Should be explicitly mentioned in docs + self.model = model_cls + self.table = 'MetadataModel' + self.cache = LRUCache(cache_size) + self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.Metadata") + + def frontier_stop(self): + self.session.close() + + @retry_and_rollback + def add_seeds(self, seeds): + for seed in seeds: + o = self._create_page(seed) + self.cache[o.fingerprint] = self.session.merge(o) + self.session.commit() + + @retry_and_rollback + def request_error(self, page, error): + m = self._modify_page(page) if page.meta['fingerprint'] in self.cache else self._create_page(page) + m.error = error + self.cache[m.fingerprint] = self.session.merge(m) + self.session.commit() + + @retry_and_rollback + def page_crawled(self, response, links): + r = self._modify_page(response) if response.meta['fingerprint'] in self.cache else self._create_page(response) + self.cache[r.fingerprint] = self.session.merge(r) + for link in links: + if link.meta['fingerprint'] not in self.cache: + self.cache[link.meta['fingerprint']] = self.session.merge(self._create_page(link)) + self.session.commit() + + def _modify_page(self, obj): + db_page = self.cache[obj.meta['fingerprint']] + db_page.fetched_at = datetime.utcnow() + if isinstance(obj, Response): + db_page.headers = obj.request.headers + db_page.method = obj.request.method + db_page.cookies = obj.request.cookies + db_page.status_code = obj.status_code + return db_page + + def _create_page(self, obj): + db_page = self.model() + db_page.fingerprint = obj.meta['fingerprint'] + db_page.url = obj.url + db_page.created_at = datetime.utcnow() + db_page.meta = obj.meta + db_page.depth = 0 + + if isinstance(obj, Request): + db_page.headers = obj.headers + db_page.method = obj.method + db_page.cookies = obj.cookies + elif isinstance(obj, Response): + db_page.headers = obj.request.headers + db_page.method = obj.request.method + db_page.cookies = obj.request.cookies + db_page.status_code = obj.status_code + return db_page + + @retry_and_rollback + def update_score(self, batch): + for fprint, score, request, schedule in batch: + m = self.model(fingerprint=fprint, score=score) + self.session.merge(m) + self.session.commit() + + +class States(MemoryStates): + + def __init__(self, session_cls, model_cls, cache_size_limit): + super(States, self).__init__(cache_size_limit) + self.session = session_cls() + self.model = model_cls + self.table = 'StateModel' + self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.States") + + @retry_and_rollback + def frontier_stop(self): + self.flush() + self.session.close() + + @retry_and_rollback + def fetch(self, fingerprints): + to_fetch = [f for f in fingerprints if f not in self._cache] + self.logger.debug("cache size %s" % len(self._cache)) + self.logger.debug("to fetch %d from %d" % (len(to_fetch), len(fingerprints))) + + for chunk in chunks(to_fetch, 128): + for state in self.session.query(self.model).filter(self.model.fingerprint.in_(chunk)): + self._cache[state.fingerprint] = state.state + + @retry_and_rollback + def flush(self, force_clear=False): + for fingerprint, state_val in self._cache.iteritems(): + state = self.model(fingerprint=fingerprint, state=state_val) + self.session.merge(state) + self.session.commit() + self.logger.debug("State cache has been flushed.") + super(States, self).flush(force_clear) + + +class Queue(BaseQueue): + def __init__(self, session_cls, queue_cls, partitions, ordering='default'): + self.session = session_cls() + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.Queue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + self.ordering = ordering + + def frontier_stop(self): + self.session.close() + + def _order_by(self, query): + if self.ordering == 'created': + return query.order_by(self.queue_model.created_at) + if self.ordering == 'created_desc': + return query.order_by(self.queue_model.created_at.desc()) + return query.order_by(self.queue_model.score, self.queue_model.created_at) # TODO: remove second parameter, + # it's not necessary for proper crawling, but needed for tests + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + :param max_n_requests: maximum number of requests to return + :param partition_id: partition id + :return: list of :class:`Request ` objects. + """ + results = [] + try: + for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\ + limit(max_n_requests): + method = 'GET' if not item.method else item.method + r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies) + r.meta['fingerprint'] = item.fingerprint + r.meta['score'] = item.score + results.append(r) + self.session.delete(item) + self.session.commit() + except Exception, exc: + self.logger.exception(exc) + self.session.rollback() + return results + + @retry_and_rollback + def schedule(self, batch): + to_save = [] + for fprint, score, request, schedule in batch: + if schedule: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, + headers=request.headers, cookies=request.cookies, method=request.method, + partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6) + to_save.append(q) + request.meta['state'] = States.QUEUED + self.session.bulk_save_objects(to_save) + self.session.commit() + + @retry_and_rollback + def count(self): + return self.session.query(self.queue_model).count() + + +class BroadCrawlingQueue(Queue): + + GET_RETRIES = 3 + + @retry_and_rollback + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + Priorities, from highest to lowest: + - max_requests_per_host + - max_n_requests + - min_hosts & min_requests + + :param max_n_requests: + :param partition_id: + :param kwargs: min_requests, min_hosts, max_requests_per_host + :return: list of :class:`Request ` objects. + """ + min_requests = kwargs.pop("min_requests", None) + min_hosts = kwargs.pop("min_hosts", None) + max_requests_per_host = kwargs.pop("max_requests_per_host", None) + assert(max_n_requests > min_requests) + + queue = {} + limit = max_n_requests + tries = 0 + count = 0 + while tries < self.GET_RETRIES: + tries += 1 + limit *= 5.5 if tries > 1 else 1.0 + self.logger.debug("Try %d, limit %d, last attempt: requests %d, hosts %d" % + (tries, limit, count, len(queue.keys()))) + queue.clear() + count = 0 + for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\ + limit(limit): + if item.host_crc32 not in queue: + queue[item.host_crc32] = [] + if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host: + continue + queue[item.host_crc32].append(item) + count += 1 + if count > max_n_requests: + break + if min_hosts is not None and len(queue.keys()) < min_hosts: + continue + if min_requests is not None and count < min_requests: + continue + break + self.logger.debug("Finished: tries %d, hosts %d, requests %d" % (tries, len(queue.keys()), count)) + + results = [] + for items in queue.itervalues(): + for item in items: + method = 'GET' if not item.method else item.method + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers + , cookies=item.cookies)) + self.session.delete(item) + self.session.commit() + return results diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py new file mode 100644 index 000000000..9c3ede542 --- /dev/null +++ b/frontera/contrib/backends/cassandra/models.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +import uuid +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model + + +class MetadataModel(Model): + __table_name__ = 'metadata' + + meta_id = columns.UUID(primary_key=True, default=uuid.uuid4) + fingerprint = columns.Text(primary_key=True) + url = columns.Text(index=True) + depth = columns.Integer() + created_at = columns.DateTime() + fetched_at = columns.DateTime(required=False) + status_code = columns.Text(required=False) + score = columns.Float(required=False) + error = columns.Text(required=False) + meta = columns.Map(required=False) + headers = columns.Map(required=False) + cookies = columns.Map(required=False) + method = columns.Text(required=False) + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.fingerprint) + + +class StateModel(Model): + __table_name__ = 'states' + + state_id = columns.UUID(primary_key=True, default=uuid.uuid4) + fingerprint = columns.Text(primary_key=True) + state = columns.SmallInt() + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.fingerprint, self.state) + + +class QueueModelMixin(object): + id = columns.UUID(primary_key=True, default=uuid.uuid4) + partition_id = columns.Integer(index=True) + score = columns.Float(index=True) + url = columns.Text() + fingerprint = columns.Text() + host_crc32 = columns.Integer() + meta = columns.Map(required=False) + headers = columns.Map(required=False) + cookies = columns.Map(required=False) + method = columns.Text(required=False) + created_at = columns.BigInt(index=True, required=False) + depth = columns.SmallInt(required=False) + + +class QueueModel(QueueModelMixin, Model): + __table_name__ = 'queue' + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.id) diff --git a/frontera/contrib/backends/cassandra/revisiting.py b/frontera/contrib/backends/cassandra/revisiting.py new file mode 100644 index 000000000..db58d308a --- /dev/null +++ b/frontera/contrib/backends/cassandra/revisiting.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +import logging +from datetime import datetime, timedelta +from time import time, sleep + +from sqlalchemy import Column, DateTime + +from frontera import Request +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.contrib.backends.sqlalchemy import SQLAlchemyBackend +from frontera.contrib.backends.sqlalchemy.models import QueueModelMixin, DeclarativeBase +from frontera.core.components import Queue as BaseQueue, States +from frontera.utils.misc import get_crc32 +from frontera.utils.url import parse_domain_from_url_fast + + +class RevisitingQueueModel(QueueModelMixin, DeclarativeBase): + __tablename__ = 'revisiting_queue' + + crawl_at = Column(DateTime, nullable=False) + + +def retry_and_rollback(func): + def func_wrapper(self, *args, **kwargs): + tries = 5 + while True: + try: + return func(self, *args, **kwargs) + except Exception, exc: + self.logger.exception(exc) + self.session.rollback() + sleep(5) + tries -= 1 + if tries > 0: + self.logger.info("Tries left %i" % tries) + continue + else: + raise exc + return func_wrapper + + +class RevisitingQueue(BaseQueue): + def __init__(self, session_cls, queue_cls, partitions): + self.session = session_cls() + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.revisiting.RevisitingQueue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + + def frontier_stop(self): + self.session.close() + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + results = [] + try: + for item in self.session.query(self.queue_model).\ + filter(RevisitingQueueModel.crawl_at <= datetime.utcnow(), + RevisitingQueueModel.partition_id == partition_id).\ + limit(max_n_requests): + method = 'GET' if not item.method else item.method + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies)) + self.session.delete(item) + self.session.commit() + except Exception, exc: + self.logger.exception(exc) + self.session.rollback() + return results + + @retry_and_rollback + def schedule(self, batch): + to_save = [] + for fprint, score, request, schedule_at in batch: + if schedule_at: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, + headers=request.headers, cookies=request.cookies, method=request.method, + partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6, + crawl_at=schedule_at) + to_save.append(q) + request.meta['state'] = States.QUEUED + self.session.bulk_save_objects(to_save) + self.session.commit() + + @retry_and_rollback + def count(self): + return self.session.query(self.queue_model).count() + + +class Backend(SQLAlchemyBackend): + + def _create_queue(self, settings): + self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") + assert isinstance(self.interval, timedelta) + return RevisitingQueue(self.session_cls, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) + + def _schedule(self, requests): + batch = [] + queue_incr = 0 + for request in requests: + if request.meta['state'] in [States.NOT_CRAWLED, None]: + schedule_at = datetime.utcnow() + elif request.meta['state'] in [States.CRAWLED, States.ERROR]: + schedule_at = datetime.utcnow() + self.interval + else: # QUEUED + schedule_at = None + batch.append((request.meta['fingerprint'], self._get_score(request), request, schedule_at)) + if schedule_at: + queue_incr += 1 + self.queue.schedule(batch) + self.metadata.update_score(batch) + self.queue_size += queue_incr + + def page_crawled(self, response, links): + super(Backend, self).page_crawled(response, links) + self._schedule([response.request]) \ No newline at end of file diff --git a/frontera/contrib/backends/cassandra/test_backend.py b/frontera/contrib/backends/cassandra/test_backend.py new file mode 100644 index 000000000..45f8c6795 --- /dev/null +++ b/frontera/contrib/backends/cassandra/test_backend.py @@ -0,0 +1,209 @@ +import os + +import pymysql +from psycopg2 import connect +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +from frontera.tests import backends +from frontera.tests.test_revisiting_backend import RevisitingBackendTest + + +#---------------------------------------------------- +# SQAlchemy base classes +#---------------------------------------------------- +class SQLAlchemyFIFO(backends.FIFOBackendTest): + backend_class = 'frontera.contrib.backends.sqlalchemy.FIFO' + + +class SQLAlchemyLIFO(backends.LIFOBackendTest): + backend_class = 'frontera.contrib.backends.sqlalchemy.LIFO' + + +class SQLAlchemyDFS(backends.DFSBackendTest): + backend_class = 'frontera.contrib.backends.sqlalchemy.DFS' + + +class SQLAlchemyBFS(backends.BFSBackendTest): + backend_class = 'frontera.contrib.backends.sqlalchemy.BFS' + + +class SQLAlchemyRevisiting(RevisitingBackendTest): + backend_class = 'frontera.contrib.backends.sqlalchemy.revisiting.Backend' + + +#---------------------------------------------------- +# SQLite Memory +#---------------------------------------------------- +class SQLiteMemory(backends.BackendTest): + + def get_settings(self): + settings = super(SQLiteMemory, self).get_settings() + settings.SQLALCHEMYBACKEND_ENGINE = 'sqlite:///:memory:' + return settings + + +class TestSQLiteMemoryFIFO(SQLAlchemyFIFO, SQLiteMemory): + pass + + +class TestSQLiteMemoryLIFO(SQLAlchemyLIFO, SQLiteMemory): + pass + + +class TestSQLiteMemoryDFS(SQLAlchemyDFS, SQLiteMemory): + pass + + +class TestSQLiteMemoryBFS(SQLAlchemyBFS, SQLiteMemory): + pass + + +class TestSQLiteMemoryRevisiting(SQLAlchemyRevisiting): + pass + + +#---------------------------------------------------- +# SQLite File +#---------------------------------------------------- +class SQLiteFile(backends.BackendTest): + + SQLITE_DB_NAME = 'backend_test.db' + + def get_settings(self): + settings = super(SQLiteFile, self).get_settings() + settings.SQLALCHEMYBACKEND_ENGINE = 'sqlite:///' + self.SQLITE_DB_NAME + return settings + + def setup_backend(self, method): + self._delete_test_db() + + def teardown_backend(self, method): + self._delete_test_db() + + def _delete_test_db(self): + try: + os.remove(self.SQLITE_DB_NAME) + except OSError: + pass + + +class TestSQLiteFileFIFO(SQLAlchemyFIFO, SQLiteFile): + pass + + +class TestSQLiteFileLIFO(SQLAlchemyLIFO, SQLiteFile): + pass + + +class TestSQLiteFileDFS(SQLAlchemyDFS, SQLiteFile): + pass + + +class TestSQLiteFileBFS(SQLAlchemyBFS, SQLiteFile): + pass + + +#---------------------------------------------------- +# DB Backend test base +#---------------------------------------------------- +class DBBackendTest(object): + + DB_DATABASE = 'backend_test' + DB_ENGINE = None + DB_HOST = None + DB_USER = None + DB_PASSWORD = None + + def get_settings(self): + settings = super(DBBackendTest, self).get_settings() + settings.SQLALCHEMYBACKEND_ENGINE = self.DB_ENGINE + return settings + + def setup_backend(self, method): + self._delete_database() + self._create_database() + + def teardown_backend(self, method): + self._delete_database() + + def _delete_database(self): + self._execute_sql("DROP DATABASE IF EXISTS %s;" % self.DB_DATABASE) + + def _create_database(self): + self._execute_sql("CREATE DATABASE %s;" % self.DB_DATABASE) + + def _execute_sql(self, sql): + raise NotImplementedError + + +#---------------------------------------------------- +# Mysql +#---------------------------------------------------- +class Mysql(DBBackendTest): + + DB_ENGINE = 'mysql://travis:@localhost/backend_test' + DB_HOST = 'localhost' + DB_USER = 'travis' + DB_PASSWORD = '' + + def _execute_sql(self, sql): + conn = pymysql.connect(host=self.DB_HOST, + user=self.DB_USER, + passwd=self.DB_PASSWORD) + cur = conn.cursor() + cur.execute(sql) + cur.close() + conn.close() + + +class TestMysqlFIFO(Mysql, SQLAlchemyFIFO): + pass + + +class TestMysqlLIFO(Mysql, SQLAlchemyLIFO): + pass + + +class TestMysqlDFS(Mysql, SQLAlchemyDFS): + pass + + +class TestMysqlBFS(Mysql, SQLAlchemyBFS): + pass + + +#---------------------------------------------------- +# Postgres +#---------------------------------------------------- +class Postgres(DBBackendTest): + + DB_ENGINE = 'postgres://postgres@localhost/backend_test' + DB_HOST = 'localhost' + DB_USER = 'postgres' + DB_PASSWORD = '' + + def _execute_sql(self, sql): + conn = connect(host=self.DB_HOST, + user=self.DB_USER, + password=self.DB_PASSWORD) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + cur.execute(sql) + cur.close() + conn.close() + + +class TestPostgresFIFO(Postgres, SQLAlchemyFIFO): + pass + + +class TestPostgresLIFO(Postgres, SQLAlchemyLIFO): + pass + + +class TestPostgresDFS(Postgres, SQLAlchemyDFS): + pass + + +class TestPostgresBFS(Postgres, SQLAlchemyBFS): + pass diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 619692afb..7b38aece7 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -45,6 +45,24 @@ 'QueueModel': 'frontera.contrib.backends.sqlalchemy.models.QueueModel' } SQLALCHEMYBACKEND_REVISIT_INTERVAL = timedelta(days=1) + + +CASSANDRABACKEND_CACHE_SIZE = 10000 +CASSANDRABACKEND_CLEAR_CONTENT = False +CASSANDRABACKEND_DROP_ALL_TABLES = False +CASSANDRABACKEND_ENGINE_ECHO = False +CASSANDRABACKEND_MODELS = { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel' +} +CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(days=1) +CASSANDRABACKEND_CLUSTER_IPS = ['127.0.0.1'] +CASSANDRABACKEND_CLUSTER_PORT = 9042 +CASSANDRABACKEND_KEYSPACE = 'frontera' +CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS = True + + STATE_CACHE_SIZE = 1000000 STORE_CONTENT = False TEST_MODE = False From 125622ee1cab4ac00810bdf4b4fef23744034db1 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Mon, 7 Mar 2016 11:36:57 +0000 Subject: [PATCH 02/17] Created Cassandra Backend based on SELAlchemy files --- .../contrib/backends/cassandra/__init__.py | 35 ++-- .../contrib/backends/cassandra/components.py | 149 ++++++++----- frontera/contrib/backends/cassandra/models.py | 33 ++- .../contrib/backends/cassandra/revisiting.py | 69 +++--- .../backends/cassandra/test_backend.py | 198 +----------------- frontera/settings/default_settings.py | 10 +- 6 files changed, 191 insertions(+), 303 deletions(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index fb87824a2..47ff9327c 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +import logging from cassandra.cluster import Cluster from cassandra.cqlengine import connection @@ -21,13 +22,14 @@ def __init__(self, manager): drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true - models = settings.get('CASSANDRA_MODELS') + models = settings.get('CASSANDRABACKEND_MODELS') self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) self.session_cls = self.cluster.connect() self.session_cls.row_factory = dict_factory + self.session_cls.encoder.mapping[dict] = self.session_cls.encoder.cql_encode_map_collection if keyspace_create: query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" @@ -40,10 +42,10 @@ def __init__(self, manager): if drop_all_tables: for key, value in self.models.iteritems(): - drop_table(key) + drop_table(value) for key, value in self.models.iteritems(): - sync_table(key) + sync_table(value) self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE')) @@ -52,8 +54,9 @@ def __init__(self, manager): self._queue = self._create_queue(settings) def frontier_stop(self): - super(CassandraBackend, self).frontier_stop() + self.states.flush() self.session_cls.shutdown() + # super(CassandraBackend, self).frontier_stop() def _create_queue(self, settings): return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) @@ -80,7 +83,7 @@ def _create_queue(self, settings): class LIFOBackend(CassandraBackend): - component_name = 'SQLAlchemy LIFO Backend' + component_name = 'Cassandra LIFO Backend' def _create_queue(self, settings): return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), @@ -122,7 +125,8 @@ def __init__(self, manager): cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true - models = settings.get('CASSANDRA_MODELS') + models = settings.get('CASSANDRABACKEND_MODELS') + logging.warning('init_dist_be') self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) @@ -134,9 +138,7 @@ def __init__(self, manager): query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) self.session_cls.execute(query) - self.session_cls.set_keyspace(keyspace) - connection.set_session(self.session_cls) self._metadata = None @@ -151,8 +153,9 @@ def strategy_worker(cls, manager): model = b.models['StateModel'] if drop_all_tables: - model.__table__.drop(bind=b.session_cls) - model.__table__.create(bind=b.session_cls) + drop_table(model) + + sync_table(model) b._states = States(b.session_cls, model, settings.get('STATE_CACHE_SIZE_LIMIT')) @@ -167,13 +170,14 @@ def db_worker(cls, manager): metadata_m = b.models['MetadataModel'] queue_m = b.models['QueueModel'] if drop: - metadata_m.__table__.drop(bind=b.session_cls) - queue_m.__table__.drop(bind=b.session_cls) - metadata_m.__table__.create(bind=b.session_cls) - queue_m.__table__.create(bind=b.session_cls) + drop_table(metadata_m) + drop_table(queue_m) + + sync_table(metadata_m) + sync_table(queue_m) b._metadata = Metadata(b.session_cls, metadata_m, - settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) + settings.get('CASSANDRABACKEND_CACHE_SIZE')) b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) return b @@ -200,6 +204,7 @@ def frontier_stop(self): component.frontier_stop() def add_seeds(self, seeds): + logging.warning('add_seeds_top %s' % seeds) self.metadata.add_seeds(seeds) def get_next_requests(self, max_next_requests, **kwargs): diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 0325b47cf..6695b5b98 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -2,13 +2,14 @@ import logging from datetime import datetime from time import time, sleep +import json from cachetools import LRUCache from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.memory import MemoryStates from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue from frontera.core.models import Request, Response -from frontera.utils.misc import get_crc32, chunks +from frontera.utils.misc import get_crc32 from frontera.utils.url import parse_domain_from_url_fast @@ -20,7 +21,6 @@ def func_wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) except Exception, exc: self.logger.exception(exc) - self.session.rollback() sleep(5) tries -= 1 if tries > 0: @@ -33,37 +33,39 @@ def func_wrapper(self, *args, **kwargs): class Metadata(BaseMetadata): def __init__(self, session_cls, model_cls, cache_size): - self.session = session_cls(expire_on_commit=False) # FIXME: Should be explicitly mentioned in docs + self.session = session_cls self.model = model_cls self.table = 'MetadataModel' self.cache = LRUCache(cache_size) - self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.Metadata") + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") def frontier_stop(self): - self.session.close() + pass @retry_and_rollback def add_seeds(self, seeds): for seed in seeds: o = self._create_page(seed) - self.cache[o.fingerprint] = self.session.merge(o) - self.session.commit() + self.cache[o.fingerprint] = o + o.save() @retry_and_rollback def request_error(self, page, error): m = self._modify_page(page) if page.meta['fingerprint'] in self.cache else self._create_page(page) m.error = error - self.cache[m.fingerprint] = self.session.merge(m) - self.session.commit() + self.cache[m.fingerprint] = m + m.update() @retry_and_rollback def page_crawled(self, response, links): - r = self._modify_page(response) if response.meta['fingerprint'] in self.cache else self._create_page(response) - self.cache[r.fingerprint] = self.session.merge(r) + r = self._modify_page(response) + self.cache[r.fingerprint] = r + r.save() for link in links: if link.meta['fingerprint'] not in self.cache: - self.cache[link.meta['fingerprint']] = self.session.merge(self._create_page(link)) - self.session.commit() + l = self._create_page(link) + self.cache[link.meta['fingerprint']] = l + l.save() def _modify_page(self, obj): db_page = self.cache[obj.meta['fingerprint']] @@ -80,7 +82,14 @@ def _create_page(self, obj): db_page.fingerprint = obj.meta['fingerprint'] db_page.url = obj.url db_page.created_at = datetime.utcnow() - db_page.meta = obj.meta + new_dict = {} + for kmeta, vmeta in obj.meta.iteritems(): + if type(vmeta) is dict: + new_dict[kmeta] = json.dumps(vmeta) + else: + new_dict[kmeta] = str(vmeta) + + db_page.meta = new_dict db_page.depth = 0 if isinstance(obj, Request): @@ -98,23 +107,21 @@ def _create_page(self, obj): def update_score(self, batch): for fprint, score, request, schedule in batch: m = self.model(fingerprint=fprint, score=score) - self.session.merge(m) - self.session.commit() + m.update() class States(MemoryStates): def __init__(self, session_cls, model_cls, cache_size_limit): super(States, self).__init__(cache_size_limit) - self.session = session_cls() + self.session = session_cls self.model = model_cls self.table = 'StateModel' - self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.States") + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") @retry_and_rollback def frontier_stop(self): - self.flush() - self.session.close() + pass @retry_and_rollback def fetch(self, fingerprints): @@ -122,39 +129,37 @@ def fetch(self, fingerprints): self.logger.debug("cache size %s" % len(self._cache)) self.logger.debug("to fetch %d from %d" % (len(to_fetch), len(fingerprints))) - for chunk in chunks(to_fetch, 128): - for state in self.session.query(self.model).filter(self.model.fingerprint.in_(chunk)): + for chunk in to_fetch: + for state in self.model.objects.filter(fingerprint=chunk): self._cache[state.fingerprint] = state.state @retry_and_rollback def flush(self, force_clear=False): for fingerprint, state_val in self._cache.iteritems(): state = self.model(fingerprint=fingerprint, state=state_val) - self.session.merge(state) - self.session.commit() + state.save() self.logger.debug("State cache has been flushed.") super(States, self).flush(force_clear) class Queue(BaseQueue): def __init__(self, session_cls, queue_cls, partitions, ordering='default'): - self.session = session_cls() + self.session = session_cls self.queue_model = queue_cls - self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.components.Queue") + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") self.partitions = [i for i in range(0, partitions)] self.partitioner = Crc32NamePartitioner(self.partitions) self.ordering = ordering def frontier_stop(self): - self.session.close() + pass - def _order_by(self, query): + def _order_by(self): if self.ordering == 'created': - return query.order_by(self.queue_model.created_at) + return "created_at" if self.ordering == 'created_desc': - return query.order_by(self.queue_model.created_at.desc()) - return query.order_by(self.queue_model.score, self.queue_model.created_at) # TODO: remove second parameter, - # it's not necessary for proper crawling, but needed for tests + return "-created_at" + return "created_at" def get_next_requests(self, max_n_requests, partition_id, **kwargs): """ @@ -166,23 +171,24 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): """ results = [] try: - for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\ + for item in self.queue_model.objects.filter(partition_id=partition_id).order_by("score", self._order_by()).\ limit(max_n_requests): + item.meta = self.gen_meta(item.meta) method = 'GET' if not item.method else item.method r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies) r.meta['fingerprint'] = item.fingerprint r.meta['score'] = item.score + self.queue_model.delete(item) results.append(r) - self.session.delete(item) - self.session.commit() + item.delete() + except Exception, exc: self.logger.exception(exc) - self.session.rollback() + return results @retry_and_rollback def schedule(self, batch): - to_save = [] for fprint, score, request, schedule in batch: if schedule: _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) @@ -193,21 +199,64 @@ def schedule(self, batch): else: partition_id = self.partitioner.partition(hostname, self.partitions) host_crc32 = get_crc32(hostname) - q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, - headers=request.headers, cookies=request.cookies, method=request.method, - partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6) - to_save.append(q) + created_at = time()*1E+6 + q = self._create_queue(request, fprint, score, partition_id, host_crc32, created_at) + + q.save() request.meta['state'] = States.QUEUED - self.session.bulk_save_objects(to_save) - self.session.commit() + + def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, created_at): + db_queue = self.queue_model() + db_queue.fingerprint = fingerprint + db_queue.score = score + db_queue.partition_id = partition_id + db_queue.host_crc32 = host_crc32 + db_queue.url = obj.url + db_queue.created_at = created_at + + new_dict = {} + for kmeta, vmeta in obj.meta.iteritems(): + if type(vmeta) is dict: + new_dict[kmeta] = json.dumps(vmeta) + else: + new_dict[kmeta] = str(vmeta) + + db_queue.meta = new_dict + db_queue.depth = 0 + + db_queue.headers = obj.headers + db_queue.method = obj.method + db_queue.cookies = obj.cookies + + return db_queue @retry_and_rollback def count(self): - return self.session.query(self.queue_model).count() + return self.queue_model.objects.count() + def gen_meta(self, meta): + ret_meta = {} + for kmeta, vmeta in meta.iteritems(): + try: + json_object = json.loads(vmeta) + except ValueError, e: + ret_meta[kmeta] = vmeta + else: + ret_meta[kmeta] = json_object + + if ret_meta['scrapy_callback'] == "None": + ret_meta['scrapy_callback'] = None + if ret_meta['scrapy_errback'] == "None": + ret_meta['scrapy_errback'] = None + if ret_meta['state'] == "None": + ret_meta['state'] = None + if ret_meta['origin_is_frontier'] == "True": + ret_meta['origin_is_frontier'] = True + ret_meta['depth'] = int(ret_meta['depth']) + return ret_meta -class BroadCrawlingQueue(Queue): +class BroadCrawlingQueue(Queue): GET_RETRIES = 3 @retry_and_rollback @@ -241,8 +290,9 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): (tries, limit, count, len(queue.keys()))) queue.clear() count = 0 - for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\ + for item in self.queue_model.objects.filter(partition_id=partition_id).order_by(self._order_by()).\ limit(limit): + item.meta = self.gen_meta(item.meta) if item.host_crc32 not in queue: queue[item.host_crc32] = [] if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host: @@ -262,8 +312,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): for items in queue.itervalues(): for item in items: method = 'GET' if not item.method else item.method - results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers - , cookies=item.cookies)) - self.session.delete(item) - self.session.commit() + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, + cookies=item.cookies)) + item.delete() return results diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py index 9c3ede542..afd61eab3 100644 --- a/frontera/contrib/backends/cassandra/models.py +++ b/frontera/contrib/backends/cassandra/models.py @@ -7,18 +7,17 @@ class MetadataModel(Model): __table_name__ = 'metadata' - meta_id = columns.UUID(primary_key=True, default=uuid.uuid4) fingerprint = columns.Text(primary_key=True) url = columns.Text(index=True) depth = columns.Integer() created_at = columns.DateTime() fetched_at = columns.DateTime(required=False) - status_code = columns.Text(required=False) + status_code = columns.Integer(required=False) score = columns.Float(required=False) error = columns.Text(required=False) - meta = columns.Map(required=False) - headers = columns.Map(required=False) - cookies = columns.Map(required=False) + meta = columns.Map(columns.Text(), columns.Text(), required=False) + headers = columns.Map(columns.Text(), columns.Text(), required=False) + cookies = columns.Map(columns.Text(), columns.Text(), required=False) method = columns.Text(required=False) @classmethod @@ -33,8 +32,8 @@ class StateModel(Model): __table_name__ = 'states' state_id = columns.UUID(primary_key=True, default=uuid.uuid4) - fingerprint = columns.Text(primary_key=True) - state = columns.SmallInt() + fingerprint = columns.Text(primary_key=True, index=True) + state = columns.SmallInt(index=True) @classmethod def query(cls, session): @@ -44,24 +43,22 @@ def __repr__(self): return '' % (self.fingerprint, self.state) -class QueueModelMixin(object): +class QueueModel(Model): + __table_name__ = 'queue' + + partition_id = columns.Integer(primary_key=True) + score = columns.Float(primary_key=True) + created_at = columns.BigInt(primary_key=True) id = columns.UUID(primary_key=True, default=uuid.uuid4) - partition_id = columns.Integer(index=True) - score = columns.Float(index=True) url = columns.Text() fingerprint = columns.Text() host_crc32 = columns.Integer() - meta = columns.Map(required=False) - headers = columns.Map(required=False) - cookies = columns.Map(required=False) + meta = columns.Map(columns.Text(), columns.Text(), required=False) + headers = columns.Map(columns.Text(), columns.Text(), required=False) + cookies = columns.Map(columns.Text(), columns.Text(), required=False) method = columns.Text(required=False) - created_at = columns.BigInt(index=True, required=False) depth = columns.SmallInt(required=False) - -class QueueModel(QueueModelMixin, Model): - __table_name__ = 'queue' - @classmethod def query(cls, session): return session.query(cls) diff --git a/frontera/contrib/backends/cassandra/revisiting.py b/frontera/contrib/backends/cassandra/revisiting.py index db58d308a..453349093 100644 --- a/frontera/contrib/backends/cassandra/revisiting.py +++ b/frontera/contrib/backends/cassandra/revisiting.py @@ -1,23 +1,23 @@ # -*- coding: utf-8 -*- import logging +import json from datetime import datetime, timedelta from time import time, sleep -from sqlalchemy import Column, DateTime - from frontera import Request from frontera.contrib.backends.partitioners import Crc32NamePartitioner -from frontera.contrib.backends.sqlalchemy import SQLAlchemyBackend -from frontera.contrib.backends.sqlalchemy.models import QueueModelMixin, DeclarativeBase +from frontera.contrib.backends.cassandra import CassandraBackend +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model from frontera.core.components import Queue as BaseQueue, States from frontera.utils.misc import get_crc32 from frontera.utils.url import parse_domain_from_url_fast -class RevisitingQueueModel(QueueModelMixin, DeclarativeBase): - __tablename__ = 'revisiting_queue' +class RevisitingQueueModel(Model): + __table_name__ = 'revisiting_queue' - crawl_at = Column(DateTime, nullable=False) + crawl_at = columns.DateTime(required=True, default=datetime.now(), index=True) def retry_and_rollback(func): @@ -28,7 +28,6 @@ def func_wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) except Exception, exc: self.logger.exception(exc) - self.session.rollback() sleep(5) tries -= 1 if tries > 0: @@ -48,27 +47,23 @@ def __init__(self, session_cls, queue_cls, partitions): self.partitioner = Crc32NamePartitioner(self.partitions) def frontier_stop(self): - self.session.close() + pass def get_next_requests(self, max_n_requests, partition_id, **kwargs): results = [] try: - for item in self.session.query(self.queue_model).\ - filter(RevisitingQueueModel.crawl_at <= datetime.utcnow(), - RevisitingQueueModel.partition_id == partition_id).\ + for item in self.queue_model.objects.filter(crawl_at=datetime.utcnow(), partition_id=partition_id).\ limit(max_n_requests): method = 'GET' if not item.method else item.method - results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies)) - self.session.delete(item) - self.session.commit() + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, + cookies=item.cookies)) + item.delete() except Exception, exc: self.logger.exception(exc) - self.session.rollback() return results @retry_and_rollback def schedule(self, batch): - to_save = [] for fprint, score, request, schedule_at in batch: if schedule_at: _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) @@ -79,21 +74,43 @@ def schedule(self, batch): else: partition_id = self.partitioner.partition(hostname, self.partitions) host_crc32 = get_crc32(hostname) - q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, - headers=request.headers, cookies=request.cookies, method=request.method, - partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6, - crawl_at=schedule_at) - to_save.append(q) + created_at = time()*1E+6 + q = self._create_queue(request, fprint, score, partition_id, host_crc32, created_at) + + q.save() request.meta['state'] = States.QUEUED - self.session.bulk_save_objects(to_save) - self.session.commit() + + def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, created_at): + db_queue = self.queue_model() + db_queue.fingerprint = fingerprint + db_queue.score = score + db_queue.partition_id = partition_id + db_queue.host_crc32 = host_crc32 + db_queue.url = obj.url + db_queue.created_at = created_at + + new_dict = {} + for kmeta, vmeta in obj.meta.iteritems(): + if type(vmeta) is dict: + new_dict[kmeta] = json.dumps(vmeta) + else: + new_dict[kmeta] = str(vmeta) + + db_queue.meta = new_dict + db_queue.depth = 0 + + db_queue.headers = obj.headers + db_queue.method = obj.method + db_queue.cookies = obj.cookies + + return db_queue @retry_and_rollback def count(self): return self.session.query(self.queue_model).count() -class Backend(SQLAlchemyBackend): +class Backend(CassandraBackend): def _create_queue(self, settings): self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") @@ -119,4 +136,4 @@ def _schedule(self, requests): def page_crawled(self, response, links): super(Backend, self).page_crawled(response, links) - self._schedule([response.request]) \ No newline at end of file + self._schedule([response.request]) diff --git a/frontera/contrib/backends/cassandra/test_backend.py b/frontera/contrib/backends/cassandra/test_backend.py index 45f8c6795..60d281bfb 100644 --- a/frontera/contrib/backends/cassandra/test_backend.py +++ b/frontera/contrib/backends/cassandra/test_backend.py @@ -1,6 +1,5 @@ import os -import pymysql from psycopg2 import connect from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT @@ -11,199 +10,22 @@ #---------------------------------------------------- # SQAlchemy base classes #---------------------------------------------------- -class SQLAlchemyFIFO(backends.FIFOBackendTest): - backend_class = 'frontera.contrib.backends.sqlalchemy.FIFO' +class cassandraFIFO(backends.FIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.FIFO' -class SQLAlchemyLIFO(backends.LIFOBackendTest): - backend_class = 'frontera.contrib.backends.sqlalchemy.LIFO' +class cassandraLIFO(backends.LIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.LIFO' -class SQLAlchemyDFS(backends.DFSBackendTest): - backend_class = 'frontera.contrib.backends.sqlalchemy.DFS' +class cassandraDFS(backends.DFSBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.DFS' -class SQLAlchemyBFS(backends.BFSBackendTest): - backend_class = 'frontera.contrib.backends.sqlalchemy.BFS' +class cassandraBFS(backends.BFSBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.BFS' -class SQLAlchemyRevisiting(RevisitingBackendTest): - backend_class = 'frontera.contrib.backends.sqlalchemy.revisiting.Backend' +class cassandraRevisiting(RevisitingBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.revisiting.Backend' - -#---------------------------------------------------- -# SQLite Memory -#---------------------------------------------------- -class SQLiteMemory(backends.BackendTest): - - def get_settings(self): - settings = super(SQLiteMemory, self).get_settings() - settings.SQLALCHEMYBACKEND_ENGINE = 'sqlite:///:memory:' - return settings - - -class TestSQLiteMemoryFIFO(SQLAlchemyFIFO, SQLiteMemory): - pass - - -class TestSQLiteMemoryLIFO(SQLAlchemyLIFO, SQLiteMemory): - pass - - -class TestSQLiteMemoryDFS(SQLAlchemyDFS, SQLiteMemory): - pass - - -class TestSQLiteMemoryBFS(SQLAlchemyBFS, SQLiteMemory): - pass - - -class TestSQLiteMemoryRevisiting(SQLAlchemyRevisiting): - pass - - -#---------------------------------------------------- -# SQLite File -#---------------------------------------------------- -class SQLiteFile(backends.BackendTest): - - SQLITE_DB_NAME = 'backend_test.db' - - def get_settings(self): - settings = super(SQLiteFile, self).get_settings() - settings.SQLALCHEMYBACKEND_ENGINE = 'sqlite:///' + self.SQLITE_DB_NAME - return settings - - def setup_backend(self, method): - self._delete_test_db() - - def teardown_backend(self, method): - self._delete_test_db() - - def _delete_test_db(self): - try: - os.remove(self.SQLITE_DB_NAME) - except OSError: - pass - - -class TestSQLiteFileFIFO(SQLAlchemyFIFO, SQLiteFile): - pass - - -class TestSQLiteFileLIFO(SQLAlchemyLIFO, SQLiteFile): - pass - - -class TestSQLiteFileDFS(SQLAlchemyDFS, SQLiteFile): - pass - - -class TestSQLiteFileBFS(SQLAlchemyBFS, SQLiteFile): - pass - - -#---------------------------------------------------- -# DB Backend test base -#---------------------------------------------------- -class DBBackendTest(object): - - DB_DATABASE = 'backend_test' - DB_ENGINE = None - DB_HOST = None - DB_USER = None - DB_PASSWORD = None - - def get_settings(self): - settings = super(DBBackendTest, self).get_settings() - settings.SQLALCHEMYBACKEND_ENGINE = self.DB_ENGINE - return settings - - def setup_backend(self, method): - self._delete_database() - self._create_database() - - def teardown_backend(self, method): - self._delete_database() - - def _delete_database(self): - self._execute_sql("DROP DATABASE IF EXISTS %s;" % self.DB_DATABASE) - - def _create_database(self): - self._execute_sql("CREATE DATABASE %s;" % self.DB_DATABASE) - - def _execute_sql(self, sql): - raise NotImplementedError - - -#---------------------------------------------------- -# Mysql -#---------------------------------------------------- -class Mysql(DBBackendTest): - - DB_ENGINE = 'mysql://travis:@localhost/backend_test' - DB_HOST = 'localhost' - DB_USER = 'travis' - DB_PASSWORD = '' - - def _execute_sql(self, sql): - conn = pymysql.connect(host=self.DB_HOST, - user=self.DB_USER, - passwd=self.DB_PASSWORD) - cur = conn.cursor() - cur.execute(sql) - cur.close() - conn.close() - - -class TestMysqlFIFO(Mysql, SQLAlchemyFIFO): - pass - - -class TestMysqlLIFO(Mysql, SQLAlchemyLIFO): - pass - - -class TestMysqlDFS(Mysql, SQLAlchemyDFS): - pass - - -class TestMysqlBFS(Mysql, SQLAlchemyBFS): - pass - - -#---------------------------------------------------- -# Postgres -#---------------------------------------------------- -class Postgres(DBBackendTest): - - DB_ENGINE = 'postgres://postgres@localhost/backend_test' - DB_HOST = 'localhost' - DB_USER = 'postgres' - DB_PASSWORD = '' - - def _execute_sql(self, sql): - conn = connect(host=self.DB_HOST, - user=self.DB_USER, - password=self.DB_PASSWORD) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - cur = conn.cursor() - cur.execute(sql) - cur.close() - conn.close() - - -class TestPostgresFIFO(Postgres, SQLAlchemyFIFO): - pass - - -class TestPostgresLIFO(Postgres, SQLAlchemyLIFO): - pass - - -class TestPostgresDFS(Postgres, SQLAlchemyDFS): - pass - - -class TestPostgresBFS(Postgres, SQLAlchemyBFS): - pass diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 7b38aece7..15fe0b674 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -48,9 +48,7 @@ CASSANDRABACKEND_CACHE_SIZE = 10000 -CASSANDRABACKEND_CLEAR_CONTENT = False CASSANDRABACKEND_DROP_ALL_TABLES = False -CASSANDRABACKEND_ENGINE_ECHO = False CASSANDRABACKEND_MODELS = { 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', @@ -78,7 +76,7 @@ LOGGER = 'frontera.logger.FrontierLogger' LOGGING_ENABLED = True -LOGGING_EVENTS_ENABLED = False +LOGGING_EVENTS_ENABLED = True LOGGING_EVENTS_INCLUDE_METADATA = True LOGGING_EVENTS_INCLUDE_DOMAIN = True LOGGING_EVENTS_INCLUDE_DOMAIN_FIELDS = ['name', 'netloc', 'scheme', 'sld', 'tld', 'subdomain'] @@ -86,19 +84,19 @@ "frontera.logger.handlers.EVENTS", ] -LOGGING_MANAGER_ENABLED = False +LOGGING_MANAGER_ENABLED = True LOGGING_MANAGER_LOGLEVEL = logging.DEBUG LOGGING_MANAGER_HANDLERS = [ "frontera.logger.handlers.CONSOLE_MANAGER", ] -LOGGING_BACKEND_ENABLED = False +LOGGING_BACKEND_ENABLED = True LOGGING_BACKEND_LOGLEVEL = logging.DEBUG LOGGING_BACKEND_HANDLERS = [ "frontera.logger.handlers.CONSOLE_BACKEND", ] -LOGGING_DEBUGGING_ENABLED = False +LOGGING_DEBUGGING_ENABLED = True LOGGING_DEBUGGING_LOGLEVEL = logging.DEBUG LOGGING_DEBUGGING_HANDLERS = [ "frontera.logger.handlers.CONSOLE_DEBUGGING", From 7bb2605bdced9bf40d0d823dca2edd46f5f84d2c Mon Sep 17 00:00:00 2001 From: wpxgit Date: Wed, 30 Mar 2016 15:00:05 +0100 Subject: [PATCH 03/17] Changed Cassandra Backend to run in Dirsibuted Mode, Made much performance enhancements --- .idea/dictionaries/osboxes.xml | 1 + .../contrib/backends/cassandra/__init__.py | 41 +-- .../contrib/backends/cassandra/components.py | 237 ++++++++++-------- frontera/contrib/backends/cassandra/models.py | 88 ++++--- frontera/settings/default_settings.py | 4 +- 5 files changed, 212 insertions(+), 159 deletions(-) diff --git a/.idea/dictionaries/osboxes.xml b/.idea/dictionaries/osboxes.xml index 77ef20cf7..900c5e91e 100644 --- a/.idea/dictionaries/osboxes.xml +++ b/.idea/dictionaries/osboxes.xml @@ -3,6 +3,7 @@ cassandrabackend keyspace + scrapy \ No newline at end of file diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 47ff9327c..7646f9391 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -1,12 +1,9 @@ from __future__ import absolute_import -import logging - from cassandra.cluster import Cluster from cassandra.cqlengine import connection from cassandra.query import dict_factory from cassandra.cqlengine.management import sync_table from cassandra.cqlengine.management import drop_table - from frontera.core.components import DistributedBackend from frontera.contrib.backends import CommonBackend from frontera.contrib.backends.cassandra.components import Metadata, Queue, States @@ -23,6 +20,7 @@ def __init__(self, manager): keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true models = settings.get('CASSANDRABACKEND_MODELS') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) @@ -30,6 +28,7 @@ def __init__(self, manager): self.session_cls = self.cluster.connect() self.session_cls.row_factory = dict_factory self.session_cls.encoder.mapping[dict] = self.session_cls.encoder.cql_encode_map_collection + self.crawl_id = crawl_id if keyspace_create: query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" @@ -48,18 +47,18 @@ def __init__(self, manager): sync_table(value) self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], - settings.get('CASSANDRABACKEND_CACHE_SIZE')) + settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id=self.crawl_id) self._states = States(self.session_cls, self.models['StateModel'], - settings.get('STATE_CACHE_SIZE_LIMIT')) + settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id=self.crawl_id) self._queue = self._create_queue(settings) def frontier_stop(self): self.states.flush() self.session_cls.shutdown() - # super(CassandraBackend, self).frontier_stop() def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + crawl_id=self.crawl_id) @property def queue(self): @@ -89,32 +88,9 @@ def _create_queue(self, settings): return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), ordering='created_desc') - -class DFSBackend(CassandraBackend): - component_name = 'Cassandra DFS Backend' - - def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) - - def _get_score(self, obj): - return -obj.meta['depth'] - - -class BFSBackend(CassandraBackend): - component_name = 'Cassandra BFS Backend' - - def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) - - def _get_score(self, obj): - return obj.meta['depth'] - - BASE = CommonBackend LIFO = LIFOBackend FIFO = FIFOBackend -DFS = DFSBackend -BFS = BFSBackend class Distributed(DistributedBackend): @@ -126,7 +102,6 @@ def __init__(self, manager): keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true models = settings.get('CASSANDRABACKEND_MODELS') - logging.warning('init_dist_be') self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) @@ -169,12 +144,15 @@ def db_worker(cls, manager): metadata_m = b.models['MetadataModel'] queue_m = b.models['QueueModel'] + stats_m = b.models['CrawlStatsModel'] if drop: drop_table(metadata_m) drop_table(queue_m) + drop_table(stats_m) sync_table(metadata_m) sync_table(queue_m) + sync_table(stats_m) b._metadata = Metadata(b.session_cls, metadata_m, settings.get('CASSANDRABACKEND_CACHE_SIZE')) @@ -204,7 +182,6 @@ def frontier_stop(self): component.frontier_stop() def add_seeds(self, seeds): - logging.warning('add_seeds_top %s' % seeds) self.metadata.add_seeds(seeds) def get_next_requests(self, max_next_requests, **kwargs): diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 6695b5b98..aa38103f9 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -2,15 +2,15 @@ import logging from datetime import datetime from time import time, sleep -import json - from cachetools import LRUCache from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.memory import MemoryStates from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue from frontera.core.models import Request, Response -from frontera.utils.misc import get_crc32 +from frontera.utils.misc import get_crc32, chunks from frontera.utils.url import parse_domain_from_url_fast +from cassandra.concurrent import execute_concurrent_with_args +from frontera.contrib.backends.cassandra.models import Meta def retry_and_rollback(func): @@ -24,7 +24,7 @@ def func_wrapper(self, *args, **kwargs): sleep(5) tries -= 1 if tries > 0: - self.logger.info("Tries left %i" % tries) + self.logger.info("Tries left %i", tries) continue else: raise exc @@ -32,65 +32,80 @@ def func_wrapper(self, *args, **kwargs): class Metadata(BaseMetadata): - def __init__(self, session_cls, model_cls, cache_size): + def __init__(self, session_cls, model_cls, cache_size, crawl_id='default'): self.session = session_cls self.model = model_cls self.table = 'MetadataModel' self.cache = LRUCache(cache_size) self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") + self.crawl_id = crawl_id def frontier_stop(self): pass @retry_and_rollback def add_seeds(self, seeds): + cql_items = [] for seed in seeds: o = self._create_page(seed) self.cache[o.fingerprint] = o - o.save() + + query = self.session.prepare( + "INSERT INTO metadata (crawl, fingerprint, url, created_at, meta, headers, cookies, method, depth) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") + meta = Meta(domain=seed.meta['domain'], fingerprint=seed.meta['fingerprint'], + origin_is_frontier=seed.meta['origin_is_frontier'], + scrapy_callback=seed.meta['scrapy_callback'], scrapy_errback=seed.meta['scrapy_errback'], + scrapy_meta=seed.meta['scrapy_meta']) + cql_i = (self.crawl_id, seed.meta['fingerprint'], seed.url, datetime.utcnow(), meta, + seed.headers, seed.cookies, seed.method, o.depth) + cql_items.append(cql_i) + if len(seeds) > 0: + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.cass_count({"seed_urls": len(seeds)}) @retry_and_rollback def request_error(self, page, error): - m = self._modify_page(page) if page.meta['fingerprint'] in self.cache else self._create_page(page) + m = self._create_page(page) m.error = error self.cache[m.fingerprint] = m - m.update() + query_page = self.session.prepare( + "UPDATE metadata SET error = ? WHERE fingerprint = ?") + self.session.execute(query_page, (error, page.meta['fingerprint'])) + self.cass_count({"error": 1}) @retry_and_rollback def page_crawled(self, response, links): - r = self._modify_page(response) - self.cache[r.fingerprint] = r - r.save() + query_page = self.session.prepare( + "UPDATE metadata SET fetched_at = ?, headers = ?, method = ?, cookies = ?, status_code = ? " + "WHERE crawl= ? AND fingerprint = ?") + self.session.execute_async(query_page, (datetime.utcnow(), response.request.headers, response.request.method, + response.request.cookies, response.status_code, self.crawl_id, + response.meta['fingerprint'])) + + query = self.session.prepare( + "INSERT INTO metadata (crawl, fingerprint, created_at, method, url, depth) VALUES (?, ?, ?, ?, ?, ?)") + cql_items = [] for link in links: if link.meta['fingerprint'] not in self.cache: + link.depth = self.cache[response.meta['fingerprint']].depth+1 l = self._create_page(link) self.cache[link.meta['fingerprint']] = l - l.save() - - def _modify_page(self, obj): - db_page = self.cache[obj.meta['fingerprint']] - db_page.fetched_at = datetime.utcnow() - if isinstance(obj, Response): - db_page.headers = obj.request.headers - db_page.method = obj.request.method - db_page.cookies = obj.request.cookies - db_page.status_code = obj.status_code - return db_page + cql_i = (self.crawl_id, link.meta['fingerprint'], datetime.utcnow(), link.method, link.url, link.depth) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.cass_count({"pages_crawled": 1, "links_found": len(cql_items)}) def _create_page(self, obj): db_page = self.model() db_page.fingerprint = obj.meta['fingerprint'] db_page.url = obj.url db_page.created_at = datetime.utcnow() - new_dict = {} - for kmeta, vmeta in obj.meta.iteritems(): - if type(vmeta) is dict: - new_dict[kmeta] = json.dumps(vmeta) - else: - new_dict[kmeta] = str(vmeta) - - db_page.meta = new_dict - db_page.depth = 0 + db_page.meta = obj.meta + if hasattr(obj, 'depth'): + db_page.depth = obj.depth + else: + db_page.depth = 0 if isinstance(obj, Request): db_page.headers = obj.headers @@ -105,19 +120,29 @@ def _create_page(self, obj): @retry_and_rollback def update_score(self, batch): + query = self.session.prepare("UPDATE metadata SET score = ? WHERE fingerprint = ?") + cql_items = [] for fprint, score, request, schedule in batch: - m = self.model(fingerprint=fprint, score=score) - m.update() + cql_i = (score, fprint) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.cass_count({"scored_urls": len(cql_items)}) + + def cass_count(self, counts): + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) class States(MemoryStates): - def __init__(self, session_cls, model_cls, cache_size_limit): + def __init__(self, session_cls, model_cls, cache_size_limit, crawl_id='default'): super(States, self).__init__(cache_size_limit) self.session = session_cls self.model = model_cls self.table = 'StateModel' self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") + self.crawl_id = crawl_id @retry_and_rollback def frontier_stop(self): @@ -126,30 +151,33 @@ def frontier_stop(self): @retry_and_rollback def fetch(self, fingerprints): to_fetch = [f for f in fingerprints if f not in self._cache] - self.logger.debug("cache size %s" % len(self._cache)) - self.logger.debug("to fetch %d from %d" % (len(to_fetch), len(fingerprints))) + self.logger.debug("cache size %s", len(self._cache)) + self.logger.debug("to fetch %d from %d", (len(to_fetch), len(fingerprints))) - for chunk in to_fetch: - for state in self.model.objects.filter(fingerprint=chunk): + for chunk in chunks(to_fetch, 128): + for state in self.model.objects.filter(crawl=self.crawl_id, fingerprint__in=chunk): self._cache[state.fingerprint] = state.state @retry_and_rollback def flush(self, force_clear=False): + query = self.session.prepare("INSERT INTO states (crawl, fingerprint, state) VALUES (?, ?, ?)") + cql_items = [] for fingerprint, state_val in self._cache.iteritems(): - state = self.model(fingerprint=fingerprint, state=state_val) - state.save() - self.logger.debug("State cache has been flushed.") + cql_i = (self.crawl_id, fingerprint, state_val) + cql_items.append(cql_i) + execute_concurrent_with_args(self.session, query, cql_items, concurrency=20000) super(States, self).flush(force_clear) class Queue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions, ordering='default'): + def __init__(self, session_cls, queue_cls, partitions, ordering='default', crawl_id='default'): self.session = session_cls self.queue_model = queue_cls self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") self.partitions = [i for i in range(0, partitions)] self.partitioner = Crc32NamePartitioner(self.partitions) self.ordering = ordering + self.crawl_id = crawl_id def frontier_stop(self): pass @@ -171,16 +199,41 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): """ results = [] try: - for item in self.queue_model.objects.filter(partition_id=partition_id).order_by("score", self._order_by()).\ - limit(max_n_requests): - item.meta = self.gen_meta(item.meta) + dequeued_urls = 0 + cql_ditems = [] + d_query = self.session.prepare("DELETE FROM queue WHERE crawl = ? AND fingerprint = ? AND partition_id = ? " + "AND score = ? AND created_at = ?") + for item in self.queue_model.objects.filter(partition_id=partition_id, crawl=self.crawl_id).\ + order_by("crawl", "score", self._order_by()).limit(max_n_requests): method = 'GET' if not item.method else item.method - r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies) + + meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta) if not name.startswith('__')) + # TODO: How the result can be an dict not an object -> Objects get error while encodeing for Message Bus + # If I take meta_dict2 direct to Request i get the same error message + + meta_dict = dict() + meta_dict["fingerprint"] = meta_dict2["fingerprint"] + meta_dict["domain"] = meta_dict2["domain"] + meta_dict["origin_is_frontier"] = meta_dict2["origin_is_frontier"] + meta_dict["scrapy_callback"] = meta_dict2["scrapy_callback"] + meta_dict["scrapy_errback"] = meta_dict2["scrapy_errback"] + meta_dict["scrapy_meta"] = meta_dict2["scrapy_meta"] + meta_dict["score"] = meta_dict2["score"] + meta_dict["jid"] = meta_dict2["jid"] + + r = Request(item.url, method=method, meta=meta_dict, headers=item.headers, cookies=item.cookies) r.meta['fingerprint'] = item.fingerprint r.meta['score'] = item.score - self.queue_model.delete(item) results.append(r) - item.delete() + + cql_d = (item.crawl, item.fingerprint, item.partition_id, item.score, item.created_at) + cql_ditems.append(cql_d) + dequeued_urls += 1 + + if dequeued_urls > 0: + execute_concurrent_with_args(self.session, d_query, cql_ditems, concurrency=200) + + self.cass_count({"dequeued_urls": dequeued_urls}) except Exception, exc: self.logger.exception(exc) @@ -189,6 +242,10 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): @retry_and_rollback def schedule(self, batch): + query = self.session.prepare("INSERT INTO queue (crawl, fingerprint, score, partition_id, host_crc32, url, " + "created_at, meta, depth, headers, method, cookies) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + cql_items = [] for fprint, score, request, schedule in batch: if schedule: _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) @@ -200,60 +257,45 @@ def schedule(self, batch): partition_id = self.partitioner.partition(hostname, self.partitions) host_crc32 = get_crc32(hostname) created_at = time()*1E+6 - q = self._create_queue(request, fprint, score, partition_id, host_crc32, created_at) - - q.save() - request.meta['state'] = States.QUEUED - - def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, created_at): - db_queue = self.queue_model() - db_queue.fingerprint = fingerprint - db_queue.score = score - db_queue.partition_id = partition_id - db_queue.host_crc32 = host_crc32 - db_queue.url = obj.url - db_queue.created_at = created_at - - new_dict = {} - for kmeta, vmeta in obj.meta.iteritems(): - if type(vmeta) is dict: - new_dict[kmeta] = json.dumps(vmeta) - else: - new_dict[kmeta] = str(vmeta) - db_queue.meta = new_dict - db_queue.depth = 0 + if "domain" not in request.meta: + request.meta["domain"] = {} + if "origin_is_frontier" not in request.meta: + request.meta["origin_is_frontier"] = '' + if "scrapy_callback" not in request.meta: + request.meta["scrapy_callback"] = None + if "scrapy_errback" not in request.meta: + request.meta["scrapy_errback"] = None + if "scrapy_meta" not in request.meta: + request.meta["scrapy_meta"] = {} + if "score" not in request.meta: + request.meta["score"] = 0 + if "jid" not in request.meta: + request.meta["jid"] = 0 + + meta = Meta(domain=request.meta['domain'], fingerprint=fprint, + origin_is_frontier=request.meta['origin_is_frontier'], + scrapy_callback=request.meta['scrapy_callback'], + scrapy_errback=request.meta['scrapy_errback'], scrapy_meta=request.meta['scrapy_meta']) + + cql_i = (self.crawl_id, fprint, score, partition_id, host_crc32, request.url, created_at, meta, 0, + request.headers, request.method, request.cookies) + cql_items.append(cql_i) - db_queue.headers = obj.headers - db_queue.method = obj.method - db_queue.cookies = obj.cookies + request.meta['state'] = States.QUEUED - return db_queue + execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) + self.cass_count({"queued_urls": len(cql_items)}) @retry_and_rollback def count(self): - return self.queue_model.objects.count() + count = self.queue_model.objects.filter(crawl=self.crawl_id).count() + return count - def gen_meta(self, meta): - ret_meta = {} - for kmeta, vmeta in meta.iteritems(): - try: - json_object = json.loads(vmeta) - except ValueError, e: - ret_meta[kmeta] = vmeta - else: - ret_meta[kmeta] = json_object - - if ret_meta['scrapy_callback'] == "None": - ret_meta['scrapy_callback'] = None - if ret_meta['scrapy_errback'] == "None": - ret_meta['scrapy_errback'] = None - if ret_meta['state'] == "None": - ret_meta['state'] = None - if ret_meta['origin_is_frontier'] == "True": - ret_meta['origin_is_frontier'] = True - ret_meta['depth'] = int(ret_meta['depth']) - return ret_meta + def cass_count(self, counts): + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET " + row + " = " + row + " + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) class BroadCrawlingQueue(Queue): @@ -290,9 +332,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): (tries, limit, count, len(queue.keys()))) queue.clear() count = 0 - for item in self.queue_model.objects.filter(partition_id=partition_id).order_by(self._order_by()).\ - limit(limit): - item.meta = self.gen_meta(item.meta) + for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\ + order_by("crawl", "score", self._order_by()).limit(limit): if item.host_crc32 not in queue: queue[item.host_crc32] = [] if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host: diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py index afd61eab3..0f719e0a7 100644 --- a/frontera/contrib/backends/cassandra/models.py +++ b/frontera/contrib/backends/cassandra/models.py @@ -1,24 +1,36 @@ # -*- coding: utf-8 -*- import uuid -from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model +from cassandra.cqlengine.usertype import UserType +from cassandra.cqlengine.columns import * +class Meta(UserType): + domain = Map(Text(), Text(), required=False) + fingerprint = Text() + origin_is_frontier = Boolean() + scrapy_callback = Text() + scrapy_errback = Text() + scrapy_meta = Map(Text(), Text(), required=False) + score = Float(required=False) + jid = Integer(required=False) + class MetadataModel(Model): __table_name__ = 'metadata' - fingerprint = columns.Text(primary_key=True) - url = columns.Text(index=True) - depth = columns.Integer() - created_at = columns.DateTime() - fetched_at = columns.DateTime(required=False) - status_code = columns.Integer(required=False) - score = columns.Float(required=False) - error = columns.Text(required=False) - meta = columns.Map(columns.Text(), columns.Text(), required=False) - headers = columns.Map(columns.Text(), columns.Text(), required=False) - cookies = columns.Map(columns.Text(), columns.Text(), required=False) - method = columns.Text(required=False) + crawl = Text(primary_key=True) + fingerprint = Text(primary_key=True) + url = Text(index=True) + depth = Integer() + created_at = DateTime() + fetched_at = DateTime(required=False) + status_code = Integer(required=False) + score = Float(required=False) + error = Text(required=False) + meta = UserDefinedType(Meta) + headers = Map(Text(), Text(), required=False) + cookies = Map(Text(), Text(), required=False) + method = Text(required=False) @classmethod def query(cls, session): @@ -31,9 +43,9 @@ def __repr__(self): class StateModel(Model): __table_name__ = 'states' - state_id = columns.UUID(primary_key=True, default=uuid.uuid4) - fingerprint = columns.Text(primary_key=True, index=True) - state = columns.SmallInt(index=True) + crawl = Text(primary_key=True) + fingerprint = Text(primary_key=True) + state = SmallInt(index=True) @classmethod def query(cls, session): @@ -46,18 +58,18 @@ def __repr__(self): class QueueModel(Model): __table_name__ = 'queue' - partition_id = columns.Integer(primary_key=True) - score = columns.Float(primary_key=True) - created_at = columns.BigInt(primary_key=True) - id = columns.UUID(primary_key=True, default=uuid.uuid4) - url = columns.Text() - fingerprint = columns.Text() - host_crc32 = columns.Integer() - meta = columns.Map(columns.Text(), columns.Text(), required=False) - headers = columns.Map(columns.Text(), columns.Text(), required=False) - cookies = columns.Map(columns.Text(), columns.Text(), required=False) - method = columns.Text(required=False) - depth = columns.SmallInt(required=False) + partition_id = Integer(primary_key=True) + crawl = Text(primary_key=True) + score = Float(primary_key=True) + created_at = BigInt(primary_key=True) + fingerprint = Text(primary_key=True) + url = Text() + host_crc32 = Integer() + meta = UserDefinedType(Meta) + headers = Map(Text(), Text(), required=False) + cookies = Map(Text(), Text(), required=False) + method = Text(required=False) + depth = SmallInt(required=False) @classmethod def query(cls, session): @@ -65,3 +77,23 @@ def query(cls, session): def __repr__(self): return '' % (self.url, self.id) + + +class CrawlStatsModel(Model): + __table_name__ = 'crawlstats' + + crawl = Text(primary_key=True) + pages_crawled = Counter() + links_found = Counter() + errors = Counter() + seed_urls = Counter() + scored_urls = Counter() + queued_urls = Counter() + dequeued_urls = Counter() + + @classmethod + def query(cls, session): + return session.query(cls) + + def __repr__(self): + return '' % (self.url, self.id) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 15fe0b674..3f8063f2f 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -52,13 +52,15 @@ CASSANDRABACKEND_MODELS = { 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', - 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel' + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel' } CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(days=1) CASSANDRABACKEND_CLUSTER_IPS = ['127.0.0.1'] CASSANDRABACKEND_CLUSTER_PORT = 9042 CASSANDRABACKEND_KEYSPACE = 'frontera' CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS = True +CASSANDRABACKEND_CRAWL_ID="default" STATE_CACHE_SIZE = 1000000 From 62383554efa91783d34286d0e932cc077e3354ac Mon Sep 17 00:00:00 2001 From: wpxgit Date: Wed, 30 Mar 2016 16:09:55 +0100 Subject: [PATCH 04/17] Added cassandra-driver to test requirements --- requirements/tests.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/tests.txt b/requirements/tests.txt index d0de882d3..31f472e49 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -8,4 +8,5 @@ SQLAlchemy>=1.0.0 cachetools pyzmq msgpack-python +cassandra-driver From 039fd0ea1fdc2f4b2bcfda004f636b4b65190bbe Mon Sep 17 00:00:00 2001 From: wpxgit Date: Wed, 30 Mar 2016 16:12:08 +0100 Subject: [PATCH 05/17] Removed not existing classes from test --- frontera/contrib/backends/cassandra/test_backend.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/frontera/contrib/backends/cassandra/test_backend.py b/frontera/contrib/backends/cassandra/test_backend.py index 60d281bfb..1570d4c4c 100644 --- a/frontera/contrib/backends/cassandra/test_backend.py +++ b/frontera/contrib/backends/cassandra/test_backend.py @@ -8,7 +8,7 @@ #---------------------------------------------------- -# SQAlchemy base classes +# Cassandra base classes #---------------------------------------------------- class cassandraFIFO(backends.FIFOBackendTest): backend_class = 'frontera.contrib.backends.cassandra.FIFO' @@ -18,14 +18,6 @@ class cassandraLIFO(backends.LIFOBackendTest): backend_class = 'frontera.contrib.backends.cassandra.LIFO' -class cassandraDFS(backends.DFSBackendTest): - backend_class = 'frontera.contrib.backends.cassandra.DFS' - - -class cassandraBFS(backends.BFSBackendTest): - backend_class = 'frontera.contrib.backends.cassandra.BFS' - - class cassandraRevisiting(RevisitingBackendTest): backend_class = 'frontera.contrib.backends.cassandra.revisiting.Backend' From 667e65c784b755a921bf3f9b03dbba655a5730ca Mon Sep 17 00:00:00 2001 From: wpxgit Date: Wed, 30 Mar 2016 16:14:49 +0100 Subject: [PATCH 06/17] Changed Loggin in defaults back to false --- frontera/settings/default_settings.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 3f8063f2f..c35472a77 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -78,7 +78,7 @@ LOGGER = 'frontera.logger.FrontierLogger' LOGGING_ENABLED = True -LOGGING_EVENTS_ENABLED = True +LOGGING_EVENTS_ENABLED = False LOGGING_EVENTS_INCLUDE_METADATA = True LOGGING_EVENTS_INCLUDE_DOMAIN = True LOGGING_EVENTS_INCLUDE_DOMAIN_FIELDS = ['name', 'netloc', 'scheme', 'sld', 'tld', 'subdomain'] @@ -86,19 +86,19 @@ "frontera.logger.handlers.EVENTS", ] -LOGGING_MANAGER_ENABLED = True +LOGGING_MANAGER_ENABLED = False LOGGING_MANAGER_LOGLEVEL = logging.DEBUG LOGGING_MANAGER_HANDLERS = [ "frontera.logger.handlers.CONSOLE_MANAGER", ] -LOGGING_BACKEND_ENABLED = True +LOGGING_BACKEND_ENABLED = False LOGGING_BACKEND_LOGLEVEL = logging.DEBUG LOGGING_BACKEND_HANDLERS = [ "frontera.logger.handlers.CONSOLE_BACKEND", ] -LOGGING_DEBUGGING_ENABLED = True +LOGGING_DEBUGGING_ENABLED = False LOGGING_DEBUGGING_LOGLEVEL = logging.DEBUG LOGGING_DEBUGGING_HANDLERS = [ "frontera.logger.handlers.CONSOLE_DEBUGGING", From 559400e1c6a1bb2fbf5f16feb7bd66697381259b Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 13:22:22 +0100 Subject: [PATCH 07/17] Remove .idea --- .idea/dictionaries/osboxes.xml | 9 --------- .idea/vcs.xml | 6 ------ 2 files changed, 15 deletions(-) delete mode 100644 .idea/dictionaries/osboxes.xml delete mode 100644 .idea/vcs.xml diff --git a/.idea/dictionaries/osboxes.xml b/.idea/dictionaries/osboxes.xml deleted file mode 100644 index 900c5e91e..000000000 --- a/.idea/dictionaries/osboxes.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - cassandrabackend - keyspace - scrapy - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7f4..000000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From 4ffe03895bf90fe10a2bdb80a9d2f50f1e3cef86 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 13:28:06 +0100 Subject: [PATCH 08/17] Changed nameing of session from session_cls to session --- .../contrib/backends/cassandra/__init__.py | 40 +++++++++---------- .../contrib/backends/cassandra/components.py | 12 +++--- .../contrib/backends/cassandra/revisiting.py | 6 +-- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 7646f9391..1ff50ca87 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -25,19 +25,19 @@ def __init__(self, manager): self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) - self.session_cls = self.cluster.connect() - self.session_cls.row_factory = dict_factory - self.session_cls.encoder.mapping[dict] = self.session_cls.encoder.cql_encode_map_collection + self.session = self.cluster.connect() + self.session.row_factory = dict_factory + self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection self.crawl_id = crawl_id if keyspace_create: query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) - self.session_cls.execute(query) + self.session.execute(query) - self.session_cls.set_keyspace(keyspace) + self.session.set_keyspace(keyspace) - connection.set_session(self.session_cls) + connection.set_session(self.session) if drop_all_tables: for key, value in self.models.iteritems(): @@ -46,18 +46,18 @@ def __init__(self, manager): for key, value in self.models.iteritems(): sync_table(value) - self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], + self._metadata = Metadata(self.session, self.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id=self.crawl_id) - self._states = States(self.session_cls, self.models['StateModel'], + self._states = States(self.session, self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id=self.crawl_id) self._queue = self._create_queue(settings) def frontier_stop(self): self.states.flush() - self.session_cls.shutdown() + self.session.shutdown() def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), crawl_id=self.crawl_id) @property @@ -77,7 +77,7 @@ class FIFOBackend(CassandraBackend): component_name = 'Cassandra FIFO Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), ordering='created') @@ -85,7 +85,7 @@ class LIFOBackend(CassandraBackend): component_name = 'Cassandra LIFO Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), ordering='created_desc') BASE = CommonBackend @@ -106,15 +106,15 @@ def __init__(self, manager): self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) - self.session_cls = self.cluster.connect() - self.session_cls.row_factory = dict_factory + self.session = self.cluster.connect() + self.session.row_factory = dict_factory if keyspace_create: query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) - self.session_cls.execute(query) - self.session_cls.set_keyspace(keyspace) - connection.set_session(self.session_cls) + self.session.execute(query) + self.session.set_keyspace(keyspace) + connection.set_session(self.session) self._metadata = None self._queue = None @@ -132,7 +132,7 @@ def strategy_worker(cls, manager): sync_table(model) - b._states = States(b.session_cls, model, + b._states = States(b.session, model, settings.get('STATE_CACHE_SIZE_LIMIT')) return b @@ -154,9 +154,9 @@ def db_worker(cls, manager): sync_table(queue_m) sync_table(stats_m) - b._metadata = Metadata(b.session_cls, metadata_m, + b._metadata = Metadata(b.session, metadata_m, settings.get('CASSANDRABACKEND_CACHE_SIZE')) - b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) + b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) return b @property diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index aa38103f9..7aaa7161a 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -32,8 +32,8 @@ def func_wrapper(self, *args, **kwargs): class Metadata(BaseMetadata): - def __init__(self, session_cls, model_cls, cache_size, crawl_id='default'): - self.session = session_cls + def __init__(self, session, model_cls, cache_size, crawl_id='default'): + self.session = session self.model = model_cls self.table = 'MetadataModel' self.cache = LRUCache(cache_size) @@ -136,9 +136,9 @@ def cass_count(self, counts): class States(MemoryStates): - def __init__(self, session_cls, model_cls, cache_size_limit, crawl_id='default'): + def __init__(self, session, model_cls, cache_size_limit, crawl_id='default'): super(States, self).__init__(cache_size_limit) - self.session = session_cls + self.session = session self.model = model_cls self.table = 'StateModel' self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") @@ -170,8 +170,8 @@ def flush(self, force_clear=False): class Queue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions, ordering='default', crawl_id='default'): - self.session = session_cls + def __init__(self, session, queue_cls, partitions, ordering='default', crawl_id='default'): + self.session = session self.queue_model = queue_cls self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") self.partitions = [i for i in range(0, partitions)] diff --git a/frontera/contrib/backends/cassandra/revisiting.py b/frontera/contrib/backends/cassandra/revisiting.py index 453349093..51ce212fc 100644 --- a/frontera/contrib/backends/cassandra/revisiting.py +++ b/frontera/contrib/backends/cassandra/revisiting.py @@ -39,8 +39,8 @@ def func_wrapper(self, *args, **kwargs): class RevisitingQueue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions): - self.session = session_cls() + def __init__(self, session, queue_cls, partitions): + self.session = session() self.queue_model = queue_cls self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.revisiting.RevisitingQueue") self.partitions = [i for i in range(0, partitions)] @@ -115,7 +115,7 @@ class Backend(CassandraBackend): def _create_queue(self, settings): self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") assert isinstance(self.interval, timedelta) - return RevisitingQueue(self.session_cls, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) + return RevisitingQueue(self.session, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) def _schedule(self, requests): batch = [] From 2e953232fdd73072d596201aa384179aa4267b9d Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 14:49:38 +0100 Subject: [PATCH 09/17] Changed retry logic to cassanrda driver based retry logic. Isn't tested because of the complex test-setup --- .../contrib/backends/cassandra/__init__.py | 12 ++++++- .../contrib/backends/cassandra/components.py | 31 ++----------------- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 1ff50ca87..26e040873 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -2,6 +2,7 @@ from cassandra.cluster import Cluster from cassandra.cqlengine import connection from cassandra.query import dict_factory +from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy from cassandra.cqlengine.management import sync_table from cassandra.cqlengine.management import drop_table from frontera.core.components import DistributedBackend @@ -22,9 +23,16 @@ def __init__(self, manager): models = settings.get('CASSANDRABACKEND_MODELS') crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') - self.cluster = Cluster(cluster_ips, cluster_port) self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) + self.cluster = Cluster( + contact_points=cluster_ips, + port=cluster_port, + compression=True, + default_retry_policy=RetryPolicy(), + reconnection_policy=ConstantReconnectionPolicy(10, 100) + ) + self.session = self.cluster.connect() self.session.row_factory = dict_factory self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection @@ -199,3 +207,5 @@ def request_error(self, request, error): def finished(self): return NotImplementedError + + diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 7aaa7161a..e1ad789a7 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -13,24 +13,6 @@ from frontera.contrib.backends.cassandra.models import Meta -def retry_and_rollback(func): - def func_wrapper(self, *args, **kwargs): - tries = 5 - while True: - try: - return func(self, *args, **kwargs) - except Exception, exc: - self.logger.exception(exc) - sleep(5) - tries -= 1 - if tries > 0: - self.logger.info("Tries left %i", tries) - continue - else: - raise exc - return func_wrapper - - class Metadata(BaseMetadata): def __init__(self, session, model_cls, cache_size, crawl_id='default'): self.session = session @@ -43,7 +25,6 @@ def __init__(self, session, model_cls, cache_size, crawl_id='default'): def frontier_stop(self): pass - @retry_and_rollback def add_seeds(self, seeds): cql_items = [] for seed in seeds: @@ -64,7 +45,6 @@ def add_seeds(self, seeds): execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) self.cass_count({"seed_urls": len(seeds)}) - @retry_and_rollback def request_error(self, page, error): m = self._create_page(page) m.error = error @@ -74,7 +54,6 @@ def request_error(self, page, error): self.session.execute(query_page, (error, page.meta['fingerprint'])) self.cass_count({"error": 1}) - @retry_and_rollback def page_crawled(self, response, links): query_page = self.session.prepare( "UPDATE metadata SET fetched_at = ?, headers = ?, method = ?, cookies = ?, status_code = ? " @@ -118,7 +97,6 @@ def _create_page(self, obj): db_page.status_code = obj.status_code return db_page - @retry_and_rollback def update_score(self, batch): query = self.session.prepare("UPDATE metadata SET score = ? WHERE fingerprint = ?") cql_items = [] @@ -144,11 +122,9 @@ def __init__(self, session, model_cls, cache_size_limit, crawl_id='default'): self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") self.crawl_id = crawl_id - @retry_and_rollback def frontier_stop(self): pass - @retry_and_rollback def fetch(self, fingerprints): to_fetch = [f for f in fingerprints if f not in self._cache] self.logger.debug("cache size %s", len(self._cache)) @@ -158,7 +134,6 @@ def fetch(self, fingerprints): for state in self.model.objects.filter(crawl=self.crawl_id, fingerprint__in=chunk): self._cache[state.fingerprint] = state.state - @retry_and_rollback def flush(self, force_clear=False): query = self.session.prepare("INSERT INTO states (crawl, fingerprint, state) VALUES (?, ?, ?)") cql_items = [] @@ -207,7 +182,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): order_by("crawl", "score", self._order_by()).limit(max_n_requests): method = 'GET' if not item.method else item.method - meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta) if not name.startswith('__')) + meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta) + if not name.startswith('__')) # TODO: How the result can be an dict not an object -> Objects get error while encodeing for Message Bus # If I take meta_dict2 direct to Request i get the same error message @@ -240,7 +216,6 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): return results - @retry_and_rollback def schedule(self, batch): query = self.session.prepare("INSERT INTO queue (crawl, fingerprint, score, partition_id, host_crc32, url, " "created_at, meta, depth, headers, method, cookies) " @@ -287,7 +262,6 @@ def schedule(self, batch): execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) self.cass_count({"queued_urls": len(cql_items)}) - @retry_and_rollback def count(self): count = self.queue_model.objects.filter(crawl=self.crawl_id).count() return count @@ -301,7 +275,6 @@ def cass_count(self, counts): class BroadCrawlingQueue(Queue): GET_RETRIES = 3 - @retry_and_rollback def get_next_requests(self, max_n_requests, partition_id, **kwargs): """ Dequeues new batch of requests for crawling. From c53c3f72695f4243115f59ce325504c5b98d4490 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 17:17:21 +0100 Subject: [PATCH 10/17] Added Documentation about Cassandra settings. Updated Docs readme to install rdt theme if missing. Updated Code to only run in FIFO algorithm because cassandras limiting ORDER functionality --- docs/README | 3 + docs/source/topics/frontera-settings.rst | 82 +++++++++++++++++++ docs/source/topics/frontier-backends.rst | 28 +++++++ .../contrib/backends/cassandra/__init__.py | 25 +----- .../contrib/backends/cassandra/components.py | 14 ++-- frontera/contrib/backends/cassandra/models.py | 2 +- 6 files changed, 123 insertions(+), 31 deletions(-) diff --git a/docs/README b/docs/README index 3d9114563..fd04c32f5 100644 --- a/docs/README +++ b/docs/README @@ -30,6 +30,9 @@ from this dir:: Documentation will be generated (in HTML format) inside the ``build/html`` dir. +If you get the error "ImportError: No module named sphinx_rtd_theme" run: + sudo pip install sphinx-rtd-theme + View the documentation ---------------------- diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 9a11db483..5d18d5c00 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -427,6 +427,88 @@ Default: ``timedelta(days=1)`` Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect documents scheduled after the change. All previously queued documents will be crawled with old periodicity. +.. _cassandra-settings: + +Cassandra +--------- + + +.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES + +CASSANDRABACKEND_DROP_ALL_TABLES +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``False`` + +Set to ``True`` if you need to drop of all DB tables on backend instantiation (e.g. every Scrapy spider run). + +.. setting:: SQLALCHEMYBACKEND_ENGINE + +CASSANDRABACKEND_CLUSTER_IPS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``['127.0.0.1']`` + +Set IPs from Cassandra Cluster. Default is localhost. To assign more than one IP use this Syntax: ``['192.168.0.1', '192.168.0.2']`` + +CASSANDRABACKEND_CLUSTER_PORT +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``9042`` + +Set port from Cassandra Cluster / Nodes + + +CASSANDRABACKEND_KEYSPACE +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``frontera`` + +Set cassandra Keyspace + +CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``True`` + +Creates Keyspace if it not exist. Set to false if you frontera shouldn't check on every startup. + + +CASSANDRABACKEND_CRAWL_ID +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``default`` + +Sets an ID in each table for the actual crawl. If you want to run another crawl from begining in same Table set to another Crawl ID. Its an Text field. + + +CASSANDRABACKEND_MODELS +^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: + + { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel' + } + +This is mapping with Cassandra models used by backends. It is mainly used for customization. + + +Revisiting backend +------------------ + +.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL + +CASSANDRABACKEND_REVISIT_INTERVAL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``timedelta(days=1)`` + +Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect +documents scheduled after the change. All previously queued documents will be crawled with old periodicity. .. _hbase-settings: diff --git a/docs/source/topics/frontier-backends.rst b/docs/source/topics/frontier-backends.rst index 37c67bf68..f02af4c9b 100644 --- a/docs/source/topics/frontier-backends.rst +++ b/docs/source/topics/frontier-backends.rst @@ -266,6 +266,34 @@ Current implementation of revisiting backend has no prioritization. During long there are no documents available for crawling, but there are documents waiting for their scheduled revisit time. +.. _frontier-backends-cassandra: + +Cassandra backends +^^^^^^^^^^^^^^^^^^ + +This set of :class:`Backend ` objects will use `Cassandra`_ as storage for +:ref:`basic algorithms `. + +Cassandra is a NoSQL Colum-Store Database with Linear scalability and a SQL-Like Query Language. + +If you need to use your own `declarative cassandra models`_, you can do it by using the +:setting:`CASSANDRABACKEND_MODELS` setting. + +This setting uses a dictionary where ``key`` represents the name of the model to define and ``value`` the model to use. + +For a complete list of all settings used for Cassandra backends check the :doc:`settings ` section. + +.. class:: frontera.contrib.backends.cassandra.BASE + + Base class for Cassandra :class:`Backend ` objects. + It runs cassandra in multi-spider one worker mode with the FIFO algorithm. + +.. class:: frontera.contrib.backends.cassandra.Distributed + + Cassandra :class:`Backend ` implementation of the distributed Backend. + + + HBase backend ^^^^^^^^^^^^^ diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 26e040873..3496b0d89 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -15,11 +15,11 @@ class CassandraBackend(CommonBackend): def __init__(self, manager): self.manager = manager settings = manager.settings - cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] + cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') - keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true + keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') models = settings.get('CASSANDRABACKEND_MODELS') crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') @@ -80,26 +80,7 @@ def metadata(self): def states(self): return self._states - -class FIFOBackend(CassandraBackend): - component_name = 'Cassandra FIFO Backend' - - def _create_queue(self, settings): - return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), - ordering='created') - - -class LIFOBackend(CassandraBackend): - component_name = 'Cassandra LIFO Backend' - - def _create_queue(self, settings): - return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), - ordering='created_desc') - -BASE = CommonBackend -LIFO = LIFOBackend -FIFO = FIFOBackend - +BASE = CassandraBackend class Distributed(DistributedBackend): def __init__(self, manager): diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index e1ad789a7..6ce4fa8fe 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -50,8 +50,8 @@ def request_error(self, page, error): m.error = error self.cache[m.fingerprint] = m query_page = self.session.prepare( - "UPDATE metadata SET error = ? WHERE fingerprint = ?") - self.session.execute(query_page, (error, page.meta['fingerprint'])) + "UPDATE metadata SET error = ? WHERE crawl = ? AND fingerprint = ?") + self.session.execute(query_page, (error, self.crawl_id, page.meta['fingerprint'])) self.cass_count({"error": 1}) def page_crawled(self, response, links): @@ -98,10 +98,10 @@ def _create_page(self, obj): return db_page def update_score(self, batch): - query = self.session.prepare("UPDATE metadata SET score = ? WHERE fingerprint = ?") + query = self.session.prepare("UPDATE metadata SET score = ? WHERE crawl = ? AND fingerprint = ?") cql_items = [] for fprint, score, request, schedule in batch: - cql_i = (score, fprint) + cql_i = (score, self.crawl_id, fprint) cql_items.append(cql_i) execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) self.cass_count({"scored_urls": len(cql_items)}) @@ -160,8 +160,6 @@ def frontier_stop(self): def _order_by(self): if self.ordering == 'created': return "created_at" - if self.ordering == 'created_desc': - return "-created_at" return "created_at" def get_next_requests(self, max_n_requests, partition_id, **kwargs): @@ -178,8 +176,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): cql_ditems = [] d_query = self.session.prepare("DELETE FROM queue WHERE crawl = ? AND fingerprint = ? AND partition_id = ? " "AND score = ? AND created_at = ?") - for item in self.queue_model.objects.filter(partition_id=partition_id, crawl=self.crawl_id).\ - order_by("crawl", "score", self._order_by()).limit(max_n_requests): + for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\ + order_by("partition_id", "score", self._order_by()).limit(max_n_requests): method = 'GET' if not item.method else item.method meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta) diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py index 0f719e0a7..868328561 100644 --- a/frontera/contrib/backends/cassandra/models.py +++ b/frontera/contrib/backends/cassandra/models.py @@ -58,8 +58,8 @@ def __repr__(self): class QueueModel(Model): __table_name__ = 'queue' - partition_id = Integer(primary_key=True) crawl = Text(primary_key=True) + partition_id = Integer(primary_key=True) score = Float(primary_key=True) created_at = BigInt(primary_key=True) fingerprint = Text(primary_key=True) From e190256cc7c4dfb88c9522c0b2b62b6fef4bbd6e Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 17:34:36 +0100 Subject: [PATCH 11/17] change the usage of crawl_id --- .gitignore | 3 +++ docs/source/conf.py | 9 +++------ frontera/contrib/backends/__init__.py | 3 +++ frontera/contrib/backends/cassandra/__init__.py | 14 ++++++++------ frontera/contrib/backends/cassandra/components.py | 6 +++--- frontera/worker/db.py | 1 + 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 287b0b569..e74280063 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,6 @@ docs/_build/ # PyBuilder target/ + +# PyCharm Idea Folder +.idea/ \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index babbfc213..747bb3431 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -266,12 +266,9 @@ import os on_rtd = os.environ.get('READTHEDOCS', None) == 'True' -if on_rtd: - html_theme = 'default' -else: - import sphinx_rtd_theme - html_theme = "sphinx_rtd_theme" - html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +import sphinx_rtd_theme +html_theme = "sphinx_rtd_theme" +html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # -- Options for autoclass ------------------------------------------------ # Use class and init docstrings for autoclass directive diff --git a/frontera/contrib/backends/__init__.py b/frontera/contrib/backends/__init__.py index 75f8b8b02..3eee5a12e 100644 --- a/frontera/contrib/backends/__init__.py +++ b/frontera/contrib/backends/__init__.py @@ -3,6 +3,7 @@ from frontera import Backend from frontera.core.components import States +from time import time class CommonBackend(Backend): @@ -47,6 +48,7 @@ def _schedule(self, requests): self.metadata.update_score(batch) self.queue_size += queue_incr + def _get_score(self, obj): return obj.meta.get('score', 1.0) @@ -59,6 +61,7 @@ def get_next_requests(self, max_next_requests, **kwargs): return batch def page_crawled(self, response, links): + start = time() response.meta['state'] = States.CRAWLED self.states.update_cache(response) depth = response.meta.get('depth', 0)+1 diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 3496b0d89..a0adf1aa8 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -55,9 +55,9 @@ def __init__(self, manager): sync_table(value) self._metadata = Metadata(self.session, self.models['MetadataModel'], - settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id=self.crawl_id) + settings.get('CASSANDRABACKEND_CACHE_SIZE'), self.crawl_id) self._states = States(self.session, self.models['StateModel'], - settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id=self.crawl_id) + settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id) self._queue = self._create_queue(settings) def frontier_stop(self): @@ -66,7 +66,7 @@ def frontier_stop(self): def _create_queue(self, settings): return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), - crawl_id=self.crawl_id) + self.crawl_id) @property def queue(self): @@ -114,6 +114,7 @@ def strategy_worker(cls, manager): b = cls(manager) settings = manager.settings drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') model = b.models['StateModel'] if drop_all_tables: @@ -122,7 +123,7 @@ def strategy_worker(cls, manager): sync_table(model) b._states = States(b.session, model, - settings.get('STATE_CACHE_SIZE_LIMIT')) + settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id) return b @classmethod @@ -130,6 +131,7 @@ def db_worker(cls, manager): b = cls(manager) settings = manager.settings drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') metadata_m = b.models['MetadataModel'] queue_m = b.models['QueueModel'] @@ -144,8 +146,8 @@ def db_worker(cls, manager): sync_table(stats_m) b._metadata = Metadata(b.session, metadata_m, - settings.get('CASSANDRABACKEND_CACHE_SIZE')) - b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) + settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id) + b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id) return b @property diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 6ce4fa8fe..721dc4147 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -14,7 +14,7 @@ class Metadata(BaseMetadata): - def __init__(self, session, model_cls, cache_size, crawl_id='default'): + def __init__(self, session, model_cls, cache_size, crawl_id): self.session = session self.model = model_cls self.table = 'MetadataModel' @@ -114,7 +114,7 @@ def cass_count(self, counts): class States(MemoryStates): - def __init__(self, session, model_cls, cache_size_limit, crawl_id='default'): + def __init__(self, session, model_cls, cache_size_limit, crawl_id): super(States, self).__init__(cache_size_limit) self.session = session self.model = model_cls @@ -145,7 +145,7 @@ def flush(self, force_clear=False): class Queue(BaseQueue): - def __init__(self, session, queue_cls, partitions, ordering='default', crawl_id='default'): + def __init__(self, session, queue_cls, partitions, crawl_id, ordering='default'): self.session = session self.queue_model = queue_cls self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 026d9455f..8d76b1cf4 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -150,6 +150,7 @@ def consume_incoming(self, *args, **kwargs): self.spider_feed.mark_busy(partition_id) finally: consumed += 1 + """ # TODO: Think how it should be implemented in DB-worker only mode. if not self.strategy_enabled and self._backend.finished(): From b1bf7bca1411131afb98bc8eb17b2868ba65dee4 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Fri, 1 Apr 2016 17:38:36 +0100 Subject: [PATCH 12/17] Undo changes on conf.py, backend/init, db.py --- docs/source/conf.py | 9 ++++++--- frontera/contrib/backends/__init__.py | 3 --- frontera/worker/db.py | 1 - 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 747bb3431..babbfc213 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -266,9 +266,12 @@ import os on_rtd = os.environ.get('READTHEDOCS', None) == 'True' -import sphinx_rtd_theme -html_theme = "sphinx_rtd_theme" -html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +if on_rtd: + html_theme = 'default' +else: + import sphinx_rtd_theme + html_theme = "sphinx_rtd_theme" + html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # -- Options for autoclass ------------------------------------------------ # Use class and init docstrings for autoclass directive diff --git a/frontera/contrib/backends/__init__.py b/frontera/contrib/backends/__init__.py index 3eee5a12e..75f8b8b02 100644 --- a/frontera/contrib/backends/__init__.py +++ b/frontera/contrib/backends/__init__.py @@ -3,7 +3,6 @@ from frontera import Backend from frontera.core.components import States -from time import time class CommonBackend(Backend): @@ -48,7 +47,6 @@ def _schedule(self, requests): self.metadata.update_score(batch) self.queue_size += queue_incr - def _get_score(self, obj): return obj.meta.get('score', 1.0) @@ -61,7 +59,6 @@ def get_next_requests(self, max_next_requests, **kwargs): return batch def page_crawled(self, response, links): - start = time() response.meta['state'] = States.CRAWLED self.states.update_cache(response) depth = response.meta.get('depth', 0)+1 diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 8d76b1cf4..026d9455f 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -150,7 +150,6 @@ def consume_incoming(self, *args, **kwargs): self.spider_feed.mark_busy(partition_id) finally: consumed += 1 - """ # TODO: Think how it should be implemented in DB-worker only mode. if not self.strategy_enabled and self._backend.finished(): From 2019a71cc07bc601364a6bde9f0914f1b4fa47e5 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Sat, 2 Apr 2016 11:26:19 +0100 Subject: [PATCH 13/17] Changes to get Travis run --- frontera/contrib/backends/cassandra/__init__.py | 1 + frontera/contrib/backends/cassandra/models.py | 3 ++- frontera/settings/default_settings.py | 4 +--- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index a0adf1aa8..f01364653 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -82,6 +82,7 @@ def states(self): BASE = CassandraBackend + class Distributed(DistributedBackend): def __init__(self, manager): self.manager = manager diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py index 868328561..1e676a568 100644 --- a/frontera/contrib/backends/cassandra/models.py +++ b/frontera/contrib/backends/cassandra/models.py @@ -2,7 +2,7 @@ import uuid from cassandra.cqlengine.models import Model from cassandra.cqlengine.usertype import UserType -from cassandra.cqlengine.columns import * +from cassandra.cqlengine.columns import Map, Text, Float, Integer, DateTime, UserDefinedType, Counter, Boolean, SmallInt class Meta(UserType): @@ -15,6 +15,7 @@ class Meta(UserType): score = Float(required=False) jid = Integer(required=False) + class MetadataModel(Model): __table_name__ = 'metadata' diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index c35472a77..9e492f080 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -46,7 +46,6 @@ } SQLALCHEMYBACKEND_REVISIT_INTERVAL = timedelta(days=1) - CASSANDRABACKEND_CACHE_SIZE = 10000 CASSANDRABACKEND_DROP_ALL_TABLES = False CASSANDRABACKEND_MODELS = { @@ -60,8 +59,7 @@ CASSANDRABACKEND_CLUSTER_PORT = 9042 CASSANDRABACKEND_KEYSPACE = 'frontera' CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS = True -CASSANDRABACKEND_CRAWL_ID="default" - +CASSANDRABACKEND_CRAWL_ID = "default" STATE_CACHE_SIZE = 1000000 STORE_CONTENT = False From ed6b561c3ba2a62568b7d94b9927fb82600bb10c Mon Sep 17 00:00:00 2001 From: wpxgit Date: Sat, 2 Apr 2016 11:36:44 +0100 Subject: [PATCH 14/17] Next Try Travis... --- frontera/contrib/backends/cassandra/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py index 1e676a568..9065477d0 100644 --- a/frontera/contrib/backends/cassandra/models.py +++ b/frontera/contrib/backends/cassandra/models.py @@ -2,7 +2,8 @@ import uuid from cassandra.cqlengine.models import Model from cassandra.cqlengine.usertype import UserType -from cassandra.cqlengine.columns import Map, Text, Float, Integer, DateTime, UserDefinedType, Counter, Boolean, SmallInt +from cassandra.cqlengine.columns import Map, Text, Float, Integer, DateTime, UserDefinedType, Counter, Boolean, \ + SmallInt, BigInt class Meta(UserType): From fef1d83381e418e866de693460ac1bfee576c3f9 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Sat, 2 Apr 2016 17:49:50 +0100 Subject: [PATCH 15/17] Make counting Table optional, own class for counting, update dokumentation --- docs/source/topics/frontera-settings.rst | 8 ++++ .../contrib/backends/cassandra/__init__.py | 18 ++++--- .../contrib/backends/cassandra/components.py | 47 ++++++++++++------- frontera/settings/default_settings.py | 1 + 4 files changed, 51 insertions(+), 23 deletions(-) diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 5d18d5c00..3b7cdf341 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -459,6 +459,14 @@ Default:: ``9042`` Set port from Cassandra Cluster / Nodes +CASSANDRABACKEND_GENERATE_STATS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``False`` + +Set this to true if you want to create an extra Table for stats collection. In this table there will be pages crawled, links queued etv. counted up. + + CASSANDRABACKEND_KEYSPACE ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index f01364653..a88e1cdb6 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -9,6 +9,7 @@ from frontera.contrib.backends import CommonBackend from frontera.contrib.backends.cassandra.components import Metadata, Queue, States from frontera.utils.misc import load_object +import logging class CassandraBackend(CommonBackend): @@ -22,6 +23,7 @@ def __init__(self, manager): keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') models = settings.get('CASSANDRABACKEND_MODELS') crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') + generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) @@ -37,6 +39,7 @@ def __init__(self, manager): self.session.row_factory = dict_factory self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection self.crawl_id = crawl_id + self.generate_stats = generate_stats if keyspace_create: query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" @@ -52,10 +55,11 @@ def __init__(self, manager): drop_table(value) for key, value in self.models.iteritems(): - sync_table(value) + if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats==True: + sync_table(value) self._metadata = Metadata(self.session, self.models['MetadataModel'], - settings.get('CASSANDRABACKEND_CACHE_SIZE'), self.crawl_id) + settings.get('CASSANDRABACKEND_CACHE_SIZE'), self.crawl_id, self.generate_stats) self._states = States(self.session, self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id) self._queue = self._create_queue(settings) @@ -66,7 +70,7 @@ def frontier_stop(self): def _create_queue(self, settings): return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), - self.crawl_id) + self.crawl_id, self.generate_stats) @property def queue(self): @@ -133,6 +137,7 @@ def db_worker(cls, manager): settings = manager.settings drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') + generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') metadata_m = b.models['MetadataModel'] queue_m = b.models['QueueModel'] @@ -144,11 +149,12 @@ def db_worker(cls, manager): sync_table(metadata_m) sync_table(queue_m) - sync_table(stats_m) + if(generate_stats==True): + sync_table(stats_m) b._metadata = Metadata(b.session, metadata_m, - settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id) - b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id) + settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id, generate_stats) + b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id, generate_stats) return b @property diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 721dc4147..647350c6b 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -14,13 +14,15 @@ class Metadata(BaseMetadata): - def __init__(self, session, model_cls, cache_size, crawl_id): + def __init__(self, session, model_cls, cache_size, crawl_id, generate_stats): self.session = session self.model = model_cls self.table = 'MetadataModel' self.cache = LRUCache(cache_size) self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") self.crawl_id = crawl_id + self.generate_stats = generate_stats + self.counter_cls = CassandraCount(crawl_id, self.session, generate_stats) def frontier_stop(self): pass @@ -43,7 +45,7 @@ def add_seeds(self, seeds): cql_items.append(cql_i) if len(seeds) > 0: execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) - self.cass_count({"seed_urls": len(seeds)}) + self.counter_cls.cass_count({"seed_urls": len(seeds)}) def request_error(self, page, error): m = self._create_page(page) @@ -52,7 +54,7 @@ def request_error(self, page, error): query_page = self.session.prepare( "UPDATE metadata SET error = ? WHERE crawl = ? AND fingerprint = ?") self.session.execute(query_page, (error, self.crawl_id, page.meta['fingerprint'])) - self.cass_count({"error": 1}) + self.counter_cls.cass_count({"error": 1}) def page_crawled(self, response, links): query_page = self.session.prepare( @@ -73,7 +75,7 @@ def page_crawled(self, response, links): cql_i = (self.crawl_id, link.meta['fingerprint'], datetime.utcnow(), link.method, link.url, link.depth) cql_items.append(cql_i) execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) - self.cass_count({"pages_crawled": 1, "links_found": len(cql_items)}) + self.counter_cls.cass_count({"pages_crawled": 1, "links_found": len(cql_items)}) def _create_page(self, obj): db_page = self.model() @@ -104,12 +106,7 @@ def update_score(self, batch): cql_i = (score, self.crawl_id, fprint) cql_items.append(cql_i) execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) - self.cass_count({"scored_urls": len(cql_items)}) - - def cass_count(self, counts): - for row, count in counts.iteritems(): - count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") - self.session.execute_async(count_page, (count, self.crawl_id)) + self.counter_cls.cass_count({"scored_urls": len(cql_items)}) class States(MemoryStates): @@ -145,7 +142,7 @@ def flush(self, force_clear=False): class Queue(BaseQueue): - def __init__(self, session, queue_cls, partitions, crawl_id, ordering='default'): + def __init__(self, session, queue_cls, partitions, crawl_id, generate_stats, ordering='default'): self.session = session self.queue_model = queue_cls self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") @@ -153,6 +150,7 @@ def __init__(self, session, queue_cls, partitions, crawl_id, ordering='default') self.partitioner = Crc32NamePartitioner(self.partitions) self.ordering = ordering self.crawl_id = crawl_id + self.counter_cls = CassandraCount(crawl_id, self.session, generate_stats) def frontier_stop(self): pass @@ -207,7 +205,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): if dequeued_urls > 0: execute_concurrent_with_args(self.session, d_query, cql_ditems, concurrency=200) - self.cass_count({"dequeued_urls": dequeued_urls}) + self.counter_cls.cass_count({"dequeued_urls": dequeued_urls}) except Exception, exc: self.logger.exception(exc) @@ -258,16 +256,17 @@ def schedule(self, batch): request.meta['state'] = States.QUEUED execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) - self.cass_count({"queued_urls": len(cql_items)}) + self.counter_cls.cass_count({"queued_urls": len(cql_items)}) def count(self): count = self.queue_model.objects.filter(crawl=self.crawl_id).count() return count - def cass_count(self, counts): - for row, count in counts.iteritems(): - count_page = self.session.prepare("UPDATE crawlstats SET " + row + " = " + row + " + ? WHERE crawl= ?") - self.session.execute_async(count_page, (count, self.crawl_id)) + def cass_count(self, counts, generate_stats): + if generate_stats is True: + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) class BroadCrawlingQueue(Queue): @@ -328,3 +327,17 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): cookies=item.cookies)) item.delete() return results + + +class CassandraCount: + + def __init__(self, crawl_id, session, generate_stats): + self.generate_stats = generate_stats + self.session = session + self.crawl_id = crawl_id + + def cass_count(self, counts): + if self.generate_stats is True: + for row, count in counts.iteritems(): + count_page = self.session.prepare("UPDATE crawlstats SET "+row+" = "+row+" + ? WHERE crawl= ?") + self.session.execute_async(count_page, (count, self.crawl_id)) \ No newline at end of file diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 9e492f080..1f5a9a680 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -60,6 +60,7 @@ CASSANDRABACKEND_KEYSPACE = 'frontera' CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS = True CASSANDRABACKEND_CRAWL_ID = "default" +CASSANDRABACKEND_GENERATE_STATS = False STATE_CACHE_SIZE = 1000000 STORE_CONTENT = False From 5a0967fcde8dbba426c74213f6d46fd17c5d55b5 Mon Sep 17 00:00:00 2001 From: wpxgit Date: Sun, 3 Apr 2016 14:47:37 +0100 Subject: [PATCH 16/17] Removed Caching from Metadata. Some changes to fit travis ci conventions --- .../contrib/backends/cassandra/__init__.py | 8 ++-- .../contrib/backends/cassandra/components.py | 47 ++++--------------- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index a88e1cdb6..85f29e134 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -58,8 +58,7 @@ def __init__(self, manager): if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats==True: sync_table(value) - self._metadata = Metadata(self.session, self.models['MetadataModel'], - settings.get('CASSANDRABACKEND_CACHE_SIZE'), self.crawl_id, self.generate_stats) + self._metadata = Metadata(self.session, self.models['MetadataModel'], self.crawl_id, self.generate_stats) self._states = States(self.session, self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id) self._queue = self._create_queue(settings) @@ -149,11 +148,10 @@ def db_worker(cls, manager): sync_table(metadata_m) sync_table(queue_m) - if(generate_stats==True): + if generate_stats is True: sync_table(stats_m) - b._metadata = Metadata(b.session, metadata_m, - settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id, generate_stats) + b._metadata = Metadata(b.session, metadata_m, crawl_id, generate_stats) b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id, generate_stats) return b diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py index 647350c6b..9584a8629 100644 --- a/frontera/contrib/backends/cassandra/components.py +++ b/frontera/contrib/backends/cassandra/components.py @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- import logging from datetime import datetime -from time import time, sleep -from cachetools import LRUCache +from time import time from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.memory import MemoryStates from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue @@ -14,11 +13,10 @@ class Metadata(BaseMetadata): - def __init__(self, session, model_cls, cache_size, crawl_id, generate_stats): + def __init__(self, session, model_cls, crawl_id, generate_stats): self.session = session self.model = model_cls self.table = 'MetadataModel' - self.cache = LRUCache(cache_size) self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") self.crawl_id = crawl_id self.generate_stats = generate_stats @@ -30,9 +28,6 @@ def frontier_stop(self): def add_seeds(self, seeds): cql_items = [] for seed in seeds: - o = self._create_page(seed) - self.cache[o.fingerprint] = o - query = self.session.prepare( "INSERT INTO metadata (crawl, fingerprint, url, created_at, meta, headers, cookies, method, depth) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") @@ -41,16 +36,13 @@ def add_seeds(self, seeds): scrapy_callback=seed.meta['scrapy_callback'], scrapy_errback=seed.meta['scrapy_errback'], scrapy_meta=seed.meta['scrapy_meta']) cql_i = (self.crawl_id, seed.meta['fingerprint'], seed.url, datetime.utcnow(), meta, - seed.headers, seed.cookies, seed.method, o.depth) + seed.headers, seed.cookies, seed.method, 0) cql_items.append(cql_i) if len(seeds) > 0: execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) self.counter_cls.cass_count({"seed_urls": len(seeds)}) def request_error(self, page, error): - m = self._create_page(page) - m.error = error - self.cache[m.fingerprint] = m query_page = self.session.prepare( "UPDATE metadata SET error = ? WHERE crawl = ? AND fingerprint = ?") self.session.execute(query_page, (error, self.crawl_id, page.meta['fingerprint'])) @@ -63,42 +55,21 @@ def page_crawled(self, response, links): self.session.execute_async(query_page, (datetime.utcnow(), response.request.headers, response.request.method, response.request.cookies, response.status_code, self.crawl_id, response.meta['fingerprint'])) + depth = 0 + page_res = self.model.objects.filter(crawl=self.crawl_id, fingerprint=response.meta['fingerprint']) + if page_res[0].depth > 0: + depth = page_res[0].depth query = self.session.prepare( "INSERT INTO metadata (crawl, fingerprint, created_at, method, url, depth) VALUES (?, ?, ?, ?, ?, ?)") cql_items = [] for link in links: - if link.meta['fingerprint'] not in self.cache: - link.depth = self.cache[response.meta['fingerprint']].depth+1 - l = self._create_page(link) - self.cache[link.meta['fingerprint']] = l - cql_i = (self.crawl_id, link.meta['fingerprint'], datetime.utcnow(), link.method, link.url, link.depth) + if response.meta['fingerprint'] != link.meta['fingerprint']: + cql_i = (self.crawl_id, link.meta['fingerprint'], datetime.utcnow(), link.method, link.url, depth+1) cql_items.append(cql_i) execute_concurrent_with_args(self.session, query, cql_items, concurrency=400) self.counter_cls.cass_count({"pages_crawled": 1, "links_found": len(cql_items)}) - def _create_page(self, obj): - db_page = self.model() - db_page.fingerprint = obj.meta['fingerprint'] - db_page.url = obj.url - db_page.created_at = datetime.utcnow() - db_page.meta = obj.meta - if hasattr(obj, 'depth'): - db_page.depth = obj.depth - else: - db_page.depth = 0 - - if isinstance(obj, Request): - db_page.headers = obj.headers - db_page.method = obj.method - db_page.cookies = obj.cookies - elif isinstance(obj, Response): - db_page.headers = obj.request.headers - db_page.method = obj.request.method - db_page.cookies = obj.request.cookies - db_page.status_code = obj.status_code - return db_page - def update_score(self, batch): query = self.session.prepare("UPDATE metadata SET score = ? WHERE crawl = ? AND fingerprint = ?") cql_items = [] From 29a0bddd6d580feb7cfcadc69f463d4ee3e9b4cb Mon Sep 17 00:00:00 2001 From: wpxgit Date: Sun, 3 Apr 2016 17:18:32 +0100 Subject: [PATCH 17/17] Code convention changes --- frontera/contrib/backends/cassandra/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py index 85f29e134..ff4134f6f 100644 --- a/frontera/contrib/backends/cassandra/__init__.py +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -55,7 +55,7 @@ def __init__(self, manager): drop_table(value) for key, value in self.models.iteritems(): - if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats==True: + if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats is True: sync_table(value) self._metadata = Metadata(self.session, self.models['MetadataModel'], self.crawl_id, self.generate_stats)