forked from phact/dse-kafka-stockticks
-
Notifications
You must be signed in to change notification settings - Fork 2
/
dfjoin
executable file
·40 lines (31 loc) · 2.73 KB
/
dfjoin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
echo "$ dse spark"
dse spark <<- 'EOF'
//Connect to the Graph
val g = spark.dseGraph("s")
//Total customer/account value
//Create Graph Temporary table
g.find("(account)-[owns]->(fund); (fund)-[holds]->(holding); (holding)-[has]->(sec)").filter("account.`~label` = 'account' and owns.`~label` = 'owns' and holds.`~label` = 'holds' and has.`~label` = 'asset' and sec.`~label` = 'security'").select($"sec.symbol", $"account.name", $"holding.quantity").registerTempTable("accounts")
//Join graph data with C* latest
spark.sql("select name, sum (value*quantity) as val from datastax_tickdata_demo.last_tick_data t join accounts a on t.symbol = a.symbol group by name").show
//Join graph data with C* for a particular time
spark.sql("select name, sum (value*quantity) as val from datastax_tickdata_demo.tick_data t join accounts a on t.symbol = a.symbol where t.date >= '2017-09-06 04:05:18.1' and t.date <= '2017-09-06 04:05:18.9' group by name").show
//Value by fund
//Create Graph Temporary table
g.find("(fund)-[holds]->(holding); (holding)-[has]->(sec)").filter("fund.`~label` = 'fund' and holds.`~label` = 'holds' and has.`~label` = 'asset' and sec.`~label` = 'security'").select($"sec.symbol", $"fund.name", $"holding.quantity").registerTempTable("funds")
//Join graph data with C* latest
spark.sql("select name, sum (value*quantity) as val from datastax_tickdata_demo.last_tick_data t join funds a on t.symbol = a.symbol group by name").show
//Join graph data with C* for a particular time
spark.sql("select name, sum (value*quantity) as val from datastax_tickdata_demo.tick_data t join funds a on t.symbol = a.symbol where t.date >= '2017-09-06 04:05:18.1' and t.date <= '2017-09-06 04:05:18.9' group by name").show
//Configuration to connect to S3 appliance
sc.hadoopConfiguration.set("fs.s3a.endpoint", "10.200.181.77:9128");
sc.hadoopConfiguration.set("fs.s3a.access.key", "C74UE8XI0XC9I11EQZJK")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "mDCIrslvtPiOiLjRs3dEBQe3EJnCCZ7CeNz+5R6/")
sc.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
sc.textFile("s3a://statestreet/full_list").first
val s3df = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("s3a://statestreet/full_list")
//Define Temp Table
spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("s3a://statestreet/full_list").registerTempTable("history")
//Historical join with S3 appliance
spark.sql("select name, sum (close*quantity) as value from history t join funds a on t.symbol = a.symbol where t.date = '2017/08/14' group by name").show