11# -*- coding: utf-8 -*-
22from __future__ import absolute_import
3- from frontera .core .messagebus import BaseMessageBus , BaseSpiderLogStream , BaseSpiderFeedStream , \
4- BaseStreamConsumer , BaseScoringLogStream , BaseStreamProducer
53
6- from kafka import KafkaClient , SimpleConsumer , KeyedProducer as KafkaKeyedProducer , SimpleProducer as KafkaSimpleProducer
7- from kafka .common import BrokerResponseError , MessageSizeTooLargeError
4+ from logging import getLogger
5+
6+ from kafka import KafkaClient , SimpleConsumer
7+ from kafka import KafkaConsumer , KafkaProducer , TopicPartition
8+ from kafka .common import BrokerResponseError
89from kafka .protocol import CODEC_NONE
910
1011from frontera .contrib .backends .partitioners import FingerprintPartitioner , Crc32NamePartitioner
1112from frontera .contrib .messagebus .kafka import OffsetsFetcher
12- from logging import getLogger
13- from time import sleep
13+ from frontera . core . messagebus import BaseMessageBus , BaseSpiderLogStream , BaseSpiderFeedStream , \
14+ BaseStreamConsumer , BaseScoringLogStream , BaseStreamProducer
1415
1516logger = getLogger ("kafkabus" )
1617
1718
18- class Consumer (BaseStreamConsumer ):
19+ class DeprecatedConsumer (BaseStreamConsumer ):
1920 """
2021 Used in DB and SW worker. SW consumes per partition.
2122 """
@@ -70,68 +71,75 @@ def get_offset(self):
7071 return 0
7172
7273
74+ class Consumer (BaseStreamConsumer ):
75+ """
76+ Used in DB and SW worker. SW consumes per partition.
77+ """
78+ def __init__ (self , location , topic , group , partition_id ):
79+ self ._location = location
80+ self ._group = group
81+ self ._topic = topic
82+ self ._partition_ids = [partition_id ] if partition_id is not None else None
83+
84+ self ._consumer = KafkaConsumer (
85+ bootstrap_servers = self ._location ,
86+ group_id = self ._group ,
87+ max_partition_fetch_bytes = 10485760 )
88+ if self ._partition_ids :
89+ self ._consumer .assign ([TopicPartition (self ._topic , pid ) for pid in self ._partition_ids ])
90+ else :
91+ self ._consumer .subscribe (self ._topic )
92+
93+ def get_messages (self , timeout = 0.1 , count = 1 ):
94+ while True :
95+ try :
96+ batch = self ._consumer .poll (timeout_ms = timeout )
97+ for _ , records in batch .iteritems ():
98+ for record in records :
99+ yield record .value
100+ except Exception , err :
101+ logger .warning ("Error %s" % err )
102+ finally :
103+ break
104+
105+ def get_offset (self ):
106+ return 0
107+
108+
73109class SimpleProducer (BaseStreamProducer ):
74- def __init__ (self , location , topic , codec ):
110+ def __init__ (self , location , topic , compression ):
75111 self ._location = location
76112 self ._topic = topic
77- self ._codec = codec
113+ self ._compression = compression
78114 self ._create ()
79115
80116 def _create (self ):
81- self ._client = KafkaClient ( self ._location )
82- self . _producer = KafkaSimpleProducer ( self . _client , codec = self ._codec )
117+ self ._producer = KafkaProducer ( bootstrap_servers = self ._location , retries = 5 ,
118+ compression_type = self ._compression )
83119
84120 def send (self , key , * messages ):
85- self ._producer .send_messages (self ._topic , * messages )
121+ for msg in messages :
122+ self ._producer .send (self ._topic , value = msg )
86123
87- def get_offset (self , partition_id ):
88- # Kafka has it's own offset management
89- raise KeyError
124+ def flush (self ):
125+ self ._producer .flush ()
90126
91127
92128class KeyedProducer (BaseStreamProducer ):
93- def __init__ (self , location , topic_done , partitioner_cls , codec ):
94- self ._prod = None
129+ def __init__ (self , location , topic_done , partitioner , compression ):
95130 self ._location = location
96131 self ._topic_done = topic_done
97- self ._partitioner_cls = partitioner_cls
98- self ._codec = codec
99-
100- def _connect_producer (self ):
101- if self ._prod is None :
102- try :
103- self ._client = KafkaClient (self ._location )
104- self ._prod = KafkaKeyedProducer (self ._client , partitioner = self ._partitioner_cls , codec = self ._codec )
105- except BrokerResponseError :
106- self ._prod = None
107- logger .warning ("Could not connect producer to Kafka server" )
108- return False
109- return True
132+ self ._partitioner = partitioner
133+ self ._compression = compression
134+ self ._producer = KafkaProducer (bootstrap_servers = self ._location , partitioner = partitioner , retries = 5 ,
135+ compression_type = self ._compression )
110136
111137 def send (self , key , * messages ):
112- success = False
113- max_tries = 5
114- if self ._connect_producer ():
115- n_tries = 0
116- while not success and n_tries < max_tries :
117- try :
118- self ._prod .send_messages (self ._topic_done , key , * messages )
119- success = True
120- except MessageSizeTooLargeError , e :
121- logger .error (str (e ))
122- break
123- except BrokerResponseError :
124- n_tries += 1
125- logger .warning (
126- "Could not send message. Try {0}/{1}" .format (
127- n_tries , max_tries )
128- )
129- sleep (1.0 )
130- return success
131-
132- def get_offset (self , partition_id ):
133- # Kafka has it's own offset management
134- raise KeyError
138+ for msg in messages :
139+ self ._producer .send (self ._topic_done , key = key , value = msg )
140+
141+ def flush (self ):
142+ self ._producer .flush ()
135143
136144
137145class SpiderLogStream (BaseSpiderLogStream ):
@@ -140,10 +148,12 @@ def __init__(self, messagebus):
140148 self ._db_group = messagebus .general_group
141149 self ._sw_group = messagebus .sw_group
142150 self ._topic_done = messagebus .topic_done
143- self ._codec = messagebus .codec
151+ self ._compression_type = messagebus .compression_type
152+ self ._partitions = messagebus .spider_log_partitions
144153
145154 def producer (self ):
146- return KeyedProducer (self ._location , self ._topic_done , FingerprintPartitioner , self ._codec )
155+ return KeyedProducer (self ._location , self ._topic_done , FingerprintPartitioner (self ._partitions ),
156+ self ._compression_type )
147157
148158 def consumer (self , partition_id , type ):
149159 """
@@ -153,7 +163,7 @@ def consumer(self, partition_id, type):
153163 :return:
154164 """
155165 group = self ._sw_group if type == 'sw' else self ._db_group
156- return Consumer (self ._location , self ._topic_done , group , partition_id )
166+ return DeprecatedConsumer (self ._location , self ._topic_done , group , partition_id )
157167
158168
159169class SpiderFeedStream (BaseSpiderFeedStream ):
@@ -164,10 +174,11 @@ def __init__(self, messagebus):
164174 self ._max_next_requests = messagebus .max_next_requests
165175 self ._hostname_partitioning = messagebus .hostname_partitioning
166176 self ._offset_fetcher = OffsetsFetcher (self ._location , self ._topic , self ._general_group )
167- self ._codec = messagebus .codec
177+ self ._compression_type = messagebus .compression_type
178+ self ._partitions = messagebus .spider_feed_partitions
168179
169180 def consumer (self , partition_id ):
170- return Consumer (self ._location , self ._topic , self ._general_group , partition_id )
181+ return DeprecatedConsumer (self ._location , self ._topic , self ._general_group , partition_id )
171182
172183 def available_partitions (self ):
173184 partitions = []
@@ -178,22 +189,23 @@ def available_partitions(self):
178189 return partitions
179190
180191 def producer (self ):
181- partitioner = Crc32NamePartitioner if self ._hostname_partitioning else FingerprintPartitioner
182- return KeyedProducer (self ._location , self ._topic , partitioner , self ._codec )
192+ partitioner = Crc32NamePartitioner (self ._partitions ) if self ._hostname_partitioning \
193+ else FingerprintPartitioner (self ._partitions )
194+ return KeyedProducer (self ._location , self ._topic , partitioner , self ._compression_type )
183195
184196
185197class ScoringLogStream (BaseScoringLogStream ):
186198 def __init__ (self , messagebus ):
187199 self ._topic = messagebus .topic_scoring
188200 self ._group = messagebus .general_group
189201 self ._location = messagebus .kafka_location
190- self ._codec = messagebus .codec
202+ self ._compression_type = messagebus .compression_type
191203
192204 def consumer (self ):
193- return Consumer (self ._location , self ._topic , self ._group , partition_id = None )
205+ return DeprecatedConsumer (self ._location , self ._topic , self ._group , partition_id = None )
194206
195207 def producer (self ):
196- return SimpleProducer (self ._location , self ._topic , self ._codec )
208+ return SimpleProducer (self ._location , self ._topic , self ._compression_type )
197209
198210
199211class MessageBus (BaseMessageBus ):
@@ -206,9 +218,10 @@ def __init__(self, settings):
206218 self .spider_partition_id = settings .get ('SPIDER_PARTITION_ID' )
207219 self .max_next_requests = settings .MAX_NEXT_REQUESTS
208220 self .hostname_partitioning = settings .get ('QUEUE_HOSTNAME_PARTITIONING' )
209- codec = settings .get ('KAFKA_CODEC' )
210- self .codec = codec if codec else CODEC_NONE
221+ self .compression_type = settings .get ('KAFKA_COMPRESSION' )
211222 self .kafka_location = settings .get ('KAFKA_LOCATION' )
223+ self .spider_log_partitions = settings .get ('SPIDER_LOG_PARTITIONS' )
224+ self .spider_feed_partitions = settings .get ('SPIDER_FEED_PARTITIONS' )
212225
213226 def spider_log (self ):
214227 return SpiderLogStream (self )
0 commit comments