Skip to content

Commit 92b95c3

Browse files
committed
Add a notebook for Kafka.
1 parent 1e3e14a commit 92b95c3

File tree

2 files changed

+162
-0
lines changed

2 files changed

+162
-0
lines changed

JupyterNotebooks/Untitled.ipynb

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 4,
6+
"id": "e01b33a3-519e-48fe-9f0d-4563a1ecd0ac",
7+
"metadata": {},
8+
"outputs": [
9+
{
10+
"name": "stdout",
11+
"output_type": "stream",
12+
"text": [
13+
"Requirement already satisfied: tweepy in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (4.14.0)\n",
14+
"Requirement already satisfied: oauthlib<4,>=3.2.0 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from tweepy) (3.2.2)\n",
15+
"Requirement already satisfied: requests<3,>=2.27.0 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from tweepy) (2.31.0)\n",
16+
"Requirement already satisfied: requests-oauthlib<2,>=1.2.0 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from tweepy) (1.3.1)\n",
17+
"Requirement already satisfied: charset-normalizer<4,>=2 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from requests<3,>=2.27.0->tweepy) (3.3.2)\n",
18+
"Requirement already satisfied: idna<4,>=2.5 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from requests<3,>=2.27.0->tweepy) (3.6)\n",
19+
"Requirement already satisfied: urllib3<3,>=1.21.1 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from requests<3,>=2.27.0->tweepy) (2.2.1)\n",
20+
"Requirement already satisfied: certifi>=2017.4.17 in c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages (from requests<3,>=2.27.0->tweepy) (2024.2.2)\n",
21+
"Note: you may need to restart the kernel to use updated packages.\n"
22+
]
23+
},
24+
{
25+
"name": "stderr",
26+
"output_type": "stream",
27+
"text": [
28+
"DEPRECATION: Loading egg at c:\\users\\rishi\\downloads\\installed\\python311\\lib\\site-packages\\vboxapi-1.0-py3.11.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330\n"
29+
]
30+
}
31+
],
32+
"source": [
33+
"pip install tweepy"
34+
]
35+
},
36+
{
37+
"cell_type": "code",
38+
"execution_count": 1,
39+
"id": "7e84f27f-b52f-44b8-b541-ac3b39cb575f",
40+
"metadata": {},
41+
"outputs": [
42+
{
43+
"ename": "ImportError",
44+
"evalue": "cannot import name 'StreamListener' from 'tweepy.streaming' (C:\\Users\\rishi\\Downloads\\installed\\Python311\\Lib\\site-packages\\tweepy\\streaming.py)",
45+
"output_type": "error",
46+
"traceback": [
47+
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
48+
"\u001b[1;31mImportError\u001b[0m Traceback (most recent call last)",
49+
"Cell \u001b[1;32mIn[1], line 2\u001b[0m\n\u001b[0;32m 1\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mtweepy\u001b[39;00m\n\u001b[1;32m----> 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mtweepy\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstreaming\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m StreamListener\n\u001b[0;32m 3\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mtweepy\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m OAuthHandler\n\u001b[0;32m 4\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mtweepy\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Stream\n",
50+
"\u001b[1;31mImportError\u001b[0m: cannot import name 'StreamListener' from 'tweepy.streaming' (C:\\Users\\rishi\\Downloads\\installed\\Python311\\Lib\\site-packages\\tweepy\\streaming.py)"
51+
]
52+
}
53+
],
54+
"source": [
55+
"import tweepy\n",
56+
"from tweepy.streaming import StreamListener\n",
57+
"from tweepy import OAuthHandler\n",
58+
"from tweepy import Stream\n",
59+
"from kafka import SimpleProducer, KafkaClient\n",
60+
"import time\n",
61+
"\n",
62+
"access_token = \"748200460067045376-zYxRRyxiPIywcw2IV50IQiIxzQVN5FZ\"\n",
63+
"access_token_secret = \"c6dRkeRbgPqtbWAOTz0OfOMpBvhZS6KqWFjtEqHBEv7me\"\n",
64+
"consumer_key = \"ukUrCrJdd6MQQd0HQBzCDwcLq\"\n",
65+
"consumer_secret = \"VCf2wU1MhedUFnQeCwffzstdVkF7rbURzoNNDAdPPvbWfDtggP\"\n",
66+
"kafka_endpoint = \"ip-20-0-32-4.ap-south-1.compute.internal:9092\"\n",
67+
"kafka_topic = \"rk_hadoop\"\n",
68+
"twitter_hash_tag = \"RamNavami\"\n",
69+
"time_limit = 10\n",
70+
"\n",
71+
"class StdOutListener(StreamListener):\n",
72+
"\tdef __init__(self, time_limit=time_limit):\n",
73+
"\t\tself.start_time = time.time()\n",
74+
"\t\tself.limit = time_limit\n",
75+
"\t\tsuper(StdOutListener, self).__init__()\n",
76+
"\tdef on_data(self, data):\n",
77+
"\t\tif (time.time() - self.start_time) < self.limit:\n",
78+
" #msg = json.loads(data)\n",
79+
"\t\t\tproducer.send_messages(kafka_topic, data.encode('utf-8'))\n",
80+
"\t\t\tprint (data)\n",
81+
"\t\t\treturn True\n",
82+
"\t\texit(0)\n",
83+
"\tdef on_error(self, status):\n",
84+
"\t\tprint (status)\n",
85+
"\n",
86+
"kafka = KafkaClient(kafka_endpoint)\n",
87+
"producer = SimpleProducer(kafka)\n",
88+
"l = StdOutListener()\n",
89+
"auth = OAuthHandler(consumer_key, consumer_secret)\n",
90+
"auth.set_access_token(access_token, access_token_secret)\n",
91+
"stream = Stream(auth, l)\n",
92+
"stream.filter(track=twitter_hash_tag)"
93+
]
94+
},
95+
{
96+
"cell_type": "code",
97+
"execution_count": null,
98+
"id": "92d11f6c-365d-4b21-9a23-f5e19dc2ac88",
99+
"metadata": {},
100+
"outputs": [],
101+
"source": []
102+
}
103+
],
104+
"metadata": {
105+
"kernelspec": {
106+
"display_name": "Python 3 (ipykernel)",
107+
"language": "python",
108+
"name": "python3"
109+
},
110+
"language_info": {
111+
"codemirror_mode": {
112+
"name": "ipython",
113+
"version": 3
114+
},
115+
"file_extension": ".py",
116+
"mimetype": "text/x-python",
117+
"name": "python",
118+
"nbconvert_exporter": "python",
119+
"pygments_lexer": "ipython3",
120+
"version": "3.11.4"
121+
}
122+
},
123+
"nbformat": 4,
124+
"nbformat_minor": 5
125+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from tweepy.streaming import StreamListener
2+
from tweepy import OAuthHandler
3+
from tweepy import Stream
4+
from kafka import SimpleProducer, KafkaClient
5+
import time
6+
7+
access_token = "748200460067045376-zYxRRyxiPIywcw2IV50IQiIxzQVN5FZ"
8+
access_token_secret = "c6dRkeRbgPqtbWAOTz0OfOMpBvhZS6KqWFjtEqHBEv7me"
9+
consumer_key = "ukUrCrJdd6MQQd0HQBzCDwcLq"
10+
consumer_secret = "VCf2wU1MhedUFnQeCwffzstdVkF7rbURzoNNDAdPPvbWfDtggP"
11+
kafka_endpoint = "ip-20-0-32-4.ap-south-1.compute.internal:9092"
12+
kafka_topic = "rk_hadoop"
13+
twitter_hash_tag = "RamNavami"
14+
time_limit = 10
15+
16+
class StdOutListener(StreamListener):
17+
def __init__(self, time_limit=time_limit):
18+
self.start_time = time.time()
19+
self.limit = time_limit
20+
super(StdOutListener, self).__init__()
21+
def on_data(self, data):
22+
if (time.time() - self.start_time) < self.limit:
23+
#msg = json.loads(data)
24+
producer.send_messages(kafka_topic, data.encode('utf-8'))
25+
print (data)
26+
return True
27+
exit(0)
28+
def on_error(self, status):
29+
print (status)
30+
31+
kafka = KafkaClient(kafka_endpoint)
32+
producer = SimpleProducer(kafka)
33+
l = StdOutListener()
34+
auth = OAuthHandler(consumer_key, consumer_secret)
35+
auth.set_access_token(access_token, access_token_secret)
36+
stream = Stream(auth, l)
37+
stream.filter(track=twitter_hash_tag)

0 commit comments

Comments
 (0)