Skip to content

Commit

Permalink
Extract 'BlockingBridge' for IO operations (#1933)
Browse files Browse the repository at this point in the history
  • Loading branch information
nulls authored Mar 5, 2023
1 parent 88868be commit 2c6d10c
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.saveourtool.save.backend.configs

import com.saveourtool.save.service.LogService
import com.saveourtool.save.service.LokiLogService
import com.saveourtool.save.utils.BlockingBridge
import org.springframework.boot.actuate.autoconfigure.metrics.orm.jpa.HibernateMetricsAutoConfiguration
import org.springframework.boot.autoconfigure.ImportAutoConfiguration
import org.springframework.boot.autoconfigure.domain.EntityScan
Expand All @@ -23,4 +24,10 @@ class ApplicationConfiguration {
*/
@Bean
fun logService(configProperties: ConfigProperties): LogService = LokiLogService.createOrStub(configProperties.loki)

/**
* @return [BlockingBridge]
*/
@Bean
fun blockingBridge(): BlockingBridge = BlockingBridge.default
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.saveourtool.save.entities.Project
import com.saveourtool.save.entities.toEntity
import com.saveourtool.save.storage.concatS3Key
import com.saveourtool.save.storage.key.AbstractS3KeyDtoManager
import com.saveourtool.save.utils.BlockingBridge
import com.saveourtool.save.utils.orNotFound

import org.springframework.data.repository.findByIdOrNull
Expand All @@ -24,11 +25,13 @@ import kotlinx.datetime.toJavaLocalDateTime
class FileS3KeyManager(
configProperties: ConfigProperties,
fileRepository: FileRepository,
blockingBridge: BlockingBridge,
private val projectService: ProjectService,
private val executionService: ExecutionService,
) : AbstractS3KeyDtoManager<FileDto, File, FileRepository>(
concatS3Key(configProperties.s3Storage.prefix, "storage"),
fileRepository,
blockingBridge,
) {
override fun createNewEntityFromDto(dto: FileDto): File = dto.toEntity {
projectService.findByNameAndOrganizationNameAndCreatedStatus(dto.projectCoordinates.projectName, dto.projectCoordinates.organizationName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import com.saveourtool.save.entities.TestsSourceSnapshot.Companion.toEntity
import com.saveourtool.save.storage.concatS3Key
import com.saveourtool.save.storage.key.AbstractS3KeyDtoManager
import com.saveourtool.save.test.TestsSourceSnapshotDto
import com.saveourtool.save.utils.BlockingBridge
import com.saveourtool.save.utils.getByIdOrNotFound

import org.springframework.stereotype.Component

/**
Expand All @@ -21,12 +23,14 @@ import org.springframework.stereotype.Component
class TestsSourceSnapshotS3KeyManager(
configProperties: ConfigProperties,
testsSourceSnapshotRepository: TestsSourceSnapshotRepository,
blockingBridge: BlockingBridge,
private val testSuitesSourceRepository: TestSuitesSourceRepository,
private val testSuitesService: TestSuitesService,
private val executionService: ExecutionService,
) : AbstractS3KeyDtoManager<TestsSourceSnapshotDto, TestsSourceSnapshot, TestsSourceSnapshotRepository>(
concatS3Key(configProperties.s3Storage.prefix, "tests-source-snapshot"),
testsSourceSnapshotRepository,
blockingBridge
) {
override fun createNewEntityFromDto(dto: TestsSourceSnapshotDto): TestsSourceSnapshot = dto.toEntity { testSuitesSourceRepository.getByIdOrNotFound(it) }

Expand Down
1 change: 1 addition & 0 deletions save-backend/src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ backend.s3-storage.presigned-endpoint=http://host.docker.internal:9000
backend.s3-storage.bucketName=cnb
backend.s3-storage.credentials.accessKeyId=admin
backend.s3-storage.credentials.secretAccessKey=adminadmin
backend.s3-storage.createBucketIfNotExists=true
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import kotlinx.coroutines.future.asDeferred
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher

import kotlinx.coroutines.withContext

/**
* S3 implementation of [StorageCoroutines]
*
Expand Down Expand Up @@ -171,7 +169,7 @@ class DefaultStorageCoroutines<K : Any>(

private suspend fun <R> S3KeyManager<K>.callAsSuspend(function: S3KeyManager<K>.() -> R): R =
if (s3KeyManager is AbstractS3KeyDatabaseManager<*, *, *>) {
withContext(s3KeyManager.ioDispatcher) {
s3KeyManager.blockingBridge.blockingToSuspend {
function(this@callAsSuspend)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,10 @@ class DefaultStorageProjectReactor<K : Any>(

private fun createNewS3Key(key: K): Mono<String> = s3KeyManager.callAsMono { createNewS3Key(key) }

private fun <R : Any> S3KeyManager<K>.callAsMono(function: S3KeyManager<K>.() -> R?): Mono<R> = { function(this) }
.toMono()
.let {
private fun <R : Any> S3KeyManager<K>.callAsMono(function: S3KeyManager<K>.() -> R?): Mono<R> =
if (s3KeyManager is AbstractS3KeyDatabaseManager<*, *, *>) {
it.subscribeOn(s3KeyManager.ioScheduler)
s3KeyManager.blockingBridge.blockingToMono { function(this) }
} else {
it
{ function(this) }.toMono()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,23 @@ package com.saveourtool.save.storage.key

import com.saveourtool.save.spring.entity.BaseEntity
import com.saveourtool.save.spring.repository.BaseEntityRepository
import com.saveourtool.save.utils.BlockingBridge
import com.saveourtool.save.utils.orNotFound

import org.springframework.data.repository.findByIdOrNull
import org.springframework.transaction.annotation.Transactional
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers

/**
* Implementation of [S3KeyManager] which stores keys in database
*
* @param prefix a common prefix for all keys in S3 storage for this storage
* @property repository repository for [E]
* @property ioScheduler
* @property ioDispatcher
* @property blockingBridge
*/
abstract class AbstractS3KeyDatabaseManager<K : Any, E : BaseEntity, R : BaseEntityRepository<E>>(
prefix: String,
protected val repository: R,
val ioScheduler: Scheduler = Schedulers.boundedElastic(),
val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
val blockingBridge: BlockingBridge,
) : S3KeyManager<K> {
/**
* [S3KeyManager] with [Long] as key (it's [ID][com.saveourtool.save.spring.entity.BaseEntity.requiredId])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.saveourtool.save.storage.key
import com.saveourtool.save.entities.DtoWithId
import com.saveourtool.save.spring.entity.BaseEntityWithDtoWithId
import com.saveourtool.save.spring.repository.BaseEntityRepository
import com.saveourtool.save.utils.BlockingBridge
import com.saveourtool.save.utils.orNotFound
import org.springframework.data.repository.findByIdOrNull

Expand All @@ -11,11 +12,13 @@ import org.springframework.data.repository.findByIdOrNull
*
* @param prefix a common prefix for all keys in S3 storage for this storage
* @param repository repository for [E] which is entity for [K]
* @param blockingBridge
*/
abstract class AbstractS3KeyDtoManager<K : DtoWithId, E : BaseEntityWithDtoWithId<K>, R : BaseEntityRepository<E>>(
prefix: String,
repository: R,
) : AbstractS3KeyDatabaseManager<K, E, R>(prefix, repository) {
blockingBridge: BlockingBridge,
) : AbstractS3KeyDatabaseManager<K, E, R>(prefix, repository, blockingBridge) {
override fun E.toKey(): K = toDto()

override fun K.toEntity(): E = createNewEntityFromDto(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.saveourtool.save.storage.key

import com.saveourtool.save.spring.entity.BaseEntity
import com.saveourtool.save.spring.repository.BaseEntityRepository
import com.saveourtool.save.utils.BlockingBridge
import com.saveourtool.save.utils.orNotFound
import org.springframework.data.repository.findByIdOrNull

Expand All @@ -10,13 +11,16 @@ import org.springframework.data.repository.findByIdOrNull
*
* @param prefix a common prefix for all keys in S3 storage for this storage
* @param repository repository for [E]
* @param blockingBridge
*/
abstract class AbstractS3KeyEntityManager<E : BaseEntity, R : BaseEntityRepository<E>>(
prefix: String,
repository: R,
blockingBridge: BlockingBridge,
) : AbstractS3KeyDatabaseManager<E, E, R>(
prefix,
repository,
blockingBridge,
) {
override fun E.toKey(): E = this

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.saveourtool.save.utils

import org.jetbrains.annotations.NonBlocking
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.toMono

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext

/**
* A bridge for blocking (IO) operations
*
* @property ioScheduler [Scheduler] for IO operations for [Mono] and [Flux]
* @property ioDispatcher [CoroutineDispatcher] for IO operations in suspend function
*/
class BlockingBridge(
val ioScheduler: Scheduler = Schedulers.boundedElastic(),
val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
) {
/**
* Taking from https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
*
* @param supplier blocking operation like JDBC
* @return [Mono] from result of blocking operation [T]
* @see blockingToFlux
*/
@NonBlocking
fun <T : Any> blockingToMono(supplier: () -> T?): Mono<T> = supplier.toMono().subscribeOn(ioScheduler)

/**
* @param supplier blocking operation like JDBC
* @return [Flux] from result of blocking operation [List] of [T]
* @see blockingToMono
*/
@NonBlocking
fun <T> blockingToFlux(supplier: () -> Iterable<T>): Flux<T> = blockingToMono(supplier).flatMapIterable { it }

/**
* @param supplier blocking operation like JDBC
* @return suspend result of blocking operation [T]
* @see blockingToMono
*/
@NonBlocking
suspend fun <T> blockingToSuspend(supplier: () -> T): T = withContext(ioDispatcher) {
supplier()
}

companion object {
/**
* A default instance of [BlockingBridge]
*/
val default: BlockingBridge = BlockingBridge()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.switchIfEmpty
import reactor.kotlin.core.publisher.switchIfEmptyDeferred
import reactor.kotlin.core.publisher.toMono

import java.io.InputStream
import java.io.SequenceInputStream
Expand Down Expand Up @@ -256,20 +255,21 @@ fun ByteReadChannel.toByteArrayFlow(): Flow<ByteArray> = flow {
* @see blockingToFlux
* @see ResponseSpec.blockingBodyToMono
* @see ResponseSpec.blockingToBodilessEntity
* @see BlockingBridge
*/
@NonBlocking
fun <T : Any> blockingToMono(supplier: () -> T?): Mono<T> = supplier.toMono()
.subscribeOn(Schedulers.boundedElastic())
fun <T : Any> blockingToMono(supplier: () -> T?): Mono<T> = BlockingBridge.default.blockingToMono(supplier)

/**
* @param supplier blocking operation like JDBC
* @return [Flux] from result of blocking operation [List] of [T]
* @see blockingToMono
* @see ResponseSpec.blockingBodyToMono
* @see ResponseSpec.blockingToBodilessEntity
* @see BlockingBridge
*/
@NonBlocking
fun <T> blockingToFlux(supplier: () -> Iterable<T>): Flux<T> = blockingToMono(supplier).flatMapIterable { it }
fun <T> blockingToFlux(supplier: () -> Iterable<T>): Flux<T> = BlockingBridge.default.blockingToFlux(supplier)

/**
* @param interval how long to wait between checks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.saveourtool.save.demo.config

import com.saveourtool.save.utils.BlockingBridge
import io.fabric8.kubernetes.client.ConfigBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.KubernetesClientBuilder
Expand Down Expand Up @@ -28,4 +29,10 @@ class Beans {
.build())
.build()
}

/**
* @return [BlockingBridge]
*/
@Bean
fun blockingBridge(): BlockingBridge = BlockingBridge.default
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.saveourtool.save.demo.entity.Dependency
import com.saveourtool.save.demo.repository.DependencyRepository
import com.saveourtool.save.storage.concatS3Key
import com.saveourtool.save.storage.key.AbstractS3KeyEntityManager
import com.saveourtool.save.utils.BlockingBridge
import org.springframework.stereotype.Component

/**
Expand All @@ -14,9 +15,11 @@ import org.springframework.stereotype.Component
class DependencyKeyManager(
configProperties: ConfigProperties,
repository: DependencyRepository,
blockingBridge: BlockingBridge,
) : AbstractS3KeyEntityManager<Dependency, DependencyRepository>(
prefix = concatS3Key(configProperties.s3Storage.prefix, "deps"),
repository = repository,
blockingBridge = blockingBridge,
) {
override fun findByContent(key: Dependency): Dependency? =
repository.findByDemoAndVersionAndFileId(key.demo, key.version, key.fileId)
Expand Down

0 comments on commit 2c6d10c

Please sign in to comment.