-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Added Cassandra-Backend #128
base: master
Are you sure you want to change the base?
Changes from all commits
4f9c92a
125622e
7bb2605
6238355
039fd0e
667e65c
559400e
4ffe038
2e95323
c53c3f7
e190256
b1bf7bc
2019a71
ed6b561
fef1d83
5a0967f
29a0bdd
2919e79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,3 +52,6 @@ docs/_build/ | |
|
||
# PyBuilder | ||
target/ | ||
|
||
# PyCharm Idea Folder | ||
.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -437,6 +437,96 @@ Default: ``timedelta(days=1)`` | |
Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect | ||
documents scheduled after the change. All previously queued documents will be crawled with old periodicity. | ||
|
||
.. _cassandra-settings: | ||
|
||
Cassandra | ||
--------- | ||
|
||
|
||
.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES | ||
|
||
CASSANDRABACKEND_DROP_ALL_TABLES | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default: ``False`` | ||
|
||
Set to ``True`` if you need to drop of all DB tables on backend instantiation (e.g. every Scrapy spider run). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would remove |
||
|
||
.. setting:: SQLALCHEMYBACKEND_ENGINE | ||
|
||
CASSANDRABACKEND_CLUSTER_IPS | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``['127.0.0.1']`` | ||
|
||
Set IPs from Cassandra Cluster. Default is localhost. To assign more than one IP use this Syntax: ``['192.168.0.1', '192.168.0.2']`` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IPs of Cassandra Cluster. Syntax should be written starting with lower case. |
||
|
||
CASSANDRABACKEND_CLUSTER_PORT | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``9042`` | ||
|
||
Set port from Cassandra Cluster / Nodes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. of |
||
|
||
|
||
CASSANDRABACKEND_GENERATE_STATS | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``False`` | ||
|
||
Set this to true if you want to create an extra Table for stats collection. In this table there will be pages crawled, links queued etv. counted up. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not only about creation, but also updating stats there. So, I would generalize it as |
||
|
||
|
||
CASSANDRABACKEND_KEYSPACE | ||
^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``frontera`` | ||
|
||
Set cassandra Keyspace | ||
|
||
CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``True`` | ||
|
||
Creates Keyspace if it not exist. Set to false if you frontera shouldn't check on every startup. | ||
|
||
|
||
CASSANDRABACKEND_CRAWL_ID | ||
^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: ``default`` | ||
|
||
Sets an ID in each table for the actual crawl. If you want to run another crawl from begining in same Table set to another Crawl ID. Its an Text field. | ||
|
||
|
||
CASSANDRABACKEND_MODELS | ||
^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default:: | ||
|
||
{ | ||
'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', | ||
'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', | ||
'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', | ||
'CrawlStatsModel': 'frontera.contrib.backends.cassandra.models.CrawlStatsModel' | ||
} | ||
|
||
This is mapping with Cassandra models used by backends. It is mainly used for customization. | ||
|
||
|
||
Revisiting backend | ||
------------------ | ||
|
||
.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL | ||
|
||
CASSANDRABACKEND_REVISIT_INTERVAL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make more sense to use already existing option |
||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Default: ``timedelta(days=1)`` | ||
|
||
Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect | ||
documents scheduled after the change. All previously queued documents will be crawled with old periodicity. | ||
|
||
.. _hbase-settings: | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
from __future__ import absolute_import | ||
from cassandra.cluster import Cluster | ||
from cassandra.cqlengine import connection | ||
from cassandra.query import dict_factory | ||
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy | ||
from cassandra.cqlengine.management import sync_table | ||
from cassandra.cqlengine.management import drop_table | ||
from frontera.core.components import DistributedBackend | ||
from frontera.contrib.backends import CommonBackend | ||
from frontera.contrib.backends.cassandra.components import Metadata, Queue, States | ||
from frontera.utils.misc import load_object | ||
import logging | ||
|
||
|
||
class CassandraBackend(CommonBackend): | ||
def __init__(self, manager): | ||
self.manager = manager | ||
settings = manager.settings | ||
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') | ||
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') | ||
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') | ||
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') | ||
models = settings.get('CASSANDRABACKEND_MODELS') | ||
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') | ||
generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') | ||
|
||
self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) | ||
|
||
self.cluster = Cluster( | ||
contact_points=cluster_ips, | ||
port=cluster_port, | ||
compression=True, | ||
default_retry_policy=RetryPolicy(), | ||
reconnection_policy=ConstantReconnectionPolicy(10, 100) | ||
) | ||
|
||
self.session = self.cluster.connect() | ||
self.session.row_factory = dict_factory | ||
self.session.encoder.mapping[dict] = self.session.encoder.cql_encode_map_collection | ||
self.crawl_id = crawl_id | ||
self.generate_stats = generate_stats | ||
|
||
if keyspace_create: | ||
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" | ||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) | ||
self.session.execute(query) | ||
|
||
self.session.set_keyspace(keyspace) | ||
|
||
connection.set_session(self.session) | ||
|
||
if drop_all_tables: | ||
for key, value in self.models.iteritems(): | ||
drop_table(value) | ||
|
||
for key, value in self.models.iteritems(): | ||
if (self.generate_stats is False and key != 'CrawlStatsModel') or self.generate_stats is True: | ||
sync_table(value) | ||
|
||
self._metadata = Metadata(self.session, self.models['MetadataModel'], self.crawl_id, self.generate_stats) | ||
self._states = States(self.session, self.models['StateModel'], | ||
settings.get('STATE_CACHE_SIZE_LIMIT'), self.crawl_id) | ||
self._queue = self._create_queue(settings) | ||
|
||
def frontier_stop(self): | ||
self.states.flush() | ||
self.session.shutdown() | ||
|
||
def _create_queue(self, settings): | ||
return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), | ||
self.crawl_id, self.generate_stats) | ||
|
||
@property | ||
def queue(self): | ||
return self._queue | ||
|
||
@property | ||
def metadata(self): | ||
return self._metadata | ||
|
||
@property | ||
def states(self): | ||
return self._states | ||
|
||
BASE = CassandraBackend | ||
|
||
|
||
class Distributed(DistributedBackend): | ||
def __init__(self, manager): | ||
self.manager = manager | ||
settings = manager.settings | ||
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment isn't needed here anymore |
||
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') | ||
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') | ||
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true | ||
models = settings.get('CASSANDRABACKEND_MODELS') | ||
|
||
self.cluster = Cluster(cluster_ips, cluster_port) | ||
self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) | ||
|
||
self.session = self.cluster.connect() | ||
self.session.row_factory = dict_factory | ||
|
||
if keyspace_create: | ||
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" | ||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) | ||
self.session.execute(query) | ||
self.session.set_keyspace(keyspace) | ||
connection.set_session(self.session) | ||
|
||
self._metadata = None | ||
self._queue = None | ||
self._states = None | ||
|
||
@classmethod | ||
def strategy_worker(cls, manager): | ||
b = cls(manager) | ||
settings = manager.settings | ||
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') | ||
model = b.models['StateModel'] | ||
|
||
if drop_all_tables: | ||
drop_table(model) | ||
|
||
sync_table(model) | ||
|
||
b._states = States(b.session, model, | ||
settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id) | ||
return b | ||
|
||
@classmethod | ||
def db_worker(cls, manager): | ||
b = cls(manager) | ||
settings = manager.settings | ||
drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') | ||
generate_stats = settings.get('CASSANDRABACKEND_GENERATE_STATS') | ||
|
||
metadata_m = b.models['MetadataModel'] | ||
queue_m = b.models['QueueModel'] | ||
stats_m = b.models['CrawlStatsModel'] | ||
if drop: | ||
drop_table(metadata_m) | ||
drop_table(queue_m) | ||
drop_table(stats_m) | ||
|
||
sync_table(metadata_m) | ||
sync_table(queue_m) | ||
if generate_stats is True: | ||
sync_table(stats_m) | ||
|
||
b._metadata = Metadata(b.session, metadata_m, crawl_id, generate_stats) | ||
b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'), crawl_id, generate_stats) | ||
return b | ||
|
||
@property | ||
def queue(self): | ||
return self._queue | ||
|
||
@property | ||
def metadata(self): | ||
return self._metadata | ||
|
||
@property | ||
def states(self): | ||
return self._states | ||
|
||
def frontier_start(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you sure all these methods need to be copy/pasted here? IMO, it would be great to extract them to parent class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I give it a try :) |
||
for component in [self.metadata, self.queue, self.states]: | ||
if component: | ||
component.frontier_start() | ||
|
||
def frontier_stop(self): | ||
for component in [self.metadata, self.queue, self.states]: | ||
if component: | ||
component.frontier_stop() | ||
|
||
def add_seeds(self, seeds): | ||
self.metadata.add_seeds(seeds) | ||
|
||
def get_next_requests(self, max_next_requests, **kwargs): | ||
partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions | ||
batch = [] | ||
for partition_id in partitions: | ||
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) | ||
return batch | ||
|
||
def page_crawled(self, response, links): | ||
self.metadata.page_crawled(response, links) | ||
|
||
def request_error(self, request, error): | ||
self.metadata.request_error(request, error) | ||
|
||
def finished(self): | ||
return NotImplementedError | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not need, because error message is pretty clear states that module is missing.