Skip to content

Commit ffffb53

Browse files
Changed interface to use zip stream. Added more tests.
1 parent 1100b40 commit ffffb53

File tree

9 files changed

+144
-117
lines changed

9 files changed

+144
-117
lines changed

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.OutputStream
20+
import java.util.zip.ZipOutputStream
2121

2222
import org.apache.spark.SparkException
2323
import org.apache.spark.ui.SparkUI
@@ -71,6 +71,6 @@ private[history] abstract class ApplicationHistoryProvider {
7171
* @throws SparkException if the logs for the app id cannot be found.
7272
*/
7373
@throws(classOf[SparkException])
74-
def writeEventLogs(appId: String, attemptId: Option[String], outputStream: OutputStream): Unit
74+
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit
7575

7676
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream,
21-
FileNotFoundException, IOException, InputStream}
20+
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
2221
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
22+
import java.util.zip.{ZipEntry, ZipOutputStream}
2323

2424
import scala.collection.mutable
25-
import scala.util.control.NonFatal
2625

2726
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
28-
import org.apache.hadoop.fs.{FileStatus, Path}
27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2928
import org.apache.hadoop.fs.permission.AccessControlException
3029

3130
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
@@ -225,43 +224,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
225224
override def writeEventLogs(
226225
appId: String,
227226
attemptId: Option[String],
228-
outputStream: OutputStream): Unit = {
227+
zipStream: ZipOutputStream): Unit = {
228+
229+
/**
230+
* This method compresses the files passed in, and writes the compressed data out into the
231+
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
232+
* the name of the file being compressed.
233+
*/
234+
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
235+
val fs = FileSystem.get(hadoopConf)
236+
val buffer = new Array[Byte](64 * 1024)
237+
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
238+
try {
239+
outputStream.putNextEntry(new ZipEntry(entryName))
240+
var dataRemaining = true
241+
while (dataRemaining) {
242+
val length = inputStream.read(buffer)
243+
if (length != -1) {
244+
outputStream.write(buffer, 0, length)
245+
} else {
246+
dataRemaining = false
247+
}
248+
}
249+
outputStream.closeEntry()
250+
} finally {
251+
inputStream.close()
252+
}
253+
}
229254

230255
applications.get(appId) match {
231256
case Some(appInfo) =>
232-
val dirsToClear = new mutable.ArrayBuffer[File]()
233257
try {
234258
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
235-
val pathsToZip = appInfo.attempts.filter { attempt =>
259+
appInfo.attempts.filter { attempt =>
236260
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
237-
}.map { attempt =>
261+
}.foreach { attempt =>
238262
val logPath = new Path(logDir, attempt.logPath)
263+
// If this is a legacy directory, then add the directory to the zipStream and add
264+
// each file to that directory.
239265
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
240-
val localDir = Utils.createTempDir()
241-
Utils.chmod700(localDir)
242-
dirsToClear += localDir
243-
val outputFile = new File(localDir, logPath.getName)
244-
val outputStream = new FileOutputStream(outputFile)
245266
val files = fs.listFiles(logPath, false)
246-
val paths = new mutable.ArrayBuffer[Path]()
267+
zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
268+
zipStream.closeEntry()
247269
while (files.hasNext) {
248-
paths += files.next().getPath
270+
val file = files.next().getPath
271+
zipFileToStream(file, attempt.logPath + Path.SEPARATOR + file.getName, zipStream)
249272
}
250-
Utils.zipFilesToStream(paths, hadoopConf, outputStream)
251-
new Path(outputFile.toURI)
252273
} else {
253-
new Path(logDir, attempt.logPath)
274+
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
254275
}
255276
}
256-
Utils.zipFilesToStream(pathsToZip, hadoopConf, outputStream)
257277
} finally {
258-
dirsToClear.foreach { dir =>
259-
try {
260-
Utils.deleteRecursively(dir)
261-
} catch {
262-
case NonFatal(e) => logWarning(s"Error while attempting to delete $dir.")
263-
}
264-
}
278+
zipStream.close()
265279
}
266280
case None => throw new SparkException(s"Logs for $appId not found.")
267281
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.OutputStream
2120
import java.util.NoSuchElementException
21+
import java.util.zip.ZipOutputStream
2222
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2323

2424
import com.google.common.cache._
@@ -177,8 +177,8 @@ class HistoryServer(
177177
override def writeEventLogs(
178178
appId: String,
179179
attemptId: Option[String],
180-
outputStream: OutputStream): Unit = {
181-
provider.writeEventLogs(appId, attemptId, outputStream)
180+
zipStream: ZipOutputStream): Unit = {
181+
provider.writeEventLogs(appId, attemptId, zipStream)
182182
}
183183

184184
/**

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import java.io.OutputStream
19+
import java.util.zip.ZipOutputStream
2020
import javax.servlet.ServletContext
2121
import javax.ws.rs._
2222
import javax.ws.rs.core.{Context, Response}
@@ -208,7 +208,7 @@ private[spark] trait UIRoot {
208208
def writeEventLogs(
209209
appId: String,
210210
attemptId: Option[String],
211-
outputStream: OutputStream): Unit = { }
211+
zipStream: ZipOutputStream): Unit = { }
212212

213213
/**
214214
* Get the spark UI with the given appID, and apply a function

core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.status.api.v1
1818

1919
import java.io.OutputStream
20+
import java.util.zip.ZipOutputStream
2021
import javax.ws.rs.{GET, Produces}
2122
import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
2223

@@ -46,7 +47,15 @@ private[v1] class EventLogDownloadResource(
4647
}
4748

4849
val stream = new StreamingOutput {
49-
override def write(output: OutputStream) = hs.writeEventLogs(appId, attemptId, output)
50+
override def write(output: OutputStream) = {
51+
val zipStream = new ZipOutputStream(output)
52+
try {
53+
hs.writeEventLogs(appId, attemptId, zipStream)
54+
} finally {
55+
zipStream.close()
56+
}
57+
58+
}
5059
}
5160

5261
Response.ok(stream)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory
2222
import java.net._
2323
import java.nio.ByteBuffer
2424
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
25-
import java.util.zip.{ZipEntry, ZipOutputStream}
2625
import java.util.concurrent._
2726
import javax.net.ssl.HttpsURLConnection
2827

@@ -777,42 +776,6 @@ private[spark] object Utils extends Logging {
777776
localRootDirs = null
778777
}
779778

780-
/**
781-
* This method compresses the files passed in, and writes the compressed data out into the
782-
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
783-
* the name of the file being compressed.
784-
*/
785-
private[spark] def zipFilesToStream(
786-
files: Seq[Path],
787-
hadoopConf: Configuration,
788-
outputStream: OutputStream): Unit = {
789-
val fs = FileSystem.get(hadoopConf)
790-
val buffer = new Array[Byte](64 * 1024)
791-
val zipStream = Some(new ZipOutputStream(outputStream))
792-
try {
793-
files.foreach { remotePath =>
794-
val inputStream = Some(fs.open(remotePath, 1 * 1024 * 1024)) // 1MB Buffer
795-
try {
796-
zipStream.get.putNextEntry(new ZipEntry(remotePath.getName))
797-
var dataRemaining = true
798-
while (dataRemaining) {
799-
val length = inputStream.get.read(buffer)
800-
if (length != -1) {
801-
zipStream.get.write(buffer, 0, length)
802-
} else {
803-
dataRemaining = false
804-
}
805-
}
806-
zipStream.get.closeEntry()
807-
} finally {
808-
inputStream.foreach(_.close())
809-
}
810-
}
811-
} finally {
812-
zipStream.foreach(_.close())
813-
}
814-
}
815-
816779
/**
817780
* Shuffle the elements of a collection into a random order, returning the
818781
* result in a new collection. Unlike scala.util.Random.shuffle, this method

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.history
2020
import java.io.{BufferedOutputStream, FileInputStream, File, FileOutputStream, OutputStreamWriter}
2121
import java.net.URI
2222
import java.util.concurrent.TimeUnit
23+
import java.util.zip.ZipOutputStream
2324

2425
import scala.io.Source
2526

@@ -356,13 +357,15 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
356357
Utils.chmod700(outDir)
357358
Utils.chmod700(unzipDir)
358359
val outFile = new File(outDir, s"file$i.zip")
359-
val outputStream = new FileOutputStream(outFile)
360+
val outputStream = new ZipOutputStream(new FileOutputStream(outFile))
360361
provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
361362
HistoryTestUtils.unzipToDir(new FileInputStream(outFile), unzipDir)
362-
unzipDir.listFiles().foreach { log =>
363-
val inFile = logs.find(_.getName == log.getName).get
364-
val expStream = new FileInputStream(inFile)
365-
val resultStream = new FileInputStream(log)
363+
val actualFiles = unzipDir.listFiles()
364+
assert(actualFiles.length == 1)
365+
actualFiles.foreach { actualFile =>
366+
val expFile = logs.find(_.getName == actualFile.getName).get
367+
val expStream = new FileInputStream(expFile)
368+
val resultStream = new FileInputStream(actualFile)
366369
try {
367370
val input = IOUtils.toString(expStream)
368371
val out = IOUtils.toString(resultStream)

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,10 @@
1616
*/
1717
package org.apache.spark.deploy.history
1818

19-
import java.io.{ BufferedOutputStream, FileOutputStream, File, FileInputStream,
20-
FileWriter, InputStream, IOException}
19+
import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
2120
import java.net.{HttpURLConnection, URL}
22-
import java.util.zip.ZipInputStream
2321
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
2422

25-
import scala.util.control.NonFatal
26-
2723
import org.apache.commons.io.{FileUtils, IOUtils}
2824
import org.mockito.Mockito.when
2925
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
@@ -153,52 +149,80 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
153149
}
154150

155151
test("download all logs for app with multiple attempts") {
156-
doDownloadTest(None)
152+
doDownloadTest("local-1430917381535", None)
157153
}
158154

159155
test("download one log for app with multiple attempts") {
160-
(1 to 2).foreach { attemptId => doDownloadTest(Some(attemptId)) }
156+
(1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
157+
}
158+
159+
test("download legacy logs - all attempts") {
160+
doDownloadTest("local-1426533911241", None, legacy = true)
161+
}
162+
163+
test("download legacy logs - single attempts") {
164+
(1 to 2). foreach {
165+
attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
166+
}
161167
}
162168

163169
// Test that the files are downloaded correctly, and validate them.
164-
def doDownloadTest(attemptId: Option[Int]): Unit = {
170+
def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
171+
172+
def validateFile(expStream: FileInputStream, actualStream: FileInputStream): Unit = {
173+
try {
174+
val expected = IOUtils.toString(expStream)
175+
val actual = IOUtils.toString(actualStream)
176+
actual should be(expected)
177+
} finally {
178+
Seq(expStream, actualStream).foreach { s =>
179+
Utils.tryWithSafeFinally(s.close())()
180+
}
181+
}
182+
}
165183

166184
val url = attemptId match {
167185
case Some(id) =>
168-
new URL(s"${generateURL("applications/local-1430917381535")}/$id/logs")
186+
new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
169187
case None =>
170-
new URL(s"${generateURL("applications/local-1430917381535")}/logs")
188+
new URL(s"${generateURL(s"applications/$appId")}/logs")
171189
}
172190

173191
val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
174192
code should be (HttpServletResponse.SC_OK)
175193
inputStream should not be None
176194
error should be (None)
177195

178-
def validateFile(fileName: String, tempDir: File): Unit = {
179-
val inStream = new FileInputStream(new File(logDir, fileName))
180-
val outStream = new FileInputStream(new File(tempDir, fileName))
181-
try {
182-
val exp = IOUtils.toString(inStream)
183-
val input = IOUtils.toString(outStream)
184-
input should be(exp)
185-
} finally {
186-
Seq(inStream, outStream).foreach { s =>
187-
Utils.tryWithSafeFinally(s.close())()
188-
}
189-
}
190-
}
191-
192196
val dir = Utils.createTempDir()
193197
try {
194198
Utils.chmod700(dir)
195199
HistoryTestUtils.unzipToDir(inputStream.get, dir)
196-
val files = dir.listFiles()
200+
val unzippedContent = dir.listFiles()
197201
attemptId match {
198-
case Some(_) => files.length should be (1)
199-
case None => files.length should be (2)
202+
case Some(_) => unzippedContent.length should be (1)
203+
case None => unzippedContent.length should be (2)
204+
}
205+
206+
// If these are legacy files, then each of the unzipped contents is actually a legacy log dir.
207+
if (legacy) {
208+
unzippedContent.foreach { legacyDir =>
209+
assert(legacyDir.isDirectory)
210+
val logFiles = legacyDir.listFiles()
211+
logFiles.length should be (3)
212+
logFiles.foreach { f =>
213+
val actualStream = new FileInputStream(f)
214+
val expectedStream =
215+
new FileInputStream(new File(new File(logDir, legacyDir.getName), f.getName))
216+
validateFile(expectedStream, actualStream)
217+
}
218+
}
219+
} else {
220+
unzippedContent.foreach { f =>
221+
val actualStream = new FileInputStream(f)
222+
val expectedStream = new FileInputStream(new File(logDir, f.getName))
223+
validateFile(expectedStream, actualStream)
224+
}
200225
}
201-
validateFile(files.head.getName, dir)
202226
} finally {
203227
Utils.deleteRecursively(dir)
204228
}

0 commit comments

Comments
 (0)