Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Added Cassandra-Backend #128

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ docs/_build/

# PyBuilder
target/

# PyCharm Idea Folder
.idea/
3 changes: 3 additions & 0 deletions docs/README
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ from this dir::

Documentation will be generated (in HTML format) inside the ``build/html`` dir.

If you get the error "ImportError: No module named sphinx_rtd_theme" run:
sudo pip install sphinx-rtd-theme
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not need, because error message is pretty clear states that module is missing.



View the documentation
----------------------
Expand Down
90 changes: 90 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,96 @@ Default: ``timedelta(days=1)``
Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect
documents scheduled after the change. All previously queued documents will be crawled with old periodicity.

.. _cassandra-settings:

Cassandra
---------


.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES

CASSANDRABACKEND_DROP_ALL_TABLES
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``False``

Set to ``True`` if you need to drop of all DB tables on backend instantiation (e.g. every Scrapy spider run).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove (e.g. every Scrapy spider run), because it shouldn't be connected with Scrapy run. Different run modes are requiring backend to instantiate in various processes.


.. setting:: SQLALCHEMYBACKEND_ENGINE

CASSANDRABACKEND_CLUSTER_IPS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``['127.0.0.1']``

Set IPs from Cassandra Cluster. Default is localhost. To assign more than one IP use this Syntax: ``['192.168.0.1', '192.168.0.2']``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IPs of Cassandra Cluster. Syntax should be written starting with lower case.


CASSANDRABACKEND_CLUSTER_PORT
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``9042``

Set port from Cassandra Cluster / Nodes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of



CASSANDRABACKEND_GENERATE_STATS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``False``

Set this to true if you want to create an extra Table for stats collection. In this table there will be pages crawled, links queued etv. counted up.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only about creation, but also updating stats there. So, I would generalize it as to use and extra table for statistics collection.
There is a misspell in etv.



CASSANDRABACKEND_KEYSPACE
^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``frontera``

Set cassandra Keyspace

CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``True``

Creates Keyspace if it not exist. Set to false if you frontera shouldn't check on every startup.


CASSANDRABACKEND_CRAWL_ID
^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``default``

Sets an ID in each table for the actual crawl. If you want to run another crawl from begining in same Table set to another Crawl ID. Its an Text field.


CASSANDRABACKEND_MODELS
^^^^^^^^^^^^^^^^^^^^^^^

Default::

{
'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel',
'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel',
'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel',
'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel'
}

This is mapping with Cassandra models used by backends. It is mainly used for customization.


Revisiting backend
------------------

.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL

CASSANDRABACKEND_REVISIT_INTERVAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to use already existing option REVISIT_INTERVAL?

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``timedelta(days=1)``

Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect
documents scheduled after the change. All previously queued documents will be crawled with old periodicity.

.. _hbase-settings:

Expand Down
28 changes: 28 additions & 0 deletions docs/source/topics/frontier-backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,34 @@ Current implementation of revisiting backend has no prioritization. During long
there are no documents available for crawling, but there are documents waiting for their scheduled revisit time.


.. _frontier-backends-cassandra:

Cassandra backends
^^^^^^^^^^^^^^^^^^

This set of :class:`Backend <frontera.core.components.Backend>` objects will use `Cassandra`_ as storage for
:ref:`basic algorithms <frontier-backends-basic-algorithms>`.

Cassandra is a NoSQL Colum-Store Database with Linear scalability and a SQL-Like Query Language.

If you need to use your own `declarative cassandra models`_, you can do it by using the
:setting:`CASSANDRABACKEND_MODELS` setting.

This setting uses a dictionary where ``key`` represents the name of the model to define and ``value`` the model to use.

For a complete list of all settings used for Cassandra backends check the :doc:`settings <frontera-settings>` section.

.. class:: frontera.contrib.backends.cassandra.BASE

Base class for Cassandra :class:`Backend <frontera.core.components.Backend>` objects.
It runs cassandra in multi-spider one worker mode with the FIFO algorithm.

.. class:: frontera.contrib.backends.cassandra.Distributed

Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of the distributed Backend.



HBase backend
^^^^^^^^^^^^^

Expand Down
199 changes: 199 additions & 0 deletions frontera/contrib/backends/cassandra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from __future__ import absolute_import
from cassandra.cluster import Cluster
from cassandra.cqlengine import connection
from cassandra.query import dict_factory
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from cassandra.cqlengine.management import sync_table
from cassandra.cqlengine.management import drop_table
from frontera.core.components import DistributedBackend
from frontera.contrib.backends import CommonBackend
from frontera.contrib.backends.cassandra.components import Metadata, Queue, States
from frontera.utils.misc import load_object
import logging


class CassandraBackend(CommonBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS')
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS')
models = settings.get('CASSANDRABACKEND_MODELS')
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID')
generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS')

self.models = dict([(name, load_object(klass)) for name, klass in models.items()])

self.cluster = Cluster(
contact_points=cluster_ips,
port=cluster_port,
compression=True,
default_retry_policy=RetryPolicy(),
reconnection_policy=ConstantReconnectionPolicy(10, 100)
)

self.session = self.cluster.connect()
self.session.row_factory = dict_factory
self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection
self.crawl_id = crawl_id
self.generate_stats = generate_stats

if keyspace_create:
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, )
self.session.execute(query)

self.session.set_keyspace(keyspace)

connection.set_session(self.session)

if drop_all_tables:
for key, value in self.models.iteritems():
drop_table(value)

for key, value in self.models.iteritems():
if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats is True:
sync_table(value)

self._metadata = Metadata(self.session, self.models['MetadataModel'], self.crawl_id, self.generate_stats)
self._states = States(self.session, self.models['StateModel'],
settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id)
self._queue = self._create_queue(settings)

def frontier_stop(self):
self.states.flush()
self.session.shutdown()

def _create_queue(self, settings):
return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
self.crawl_id, self.generate_stats)

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states

BASE = CassandraBackend


class Distributed(DistributedBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment isn't needed here anymore

cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true
models = settings.get('CASSANDRABACKEND_MODELS')

self.cluster = Cluster(cluster_ips, cluster_port)
self.models = dict([(name, load_object(klass)) for name, klass in models.items()])

self.session = self.cluster.connect()
self.session.row_factory = dict_factory

if keyspace_create:
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, )
self.session.execute(query)
self.session.set_keyspace(keyspace)
connection.set_session(self.session)

self._metadata = None
self._queue = None
self._states = None

@classmethod
def strategy_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID')
model = b.models['StateModel']

if drop_all_tables:
drop_table(model)

sync_table(model)

b._states = States(b.session, model,
settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id)
return b

@classmethod
def db_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID')
generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS')

metadata_m = b.models['MetadataModel']
queue_m = b.models['QueueModel']
stats_m = b.models['CrawlStatsModel']
if drop:
drop_table(metadata_m)
drop_table(queue_m)
drop_table(stats_m)

sync_table(metadata_m)
sync_table(queue_m)
if generate_stats is True:
sync_table(stats_m)

b._metadata = Metadata(b.session, metadata_m, crawl_id, generate_stats)
b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id, generate_stats)
return b

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states

def frontier_start(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure all these methods need to be copy/pasted here? IMO, it would be great to extract them to parent class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I give it a try :)

for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_start()

def frontier_stop(self):
for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_stop()

def add_seeds(self, seeds):
self.metadata.add_seeds(seeds)

def get_next_requests(self, max_next_requests, **kwargs):
partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions
batch = []
for partition_id in partitions:
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs))
return batch

def page_crawled(self, response, links):
self.metadata.page_crawled(response, links)

def request_error(self, request, error):
self.metadata.request_error(request, error)

def finished(self):
return NotImplementedError


Loading