Skip to content

Commit

Permalink
Update the test suite. Stop checking subset of headers (failure will …
Browse files Browse the repository at this point in the history
…happen later on)
  • Loading branch information
JulienPeloton committed May 24, 2019
1 parent a1c2226 commit 8295516
Show file tree
Hide file tree
Showing 6 changed files with 12,110 additions and 135 deletions.
229 changes: 115 additions & 114 deletions src/main/scala/com/astrolabsoftware/sparkfits/FitsRecordReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,132 +173,133 @@ class FitsRecordReader extends RecordReader[LongWritable, Seq[Row]] {
val keyValues = FitsLib.parseHeader(header)

if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "PERMISSIVE") {
log.warn(s"\nEmpty HDU for ${file}")
log.warn(s"Empty HDU for ${file}")
notValid = true
return
} else if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "FAILFAST") {
log.warn(s"\nEmpty HDU for ${file}")
log.warn(s"\nUse option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.")
}

// Get the number of rows and the size (B) of one row.
// this is dependent on the HDU type
nrowsLong = fits.hdu.getNRows(keyValues)
rowSizeInt = fits.hdu.getSizeRowBytes(keyValues)
rowSizeLong = rowSizeInt.toLong

// What Hadoop gave us
val start_theo = fileSplit.getStart
val stop_theo = fileSplit.getStart + fileSplit.getLength

// Reject this mapper if the HDFS block is below the targeted HDU
notValid = if((start_theo < startstop.dataStart) && (stop_theo < startstop.dataStart)) {
true
} else if ((start_theo >= startstop.dataStop) && (stop_theo >= startstop.dataStop)) {
true
} else {
false
}
if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "FAILFAST") {
log.warn(s"Empty HDU for ${file}")
log.warn(s"Use option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.")
}

val splitStart_tmp = if (start_theo <= startstop.dataStart && !notValid) {
// Valid block: starting index.
// We are just before the targeted HDU, therefore
// we jump at the beginning of the data block
startstop.dataStart
} else {
start_theo
}
// Get the number of rows and the size (B) of one row.
// this is dependent on the HDU type
nrowsLong = fits.hdu.getNRows(keyValues)
rowSizeInt = fits.hdu.getSizeRowBytes(keyValues)
rowSizeLong = rowSizeInt.toLong

// What Hadoop gave us
val start_theo = fileSplit.getStart
val stop_theo = fileSplit.getStart + fileSplit.getLength

// Reject this mapper if the HDFS block is below the targeted HDU
notValid = if((start_theo < startstop.dataStart) && (stop_theo < startstop.dataStart)) {
true
} else if ((start_theo >= startstop.dataStop) && (stop_theo >= startstop.dataStop)) {
true
} else {
false
}

splitEnd = if (stop_theo <= startstop.dataStop && !notValid) {
// Valid block: ending index (start/end inside)
// We are inside the targeted HDU
stop_theo
} else if (stop_theo > startstop.dataStop && !notValid) {
// Valid block: ending index (start inside, end outside)
// The block start in the targeted HDU, but ends outside.
// We just move back the final cursor.
startstop.dataStop
} else {
// Not valid anyway
stop_theo
}
val splitStart_tmp = if (start_theo <= startstop.dataStart && !notValid) {
// Valid block: starting index.
// We are just before the targeted HDU, therefore
// we jump at the beginning of the data block
startstop.dataStart
} else {
start_theo
}

// A priori, there is no reason for a random split of the FITS file to start
// at the beginning of a row. Therefore we do the following:
// - The block starts
// - its data is processed record-by-record (see below for the
// processing of the records)
// - at the end of the block, the stop index might be in the middle of a
// row. We do not read this row in the first block, and we stop here.
// - The second block starts at start_1=(end_0)
// - We decrement the starting index to include the previous line not read
// in the first block.
// - its data is processed record-by-record
// - etc.
// Summary: Add last row if we start the block at the middle of a row.
// We assume that fileSplit.getStart starts at the
// beginning of the data block for the first valid block.

// Here is an attempt to fix a bug when reading images:
//
// I noticed that when an image is split across several HDFS blocks,
// the transition is not done correctly, and there is one line typically
// missing. After some manual inspection, I found that introducing a shift
// in the starting index helps removing the bug. The shift is function of
// the HDU index, and depends whether the primary HDU is empty or not.
// By far I'm not convinced about this fix in general, but it works for
// the few examples that I tried. If you face a similar problem,
// or find a general solution, let me know!
var shift = if (primaryfits.empty_hdu) {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 3)
} else {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 1)
}
splitEnd = if (stop_theo <= startstop.dataStop && !notValid) {
// Valid block: ending index (start/end inside)
// We are inside the targeted HDU
stop_theo
} else if (stop_theo > startstop.dataStop && !notValid) {
// Valid block: ending index (start inside, end outside)
// The block start in the targeted HDU, but ends outside.
// We just move back the final cursor.
startstop.dataStop
} else {
// Not valid anyway
stop_theo
}

splitStart = if((splitStart_tmp) % rowSizeLong != 0 &&
splitStart_tmp != startstop.dataStart && splitStart_tmp != 0) {
// A priori, there is no reason for a random split of the FITS file to start
// at the beginning of a row. Therefore we do the following:
// - The block starts
// - its data is processed record-by-record (see below for the
// processing of the records)
// - at the end of the block, the stop index might be in the middle of a
// row. We do not read this row in the first block, and we stop here.
// - The second block starts at start_1=(end_0)
// - We decrement the starting index to include the previous line not read
// in the first block.
// - its data is processed record-by-record
// - etc.
// Summary: Add last row if we start the block at the middle of a row.
// We assume that fileSplit.getStart starts at the
// beginning of the data block for the first valid block.

// Here is an attempt to fix a bug when reading images:
//
// I noticed that when an image is split across several HDFS blocks,
// the transition is not done correctly, and there is one line typically
// missing. After some manual inspection, I found that introducing a shift
// in the starting index helps removing the bug. The shift is function of
// the HDU index, and depends whether the primary HDU is empty or not.
// By far I'm not convinced about this fix in general, but it works for
// the few examples that I tried. If you face a similar problem,
// or find a general solution, let me know!
var shift = if (primaryfits.empty_hdu) {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 3)
} else {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 1)
}

// Decrement the starting index to fully catch the line we are sitting on
var tmp_byte = 0
do {
tmp_byte = tmp_byte - 1
} while ((splitStart_tmp + tmp_byte + shift) % rowSizeLong != 0)

// Return offseted starting index
splitStart_tmp + tmp_byte
} else splitStart_tmp

// Get the record length in Bytes (get integer!). First look if the user
// specify a size for the recordLength. If not, set it to max(1 Ko, rowSize).
// If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes)
val recordLengthFromUser = Try{conf.get("recordlength").toInt}
.getOrElse{
if (fits.hduType == "IMAGE") {
rowSizeInt
} else {
// set it to max(1 Ko, rowSize)
math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt)
splitStart = if((splitStart_tmp) % rowSizeLong != 0 &&
splitStart_tmp != startstop.dataStart && splitStart_tmp != 0) {

// Decrement the starting index to fully catch the line we are sitting on
var tmp_byte = 0
do {
tmp_byte = tmp_byte - 1
} while ((splitStart_tmp + tmp_byte + shift) % rowSizeLong != 0)

// Return offseted starting index
splitStart_tmp + tmp_byte
} else splitStart_tmp

// Get the record length in Bytes (get integer!). First look if the user
// specify a size for the recordLength. If not, set it to max(1 Ko, rowSize).
// If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes)
val recordLengthFromUser = Try{conf.get("recordlength").toInt}
.getOrElse{
if (fits.hduType == "IMAGE") {
rowSizeInt
} else {
// set it to max(1 Ko, rowSize)
math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt)
}
}

// For Table, seek for a round number of lines for the record
recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt

// Make sure that the recordLength is not bigger than the block size!
// This is a guard for small files.
recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) {
// OK less than the total number of lines
recordLength
} else {
// Small files, one record is the entire file.
nrowsLong.toInt * rowSizeLong.toInt
}

// For Table, seek for a round number of lines for the record
recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt
// Move to the starting binary index
fits.data.seek(splitStart)

// Make sure that the recordLength is not bigger than the block size!
// This is a guard for small files.
recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) {
// OK less than the total number of lines
recordLength
} else {
// Small files, one record is the entire file.
nrowsLong.toInt * rowSizeLong.toInt
// Set our starting block position
currentPosition = splitStart
}

// Move to the starting binary index
fits.data.seek(splitStart)

// Set our starting block position
currentPosition = splitStart
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,29 @@ class FitsRelation(parameters: Map[String, String], userSchema: Option[StructTyp
val fits_init = new Fits(path_init, conf, indexHDU)

if (fits_init.hdu.implemented) {
val schema_init = getSchema(fits_init)
fits_init.data.close()

for (file <- listOfFitsFiles.slice(1, listOfFitsFiles.size)) {
var path = new Path(file)
val fits = new Fits(path, conf, indexHDU)
val schema = getSchema(fits)
val isOk = schema_init == schema
isOk match {
case true => isOk
case false => {
throw new AssertionError(
"""
You are trying to add HDU data with different structures!
Check that the number of columns, names of columns and element
types are the same. re-run with .option("verbose", true) to
list the files.
""")
// Do not perform checks if the mode is PERMISSIVE.
if (conf.get("mode") != "PERMISSIVE") {
val schema_init = getSchema(fits_init)
fits_init.data.close()
for (file <- listOfFitsFiles.slice(1, listOfFitsFiles.size)) {
var path = new Path(file)
val fits = new Fits(path, conf, indexHDU)
val schema = getSchema(fits)
val isOk = schema_init == schema
isOk match {
case true => isOk
case false => {
throw new AssertionError(
"""
You are trying to add HDU data with different structures!
Check that the number of columns, names of columns and element
types are the same. re-run with .option("verbose", true) to
list all the files.
""")
}
}
fits.data.close()
}
fits.data.close()
}
true
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object ReadFits {
.format("fits")
.option("hdu", hdu) // Index of the HDU
.option("verbose", true) // pretty print
.option("recordlength", 1 * 1024) // 1 KB per record
.option("recordlength", 5 * 1024) // 1 KB per record
.load(args(0).toString) // File to load

println("show>")
Expand Down
11,943 changes: 11,943 additions & 0 deletions src/test/resources/dirIm/0_i_am_not_empty.fits

Large diffs are not rendered by default.

Loading

0 comments on commit 8295516

Please sign in to comment.