18
18
19
19
package org .apache .spark .sql .hive .thriftserver
20
20
21
+ import java .io ._
22
+
21
23
import scala .collection .mutable .ArrayBuffer
22
- import scala .concurrent .ExecutionContext .Implicits .global
23
24
import scala .concurrent .duration ._
24
- import scala .concurrent .{Await , Future , Promise }
25
+ import scala .concurrent .{Await , Promise }
25
26
import scala .sys .process .{Process , ProcessLogger }
26
27
27
- import java .io ._
28
- import java .util .concurrent .atomic .AtomicInteger
29
-
30
28
import org .apache .hadoop .hive .conf .HiveConf .ConfVars
31
29
import org .scalatest .{BeforeAndAfterAll , FunSuite }
32
30
33
- import org .apache .spark .{ SparkException , Logging }
31
+ import org .apache .spark .Logging
34
32
import org .apache .spark .sql .catalyst .util .getTempFilePath
35
33
36
34
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
53
51
""" .stripMargin.split(" \\ s+" ).toSeq ++ extraArgs
54
52
}
55
53
56
- // AtomicInteger is needed because stderr and stdout of the forked process are handled in
57
- // different threads.
58
- val next = new AtomicInteger (0 )
54
+ var next = 0
59
55
val foundAllExpectedAnswers = Promise .apply[Unit ]()
60
56
val queryStream = new ByteArrayInputStream (queries.mkString(" \n " ).getBytes)
61
57
val buffer = new ArrayBuffer [String ]()
58
+ val lock = new Object
62
59
63
- def captureOutput (source : String )(line : String ) {
60
+ def captureOutput (source : String )(line : String ): Unit = lock. synchronized {
64
61
buffer += s " $source> $line"
65
- // If we haven't found all expected answers...
66
- if (next.get() < expectedAnswers.size) {
67
- // If another expected answer is found...
68
- if (line.startsWith(expectedAnswers(next.get()))) {
69
- // If all expected answers have been found...
70
- if (next.incrementAndGet() == expectedAnswers.size) {
71
- foundAllExpectedAnswers.trySuccess(())
72
- }
62
+ // If we haven't found all expected answers and another expected answer comes up...
63
+ if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
64
+ next += 1
65
+ // If all expected answers have been found...
66
+ if (next == expectedAnswers.size) {
67
+ foundAllExpectedAnswers.trySuccess(())
73
68
}
74
69
}
75
70
}
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
88
83
|=======================
89
84
|Spark SQL CLI command line: ${command.mkString(" " )}
90
85
|
91
- |Executed query ${ next.get()} " ${queries(next.get() )}",
92
- |But failed to capture expected output " ${expectedAnswers(next.get() )}" within $timeout.
86
+ |Executed query $next " ${queries(next)}",
87
+ |But failed to capture expected output " ${expectedAnswers(next)}" within $timeout.
93
88
|
94
89
| ${buffer.mkString(" \n " )}
95
90
|===========================
0 commit comments