@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
27
27
import org .apache .hadoop .fs .{FileSystem , FSDataOutputStream , Path }
28
28
import org .apache .hadoop .mapred .{JobConf , JobID }
29
29
30
- import org .apache .spark .{SparkConf , TaskContext }
30
+ import org .apache .spark .{CarmelConcurrentModifiedException , SparkConf , TaskContext }
31
31
import org .apache .spark .deploy .SparkHadoopUtil
32
32
import org .apache .spark .executor .OutputMetrics
33
33
import org .apache .spark .internal .Logging
@@ -102,12 +102,29 @@ object SparkHadoopWriterUtils extends Logging {
102
102
new Path (parent, s " _WRITING_ $diff" )
103
103
}
104
104
105
+ def withLock [T ](fs : FileSystem , path : Path , waitTime : Int , info : String )(f : => T ): T = {
106
+ var lockFileOut : Option [FSDataOutputStream ] = None
107
+ try {
108
+ lockFileOut = tryToLockFile(fs,
109
+ path, true , waitTime, info)
110
+ if (waitTime < 0 || lockFileOut.nonEmpty) {
111
+ f
112
+ } else {
113
+ throw CarmelConcurrentModifiedException (path)
114
+ }
115
+ } finally {
116
+ if (waitTime >= 0 && lockFileOut.nonEmpty) {
117
+ releaseLockFile(fs, lockFileOut.get, path)
118
+ }
119
+ }
120
+ }
121
+
105
122
def tryToLockFile (fs : FileSystem , path : Path , retry : Boolean ,
106
123
waitTime : Int , info : String = " " ): Option [FSDataOutputStream ] = {
107
124
if (waitTime < 0 ) return null
108
125
val startTime = System .currentTimeMillis()
109
126
log.info(s " [compact info] $info try to lock path $path" )
110
- val loc = tryToLockFileInternal(fs, path, retry, waitTime)
127
+ val loc = tryToLockFileInternal(fs, path, retry, waitTime, info )
111
128
val durationMS = System .currentTimeMillis() - startTime
112
129
if (loc.isDefined) {
113
130
log.info(s " [compact info] $info got lock in $durationMS ms for path $path" )
@@ -125,11 +142,15 @@ object SparkHadoopWriterUtils extends Logging {
125
142
}
126
143
127
144
private def tryToLockFileInternal (fs : FileSystem , path : Path , retry : Boolean ,
128
- waitTime : Int ): Option [FSDataOutputStream ] = {
145
+ waitTime : Int , info : String ): Option [FSDataOutputStream ] = {
129
146
if (waitTime < 0 ) return null
130
147
val targetPath = getLockFilePath(path)
131
148
val sleepTime = 1000
132
- var retryNum = waitTime / sleepTime + 1
149
+ var retryNum = if (retry) {
150
+ waitTime / sleepTime + 1
151
+ } else {
152
+ 1
153
+ }
133
154
var ret : Option [FSDataOutputStream ] = None
134
155
while (retryNum > 0 ) {
135
156
try {
@@ -142,11 +163,13 @@ object SparkHadoopWriterUtils extends Logging {
142
163
}
143
164
} catch {
144
165
case ex : Throwable =>
145
- logError(s " Failed to lock file $path in $retryNum attempt: ${ex.getMessage}" )
166
+ logError(s " [compact info] $info Failed to lock file $path in $retryNum" +
167
+ s " attempt: ${ex.getMessage}" )
146
168
}
147
169
148
170
if (retryNum > 1 ) {
149
- logWarning(s " Failed to lock file for $targetPath, ${retryNum - 1 } retries left, " +
171
+ logWarning(s " [compact info] $info Failed to lock file for $targetPath, " +
172
+ s " ${retryNum - 1 } retries left, " +
150
173
s " sleep for $sleepTime ms " )
151
174
Thread .sleep(sleepTime)
152
175
}
0 commit comments