Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.SqlServer.withTransaction
import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum
import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
import org.apache.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS
import org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
import org.apache.texera.dao.jooq.generated.tables.User.USER
Expand All @@ -53,25 +55,23 @@ import org.apache.texera.service.util.S3StorageClient.{
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS,
MINIMUM_NUM_OF_MULTIPART_S3_PART
}
import org.jooq.{DSLContext, EnumType}
import org.jooq.exception.DataAccessException
import org.jooq.impl.DSL
import org.jooq.impl.DSL.{inline => inl}
import org.jooq.{DSLContext, EnumType}
import software.amazon.awssdk.services.s3.model.UploadPartResponse

import java.io.{InputStream, OutputStream}
import java.net.{HttpURLConnection, URL, URLDecoder}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.sql.SQLException
import java.util
import java.util.Optional
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
import org.jooq.exception.DataAccessException
import software.amazon.awssdk.services.s3.model.UploadPartResponse

import java.sql.SQLException
import scala.util.Try

object DatasetResource {
Expand All @@ -80,6 +80,16 @@ object DatasetResource {
.getInstance()
.createDSLContext()

private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = {
val limit = ctx
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib"))
.fetchOneInto(classOf[String])
Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong)
.getOrElse(defaultMiB) * 1024L * 1024L
}

/**
* Helper function to get the dataset from DB using did
*/
Expand Down Expand Up @@ -647,14 +657,16 @@ class DatasetResource {
@QueryParam("ownerEmail") ownerEmail: String,
@QueryParam("datasetName") datasetName: String,
@QueryParam("filePath") filePath: String,
@QueryParam("numParts") numParts: Optional[Integer],
@QueryParam("fileSizeBytes") fileSizeBytes: Optional[Long],
@QueryParam("partSizeBytes") partSizeBytes: Optional[Long],
@Auth user: SessionUser
): Response = {
val uid = user.getUid
val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)

operationType.toLowerCase match {
case "init" => initMultipartUpload(dataset.getDid, filePath, numParts, uid)
case "init" =>
initMultipartUpload(dataset.getDid, filePath, fileSizeBytes, partSizeBytes, uid)
case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
case "abort" => abortMultipartUpload(dataset.getDid, filePath, uid)
case _ =>
Expand Down Expand Up @@ -715,7 +727,55 @@ class DatasetResource {
if (session == null)
throw new NotFoundException("Upload session not found. Call type=init first.")

val expectedParts = session.getNumPartsRequested
val expectedParts: Int = session.getNumPartsRequested
val fileSizeBytesValue: Long = session.getFileSizeBytes
val partSizeBytesValue: Long = session.getPartSizeBytes

if (fileSizeBytesValue <= 0L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After failing to upload, the record should be deleted from database, I suggest move error catching logics to a function, and if any of them failed, just remove database records, this way you write the recycling logic once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Since this is the endpoint for uploading a part, no record is created in this endpoint at this moment, so no record should be deleted here in case of errors, the current logic relies that all the parts rows are created on init phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Regarding the refactor, do you mean each check that throws an exception should have its own function?

throw new WebApplicationException(
s"Upload session has an invalid file size of $fileSizeBytesValue. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (partSizeBytesValue <= 0L) {
throw new WebApplicationException(
s"Upload session has an invalid part size of $partSizeBytesValue. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}

// lastPartSize = fileSize - partSize*(expectedParts-1)
val nMinus1: Long = expectedParts.toLong - 1L
if (nMinus1 < 0L) {
throw new WebApplicationException(
s"Upload session has an invalid number of requested parts of $expectedParts. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (nMinus1 > 0L && partSizeBytesValue > Long.MaxValue / nMinus1) {
throw new WebApplicationException(
"Overflow while computing last part size",
Response.Status.INTERNAL_SERVER_ERROR
)
}
val prefixBytes: Long = partSizeBytesValue * nMinus1
if (prefixBytes > fileSizeBytesValue) {
throw new WebApplicationException(
s"Upload session is invalid: computed bytes before last part ($prefixBytes) exceed declared file size ($fileSizeBytesValue). Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
val lastPartSize: Long = fileSizeBytesValue - prefixBytes
if (lastPartSize <= 0L || lastPartSize > partSizeBytesValue) {
throw new WebApplicationException(
s"Upload session is invalid: computed last part size ($lastPartSize bytes) must be within 1..$partSizeBytesValue bytes. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val allowedSize: Long =
if (partNumber < expectedParts) partSizeBytesValue else lastPartSize

if (partNumber > expectedParts) {
throw new BadRequestException(
s"$partNumber exceeds the requested parts on init: $expectedParts"
Expand All @@ -729,10 +789,17 @@ class DatasetResource {
)
}

if (contentLength != allowedSize) {
throw new BadRequestException(
s"Invalid part size for partNumber=$partNumber. " +
s"Expected Content-Length=$allowedSize, got $contentLength."
)
}

val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand All @@ -743,7 +810,7 @@ class DatasetResource {
catch {
case e: IllegalArgumentException =>
throw new WebApplicationException(
s"Upload session has invalid physicalAddress. Re-init the upload. (${e.getMessage})",
s"Upload session has invalid physicalAddress. Restart the upload. (${e.getMessage})",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -775,7 +842,7 @@ class DatasetResource {
if (partRow == null) {
// Should not happen if init pre-created rows
throw new WebApplicationException(
s"Part row not initialized for part $partNumber. Re-init the upload.",
s"Part row not initialized for part $partNumber. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -1399,7 +1466,8 @@ class DatasetResource {
private def initMultipartUpload(
did: Integer,
encodedFilePath: String,
numParts: Optional[Integer],
fileSizeBytes: Optional[Long],
partSizeBytes: Optional[Long],
uid: Integer
): Response = {

Expand All @@ -1416,12 +1484,63 @@ class DatasetResource {
URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
)

val numPartsValue = numParts.toScala.getOrElse {
throw new BadRequestException("numParts is required for initialization")
val fileSizeBytesValue: Long =
fileSizeBytes
.orElseThrow(() =>
new BadRequestException("fileSizeBytes is required for initialization")
)

if (fileSizeBytesValue <= 0L) {
throw new BadRequestException("fileSizeBytes must be > 0")
}
if (numPartsValue < 1 || numPartsValue > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) {

val partSizeBytesValue: Long =
partSizeBytes
.orElseThrow(() =>
new BadRequestException("partSizeBytes is required for initialization")
)

if (partSizeBytesValue <= 0L) {
throw new BadRequestException("partSizeBytes must be > 0")
}

// singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts == file size)
val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
if (totalMaxBytes <= 0L) {
throw new WebApplicationException(
"singleFileUploadMaxBytes must be > 0",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (fileSizeBytesValue > totalMaxBytes) {
throw new BadRequestException(
"numParts must be between 1 and " + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS
s"fileSizeBytes=$fileSizeBytesValue exceeds singleFileUploadMaxBytes=$totalMaxBytes"
)
}

// Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize - 1) / partSize
val addend: Long = partSizeBytesValue - 1L
if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) {
throw new WebApplicationException(
"Overflow while computing numParts",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val numPartsLong: Long = (fileSizeBytesValue + addend) / partSizeBytesValue
if (numPartsLong < 1L || numPartsLong > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS.toLong) {
throw new BadRequestException(
s"Computed numParts=$numPartsLong is out of range 1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
)
}
val numPartsValue: Int = numPartsLong.toInt

// S3 multipart constraint: all non-final parts must be >= 5MiB.
// If we have >1 parts, then partSizeBytesValue is the non-final part size.
if (numPartsValue > 1 && partSizeBytesValue < MINIMUM_NUM_OF_MULTIPART_S3_PART) {
throw new BadRequestException(
s"partSizeBytes=$partSizeBytesValue is too small. " +
s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART bytes."
)
}

Expand Down Expand Up @@ -1453,7 +1572,6 @@ class DatasetResource {
val uploadIdStr = presign.getUploadId
val physicalAddr = presign.getPhysicalAddress

// If anything fails after this point, abort LakeFS multipart
try {
val rowsInserted = ctx
.insertInto(DATASET_UPLOAD_SESSION)
Expand All @@ -1462,7 +1580,9 @@ class DatasetResource {
.set(DATASET_UPLOAD_SESSION.UID, uid)
.set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
.set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
.set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, numPartsValue)
.set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Integer.valueOf(numPartsValue))
.set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(fileSizeBytesValue))
.set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(partSizeBytesValue))
.onDuplicateKeyIgnore()
.execute()

Expand Down Expand Up @@ -1506,7 +1626,6 @@ class DatasetResource {
Response.ok().build()
} catch {
case e: Exception =>
// rollback will remove session + parts rows; we still must abort LakeFS
try {
LakeFSStorageClient.abortPresignedMultipartUploads(
repositoryName,
Expand Down Expand Up @@ -1572,7 +1691,7 @@ class DatasetResource {
val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand All @@ -1595,7 +1714,7 @@ class DatasetResource {

if (totalCnt != expectedParts) {
throw new WebApplicationException(
s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Re-init the upload.",
s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -1646,7 +1765,29 @@ class DatasetResource {
physicalAddr
)

// Cleanup: delete the session; parts are removed by ON DELETE CASCADE
// FINAL SERVER-SIDE SIZE CHECK (do not rely on init)
val actualSizeBytes =
Option(objectStats.getSizeBytes).map(_.longValue()).getOrElse(-1L)

if (actualSizeBytes <= 0L) {
throw new WebApplicationException(
"lakeFS did not return sizeBytes for completed multipart upload",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val maxBytes = singleFileUploadMaxBytes(ctx)
val tooLarge = actualSizeBytes > maxBytes

if (tooLarge) {
try {
LakeFSStorageClient.resetObjectUploadOrDeletion(dataset.getRepositoryName, filePath)
} catch {
case _: Throwable => ()
}
}

// always cleanup session
ctx
.deleteFrom(DATASET_UPLOAD_SESSION)
.where(
Expand All @@ -1657,6 +1798,13 @@ class DatasetResource {
)
.execute()

if (tooLarge) {
throw new WebApplicationException(
s"Upload exceeded max size: actualSizeBytes=$actualSizeBytes maxBytes=$maxBytes",
Response.Status.REQUEST_ENTITY_TOO_LARGE
)
}

Response
.ok(
Map(
Expand Down Expand Up @@ -1716,7 +1864,7 @@ class DatasetResource {
val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down
Loading
Loading