Skip to content

Thread pool #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions LICENSE.BSL1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Boost Software License - Version 1.0 - August 17th, 2003

Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:

The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
66 changes: 63 additions & 3 deletions source/dhtslib/file/file.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import std.array : array;
import dhtslib.memory;
import dhtslib.file.iterator;
import dhtslib : initKstring;
import dhtslib.threadpool;
import htslib;

enum HtslibFileFormatMode
Expand Down Expand Up @@ -108,6 +109,8 @@ struct HtslibFile
/// if it has been loaded
off_t headerOffset = -1;

ThreadPool pool;

/// allow HtslibFile to be used as
/// underlying ptr type
alias getFilePtr this;
Expand Down Expand Up @@ -172,10 +175,19 @@ struct HtslibFile
return newFile;
}

/// set extra multithreading
void setExtraThreads(int extra)
/// sets number of threads via global thread pool
/// if number is -1 sets to number of CPUs
void setThreads(int threads)
{
enforceGlobalThreadPool(threads);
this.setThreadPool(globalPool);
}

/// set multithreading pool
void setThreadPool(ThreadPool pool)
{
hts_set_threads(this.fp, extra);
this.pool = pool;
hts_set_thread_pool(this.fp, &this.pool.htsPool);
}

/// get file offset
Expand Down Expand Up @@ -577,6 +589,7 @@ debug(dhtslib_unittest) unittest

auto f = HtslibFile("/tmp/htslibfile.test.sam");
f.loadHeader;
f.setThreads(4);
auto h = f.bamHdr;
auto read = f.readRecord!Bam1();
assert(fromStringz(bam_get_qname(read)) == "HS18_09653:4:1315:19857:61712");
Expand Down Expand Up @@ -613,32 +626,39 @@ debug(dhtslib_unittest) unittest
import std.path:buildPath,dirName;
auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
{
ThreadPool pool = ThreadPool(8);
auto f = HtslibFile(fn);
f.loadHeader;
f.setThreadPool(pool);
auto h = f.bcfHdr;
auto read = f.readRecord!Bcf1();

f = HtslibFile("/tmp/htslibfile.test.vcf", HtslibFileWriteMode.Vcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.bcf", HtslibFileWriteMode.Bcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.ubcf", HtslibFileWriteMode.UncompressedBcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.vcf.gz", HtslibFileWriteMode.GzippedVcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.vcf.bgz", HtslibFileWriteMode.BgzippedVcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);
Expand Down Expand Up @@ -676,4 +696,44 @@ debug(dhtslib_unittest) unittest
read = f.readRecord!Bcf1();
assert(read.pos == 3000149);
}
}

debug(dhtslib_unittest) unittest
{
hts_log_info(__FUNCTION__, "Testing HtslibFile parallel processing");
import std.path:buildPath,dirName;
auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
{

ThreadPool pool = ThreadPool(8);
auto rdr = HtslibFile(fn);
rdr.setThreadPool(pool);
rdr.loadHeader;
auto h = rdr.bcfHdr;

auto wtr = HtslibFile("/tmp/htslibfile.test_parallel.vcf", HtslibFileWriteMode.Vcf);
wtr.setThreadPool(pool);
wtr.setHeader(h);
wtr.writeHeader;

foreach(x; rdr.byRecord!Bcf1().parallelMap!((x) { x.pos++; return x;})(&pool)) {
wtr.write(x);
}
}

{
auto f = HtslibFile(fn);
f.loadHeader;
auto h = f.bcfHdr;
auto read = f.readRecord!Bcf1();
assert(read.pos == 3000149);

auto f2 = HtslibFile("/tmp/htslibfile.test_parallel.vcf");
f2.loadHeader;
h = f2.bcfHdr;
auto read2 = f2.readRecord!Bcf1();
assert(read2.pos == 3000150);

}

}
2 changes: 1 addition & 1 deletion source/dhtslib/file/iterator.d
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ if(is(T == Bam1) || is(T == Bcf1) || is(T == Kstring))
/// If you keep the result around it should be duplicated
T front()
{
return rec;
return rec.dup;
}

/// popFront to move range forward
Expand Down
26 changes: 20 additions & 6 deletions source/dhtslib/memory.d
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void hts_log_errorNoGC(const(char)[] ctx)( string msg) @trusted @nogc nothrow
/// pointer and reference counts it and then
/// destroys with destroyFun when it goes
/// truly out of scope
struct SafeHtslibPtr(T, alias destroyFun)
struct SafeHtslibPtr(T, alias destroyFun, alias dupFun = void)
if(!isPointer!T && isSomeFunction!destroyFun)
{
@safe @nogc nothrow:
Expand Down Expand Up @@ -77,6 +77,12 @@ if(!isPointer!T && isSomeFunction!destroyFun)
return ptr;
}

static if(isSomeFunction!dupFun) {
auto dup() @trusted return scope {
return typeof(this)(dupFun(this.ptr));
}
}

/// dtor that respects scope
~this() @trusted return scope
{
Expand Down Expand Up @@ -106,19 +112,19 @@ if(!isPointer!T && isSomeFunction!destroyFun)

/// reference counted bam1_t wrapper
/// can be used directly as a bam1_t *
alias Bam1 = SafeHtslibPtr!(bam1_t, bam_destroy1);
alias Bam1 = SafeHtslibPtr!(bam1_t, bam_destroy1, bam_dup1);

/// reference counted bam_hdr_t wrapper
/// can be used directly as a bam_hdr_t *
alias BamHdr = SafeHtslibPtr!(bam_hdr_t, bam_hdr_destroy);
alias BamHdr = SafeHtslibPtr!(bam_hdr_t, bam_hdr_destroy, bam_hdr_dup);

/// reference counted bcf1_t wrapper
/// can be used directly as a bcf1_t *
alias Bcf1 = SafeHtslibPtr!(bcf1_t, bcf_destroy);
alias Bcf1 = SafeHtslibPtr!(bcf1_t, bcf_destroy, bcf_dup);

/// reference counted bcf_hdr_t wrapper
/// can be used directly as a bcf_hdr_t *
alias BcfHdr = SafeHtslibPtr!(bcf_hdr_t, bcf_hdr_destroy);
alias BcfHdr = SafeHtslibPtr!(bcf_hdr_t, bcf_hdr_destroy, bcf_hdr_dup);

/// reference counted htsFile wrapper
/// can be used directly as a htsFile *
Expand Down Expand Up @@ -150,10 +156,18 @@ alias Faidx = SafeHtslibPtr!(faidx_t, fai_destroy);

/// reference counted Kstring wrapper
/// can be used directly as a kstring_t *
alias Kstring = SafeHtslibPtr!(kstring_t, ks_free);
alias Kstring = SafeHtslibPtr!(kstring_t, ks_free, ks_dup);

alias HtsItrMulti = HtsItr;

/// reference counted hts_tpool wrapper
/// can be used directly as a hts_tpool *
alias HtsTPool = SafeHtslibPtr!(hts_tpool, hts_tpool_destroy);

/// reference counted hts_tpool_process wrapper
/// can be used directly as a hts_tpool_process *
alias HtsProcess = SafeHtslibPtr!(hts_tpool_process, hts_tpool_process_destroy);

debug(dhtslib_unittest) unittest
{
auto rc1 = Bam1(bam_init1);
Expand Down
Loading