-
Notifications
You must be signed in to change notification settings - Fork 10
/
problem63
81 lines (54 loc) · 5.16 KB
/
problem63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
You have been given MySQL DB with following details.
user=retail_dba
password=cloudera
database=retail_db
table=retail_db.orders
table=retail_db.order_items
jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Columns of order table : (orderid , order_date , order_customer_id, order_status)
Columns of ordeMtems table : (order_item_td , order_item_order_id ,
order_item_product_id,
order_item_quantity,order_item_subtotal,order_item_product_price)
Please accomplish following activities.
1. Copy "retail_db.orders" and "retail_db.order_items" table to hdfs in respective directory p92_orders and p92_order_items .
scala> val orders = sc.textFile("p92_orders").map(_.split(","))
orders: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:27
scala> val orderItems = sc.textFile("p92_order_items").map(_.split(","))
orderItems: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at map at <console>:27
2. Join these data using order_id in Spark and Python
scala> val ordersPair = orders.keyBy(line => line(0))
ordersPair: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[8] at keyBy at <console>:29
scala> ordersPair.first
res0: (String, Array[String]) = (1,Array(1, 2013-07-25 00:00:00.0, 11599, CLOSED))
scala> val joinedData = ordersPair.join(orderItemsPair)
joinedData: org.apache.spark.rdd.RDD[(String, (Array[String], Array[String]))] = MapPartitionsRDD[12] at join at <console>:35
scala> joinedData.first
res2: (String, (Array[String], Array[String])) = (2828,(Array(2828, 2013-08-10 00:00:00.0, 4952, SUSPECTED_FRAUD),Array(7097, 2828, 403, 1, 129.99, 129.99)))
3. Calculate total revenue perday and per customer
scala> val data = joinedData.map(line => (line._2._1(1), line._2._1(2), line._2._2(4)))
data: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[23] at map at <console>:37
scala> data.first
res13: (String, String, String) = (2013-08-10 00:00:00.0,4952,129.99)
scala> val dataCustomerTotal = joinedData.map(line => (line._2._1(1), line._2._1(2), line._2._2(4)))
dataCustomerTotal: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[25] at map at <console>:37
scala> val keyByDateCustomer = dataCustomerTotal.map(line => ((line._1, line._2), line._3))
keyByDateCustomer: org.apache.spark.rdd.RDD[((String, String), String)] = MapPartitionsRDD[28] at map at <console>:39
scala> val result = keyByDateCustomer.reduceByKey(_+_)
result: org.apache.spark.rdd.RDD[((String, String), String)] = ShuffledRDD[30] at reduceByKey at <console>:41
scala> val keyByDateCustomer = dataCustomerTotal.map(line => ((line._1, line._2), line._3.toFloat))
keyByDateCustomer: org.apache.spark.rdd.RDD[((String, String), Float)] = MapPartitionsRDD[31] at map at <console>:39
scala> val result = keyByDateCustomer.reduceByKey(_+_)
result: org.apache.spark.rdd.RDD[((String, String), Float)] = ShuffledRDD[32] at reduceByKey at <console>:41
scala> result.first
res20: ((String, String), Float) = ((2013-08-01 00:00:00.0,5806),639.88)
scala> val result = keyByDateCustomer.reduceByKey(_+_).sortByKey()
result: org.apache.spark.rdd.RDD[((String, String), Float)] = ShuffledRDD[42] at sortByKey at <console>:41
scala> result.collect
res26: Array[((String, String), Float)] = Array(((2013-07-25 00:00:00.0,10039),100.0), ((2013-07-25 00:00:00.0,10430),150.0), ((2013-07-25 00:00:00.0,10500),399.98), ((2013-07-25 00:00:00.0,10519),699.89), ((2013-07-25 00:00:00.0,10784),124.95), ((2013-07-25 00:00:00.0,10920),759.96), ((2013-07-25 00:00:00.0,11196),200.0), ((2013-07-25 00:00:00.0,11318),1129.8601), ((2013-07-25 00:00:00.0,1143),869.89), ((2013-07-25 00:00:00.0,11441),829.97), ((2013-07-25 00:00:00.0,1148),899.92004), ((2013-07-25 00:00:00.0,11542),1099.93), ((2013-07-25 00:00:00.0,11586),359.96002), ((2013-07-25 00:00:00.0,11589),739.91), ((2013-07-25 00:00:00.0,11599),299.98), ((2013-07-25 00:00:00.0,11644),619.95), ((2013-07-25 00:00:00.0,11809),629.96), ((2013-07-25 00:00:00.0,11941),1649.8), ((2013-07-25 00:00:00.0,...
4. Calculate maximum revenue customer
scala> val keyByAmount = keyByDateCustomer.map(line => ((line._1._1, line._2), line._1._2))
keyByAmount: org.apache.spark.rdd.RDD[((String, Float), String)] = MapPartitionsRDD[44] at map at <console>:41
scala> val maxRevByDate = keyByAmount.sortByKey(false)
maxRevByDate: org.apache.spark.rdd.RDD[((String, Float), String)] = ShuffledRDD[47] at sortByKey at <console>:43
scala> maxRevByDate.collect
res27: Array[((String, Float), String)] = Array(((2014-07-24 00:00:00.0,499.95),8758), ((2014-07-24 00:00:00.0,499.95),3728), ((2014-07-24 00:00:00.0,499.95),11397), ((2014-07-24 00:00:00.0,499.95),2621), ((2014-07-24 00:00:00.0,499.95),4896), ((2014-07-24 00:00:00.0,499.95),1218), ((2014-07-24 00:00:00.0,399.99),4380), ((2014-07-24 00:00:00.0,399.98),10170), ((2014-07-24 00:00:00.0,399.98),11508), ((2014-07-24 00:00:00.0,399.98),4263), ((2014-07-24 00:00:00.0,399.98),10022), ((2014-07-24 00:00:00.0,399.98),5446), ((2014-07-24 00:00:00.0,399.98),9920), ((2014-07-24 00:00:00.0,399.98),8834), ((2014-07-24 00:00:00.0,399.98),11274), ((2014-07-24 00:00:00.0,399.98),11274), ((2014-07-24 00:00:00.0,399.98),11711), ((2014-07-24 00:00:00.0,399.98),5764), ((2014-07-24 00:00:00.0,399.98),2989), (...