Skip to content

Commit

Permalink
Make TabixReader take a hadoop configuration in the constructor (hail…
Browse files Browse the repository at this point in the history
…-is#5033)

* Make TabixReader take a hadoop configuration

This fixes a bug in import_vcfs as reading the indices and generating
partitions is parallelized.

* Add cluster test for import_vcfs

Also reorganize the cluster tests to be under the python directory, and
make it easier to add new scripts.

* Make partitions json actually subset the files

Before they were just selecting nothing.
  • Loading branch information
chrisvittal authored and danking committed Dec 21, 2018
1 parent 2d76494 commit b1473a7
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 11 deletions.
8 changes: 3 additions & 5 deletions hail/hail-ci-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,9 @@ test_gcp() {
--zip gs://hail-ci-0-1/temp/$SOURCE_SHA/$TARGET_SHA/hail.zip \
--vep

time cluster submit ${CLUSTER_NAME} \
cluster-sanity-check.py

time cluster submit ${CLUSTER_NAME} \
cluster-vep-check.py
for script in python/cluster-tests/**.py; do
time cluster submit ${CLUSTER_NAME} $script
done

time cluster stop ${CLUSTER_NAME} --async
touch ${GCP_SUCCESS}
Expand Down
22 changes: 22 additions & 0 deletions hail/python/cluster-tests/cluster-read-vcfs-check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json
import hail as hl

gvcfs = ['gs://hail-ci/gvcfs/HG00096.g.vcf.gz',
'gs://hail-ci/gvcfs/HG00268.g.vcf.gz']
hl.init(default_reference='GRCh38')
parts = [
{'start': {'locus': {'contig': 'chr20', 'position': 17821257}},
'end': {'locus': {'contig': 'chr20', 'position': 18708366}},
'includeStart': True,
'includeEnd': True},
{'start': {'locus': {'contig': 'chr20', 'position': 18708367}},
'end': {'locus': {'contig': 'chr20', 'position': 19776611}},
'includeStart': True,
'includeEnd': True},
{'start': {'locus': {'contig': 'chr20', 'position': 19776612}},
'end': {'locus': {'contig': 'chr20', 'position': 21144633}},
'includeStart': True,
'includeEnd': True},
]
parts_str = json.dumps(parts)
vcfs = hl.import_vcfs(gvcfs, parts_str)
File renamed without changes.
File renamed without changes.
4 changes: 1 addition & 3 deletions hail/src/main/scala/is/hail/io/tabix/TabixReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object TabixReader {

}

class TabixReader(val filePath: String, private val idxFilePath: Option[String] = None) {
class TabixReader(val filePath: String, private val hConf: hd.conf.Configuration, private val idxFilePath: Option[String] = None) {
import TabixReader._

val indexPath: String = idxFilePath match {
Expand All @@ -108,8 +108,6 @@ class TabixReader(val filePath: String, private val idxFilePath: Option[String]
}
}

private val hConf = HailContext.get.hadoopConf

val index: Tabix = hConf.readFile(indexPath) { is =>
var buf = new Array[Byte](4)
is.read(buf, 0, 4) // read magic bytes "TBI\1"
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ case class VCFsReader(
entryType = genotypeSignature)

val partitions = {
val r = new TabixReader(file)
val r = new TabixReader(file, hConf)
localRangeBounds.zipWithIndex.map { case (b, i) =>
if (!(b.includesStart && b.includesEnd))
fatal("range bounds must be inclusive")
Expand Down
4 changes: 2 additions & 2 deletions hail/src/test/scala/is/hail/io/TabixSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TabixSuite extends SparkSuite {
val vcfGzFile = vcfFile + ".gz"
val vcfGzTbiFile = vcfGzFile + ".tbi"

lazy val reader = new TabixReader(vcfGzFile)
lazy val bcConf = hc.sc.broadcast(new SerializableHadoopConfiguration(hc.hadoopConf))
lazy val reader = new TabixReader(vcfGzFile, hc.hadoopConf)

@BeforeTest def initialize() {
hc // reference to initialize
Expand Down Expand Up @@ -96,7 +96,7 @@ class TabixSuite extends SparkSuite {
val vcfFile = "src/test/resources/sample.vcf.bgz"
val chr = "20"
val htsjdkrdr = new HtsjdkTabixReader(vcfFile)
val hailrdr = new TabixReader(vcfFile)
val hailrdr = new TabixReader(vcfFile, hc.hadoopConf)
val tid = hailrdr.chr2tid(chr)

for ((start, end) <-
Expand Down

0 comments on commit b1473a7

Please sign in to comment.