Skip to content

Commit 460ecd8

Browse files
Rashid SchamiRashid Schami
authored andcommitted
Add job_search input stream
1 parent f4da24a commit 460ecd8

File tree

5 files changed

+37
-19
lines changed

5 files changed

+37
-19
lines changed

job_url_scraper/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
KAFKA = 'kafka:9092'
2+
KAFKA_INPUT_TOPIC = 'job_search'
23
KAFKA_OUTPUT_TOPIC = 'job_urls'
34
KAFKA_ERROR_TOPIC = 'errors'

job_url_scraper/helper.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from contextlib import closing
44
from bs4 import BeautifulSoup
55
from retry import retry
6-
from kafka import KafkaProducer
6+
import json
7+
from kafka import KafkaProducer, KafkaConsumer
78
from config import KAFKA
89

910

@@ -15,6 +16,15 @@ def get_producer():
1516
)
1617

1718

19+
@retry(tries=5, delay=30)
20+
def get_consumer(topic):
21+
return KafkaConsumer(
22+
topic,
23+
bootstrap_servers=[KAFKA],
24+
value_deserializer=lambda x: json.loads(x.decode('ascii'))
25+
)
26+
27+
1828
@retry(tries=3, delay=5)
1929
def get_html(url):
2030
with closing(get(url, stream=True)) as response:

job_url_scraper/job_url_scraper.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import re
22
from helper import get_html
33
from time import sleep
4+
from urllib.parse import quote_plus
45

56

67
class JobUrlScraper:
7-
def __init__(self, query='software+developer', throttle_seconds=5):
8-
self.query = query
8+
def __init__(self, query, city, throttle_seconds=5):
9+
self.query = quote_plus(query)
10+
self.city = quote_plus(city)
911
self.search_count = self._get_search_count()
1012
self.throttle_seconds = throttle_seconds
1113

@@ -15,7 +17,7 @@ def _get_search_count(self):
1517
return int(re.findall(r'(\d+.?\d*) Jobs', search_count_text)[0].replace('.', ''))
1618

1719
def _get_search_page_url(self, start=0):
18-
return f'https://de.indeed.com/Jobs?q={self.query}&l=Berlin&sort=date&limit=50&radius=25&filter=0&start={start}'
20+
return f'https://de.indeed.com/Jobs?q={self.query}&l={self.city}&sort=date&limit=50&radius=25&filter=0&start={start}'
1921

2022
def get_all_job_urls(self):
2123
for url in self._get_all_search_page_urls():

job_url_scraper/main.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
1-
from helper import get_producer
2-
from config import KAFKA_ERROR_TOPIC, KAFKA_OUTPUT_TOPIC
1+
from helper import get_producer, get_consumer
2+
from config import KAFKA_ERROR_TOPIC, KAFKA_OUTPUT_TOPIC, KAFKA_INPUT_TOPIC
33
from job_url_scraper import JobUrlScraper
44
import traceback
55

66

77
if __name__ == '__main__':
88
producer = get_producer()
9+
consumer = get_consumer(KAFKA_INPUT_TOPIC)
910

10-
try:
11-
for url in JobUrlScraper().get_all_job_urls():
12-
producer.send(KAFKA_OUTPUT_TOPIC, url)
13-
except Exception as e:
14-
tb = traceback.format_exc()
15-
producer.send(KAFKA_ERROR_TOPIC,
16-
value=f'ERROR: {e}\n{tb}\n',
17-
key='JobUrlScraper'
18-
)
19-
raise
20-
finally:
21-
producer.flush()
11+
for message in consumer:
12+
try:
13+
for url in JobUrlScraper(query=message.value['query'],
14+
city=message.value['city']).get_all_job_urls():
15+
producer.send(KAFKA_OUTPUT_TOPIC, url)
16+
except Exception as e:
17+
tb = traceback.format_exc()
18+
producer.send(KAFKA_ERROR_TOPIC,
19+
value=f'ERROR: {e}\n{tb}\n',
20+
key='JobUrlScraper'
21+
)
22+
raise
23+
finally:
24+
producer.flush()

job_url_scraper/test/test_job_url_scraper.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ def setUp(self):
1414

1515
def test_get_all_job_urls(self):
1616
def get_html(url):
17-
if url == 'https://de.indeed.com/Jobs?q=software+developer&l=Berlin&sort=date&limit=50&radius=25&filter=0&start=0':
17+
if url == 'https://de.indeed.com/Jobs?q=software+developer+c%23&l=Berlin&sort=date&limit=50&radius=25&filter=0&start=0':
1818
return self.main_page
1919
raise RequestException(f'Error getting {url}')
2020

2121
with patch('job_url_scraper.get_html') as mocked_get:
2222
mocked_get.side_effect = get_html
2323
job_urls = [url for url in JobUrlScraper(
24+
query='software developer c#',
25+
city='Berlin',
2426
throttle_seconds=0).get_all_job_urls()]
2527
self.assertEqual(job_urls, self.expected_job_urls)

0 commit comments

Comments
 (0)