forked from scylladb/scylla-code-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
insert_data_into_scylla+elastic.py
101 lines (73 loc) · 3.19 KB
/
insert_data_into_scylla+elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#! /usr/bin/env python
# -*- coding: latin-1 -*-
#
### Using elasticsearch-py ###
import csv
from cassandra.cluster import Cluster
from elasticsearch import Elasticsearch
import random
import argparse
import concurrent.futures
from cassandra import ConsistencyLevel
from cassandra.concurrent import execute_concurrent_with_args
## Script args and Help
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('-s', action="store", dest="SCYLLA_IP", default="127.0.0.1")
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1")
opts = parser.parse_args()
SCYLLA_IP = opts.SCYLLA_IP.split(',')
ES_IP = opts.ES_IP.split(',')
## Define KS + Table
create_ks = "CREATE KEYSPACE IF NOT EXISTS catalog WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};"
create_t1 = "CREATE TABLE IF NOT EXISTS catalog.apparel (sku text, brand text, group text, sub_group text, color text, size text, gender text, PRIMARY KEY ((sku),color,size));"
## Loading the data
def load_data(filename):
data = []
headers = []
with open(filename, "r") as f:
reader = csv.reader(f)
headers = next(reader) # read the headers line
for l in reader:
doc = {}
for i in range(0, len(l)):
doc[headers[i].lower()] = l[i]
data.append(doc)
return headers, data
## Insert the data
def insert_data(headers, data):
## Connect to Scylla cluster and create schema
# session = cassandra.cluster.Cluster(SCYLLA_IP).connect()
print("")
print("## Connecting to Scylla cluster -> Creating schema")
session = Cluster(SCYLLA_IP).connect()
session.execute(create_ks)
session.execute(create_t1)
## Connect to Elasticsearch
print ("")
print ("## Connecting to Elasticsearch -> Creating 'Catalog' index")
es = Elasticsearch(ES_IP)
## Create Elasticsearch index. Ignore 400 = IF NOT EXIST
es.indices.create(index="catalog", ignore=400)
## Non-prepared CQL statement
#cql = "INSERT INTO catalog.apparel(sku,brand,group,sub_group,color,size,gender) VALUES(%(sku)s,%(brand)s,%(group)s,%(sub_group)s,%(color)s,%(size)s,%(gender)s)"
## Prepared CQL statement
print("")
print("## Preparing CQL statement")
cql = "INSERT INTO catalog.apparel (sku,brand,group,sub_group,color,size,gender) VALUES (?,?,?,?,?,?,?) using TIMESTAMP ?"
cql_prepared = session.prepare(cql)
cql_prepared.consistency_level = ConsistencyLevel.ONE if random.random() < 0.2 else ConsistencyLevel.QUORUM
print("")
print("## Insert csv content into Scylla and Elasticsearch")
for d in data:
# See if we need to add code to wait for the ack. This should be synchronous.
# Also, might need to switch to prepared statements to set the consistency level for sync requests.
session.execute(cql_prepared, d)
res = es.index(index="catalog", doc_type="apparel", id=d["sku"], body=d)
## After all the inserts, make a refresh, just in case
print("")
print("## Inserts completed, refreshing index")
es.indices.refresh(index="catalog")
print("")
if __name__ == "__main__":
headers, data = load_data("./catalog.csv")
insert_data(headers, data)