@@ -76,4 +76,34 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
76
76
assert(inputInfoTracker.getInfo(Time (0 )).get(streamId1) === None )
77
77
assert(inputInfoTracker.getInfo(Time (1 ))(streamId1) === inputInfo2)
78
78
}
79
+
80
+ test(" merge two InputInfos" ) {
81
+ val inputInfo1_1 = StreamInputInfo (1 , 100L , Map (" ID" -> 1 ))
82
+ val inputInfo1_2 = StreamInputInfo (1 , 200L , Map (" ID" -> 1 ))
83
+ val inputInfo2 = StreamInputInfo (2 , 200L , Map (" ID" -> 2 ))
84
+
85
+ val mergedInfo = inputInfo1_1.merge(inputInfo1_2)
86
+ assert(mergedInfo.inputStreamId == 1 )
87
+ assert(mergedInfo.numRecords == 300L )
88
+ assert(mergedInfo.metadata == Map (" ID" -> 1 ))
89
+
90
+ intercept[IllegalArgumentException ]{
91
+ inputInfo1_1.merge(inputInfo2)
92
+ }
93
+ }
94
+
95
+ test(" test get InputInfo of all specified times" ) {
96
+ val inputInfoTracker = new InputInfoTracker (ssc)
97
+
98
+ val streamId1 = 0
99
+ val inputInfo1 = StreamInputInfo (streamId1, 100L )
100
+ val inputInfo2 = StreamInputInfo (streamId1, 300L )
101
+ inputInfoTracker.reportInfo(Time (0 ), inputInfo1)
102
+ inputInfoTracker.reportInfo(Time (1 ), inputInfo2)
103
+
104
+ val times = Seq (Time (0 ), Time (1 ))
105
+ val mergedInfo = inputInfoTracker.getInfo(times)(streamId1)
106
+ assert(mergedInfo.inputStreamId == 0 )
107
+ assert(mergedInfo.numRecords == 400L )
108
+ }
79
109
}
0 commit comments