Skip to content

Commit

Permalink
refactor(backend): move DownloadRepository to non Reactive API
Browse files Browse the repository at this point in the history
Related to #231
  • Loading branch information
davinkevin committed Aug 3, 2024
1 parent 8bbeff5 commit 089c39b
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class DownloadConfig {

@Bean
fun onStartupCleanInvalidDownloadingItemsState(download: DownloadRepository) = CommandLineRunner {
download
.resetToWaitingStateAllDownloadingItems()
.block()
download.resetToWaitingStateAllDownloadingItems()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package com.github.davinkevin.podcastserver.download

import com.github.davinkevin.podcastserver.database.Tables.DOWNLOADING_ITEM
import com.github.davinkevin.podcastserver.database.Tables.ITEM
import com.github.davinkevin.podcastserver.database.enums.ItemStatus.*
import com.github.davinkevin.podcastserver.database.enums.DownloadingState
import com.github.davinkevin.podcastserver.database.enums.ItemStatus
import com.github.davinkevin.podcastserver.database.enums.ItemStatus.*
import com.github.davinkevin.podcastserver.database.tables.Item
import com.github.davinkevin.podcastserver.entity.fromDb
import com.github.davinkevin.podcastserver.entity.toDb
Expand All @@ -15,7 +15,6 @@ import org.jooq.DSLContext
import org.jooq.Record9
import org.jooq.impl.DSL.*
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono
import java.net.URI
import java.nio.file.Path
Expand All @@ -27,13 +26,13 @@ import java.util.*
*/
class DownloadRepository(private val query: DSLContext) {

fun initQueue(fromDate: OffsetDateTime, withMaxNumberOfTry: Int): Mono<Void> {
fun initQueue(fromDate: OffsetDateTime, withMaxNumberOfTry: Int) {
val positionInQueue = rowNumber()
.over().orderBy(ITEM.PUB_DATE) +
select(coalesce(max(DOWNLOADING_ITEM.POSITION), 0))
.from(DOWNLOADING_ITEM).asField<Int>()

return query.insertInto(DOWNLOADING_ITEM, DOWNLOADING_ITEM.ITEM_ID, DOWNLOADING_ITEM.POSITION)
query.insertInto(DOWNLOADING_ITEM, DOWNLOADING_ITEM.ITEM_ID, DOWNLOADING_ITEM.POSITION)
.select(
select(ITEM.ID, positionInQueue)
.from(ITEM)
Expand All @@ -44,24 +43,24 @@ class DownloadRepository(private val query: DSLContext) {
.orderBy(ITEM.PUB_DATE.asc())
)
.toMono()
.then()
.block()
}

fun addItemToQueue(id: UUID): Mono<Void> {
fun addItemToQueue(id: UUID) {
val positionInQueue = select(coalesce(max(DOWNLOADING_ITEM.POSITION), 0)).from(DOWNLOADING_ITEM).asField<Int>()

return query.insertInto(DOWNLOADING_ITEM, DOWNLOADING_ITEM.ITEM_ID, DOWNLOADING_ITEM.POSITION)
query.insertInto(DOWNLOADING_ITEM, DOWNLOADING_ITEM.ITEM_ID, DOWNLOADING_ITEM.POSITION)
.select(
select(ITEM.ID, positionInQueue + 1)
.from(ITEM)
.where(ITEM.ID.eq(id))
.and(ITEM.ID.notIn(select(DOWNLOADING_ITEM.ITEM_ID).from(DOWNLOADING_ITEM)))
)
.toMono()
.then()
.block()
}

fun findAllToDownload(limit: Int): Flux<DownloadingItem> {
fun findAllToDownload(limit: Int): List<DownloadingItem> {
val position = rowNumber().over().orderBy(DOWNLOADING_ITEM.POSITION)

val snapshot = name("downloading_snapshot").`as`(
Expand All @@ -72,26 +71,30 @@ class DownloadRepository(private val query: DSLContext) {

val item = DOWNLOADING_ITEM.item()

return Flux.from(
val items = Flux.from(
query.with(snapshot)
.select(
item.ID, item.TITLE, item.STATUS, item.URL, item.NUMBER_OF_FAIL,
item.podcast().ID, item.podcast().TITLE,
item.cover().ID, item.cover().URL
)
.from(snapshot
.innerJoin(DOWNLOADING_ITEM)
.on(snapshot.field(DOWNLOADING_ITEM.ITEM_ID)!!.eq(DOWNLOADING_ITEM.ITEM_ID))
.from(
snapshot
.innerJoin(DOWNLOADING_ITEM)
.on(snapshot.field(DOWNLOADING_ITEM.ITEM_ID)!!.eq(DOWNLOADING_ITEM.ITEM_ID))
)
.where(snapshot.field(DOWNLOADING_ITEM.STATE)!!.eq(DownloadingState.WAITING))
.orderBy(snapshot.field(position))
)
.map(::toDownloadingItem)
.collectList()
.block()!!

return items.map(::toDownloadingItem)
}

fun findAllDownloading(): Flux<DownloadingItem> {
fun findAllDownloading(): List<DownloadingItem> {
val item = DOWNLOADING_ITEM.item()
return Flux.from(
val items = Flux.from(
query
.select(
item.ID, item.TITLE, item.STATUS, item.URL, item.NUMBER_OF_FAIL,
Expand All @@ -102,12 +105,14 @@ class DownloadRepository(private val query: DSLContext) {
.where(DOWNLOADING_ITEM.STATE.eq(DownloadingState.DOWNLOADING))
.orderBy(DOWNLOADING_ITEM.POSITION.asc())
)
.map(::toDownloadingItem)
.collectList().block()!!

return items.map(::toDownloadingItem)
}

fun findAllWaiting(): Flux<DownloadingItem> {
fun findAllWaiting(): List<DownloadingItem> {
val item = DOWNLOADING_ITEM.item()
return Flux.from(
val items = Flux.from(
query
.select(
item.ID, item.TITLE, item.STATUS, item.URL, item.NUMBER_OF_FAIL,
Expand All @@ -118,30 +123,31 @@ class DownloadRepository(private val query: DSLContext) {
.where(DOWNLOADING_ITEM.STATE.eq(DownloadingState.WAITING))
.orderBy(DOWNLOADING_ITEM.POSITION.asc())
)
.map(::toDownloadingItem)
.collectList().block()!!

return items.map(::toDownloadingItem)
}

fun startItem(id: UUID): Mono<Void> {
return query
fun startItem(id: UUID) {
query
.update(DOWNLOADING_ITEM)
.set(DOWNLOADING_ITEM.STATE, DownloadingState.DOWNLOADING)
.where(DOWNLOADING_ITEM.ITEM_ID.eq(id))
.toMono()
.then()
.block()
}

fun remove(id: UUID, hasToBeStopped: Boolean): Mono<Void> {
val stop = if (hasToBeStopped) stopItem(id) else Mono.empty()

return query
fun remove(id: UUID, hasToBeStopped: Boolean) {
query
.deleteFrom(DOWNLOADING_ITEM)
.where(DOWNLOADING_ITEM.ITEM_ID.eq(id))
.toMono()
.then(stop.then())
}
.block()

fun moveItemInQueue(id: UUID, position: Int): Mono<Void> {
if (hasToBeStopped) stopItem(id)
}

fun moveItemInQueue(id: UUID, position: Int) {
val numberOfDownloadingItem = select(coalesce(max(DOWNLOADING_ITEM.POSITION), 0))
.from(DOWNLOADING_ITEM)
.where(DOWNLOADING_ITEM.STATE.eq(DownloadingState.DOWNLOADING))
Expand Down Expand Up @@ -179,35 +185,37 @@ class DownloadRepository(private val query: DSLContext) {
.and(value(true).eq(isMovingDown))
.returning()

return query
query
.with("updateMoveUp").`as`(updateMoveUp)
.with("updateMoveDown").`as`(updateMoveDown)
.update(DOWNLOADING_ITEM)
.set(DOWNLOADING_ITEM.POSITION, numberOfDownloadingItem + position + 1 )
.where(DOWNLOADING_ITEM.ITEM_ID.eq(id))
.toMono()
.then()
.block()
}

fun stopItem(id: UUID): Mono<Int> = Mono.defer {
query
fun stopItem(id: UUID): Int {
return query
.update(ITEM)
.set(ITEM.STATUS, STOPPED)
.where(ITEM.ID.eq(id))
.toMono()
.block() ?: 0
}

fun updateDownloadItem(item: DownloadingItem): Mono<Int> = Mono.defer {
query
fun updateDownloadItem(item: DownloadingItem): Int {
return query
.update(ITEM)
.set(ITEM.STATUS, item.status.toDb())
.set(ITEM.NUMBER_OF_FAIL, item.numberOfFail)
.where(ITEM.ID.eq(item.id))
.toMono()
.block() ?: 0
}

fun finishDownload(id: UUID, length: Long, mimeType: String, fileName: Path, downloadDate: OffsetDateTime): Mono<Int> = Mono.defer {
query
fun finishDownload(id: UUID, length: Long, mimeType: String, fileName: Path, downloadDate: OffsetDateTime): Int {
return query
.update(ITEM)
.set(ITEM.STATUS, FINISH)
.set(ITEM.LENGTH, length)
Expand All @@ -216,14 +224,16 @@ class DownloadRepository(private val query: DSLContext) {
.set(ITEM.DOWNLOAD_DATE, downloadDate)
.where(ITEM.ID.eq(id))
.toMono()
.block() ?: 0
}

fun resetToWaitingStateAllDownloadingItems(): Mono<Int> = Mono.defer {
query.
fun resetToWaitingStateAllDownloadingItems(): Int {
return query.
update(DOWNLOADING_ITEM)
.set(DOWNLOADING_ITEM.STATE, DownloadingState.WAITING)
.where(DOWNLOADING_ITEM.STATE.eq(DownloadingState.DOWNLOADING))
.toMono()
.block() ?: 0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ class ItemDownloadManager (

val queue: List<DownloadingItem>
get() = repository.findAllWaiting()
.collectList()
.block()!!

val downloading: List<DownloadingItem>
get() = repository.findAllDownloading()
.collectList()
.block()!!

var limitParallelDownload: Int
get() = downloadExecutor.corePoolSize
Expand All @@ -41,14 +37,13 @@ class ItemDownloadManager (
}

fun launchDownload() {
repository.initQueue(parameters.limitDownloadDate(), parameters.numberOfTry).block()
repository.initQueue(parameters.limitDownloadDate(), parameters.numberOfTry)
manageInBackground()
}

private fun manageDownload() {
val allToDownload = repository
.findAllToDownload(limitParallelDownload)
.collectList().block()!!

allToDownload.forEach(::launchDownloadFor)

Expand All @@ -63,7 +58,7 @@ class ItemDownloadManager (

downloaders[item.id] = downloader

repository.startItem(item.id).block()
repository.startItem(item.id)
downloadExecutor.execute(downloader)
}

Expand All @@ -72,18 +67,18 @@ class ItemDownloadManager (
}

fun addItemToQueue(id: UUID) {
repository.addItemToQueue(id).block()
repository.addItemToQueue(id)
manageInBackground()
}

fun removeItemFromQueue(id: UUID, stopItem: Boolean) {
repository.remove(id, stopItem).block()
repository.remove(id, stopItem)
manageInBackground()
}

fun removeACurrentDownload(id: UUID) {
downloaders.remove(id)
repository.remove(id, false).block()
repository.remove(id, false)
manageInBackground()
}

Expand All @@ -94,7 +89,7 @@ class ItemDownloadManager (
return
}

repository.remove(id, false).block()
repository.remove(id, false)
manageInBackground()
}

Expand All @@ -108,7 +103,6 @@ class ItemDownloadManager (

private fun convertAndSendWaitingQueue() {
val list = repository.findAllWaiting()
.collectList().block()!!

template.sendWaitingQueue(list)
}
Expand All @@ -118,7 +112,7 @@ class ItemDownloadManager (
}

fun moveItemInQueue(id: UUID, position: Int) {
repository.moveItemInQueue(id, position).block()
repository.moveItemInQueue(id, position)

convertAndSendWaitingQueueInBackground()
}
Expand Down
Loading

0 comments on commit 089c39b

Please sign in to comment.