Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to storage-manager release #343

Merged
merged 5 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion storage_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ In this example, we extend the one we've seen above in 3., adding a `having` cla
{ "name": "col1" },
{ "name": "col2" },
{
"max": { "name": "col3", "alias": "max_col3" }
"max": { "name": "col3" },
"alias": "max_col3"
}, {
"count": { "name": "*" }
}
Expand Down
5 changes: 4 additions & 1 deletion storage_manager/app/api/DatasetControllerAPI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ trait DatasetControllerAPI {
format: String,
@ApiParam(value = "the method used to perform the data conversions", required = false, defaultValue = "quick")
@QueryParam("method")
method: String): Action[AnyContent]
method: String,
@ApiParam(value = "the maximum number of rows returned by this request", required = false)
@QueryParam("limit")
limit: Option[Int]): Action[AnyContent]

@ApiOperation(
value = "Get a dataset based on the dataset id",
Expand Down
44 changes: 24 additions & 20 deletions storage_manager/app/config/FileExportConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration._
* @param sizeThreshold the minimum dataset size in KB before a download is carried out by export
* @param exportTimeout the maximum amount of time an export is allowed to take before it is killed
* @param exportPath the base path where to store export results
* @param keepAliveTimeout the minimum idle time to wait before triggering a keep-alive job
* @param livyHost the hostname (:port) for the livy server
* @param livyAuth the auth information for the livy client
* @param livyAppJars which additional jars to add to the livy client for upload to the server
Expand All @@ -39,6 +40,7 @@ case class FileExportConfig(numSessions: Int,
sizeThreshold: Int,
exportTimeout: FiniteDuration,
exportPath: String,
keepAliveTimeout: FiniteDuration,
livyHost: String,
livyAuth: Option[String],
livySSL: Boolean,
Expand Down Expand Up @@ -80,27 +82,29 @@ object FileExportConfig {
}

private def readExportValues = for {
numSessions <- Read.int { "num_sessions" } default 1
sizeThreshold <- Read.int { "size_threshold" } default 5120
exportTimeout <- Read.time { "timeout" } default 10.minutes
exportPath <- Read.string { "export_path" }.!
livyHost <- Read.string { "livy.host" }.!
livyAuth <- Read.string { "livy.auth" }
livySSL <- Read.boolean { "livy.ssl" } default true
livyAppJars <- Read.strings { "livy.jars" } default List.empty[String]
livyProperties <- readLivyConfig ~> readLivyProperties()
cleanup <- FileExportCleanupConfig.read
numSessions <- Read.int { "num_sessions" } default 1
sizeThreshold <- Read.int { "size_threshold" } default 5120
exportTimeout <- Read.time { "timeout" } default 10.minutes
exportPath <- Read.string { "export_path" }.!
keepAliveTimeout <- Read.time { "keep_alive_timeout" } default 30.minutes
livyHost <- Read.string { "livy.host" }.!
livyAuth <- Read.string { "livy.auth" }
livySSL <- Read.boolean { "livy.ssl" } default true
livyAppJars <- Read.strings { "livy.jars" } default List.empty[String]
livyProperties <- readLivyConfig ~> readLivyProperties()
cleanup <- FileExportCleanupConfig.read
} yield FileExportConfig(
numSessions = numSessions,
sizeThreshold = sizeThreshold,
exportTimeout = exportTimeout,
exportPath = exportPath,
livyHost = livyHost,
livyAuth = livyAuth,
livySSL = livySSL,
livyAppJars = livyAppJars,
livyProperties = livyProperties,
cleanup = cleanup
numSessions = numSessions,
sizeThreshold = sizeThreshold,
exportTimeout = exportTimeout,
exportPath = exportPath,
keepAliveTimeout = keepAliveTimeout,
livyHost = livyHost,
livyAuth = livyAuth,
livySSL = livySSL,
livyAppJars = livyAppJars,
livyProperties = livyProperties,
cleanup = cleanup
)

/**
Expand Down
12 changes: 7 additions & 5 deletions storage_manager/app/controllers/DatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ class DatasetController @Inject()(configuration: Configuration,
case Failure(error) => throw ConfigReadException(s"Unable to configure [impala-jdbc]", error)
}

private def defaultLimit = Read.int { "daf.row_limit" }.read(configuration) getOrElse None

protected val datasetService = new DatasetService(configuration.underlying)
protected val queryService = new JdbcQueryService(impalaConfig) with ImpalaTransactorInstance
protected val queryService = new JdbcQueryService(impalaConfig, defaultLimit) with ImpalaTransactorInstance
protected val downloadService = new DownloadService(kuduMaster)
protected val fileExportService = new FileExportService(exportConfig, kuduMaster)

Expand All @@ -87,8 +89,8 @@ class DatasetController @Inject()(configuration: Configuration,
params <- DatasetParams.fromCatalog(catalog)
} yield params

private def retrieveBulkData(uri: String, auth: String, userId: String, targetFormat: FileDataFormat, method: DownloadMethod) = retrieveCatalog(auth, uri) match {
case Success(params) => download(params, userId, targetFormat, method)
private def retrieveBulkData(uri: String, auth: String, userId: String, targetFormat: FileDataFormat, method: DownloadMethod, limit: Option[Int]) = retrieveCatalog(auth, uri) match {
case Success(params) => download(params, userId, targetFormat, method, limit)
case Failure(error) => Future.failed { error }
}

Expand Down Expand Up @@ -116,11 +118,11 @@ class DatasetController @Inject()(configuration: Configuration,
} yield Ok { schema.prettyJson } as JSON
}

def getDataset(uri: String, format: String = "json", method: String = "quick"): Action[AnyContent] = Actions.basic.securedAsync { (_, auth, userId) =>
def getDataset(uri: String, format: String = "json", method: String = "quick", limit: Option[Int] = None): Action[AnyContent] = Actions.basic.securedAsync { (_, auth, userId) =>
for {
targetFormat <- checkTargetFormat[Future](format)
downloadMethod <- checkDownloadMethod[Future](method)
result <- retrieveBulkData(uri, auth, userId, targetFormat, downloadMethod)
result <- retrieveBulkData(uri, auth, userId, targetFormat, downloadMethod, limit)
} yield result
}

Expand Down
6 changes: 3 additions & 3 deletions storage_manager/app/controllers/DatasetExport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import scala.util.{ Failure, Success }

trait DatasetExport { this: DatasetController =>

protected def prepareDirect(params: DatasetParams, targetFormat: FileDataFormat) = targetFormat match {
case JsonFileFormat => datasetService.jsonData(params)
case CsvFileFormat => datasetService.csvData(params)
protected def prepareDirect(params: DatasetParams, targetFormat: FileDataFormat, limit: Option[Int]) = targetFormat match {
case JsonFileFormat => datasetService.jsonData(params, limit)
case CsvFileFormat => datasetService.csvData(params, limit)
case _ => Failure { new IllegalArgumentException("Unable to prepare download; only CSV and JSON are permitted") }
}

Expand Down
14 changes: 7 additions & 7 deletions storage_manager/app/controllers/DownloadExecution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ trait DownloadExecution { this: DatasetController with DatasetExport with FileSy
case Failure(error) => Future.failed { error }
}

private def doQuickFile(params: DatasetParams, targetFormat: FileDataFormat) = prepareDirect(params, targetFormat).map { respond(_, params.name, targetFormat) }.~>[Future]
private def doQuickFile(params: DatasetParams, targetFormat: FileDataFormat, limit: Option[Int]) = prepareDirect(params, targetFormat, limit).map { respond(_, params.name, targetFormat) }.~>[Future]

private def quickFileDownload(params: FileDatasetParams, userId: String, targetFormat: FileDataFormat) = retrieveFileInfo(params.path, userId) match {
case Success(pathInfo) if pathInfo.estimatedSize <= exportConfig.sizeThreshold => doQuickFile(params, targetFormat)
private def quickFileDownload(params: FileDatasetParams, userId: String, targetFormat: FileDataFormat, limit: Option[Int]) = retrieveFileInfo(params.path, userId) match {
case Success(pathInfo) if pathInfo.estimatedSize <= exportConfig.sizeThreshold => doQuickFile(params, targetFormat, limit)
case Success(pathInfo) => failQuickDownload(params, targetFormat)
case Failure(error) => Future.failed { error }
}

// API

protected def quickDownload(params: DatasetParams, userId: String, targetFormat: FileDataFormat) = params match {
case fileParams: FileDatasetParams => quickFileDownload(fileParams, userId, targetFormat)
protected def quickDownload(params: DatasetParams, userId: String, targetFormat: FileDataFormat, limit: Option[Int] = None) = params match {
case fileParams: FileDatasetParams => quickFileDownload(fileParams, userId, targetFormat, limit)
case kuduParams: KuduDatasetParams => failQuickDownload(kuduParams, targetFormat) // no quick download option for kudu
}

Expand All @@ -74,8 +74,8 @@ trait DownloadExecution { this: DatasetController with DatasetExport with FileSy
case fileParams: FileDatasetParams => doFileExport(fileParams, userId, targetFormat).map { respond(_, fileParams.name, targetFormat) }
}

protected def download(params: DatasetParams, userId: String, targetFormat: FileDataFormat, method: DownloadMethod) = method match {
case QuickDownloadMethod => quickDownload(params, userId, targetFormat)
protected def download(params: DatasetParams, userId: String, targetFormat: FileDataFormat, method: DownloadMethod, limit: Option[Int] = None) = method match {
case QuickDownloadMethod => quickDownload(params, userId, targetFormat, limit)
case BatchDownloadMethod => batchDownload(params, userId, targetFormat)
}

Expand Down
13 changes: 6 additions & 7 deletions storage_manager/app/controllers/KuduController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ class KuduController(sparkSession: SparkSession, master: String) {

val alogger: Logger = LoggerFactory.getLogger(this.getClass)

def readData(table: String): Try[DataFrame] = {
Try{sparkSession
def readData(table: String): Try[DataFrame] = Try{
sparkSession
.sqlContext
.read
.options(Map("kudu.master" -> master, "kudu.table" -> table)).kudu
} recoverWith {
case ex =>
alogger.error(s"Exception ${ex.getMessage}\n ${ex.getStackTrace.mkString("\n")} ")
Failure(ex)
}
}.recoverWith {
case ex =>
alogger.error(s"Exception ${ex.getMessage}\n ${ex.getStackTrace.mkString("\n")} ")
Failure(ex)
}
}
27 changes: 19 additions & 8 deletions storage_manager/app/controllers/PhysicalDatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,38 @@ import cats.syntax.show.toShow
import com.typesafe.config.Config
import daf.dataset.{ DatasetParams, FileDatasetParams, KuduDatasetParams }
import daf.filesystem.fileFormatShow
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{ DataFrame, SparkSession }
import org.apache.spark.SparkConf
import org.slf4j.{ Logger, LoggerFactory }

class PhysicalDatasetController(sparkSession: SparkSession,
kuduMaster: String,
defaultLimit: Int = 1000,
defaultLimit: Option[Int] = None,
defaultChunkSize: Int = 0) {

lazy val kuduController = new KuduController(sparkSession, kuduMaster)
lazy val hdfsController = new HDFSController(sparkSession)

val logger: Logger = LoggerFactory.getLogger(this.getClass)

def kudu(params: KuduDatasetParams, limit: Int = defaultLimit) = {
private def addLimit(dataframe: DataFrame, limit: Option[Int]) = (limit, defaultLimit) match {
case (None, None) => dataframe
case (None, Some(value)) => dataframe.limit { value }
case (Some(value), None) => dataframe.limit { value }
case (Some(value), Some(default)) => dataframe.limit { math.min(value, default) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why take the min here? Wouldn't be a little misleading for the user to get a lesser number of rows respect to the parameter he submitted (if the defaultLimit is minor than limit)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The min is a defensive measure in that the optional default limit set in the configuration is used to set the absolute maximum number of rows that the quick APIs are allowed to return.

The problem is that it will not be clear in certain situations whether the number of rows returned is actually legit, or just the server’s maximum but i don’t think that there is another way to set a bound, and there is also no clear way to relay this information to the user.

Note that there’s always a check on the size of the data before we do the quick methods
so in theory this is a optional last stand on the server part so to say.

}

def kudu(params: KuduDatasetParams, limit: Option[Int] = None) = {
logger.debug { s"Reading data from kudu table [${params.table}]" }
kuduController.readData(params.table).map { _ limit math.min(limit, defaultLimit) }
kuduController.readData(params.table).map { addLimit(_, limit) }
}

def hdfs(params: FileDatasetParams, limit: Int = defaultLimit) = {
def hdfs(params: FileDatasetParams, limit: Option[Int] = None) = {
logger.debug { s"Reading data from hdfs at path [${params.path}]" }
hdfsController.readData(params.path, params.format.show, params.param("separator")).map { _ limit math.min(limit, defaultLimit) }
hdfsController.readData(params.path, params.format.show, params.param("separator")).map { addLimit(_, limit) }
}

def get(params: DatasetParams, limit: Int = defaultLimit) = params match {
def get(params: DatasetParams, limit: Option[Int]= None) = params match {
case kuduParams: KuduDatasetParams => kudu(kuduParams, limit)
case hdfsParams: FileDatasetParams => hdfs(hdfsParams, limit)
}
Expand Down Expand Up @@ -80,8 +87,12 @@ object PhysicalDatasetController {

val kuduMaster = configuration.getString("kudu.master")

val defaultLimit = if (configuration hasPath "daf.row_limit") Some {
configuration.getInt("daf.row_limit")
} else None

System.setProperty("sun.security.krb5.debug", "true")

new PhysicalDatasetController(sparkSession, kuduMaster)
new PhysicalDatasetController(sparkSession, kuduMaster, defaultLimit)
}
}
8 changes: 4 additions & 4 deletions storage_manager/app/daf/dataset/DatasetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class DatasetService(config: Config) {

private val storageClient = PhysicalDatasetController(config)

def schema(params: DatasetParams): Try[StructType] = storageClient.get(params, 1).map { _.schema }
def schema(params: DatasetParams): Try[StructType] = storageClient.get(params, Some(1)).map { _.schema }

def data(params: DatasetParams): Try[DataFrame] = storageClient.get(params)
def data(params: DatasetParams, limit: Option[Int]): Try[DataFrame] = storageClient.get(params, limit)

def jsonData(params: DatasetParams) = data(params).map { json }
def jsonData(params: DatasetParams, limit: Option[Int]) = data(params, limit).map { json }

def json(dataFrame: DataFrame) = wrapJson {
Source[String] { dataFrame.toJSON.collect().toVector }
Expand All @@ -49,7 +49,7 @@ class DatasetService(config: Config) {
// }

// TODO: split code without breaking Spark task serialization
def csvData(params: DatasetParams) = data(params).map { csv }
def csvData(params: DatasetParams, limit: Option[Int]) = data(params, limit).map { csv }

def csv(dataFrame: DataFrame) = Source[String] {
dataFrame.schema.fieldNames.map { h => s""""$h"""" }.mkString(",") +:
Expand Down
25 changes: 21 additions & 4 deletions storage_manager/app/daf/dataset/export/FileExportActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import java.io.File
import java.net.URI
import java.util.{ Properties, UUID }

import akka.actor.{ Actor, Props }
import akka.actor.{ Actor, Props, ReceiveTimeout }
import config.FileExportConfig
import daf.dataset.ExtraParams
import daf.filesystem._
import org.apache.commons.net.util.Base64
import org.apache.hadoop.fs.{ FileSystem, FileUtil }
import org.apache.livy.LivyClientFactory
import org.slf4j.LoggerFactory

import scala.concurrent.duration.FiniteDuration
import scala.util.{ Failure, Success, Try }

/**
Expand All @@ -50,6 +52,8 @@ import scala.util.{ Failure, Success, Try }
* @param livyProps the `Properties` instance used to configure the livy client
* @param kuduMaster the connection string for the Kudu cluster master
* @param exportPath string representing the base path where to put the exported data
* @param keepAliveTimeout the minimum amount of time to wait before triggering a keep-alive, which is an inexpensive
* job that is triggered if livy is not utilized to keep the livy session alive
* @param fileSystem the `FileSystem` instance used for interaction
*/
class FileExportActor(livyFactory: LivyClientFactory,
Expand All @@ -60,8 +64,11 @@ class FileExportActor(livyFactory: LivyClientFactory,
livyProps: Properties,
kuduMaster: String,
exportPath: String,
keepAliveTimeout: FiniteDuration,
fileSystem: FileSystem) extends Actor {

private val logger = LoggerFactory.getLogger("it.gov.daf.ExportActor")

private val livyClientScheme = if (livySSL) "https" else "http"

private val livyUrl = livyAuth.map { auth => new String(Base64.decodeBase64(auth)) } match {
Expand Down Expand Up @@ -110,8 +117,14 @@ class FileExportActor(livyFactory: LivyClientFactory,
livyClient.run { QueryExportJob.create(query, outputPath, toFormat, params) }.get
}

private def keepAlive() = Try { livyClient.run { KeepAliveJob.init }.get } match {
case Success(duration) => logger.info { s"Refreshed livy session within [$duration] milliseconds after [${keepAliveTimeout.toMinutes}] minutes of inactivity" }
case Failure(error) => logger.warn(s"Failed to refresh session after [${keepAliveTimeout.toMinutes}] minutes of inactivity", error)
}

override def preStart() = {
livyAppJarURIs.foreach { livyClient.uploadJar(_).get }
context.setReceiveTimeout { keepAliveTimeout }
}

override def postStop() = {
Expand All @@ -123,6 +136,7 @@ class FileExportActor(livyFactory: LivyClientFactory,
case ExportFile(path, from, to, params) => sender ! submitFileExport(path, outputPath(path), from, to, params)
case ExportTable(name, to, params) => sender ! submitKuduExport(name, outputTable(name), to, params)
case ExportQuery(query, to, params) => sender ! submitQueryExport(query, outputQuery, to, params)
case ReceiveTimeout => keepAlive()
}

}
Expand All @@ -139,7 +153,8 @@ object FileExportActor {
exportServiceConfig.livyAppJars,
exportServiceConfig.livyProperties,
kuduMaster,
exportServiceConfig.exportPath
exportServiceConfig.exportPath,
exportServiceConfig.keepAliveTimeout
)

def props(livyFactory: LivyClientFactory,
Expand All @@ -149,7 +164,8 @@ object FileExportActor {
livyAppJars: Seq[String],
livyProps: Properties,
kuduMaster: String,
exportPath: String)(implicit fileSystem: FileSystem): Props = Props {
exportPath: String,
keepAliveTimeout: FiniteDuration)(implicit fileSystem: FileSystem): Props = Props {
new FileExportActor(
livyFactory,
livyHost,
Expand All @@ -159,6 +175,7 @@ object FileExportActor {
livyProps,
kuduMaster,
exportPath,
keepAliveTimeout,
fileSystem
)
}
Expand All @@ -169,4 +186,4 @@ sealed trait ExportMessage

case class ExportFile(path: String, sourceFormat: FileDataFormat, targetFormat: FileDataFormat, extraParams: Map[String, String] = Map.empty[String, String]) extends ExportMessage
case class ExportTable(name: String, targetFormat: FileDataFormat, extraParams: Map[String, String] = Map.empty[String, String]) extends ExportMessage
case class ExportQuery(query: String, targetFormat: FileDataFormat, extraParams: Map[String, String] = Map.empty[String, String]) extends ExportMessage
case class ExportQuery(query: String, targetFormat: FileDataFormat, extraParams: Map[String, String] = Map.empty[String, String]) extends ExportMessage
Loading