diff --git a/du_io_uring.cpp b/du_io_uring.cpp index 4684f86..523b1df 100644 --- a/du_io_uring.cpp +++ b/du_io_uring.cpp @@ -13,10 +13,21 @@ #include #include - -#define BUF_SIZE 16384*2 - -std::deque queue; +#include +#include + +#define BUF_SIZE 16384 + +struct file{ + enum class State {OPEN, STAT}; + int parent_fd; + int fd = -1; + int ret; + std::string name; + State state; + struct statx stat; +}; +std::deque queue; static size_t size; static struct io_uring ring; @@ -29,9 +40,9 @@ static void drain_cqes() int count = 0; io_uring_for_each_cqe (&ring, head, cqe) { - //TODO: handle error open file because of limit of open file descriptors - if(cqe->res>0) - queue.push_back(cqe->res); + file* f = (file*)io_uring_cqe_get_data(cqe); + f->ret=cqe->res; + queue.push_back(f); count++; } @@ -64,32 +75,19 @@ static struct io_uring_sqe *get_sqe() return sqe; } -static void schedule_open(int parent_fd, const char* name) +static void schedule_open(file* f) { + f->state=file::State::OPEN; struct io_uring_sqe* sqe = get_sqe(); - io_uring_prep_openat(sqe, parent_fd, name, O_NOFOLLOW | O_NONBLOCK, O_RDONLY); + io_uring_prep_openat(sqe, f->parent_fd, f->name.c_str(), O_NOFOLLOW | O_NONBLOCK, O_RDONLY); + io_uring_sqe_set_data(sqe, f); } -struct statx stat_buf[BUF_SIZE]; -static void schedule_statx(int parent_fd, const char* name) +static void schedule_statx(file* f) { + f->state=file::State::STAT; struct io_uring_sqe* sqe = get_sqe(); - io_uring_prep_statx(sqe, parent_fd, name, AT_SYMLINK_NOFOLLOW,STATX_MODE | STATX_BLOCKS, &stat_buf[sqes_in_flight-1]); -} -static void parse_file(int dirfd, const char* name){ - struct statx statbuf; - //printf("fd:%d\n",fd); - //TODO: use AT_EMPTY_PATH on statx - if (statx(dirfd, name, AT_SYMLINK_NOFOLLOW,STATX_MODE | STATX_BLOCKS, &statbuf) != 0){ - printf("error statx %s\n",name); - return; - } - if(S_ISDIR(statbuf.stx_mode)){ - size += statbuf.stx_blocks*512; - schedule_open(dirfd,name); - } - else if(S_ISREG(statbuf.stx_mode)) - size += statbuf.stx_blocks*512; - return; + io_uring_prep_statx(sqe, f->parent_fd, f->name.c_str(), AT_SYMLINK_NOFOLLOW,STATX_MODE | STATX_BLOCKS, &f->stat); + io_uring_sqe_set_data(sqe, f); } static char buff[BUF_SIZE]; @@ -98,6 +96,7 @@ static void parse_directory(int fd){ int nread = syscall(SYS_getdents64, fd, buff, BUF_SIZE); if (nread == -1){ perror("getdents"); + fprintf(stderr, "(%d): %s\n",fd, strerror(errno)); break; } @@ -111,56 +110,17 @@ static void parse_directory(int fd){ continue; if(strcmp(d->d_name,"..")==0) continue; - schedule_statx(fd,d->d_name); - } - while (sqes_in_flight) { - int ret = io_uring_submit_and_wait(&ring, sqes_in_flight); - if (ret < 0 && errno != EBUSY) { - perror("io_uring_submit_and_wait"); - exit(EXIT_FAILURE); - } - - uint32_t head; - struct io_uring_cqe *cqe; - - int count = 0; - io_uring_for_each_cqe (&ring, head, cqe) { - count++; - } - sqes_in_flight -= count; - io_uring_cq_advance(&ring, count); - } - int count=0; - for(int i=0;id_reclen; - if(strcmp(d->d_name,".")==0) - continue; - if(strcmp(d->d_name,"..")==0) - continue; - if(S_ISDIR(stat_buf[count].stx_mode)){ - size += stat_buf[count].stx_blocks*512; - schedule_open(fd,d->d_name); - } - else if(S_ISREG(stat_buf[count].stx_mode)) - size += stat_buf[count].stx_blocks*512; - count++; - } - while (sqes_in_flight) { - int ret = io_uring_submit_and_wait(&ring, sqes_in_flight); - if (ret < 0 && errno != EBUSY) { - perror("io_uring_submit_and_wait"); - exit(EXIT_FAILURE); - } - - drain_cqes(); + file* f = new file; + f->parent_fd=fd; + f->name=d->d_name; + schedule_statx(f); } } } static void submit_wait_until_complete(){ while (sqes_in_flight) { - int ret = io_uring_submit_and_wait(&ring, sqes_in_flight); + int ret = io_uring_submit_and_wait(&ring, 1); if (ret < 0 && errno != EBUSY) { perror("io_uring_submit_and_wait"); exit(EXIT_FAILURE); @@ -168,8 +128,26 @@ static void submit_wait_until_complete(){ drain_cqes(); while(!queue.empty()){ - parse_directory(queue.front()); + file* f = queue.front(); queue.pop_front(); + if(f->state==file::State::STAT){ + if(f->ret==0){ + if(S_ISDIR(f->stat.stx_mode)){ + size += f->stat.stx_blocks*512; + schedule_open(f); + } + else if(S_ISREG(f->stat.stx_mode)) + size += f->stat.stx_blocks*512; + } + }else{ + if(f->ret<0){ + fprintf(stderr, "error opening "); + fprintf(stderr, ": %s\n", strerror(-ret)); + continue; + } + f->fd=f->ret; + parse_directory(f->fd); + } } } } @@ -216,9 +194,11 @@ int main(int argc, char** argv){ for(int i=1;iparent_fd=AT_FDCWD; + f->name=filename; + schedule_statx(f); submit_wait_until_complete(); printf("%s\t%s\n",humanSize(size), filename);