Skip to content

Flow-less summary storage #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Producer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2022 UnitTestBot contributors (utbot.org)
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jacodb.analysis.ifds

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

interface Producer<T> {
fun produce(event: T)
fun subscribe(consumer: Consumer<T>)
}

fun interface Consumer<in T> {
fun consume(event: T)
}

class SyncProducer<T> : Producer<T> {
private val consumers: MutableList<Consumer<T>> = mutableListOf()
private val events: MutableList<T> = mutableListOf()

@Synchronized
override fun produce(event: T) {
for (consumer in consumers) {
consumer.consume(event)
}
events.add(event)
}

@Synchronized
override fun subscribe(consumer: Consumer<T>) {
for (event in events) {
consumer.consume(event)
}
consumers.add(consumer)
}
}

sealed interface ConsList<out T> : Iterable<T>

object Nil : ConsList<Nothing> {
override fun iterator(): Iterator<Nothing> = object : Iterator<Nothing> {
override fun hasNext(): Boolean = false
override fun next(): Nothing {
throw NoSuchElementException()
}
}

override fun toString(): String = javaClass.simpleName
}

data class Cons<out T>(
val value: T,
val tail: ConsList<T>,
) : ConsList<T> {
override fun iterator(): Iterator<T> = Iter(this)

private class Iter<T>(private var list: ConsList<T>) : Iterator<T> {
override fun hasNext(): Boolean = list !is Nil

override fun next(): T = when (val list = list) {
is Nil -> throw NoSuchElementException()
is Cons -> {
val value = list.value
this.list = list.tail
value
}
}
}
}

class NonBlockingQueue<T> {
data class Node<T>(
val value: T,
@Volatile var next: Node<T>? = null,
)

var head: Node<T>? = null
private set
val tail: AtomicReference<Node<T>> = AtomicReference(head)
val size: AtomicInteger = AtomicInteger(0)

fun add(element: T) {
val node = Node(element)
var currentTail: Node<T>?
while (true) {
currentTail = tail.get()
if (tail.compareAndSet(currentTail, node)) break
}
if (currentTail != null) {
currentTail.next = node
} else {
head = node
}
size.incrementAndGet()
}
}

class ConcurrentProducer<T> : Producer<T> {
private var consumers: AtomicReference<ConsList<Consumer<T>>> = AtomicReference(Nil)
private val events: NonBlockingQueue<T> = NonBlockingQueue()

override fun produce(event: T) {
var currentConsumers: ConsList<Consumer<T>>
while (true) {
currentConsumers = consumers.get() ?: continue
if (consumers.compareAndSet(currentConsumers, null)) break
}

events.add(event)

try {
for (consumer in currentConsumers) {
consumer.consume(event)
}
} finally {
check(consumers.compareAndSet(null, currentConsumers))
}
}

override fun subscribe(consumer: Consumer<T>) {
var last: NonBlockingQueue.Node<T>? = null
while (true) {
val start = if (last != null) last.next else events.head
var current = start
while (current != null) {
last = current
consumer.consume(current.value)
current = current.next
}

val currentConsumers = consumers.get() ?: continue
if (!consumers.compareAndSet(currentConsumers, null)) continue
if (events.tail.get() === last) {
val newConsumers = Cons(consumer, currentConsumers)
check(consumers.compareAndSet(null, newConsumers))
break
} else {
check(consumers.compareAndSet(null, currentConsumers))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class UniRunner<Fact, Event>(

// Add edge to worklist:
workList.trySend(edge).getOrThrow()
manager.handleControlEvent(queueIsNotEmpty)

return true
}
Expand All @@ -122,7 +123,6 @@ class UniRunner<Fact, Event>(
val edge = workList.tryReceive().getOrElse {
manager.handleControlEvent(queueIsEmpty)
val edge = workList.receive()
manager.handleControlEvent(queueIsNotEmpty)
edge
}
tabulationAlgorithmStep(edge, this@coroutineScope)
Expand Down
71 changes: 45 additions & 26 deletions jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Summary.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.jacodb.analysis.ifds

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import org.jacodb.api.JcMethod
Expand Down Expand Up @@ -48,56 +47,76 @@ interface Vulnerability<out Fact> : Summary {
/**
* Contains summaries for many methods and allows to update them and subscribe for them.
*/
interface SummaryStorage<T : Summary> {
class SummaryStorageWithFlows<T : Summary> {
private val summaries = ConcurrentHashMap<JcMethod, MutableSet<T>>()
private val outFlows = ConcurrentHashMap<JcMethod, MutableSharedFlow<T>>()

/**
* A list of all methods for which summaries are not empty.
* @return a list with all methods for which there are some summaries.
*/
val knownMethods: List<JcMethod>
val knownMethods: List<JcMethod>
get() = summaries.keys.toList()

private fun getFlow(method: JcMethod): MutableSharedFlow<T> {
return outFlows.computeIfAbsent(method) {
MutableSharedFlow(replay = Int.MAX_VALUE)
}
}

/**
* Adds [fact] to summary of its method.
* Adds a new [fact] to the storage.
*/
fun add(fact: T)
fun add(fact: T) {
val isNew = summaries.computeIfAbsent(fact.method) { ConcurrentHashMap.newKeySet() }.add(fact)
if (isNew) {
val flow = getFlow(fact.method)
check(flow.tryEmit(fact))
}
}

/**
* @return a flow with all facts summarized for the given [method].
* Already received facts, along with the facts that will be sent to this storage later,
* will be emitted to the returned flow.
*/
fun getFacts(method: JcMethod): Flow<T>
fun getFacts(method: JcMethod): SharedFlow<T> {
return getFlow(method)
}

/**
* @return a list will all facts summarized for the given [method] so far.
*/
fun getCurrentFacts(method: JcMethod): List<T>
fun getCurrentFacts(method: JcMethod): List<T> {
return getFacts(method).replayCache
}
}

class SummaryStorageImpl<T : Summary> : SummaryStorage<T> {
class SummaryStorageWithProducers<T : Summary>(
private val useConcurrentProducer: Boolean = false,
) {
private val summaries = ConcurrentHashMap<JcMethod, MutableSet<T>>()
private val outFlows = ConcurrentHashMap<JcMethod, MutableSharedFlow<T>>()

override val knownMethods: List<JcMethod>
get() = summaries.keys.toList()

private fun getFlow(method: JcMethod): MutableSharedFlow<T> {
return outFlows.computeIfAbsent(method) {
MutableSharedFlow(replay = Int.MAX_VALUE)
private val producers = ConcurrentHashMap<JcMethod, Producer<T>>()

private fun getProducer(method: JcMethod): Producer<T> {
return producers.computeIfAbsent(method) {
if (useConcurrentProducer) {
ConcurrentProducer()
} else {
SyncProducer()
}
}
}

override fun add(fact: T) {
fun add(fact: T) {
val isNew = summaries.computeIfAbsent(fact.method) { ConcurrentHashMap.newKeySet() }.add(fact)
if (isNew) {
val flow = getFlow(fact.method)
check(flow.tryEmit(fact))
val producer = getProducer(fact.method)
producer.produce(fact)
}
}

override fun getFacts(method: JcMethod): SharedFlow<T> {
return getFlow(method)
}

override fun getCurrentFacts(method: JcMethod): List<T> {
return getFacts(method).replayCache
fun subscribe(method: JcMethod, handler: (T) -> Unit) {
val producer = getProducer(method)
producer.subscribe(handler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
Expand All @@ -34,7 +32,9 @@ import org.jacodb.analysis.ifds.ControlEvent
import org.jacodb.analysis.ifds.IfdsResult
import org.jacodb.analysis.ifds.Manager
import org.jacodb.analysis.ifds.QueueEmptinessChanged
import org.jacodb.analysis.ifds.SummaryStorageImpl
import org.jacodb.analysis.ifds.SummaryEdge
import org.jacodb.analysis.ifds.SummaryStorageWithFlows
import org.jacodb.analysis.ifds.SummaryStorageWithProducers
import org.jacodb.analysis.ifds.TraceGraph
import org.jacodb.analysis.ifds.UniRunner
import org.jacodb.analysis.ifds.UnitResolver
Expand Down Expand Up @@ -63,8 +63,8 @@ open class TaintManager(
protected val runnerForUnit: MutableMap<UnitType, TaintRunner> = hashMapOf()
private val queueIsEmpty = ConcurrentHashMap<UnitType, Boolean>()

private val summaryEdgesStorage = SummaryStorageImpl<TaintSummaryEdge>()
private val vulnerabilitiesStorage = SummaryStorageImpl<TaintVulnerability>()
private val summaryEdgesStorage = SummaryStorageWithProducers<SummaryEdge<TaintDomainFact>>()
private val vulnerabilitiesStorage = SummaryStorageWithFlows<TaintVulnerability>()

private val stopRendezvous = Channel<Unit>(Channel.RENDEZVOUS)

Expand Down Expand Up @@ -277,10 +277,7 @@ open class TaintManager(
scope: CoroutineScope,
handler: (TaintEdge) -> Unit,
) {
summaryEdgesStorage
.getFacts(method)
.onEach { handler(it.edge) }
.launchIn(scope)
summaryEdgesStorage.subscribe(method) { handler(it.edge) }
}

fun vulnerabilityTraceGraph(vulnerability: TaintVulnerability): TraceGraph<TaintDomainFact> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.jacodb.analysis.ifds.Edge
import org.jacodb.analysis.ifds.Manager
import org.jacodb.analysis.ifds.QueueEmptinessChanged
import org.jacodb.analysis.ifds.Runner
import org.jacodb.analysis.ifds.SummaryStorageImpl
import org.jacodb.analysis.ifds.SummaryStorageWithFlows
import org.jacodb.analysis.ifds.UniRunner
import org.jacodb.analysis.ifds.UnitResolver
import org.jacodb.analysis.ifds.UnitType
Expand Down Expand Up @@ -62,8 +62,7 @@ class UnusedVariableManager(
private val runnerForUnit: MutableMap<UnitType, Runner<UnusedVariableDomainFact>> = hashMapOf()
private val queueIsEmpty = ConcurrentHashMap<UnitType, Boolean>()

private val summaryEdgesStorage = SummaryStorageImpl<UnusedVariableSummaryEdge>()
private val vulnerabilitiesStorage = SummaryStorageImpl<UnusedVariableVulnerability>()
private val summaryEdgesStorage = SummaryStorageWithFlows<UnusedVariableSummaryEdge>()

private val stopRendezvous = Channel<Unit>(Channel.RENDEZVOUS)

Expand Down
41 changes: 41 additions & 0 deletions jacodb-analysis/src/test/resources/additional.json
Original file line number Diff line number Diff line change
Expand Up @@ -603,5 +603,46 @@
}
}
]
},
{
"_": "MethodSource",
"methodInfo": {
"cls": {
"classNameMatcher": {
"_": "NameIsEqualTo",
"name": "Properties"
},
"packageMatcher": {
"_": "NameIsEqualTo",
"name": "java.util"
}
},
"functionName": {
"_": "NameIsEqualTo",
"name": "getProperty"
},
"applyToOverrides": true,
"exclude": [],
"functionLabel": null,
"modifier": -1,
"parametersMatchers": [],
"returnTypeMatcher": {
"_": "AnyTypeMatches"
}
},
"condition": {
"_": "ConstantTrue"
},
"actionsAfter": [
{
"_": "AssignMark",
"position": {
"_": "Result"
},
"mark": {
"name": "PROPERTY"
}
}
]
}
]