forked from auimendoza/realtime-kafka-project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
salesbi-kafka-postgres-producer.py
123 lines (100 loc) · 2.85 KB
/
salesbi-kafka-postgres-producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
import psycopg2
import re, sys, time, traceback
from datetime import datetime
from kafka import KafkaProducer
con = None
cur = None
def startreplication (slot):
# create slot
plugin = 'test_decoding'
cur.execute(
'''
SELECT pg_create_logical_replication_slot(%s, %s)
'''
, (slot, plugin))
print slot, 'created.'
def getlogs():
draw = []
d = []
commits = {}
try:
cur.execute(
'''
SELECT pg_logical_slot_get_changes(%s, NULL, NULL, %s, %s)
''', (slot, 'include-timestamp', 'on'))
draw = cur.fetchall()
except:
traceback.print_exc()
for line in draw:
logd = {}
# each line looks like this:
# ('(0/173BAF8,771,"COMMIT 771 (at 2016-06-03 19:08:10.77748+00)")',)
log = re.sub('[(^\()(\)$]','',line[0]).split(',')
logid = log[1]
logtxt = re.sub('[(^\")(\"$)]','',log[2])
tokens = re.sub('\[[\w ]+\]','',logtxt).strip().split()
# "COMMIT 771 (at 2016-06-03 19:08:10.77748+00)"
if tokens[0] == 'COMMIT':
tstamp = ' '.join([tokens[3], tokens[4]])
uspattern = '(?<=\.)\d+(?=\+.)'
#us = re.search('(?<=\.)\d+(?=\+.)', tstamp).group(0).zfill(6)
us = re.search(uspattern, tstamp).group(0).zfill(6)
commits[logid] = re.sub('\+\d+$', '', re.sub(uspattern, us, tstamp))
# "table public.transaction: INSERT: order_date[date]:\'2016-06-03\' order_time[integer]:1010 sales_rep_id[character]:\'1-001\' product_id[character varying]:\'N001\' unit_sold[integer]:137"
elif tokens[0] == 'table':
logd['logid'] = logid
logd['table'] = re.sub(':$','',tokens[1].replace('public.',''))
for t in tokens[3:] :
logd[t.split(':')[0]] = re.sub('[(^\')(\'$)]','',t.split(':')[1])
d.append(logd)
return (commits, d)
def stopreplication(slot):
# drop slot
try:
cur.execute(
'''
SELECT pg_drop_replication_slot(%s)
'''
, (slot,))
print slot, 'dropped.'
except:
None
def usage():
print 'Usage:'
print ' ', sys.argv[0], 'slotname', 'interval_in_sec'
sys.exit(1)
if __name__ == '__main__':
if len(sys.argv) != 3:
usage()
slot = sys.argv[1]
topic = slot
interval = sys.argv[2]
try:
con = psycopg2.connect(database='salesbi', user='salesbi')
cur = con.cursor()
startreplication(slot)
producer = KafkaProducer()
while True:
c, d = getlogs()
mcnt = 0
for di in d:
if len(di) == 0:
continue
logid = di['logid']
di['timestamp'] = c[logid]
producer.send(topic, json.dumps(di))
mcnt += 1
#print str(len(d)), 'logs processed.'
print str(mcnt), 'messages sent'
time.sleep(float(interval))
except KeyboardInterrupt, ke:
None
except:
traceback.print_exc()
sys.exit(1)
finally:
stopreplication(slot)
if con:
con.close()
sys.exit(0)