- Data Generator
- PostgreSQL Database 9.4 (Third Party Software)
- Replication
- Logical Decoding
- Kafka Producer (using Logical Decoding SQL Interface, python)
- Kafka Consumer
- Analytics Web App (calls consumer.py SalesConsumer class)
- Dashboard (D3js, datamaps)
Setup Azure VM
- Option: Ubuntu Server 14.04 LTS
- Size: Standard DS11 v2 (2 cores, 14 GB memory)
- Location: East US
- DNS Name: auimbigdata2.eastus.cloudapp.azure.com
- Static IP: Yes
- Use SSH Key: Yes
- Open TCP Port 8080 for HTTP
Install PostgreSQL 9.4
$ sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main" >> /etc/apt/sources.list.d/pgdg.list'
$ wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
$ sudo apt-get update
$ sudo apt-get upgrade
$ sudo apt-get install postgresql-9.4 postgresql-server-dev-9.4 postgresql-contrib-9.4
Create salesbi user, database and setup replication
$ sudo su - postgres
$ createuser -U postgres -d -e -E -l -P -r -s --replication salesbi
$ sudo adduser salesbi
$ sudo su - salesbi
$ createdb salesbi
add the below lines to /etc/postgresql/9.4/main/postgresql.conf
wal_level = logical
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4
add the below lines to /etc/postgresql/9.4/main/pg_hba.conf
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust
Install Psycopg PostgreSQL adapter for Python
$ sudo service postgresql stop
$ sudo apt-get install python-psycopg2
$ sudo service postgresql start
Install Kafka
install dependencies jre
and zookeeper
$ sudo apt-get update
$ sudo apt-get install default-jre
$ sudo apt-get install zookeeperd
create kafka user
$ sudo su -
$ useradd kafka -m
$ passwd kafka
$ adduser kafka sudo
install kafka binaries
$ su - kafka
$ mkdir -p ~/Downloads
$ wget "http://mirror.cc.columbia.edu/pub/software/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz" -O ~/Downloads/kafka.tgz
$ mkdir -p ~/kafka && cd ~/kafka
$ tar -xvzf ~/Downloads/kafka.tgz --strip 1
add the following lines to ~/kafka/config/server.properties
delete.topic.enable = true
auto.create.topics.enable=true
start the Kafka Server
$ nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &
Install and prepare Python virtualenv
install virtualenv
$ sudo apt-get install python-virtualenv
create project virtualenv
$ sudo su - salesbi
$ mkdir webapi
$ cd webapi
$ virtualenv venv
activate virtualenv
$ . venv/bin/activate
Install required Python packages for data pipeline
install flask, socketio, eventlet, moment and bootstrap
(venv)$ pip install Flask
(venv)$ pip install flask-socketio
(venv)$ pip install eventlet
(venv)$ pip install flask-moment
(venv)$ pip install flask-bootstrap
install psycopg2 postgresql adapter for python
(venv)$ pip install psycopg2
install python kafka client
(venv)$ pip install kafka-python
$ sudo su - salesbi
$ psql
salesbi=# \i generator/create_insert_scripts.sql
salesbi=# \q
Start the kafka producer
(venv)$ python salesbi-kafka-postgres-producer.py transaction_slot 5
where transaction_slot is the postgresql replication slot name and 5 is the sleep time in between pg_logical_slot_get_changes calls
This producer does the following:
- create a new replication slot using given slot name
- loop continuously until Ctrl-C or Keyboard Interrupt
- for each loop iteration
- read the logical changes from the replication slot
- format the data into json
- send the formatted json data to topic with the same name as replication slot
- sleep for a specified number of seconds
- note: kafka is configured to auto create topics
- drop the replication slot and close database connection
Run the data generator
(venv)$ python generator/generator.py
Start the webapi server
(venv)$ export PYTHONPATH=$PYTHONPATH:/home/salesbi:.
(venv)$ python webapi.py
See it in action
Go to Youtube
References:
Installing Apache Kafka
Python client for Apache Kafka
Installing PostgreSQL
Logical Decoding in PostgreSQL 9.4
Install Psycopg - Python adapter for PostgreSQL
Python Flask (web microframework) installation and flask-socketio
Datamaps: Interactive maps for data visualization