99 drop_keyspace , drop_table ,
1010 sync_table )
1111
12- from frontera .contrib .backends .cassandra import CassandraBackend
12+ from frontera .contrib .backends .cassandra import CassandraBackend , Distributed
1313from frontera .contrib .backends .cassandra .models import (FifoOrLIfoQueueModel ,
1414 MetadataModel ,
1515 QueueModel , StateModel )
2929r4 = r3 .copy ()
3030
3131
32- class BaseCassandraTest (object ):
32+ class CassandraConfig (object ):
3333
3434 def setUp (self ):
3535 settings = Settings ()
@@ -53,7 +53,7 @@ def _set_global_connection(self, hosts, port, timeout):
5353 connection .session .default_timeout = timeout
5454
5555
56- class TestCassandraBackendModels (BaseCassandraTest , unittest .TestCase ):
56+ class TestCassandraBackendModels (CassandraConfig , unittest .TestCase ):
5757
5858 def test_pickled_fields (self ):
5959 sync_table (MetadataModel )
@@ -131,7 +131,25 @@ def assert_db_values(self, model, _filter, fields):
131131 self .assertEqual (stored_value , original_value )
132132
133133
134- class TestCassandraBackend (BaseCassandraTest , unittest .TestCase ):
134+ class TestCassandraBackend (CassandraConfig , unittest .TestCase ):
135+
136+ def init_backend (self ):
137+ self .backend = CassandraBackend (self .manager )
138+
139+ @property
140+ def metadata (self ):
141+ self .init_backend ()
142+ return self .backend .metadata
143+
144+ @property
145+ def states (self ):
146+ self .init_backend ()
147+ return self .backend .states
148+
149+ @property
150+ def queue (self ):
151+ self .init_backend ()
152+ return self .backend .queue
135153
136154 def _get_tables (self ):
137155 query = 'SELECT table_name FROM system_schema.tables WHERE keyspace_name = \' {}\' ' .format (self .keyspace )
@@ -141,7 +159,7 @@ def _get_tables(self):
141159 def test_tables_created (self ):
142160 tables_before = self ._get_tables ()
143161 self .assertEqual (tables_before , [])
144- CassandraBackend ( self .manager )
162+ self .init_backend ( )
145163 tables_after = self ._get_tables ()
146164 self .assertEqual (set (tables_after ), set (['metadata' , 'states' , 'queue' ]))
147165
@@ -158,14 +176,14 @@ def _get_state_data():
158176 rows_before = _get_state_data ()
159177 self .assertEqual (rows_before .count (), 1 )
160178 self .manager .settings .CASSANDRABACKEND_DROP_ALL_TABLES = True
161- CassandraBackend (self .manager )
162- self .assertEqual (set (tables_before ), set (['metadata' , 'states' , 'queue' ]))
179+ self .init_backend ()
180+ tables_after = self ._get_tables ()
181+ self .assertEqual (set (tables_after ), set (['metadata' , 'states' , 'queue' ]))
163182 rows_after = _get_state_data ()
164183 self .assertEqual (rows_after .count (), 0 )
165184
166185 def test_metadata (self ):
167- b = CassandraBackend (self .manager )
168- metadata = b .metadata
186+ metadata = self .metadata
169187 metadata .add_seeds ([r1 , r2 , r3 ])
170188 meta_qs = MetadataModel .objects .all ()
171189 self .assertEqual (set ([r1 .url , r2 .url , r3 .url ]), set ([m .url for m in meta_qs ]))
@@ -183,10 +201,9 @@ def test_metadata(self):
183201 self .assertEqual (meta_qs .count (), 3 )
184202
185203 def test_state (self ):
186- b = CassandraBackend (self .manager )
187- state = b .states
204+ state = self .states
188205 state .set_states ([r1 , r2 , r3 ])
189- self .assertEqual ([r .meta [b'state' ] for r in [r1 , r2 , r3 ]], [States .NOT_CRAWLED ]* 3 )
206+ self .assertEqual ([r .meta [b'state' ] for r in [r1 , r2 , r3 ]], [States .NOT_CRAWLED ] * 3 )
190207 state .update_cache ([r1 , r2 , r3 ])
191208 self .assertDictEqual (state ._cache , {b'10' : States .NOT_CRAWLED ,
192209 b'11' : States .NOT_CRAWLED ,
@@ -209,11 +226,11 @@ def test_state(self):
209226
210227 def test_queue (self ):
211228 self .manager .settings .SPIDER_FEED_PARTITIONS = 2
212- b = CassandraBackend (self .manager )
213- queue = b .queue
229+ queue = self .queue
214230 batch = [('10' , 0.5 , r1 , True ), ('11' , 0.6 , r2 , True ),
215231 ('12' , 0.7 , r3 , True )]
216232 queue .schedule (batch )
233+ self .assertEqual (queue .count (), 3 )
217234 self .assertEqual (set ([r .url for r in queue .get_next_requests (10 , 0 ,
218235 min_requests = 3 ,
219236 min_hosts = 1 ,
@@ -224,10 +241,34 @@ def test_queue(self):
224241 min_hosts = 1 ,
225242 max_requests_per_host = 10 )]),
226243 set ([r1 .url , r2 .url ]))
244+ self .assertEqual (queue .count (), 0 )
245+
246+
247+ class TestCassandraDistributedBackend (TestCassandraBackend ):
248+
249+ def init_backend (self ):
250+ self .backend = Distributed (self .manager )
251+ self .strategy_worker = self .backend .strategy_worker (self .manager )
252+ self .db_worker = self .backend .db_worker (self .manager )
253+
254+ @property
255+ def metadata (self ):
256+ self .init_backend ()
257+ return self .db_worker .metadata
258+
259+ @property
260+ def states (self ):
261+ self .init_backend ()
262+ return self .strategy_worker .states
263+
264+ @property
265+ def queue (self ):
266+ self .init_backend ()
267+ return self .db_worker .queue
227268
228269
229270class BaseCassandraIntegrationTests (object ):
230- obj = BaseCassandraTest ()
271+ obj = CassandraConfig ()
231272
232273 def setup_backend (self , method ):
233274 self .obj .setUp ()
0 commit comments