Skip to content

Commit 3246237

Browse files
committed
more async tasks
1 parent 81f42b7 commit 3246237

File tree

4 files changed

+311
-0
lines changed

4 files changed

+311
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package cub.async.answers
2+
3+
import java.lang.Exception
4+
import java.util.concurrent.Executors
5+
import java.util.concurrent.TimeUnit
6+
import kotlin.concurrent.thread
7+
8+
class AsyncFrameworkResultClass {
9+
private val threadPool = Executors.newCachedThreadPool()
10+
11+
fun <T> async(call1: () -> T, call2: (Result<T>) -> Unit) {
12+
val runnable = Runnable {
13+
val res = try {
14+
val t = call1()
15+
Success(t)
16+
} catch (e: Exception) {
17+
Error(e)
18+
}
19+
threadPool.submit({ call2(res) })
20+
}
21+
threadPool.submit(runnable)
22+
}
23+
24+
fun close() {
25+
threadPool.shutdown()
26+
threadPool.awaitTermination(1, TimeUnit.HOURS)
27+
}
28+
}
29+
30+
sealed class Result<R>
31+
32+
class Success<R>(val r: R) : Result<R>()
33+
34+
class Error<R>(val e: Exception) : Result<R>()
35+
36+
class AsyncFrameworkCallbackClass {
37+
private val threadPool = Executors.newCachedThreadPool()
38+
39+
fun <T> async(call1: () -> T, call2: AsyncCallback<T>.() -> Unit) {
40+
val runnable = Runnable {
41+
val nextCallback = AsyncCallback<T>()
42+
nextCallback.call2()
43+
val trueCall2 = try {
44+
val t = call1()
45+
Runnable { nextCallback._onSuccess(t) }
46+
} catch (e: Exception) {
47+
Runnable { nextCallback._onError(e) }
48+
}
49+
threadPool.submit(trueCall2)
50+
}
51+
threadPool.submit(runnable)
52+
}
53+
54+
fun close() {
55+
threadPool.shutdown()
56+
threadPool.awaitTermination(1, TimeUnit.HOURS)
57+
}
58+
59+
inner class AsyncCallback<R> {
60+
internal var _onSuccess: (R) -> Unit = { }
61+
internal var _onError: (Exception) -> Unit = { throw it }
62+
63+
fun onSuccess(callback: (R) -> Unit) {
64+
_onSuccess = callback
65+
}
66+
67+
fun onError(callback: (Exception) -> Unit) {
68+
_onError = callback
69+
}
70+
}
71+
}
72+
73+
fun callback() {
74+
val a = AsyncFrameworkCallbackClass()
75+
a.async({ 123 }) {
76+
onSuccess { res ->
77+
a.async({ res.toDouble() }) {
78+
onSuccess {
79+
println(it)
80+
thread { a.close() }
81+
}
82+
}
83+
}
84+
onError {
85+
println("ERROR!")
86+
}
87+
}
88+
}
89+
90+
fun result() {
91+
val a = AsyncFrameworkResultClass()
92+
a.async({ 123 }) { res ->
93+
when (res) {
94+
is Success -> {
95+
a.async({ res.r.toDouble() }) { res2 ->
96+
when (res2) {
97+
is Success -> {
98+
println(res2.r)
99+
thread { a.close() }
100+
}
101+
else -> { }
102+
}
103+
}
104+
}
105+
is Error -> {
106+
println("ERROR!")
107+
}
108+
}
109+
}
110+
}
111+
112+
fun main() {
113+
callback()
114+
result()
115+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package cub.async.answers
2+
3+
import cub.async.tasks.*
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.channels.BufferOverflow
6+
import kotlinx.coroutines.channels.Channel
7+
import kotlinx.coroutines.flow.MutableSharedFlow
8+
import kotlinx.coroutines.sync.Mutex
9+
import kotlinx.coroutines.sync.withLock
10+
import java.lang.Exception
11+
12+
private suspend fun <T> theRetry(arg: Int, block: suspend (Int, Int) -> T): T {
13+
repeat(4) { // try 4 times
14+
try {
15+
return withTimeout(500) { // with timeout
16+
block(arg, it)
17+
}
18+
} catch (e: TimeoutCancellationException) { /* retry */ }
19+
}
20+
return block(arg, 5) // last time just invoke without timeout
21+
}
22+
23+
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
24+
fun List<Int>.coroutineSumThread(): Int {
25+
val sumContext = newSingleThreadContext("sum")
26+
var sum = 0
27+
runBlocking {
28+
supervisorScope {
29+
forEach { i ->
30+
launch(Dispatchers.Default + CoroutineExceptionHandler { _, e -> }) {
31+
val res1 = async { theRetry(i, ::f1) }
32+
val res2 = async { theRetry(i, ::f2) }
33+
val res = f3(res1.await(), res2.await())
34+
withContext(sumContext) {
35+
sum += res
36+
}
37+
}
38+
}
39+
}
40+
}
41+
return sum
42+
}
43+
44+
fun List<Int>.coroutineSumMutex(): Int {
45+
val mutex = Mutex()
46+
var sum = 0
47+
runBlocking {
48+
supervisorScope {
49+
forEach { i ->
50+
launch(Dispatchers.Default + CoroutineExceptionHandler { _, e -> }) {
51+
val res1 = async { theRetry(i, ::f1) }
52+
val res2 = async { theRetry(i, ::f2) }
53+
val res = f3(res1.await(), res2.await())
54+
mutex.withLock {
55+
sum += res
56+
}
57+
}
58+
}
59+
}
60+
}
61+
return sum
62+
}
63+
64+
fun List<Int>.coroutineSumChannel(): Int {
65+
val channel = Channel<Int>(capacity = 10, onBufferOverflow = BufferOverflow.SUSPEND)
66+
var sum = 0
67+
runBlocking {
68+
coroutineScope {
69+
launch {
70+
for (msg in channel) {
71+
sum += msg
72+
}
73+
}
74+
supervisorScope {
75+
forEach { i ->
76+
launch(Dispatchers.Default + CoroutineExceptionHandler { _, e -> }) {
77+
val res1 = async { theRetry(i, ::f1) }
78+
val res2 = async { theRetry(i, ::f2) }
79+
val res = f3(res1.await(), res2.await())
80+
channel.send(res)
81+
}
82+
}
83+
}
84+
// will be called when supervisorScope is done
85+
channel.close()
86+
}
87+
}
88+
return sum
89+
}
90+
91+
fun List<Int>.coroutineSumSharedFlow(): Int {
92+
val flow = MutableSharedFlow<Int>(onBufferOverflow = BufferOverflow.SUSPEND)
93+
var sum = 0
94+
runBlocking {
95+
coroutineScope {
96+
val collectorJob = launch {
97+
flow.collect {
98+
sum += it
99+
}
100+
}
101+
supervisorScope {
102+
forEach { i ->
103+
launch(Dispatchers.Default + CoroutineExceptionHandler { _, e -> }) {
104+
val res1 = async { theRetry(i, ::f1) }
105+
val res2 = async { theRetry(i, ::f2) }
106+
val res = f3(res1.await(), res2.await())
107+
flow.emit(res)
108+
}
109+
}
110+
}
111+
collectorJob.cancel()
112+
}
113+
}
114+
return sum
115+
}
116+
117+
fun main() {
118+
val data = getData()
119+
val answer = data.map {
120+
try {
121+
val v1 = f1(it, 23)
122+
val v2 = f2(it, 23)
123+
f3(v1, v2)
124+
} catch (e: Exception) {
125+
0
126+
}
127+
}.sum()
128+
testSolution(answer, data, List<Int>::coroutineSumThread, "Special thread")
129+
testSolution(answer, data, List<Int>::coroutineSumMutex, "Mutex")
130+
testSolution(answer, data, List<Int>::coroutineSumChannel, "Channel")
131+
testSolution(answer, data, List<Int>::coroutineSumSharedFlow, "Flow")
132+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package cub.async.tasks
2+
3+
/*
4+
* Same as Task 2, but now we want to handle errors somehow
5+
*/
6+
7+
interface AsyncerFramework
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package cub.async.tasks
2+
3+
import java.lang.Exception
4+
import java.lang.RuntimeException
5+
import kotlin.random.Random
6+
import kotlin.system.measureTimeMillis
7+
8+
// For each int in list you have to call f1 and f2 with timeout of 500 until success or exception.
9+
// Then combine 2 resulting ints and get the sum.
10+
11+
fun getData() = List(10) { Random.nextInt(100) }
12+
13+
fun f1(i: Int, attempt: Int): Int {
14+
if (attempt == 23)
15+
return i * 2
16+
val waitTime: Long = if (attempt != 3) 2000 else 200
17+
Thread.sleep(waitTime)
18+
return i * 2
19+
}
20+
21+
fun f2(i: Int, attempt: Int): Int {
22+
if (i % 3 == 0) throw RuntimeException("Something went wrong")
23+
if (attempt == 23)
24+
return i * 3
25+
val waitTime: Long = if (attempt != 3) 2000 else 200
26+
Thread.sleep(waitTime)
27+
return i * 3
28+
}
29+
30+
fun f3(i: Int, j: Int) = i % j
31+
32+
fun testSolution(expected: Int, data: List<Int>, solution: List<Int>.() -> Int, name: String = "Solution") {
33+
print("$name. ")
34+
val t = measureTimeMillis {
35+
val actual = data.solution()
36+
if (actual == expected) {
37+
print("Correct. ")
38+
} else {
39+
print("WRONG! Expected: $expected; Actual: $actual. ")
40+
}
41+
}
42+
println("${t}ms")
43+
}
44+
45+
fun main() {
46+
val data = getData()
47+
val answer = data.map {
48+
try {
49+
val v1 = f1(it, 23)
50+
val v2 = f2(it, 23)
51+
f3(v1, v2)
52+
} catch (e: Exception) {
53+
0
54+
}
55+
}.sum()
56+
// testSolution(answer, data, TODO)
57+
}

0 commit comments

Comments
 (0)