-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathRunner.scala
302 lines (254 loc) · 9.85 KB
/
Runner.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package akka.dispatch.verification
import akka.actor.{ Actor, ActorRef, DeadLetter }
import akka.actor.{ActorSystem, ExtendedActorSystem}
import akka.actor.Props
import akka.dispatch.verification._
import scala.collection.mutable.Queue
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.Map
import scala.util.Random
import pl.project13.scala.akka.raft.example._
import pl.project13.scala.akka.raft.protocol._
import pl.project13.scala.akka.raft.example.protocol._
import pl.project13.scala.akka.raft._
import pl.project13.scala.akka.raft.model._
import runner.raftchecks._
import runner.raftserialization._
import java.nio._
import akka.actor.FSM
import akka.actor.FSM.Timer
import scalax.collection.mutable.Graph,
scalax.collection.GraphEdge.DiEdge,
scalax.collection.edge.LDiEdge
class RaftMessageFingerprinter extends MessageFingerprinter {
override def fingerprint(msg: Any) : Option[MessageFingerprint] = {
val alreadyFingerprint = super.fingerprint(msg)
if (alreadyFingerprint != None) {
return alreadyFingerprint
}
def removeId(ref: ActorRef) : String = {
return ref.path.name
}
val str = msg match {
case RequestVote(term, ref, lastTerm, lastIdx) =>
(("RequestVote", term, removeId(ref), lastTerm, lastIdx)).toString
case LeaderIs(Some(ref), msg) =>
("LeaderIs", removeId(ref)).toString
case ClientMessage(deadLetters,cmd) =>
("ClientMessage", cmd).toString
case m =>
""
}
if (str != "") {
return Some(BasicFingerprint(str))
}
return None
}
// Does this message trigger a logical clock contained in subsequent
// messages to be incremented?
override def causesClockIncrement(msg: Any) : Boolean = {
msg match {
case Timer("election-timer", _, _, _) => return true
case _ => return false
}
}
// Extract a clock value from the contents of this message
override def getLogicalClock(msg: Any) : Option[Long] = {
msg match {
case RequestVote(term, _, _, _) =>
return Some(term.termNr)
case AppendEntries(term, _, _, _, _) =>
return Some(term.termNr)
case VoteCandidate(term) =>
return Some(term.termNr)
case DeclineCandidate(term) =>
return Some(term.termNr)
case a: AppendResponse =>
return Some(a.term.termNr)
case _ => return None
}
}
}
class AppendWordConstuctor(word: String) extends ExternalMessageConstructor {
def apply() : Any = {
return ClientMessage[AppendWord](Instrumenter().actorSystem.deadLetters, AppendWord(word))
}
}
class ClientMessageGenerator(raft_members: Seq[String]) extends MessageGenerator {
var highestWordUsedSoFar = 0
val rand = new Random
val destinations = new RandomizedHashSet[String]
for (dst <- raft_members) {
destinations.insert(dst)
}
def generateMessage(alive: RandomizedHashSet[String]) : Send = {
val dst = destinations.getRandomElement()
var word = highestWordUsedSoFar.toString
highestWordUsedSoFar += 1
return Send(dst, new AppendWordConstuctor(word))
}
}
case class BootstrapMessageConstructor(maskedIndices: Set[Int]) extends ExternalMessageConstructor {
@scala.transient
var components : Seq[ActorRef] = Seq.empty
def apply(): Any = {
val all = Instrumenter().actorMappings.filter({
case (k,v) => k != "client" && !ActorTypes.systemActor(k)
}).values.toSeq.sorted
// TODO(cs): factor zipWithIndex magic into a static method in ExternalMessageConstructor.
components = all.zipWithIndex.filterNot {
case (e,i) => maskedIndices contains i
}.map { case (e,i) => e }.toSeq
return ChangeConfiguration(ClusterConfiguration(components))
}
override def getComponents() = components
override def maskComponents(indices: Set[Int]) : ExternalMessageConstructor = {
return new BootstrapMessageConstructor(indices ++ maskedIndices)
}
}
object Init {
def actorCtor(): Props = {
return Props.create(classOf[WordConcatRaftActor])
}
def startCtor(): Any = {
val clusterRefs = Instrumenter().actorMappings.filter({
case (k,v) => k != "client" && !ActorTypes.systemActor(k)
}).values
return ChangeConfiguration(ClusterConfiguration(clusterRefs))
}
// Very important! Need to update the actor refs recorded in the event
// trace, since they are no longer valid for this new actor system.
def updateActorRef(ref: ActorRef) : ActorRef = {
val newRef = Instrumenter().actorSystem.actorFor("/user/" + ref.path.name)
assert(newRef.path.name != "deadLetters")
return newRef
}
def externalMessageFilter(msg: Any) = {
msg match {
case ChangeConfiguration(_) => true
case ClientMessage(_, _) => true
case _ => false
}
}
}
object Main extends App {
Instrumenter().setLogLevel("ERROR")
EventTypes.setExternalMessageFilter(Init.externalMessageFilter)
Instrumenter().setPassthrough
Instrumenter().actorSystem
Instrumenter().unsetPassthrough
val raftChecks = new RaftChecks
val fingerprintFactory = new FingerprintFactory
fingerprintFactory.registerFingerprinter(new RaftMessageFingerprinter)
// -- Used for initial fuzzing: --
val members = (1 to 4) map { i => s"raft-member-$i" }
val prefix = Array[ExternalEvent]() ++
members.map(member =>
Start(Init.actorCtor, member)) ++
members.map(member =>
Send(member, new BootstrapMessageConstructor(Set[Int]())))
Array[ExternalEvent](WaitQuiescence()
)
// -- --
def shutdownCallback() = {
raftChecks.clear
}
Instrumenter().registerShutdownCallback(shutdownCallback)
val schedulerConfig = SchedulerConfig(
messageFingerprinter=fingerprintFactory,
enableFailureDetector=false,
enableCheckpointing=true,
shouldShutdownActorSystem=true,
filterKnownAbsents=false,
invariant_check=Some(raftChecks.invariant),
ignoreTimers=false
)
val weights = new FuzzerWeights(kill=0.00, send=0.3, wait_quiescence=0.0,
partition=0.0, unpartition=0)
val messageGen = new ClientMessageGenerator(members)
val fuzzer = new Fuzzer(100, weights, messageGen, prefix)
val mode = RunnerUtils.getExecutionMode(args)
var traceFound: EventTrace = null
var violationFound: ViolationFingerprint = null
var depGraph : Graph[Unique, DiEdge] = null
var initialTrace : Queue[Unique] = null
var filteredTrace : Queue[Unique] = null
if (mode == ExecutionMode.FUZZ) {
def replayerCtor() : ReplayScheduler = {
val replayer = new ReplayScheduler(schedulerConfig)
return replayer
}
def randomizationStrategy() : RandomizationStrategy = {
return new SrcDstFIFO
}
val tuple = RunnerUtils.fuzz(fuzzer, raftChecks.invariant,
schedulerConfig,
validate_replay=Some(replayerCtor),
maxMessages=Some(3000), // XXX
invariant_check_interval=30,
randomizationStrategyCtor=randomizationStrategy)
traceFound = tuple._1
violationFound = tuple._2
depGraph = tuple._3
initialTrace = tuple._4
filteredTrace = tuple._5
}
if (mode == ExecutionMode.FUZZ) {
var provenanceTrace = traceFound.intersection(filteredTrace, fingerprintFactory)
val serializer = new ExperimentSerializer(
fingerprintFactory,
new RaftMessageSerializer)
val dir = serializer.record_experiment("akka-raft-fuzz-long",
traceFound.filterCheckpointMessages(), violationFound,
depGraph=Some(depGraph), initialTrace=Some(initialTrace),
filteredTrace=Some(filteredTrace))
val (mcs, stats1, verified_mcs, violation) =
RunnerUtils.stsSchedDDMin(false,
schedulerConfig,
provenanceTrace,
violationFound,
actorNameProps=Some(ExperimentSerializer.getActorNameProps(traceFound)))
val mcs_dir = serializer.serializeMCS(dir, mcs, stats1, verified_mcs, violation, false)
println("verified_mcs.size: " +
RunnerUtils.getDeliveries(verified_mcs.get).size)
println("MCS DIR: " + mcs_dir)
} else if (mode == ExecutionMode.MINIMIZE) {
val dir =
"experiments/akka-raft-fuzz-long_2016_01_03_20_36_00"
val mcs_dir =
"experiments/akka-raft-fuzz-long_2016_01_03_20_36_00_DDMin_STSSchedNoPeek"
val msgSerializer = new RaftMessageSerializer
val msgDeserializer = new RaftMessageDeserializer(Instrumenter()._actorSystem)
def shouldRerunDDMin(externals: Seq[ExternalEvent]) =
externals.exists({
case s: Send => s.messageCtor.isInstanceOf[AppendWordConstuctor]
case _ => false
})
RunnerUtils.runTheGamut(dir, mcs_dir, schedulerConfig, msgSerializer,
msgDeserializer, shouldRerunDDMin=shouldRerunDDMin)
} else if (mode == ExecutionMode.INTERACTIVE) {
val mcs_dir =
"experiments/akka-raft-fuzz-long_2016_01_03_20_36_00_DDMin_STSSchedNoPeek"
val msgSerializer = new RaftMessageSerializer
val msgDeserializer = new RaftMessageDeserializer(Instrumenter()._actorSystem)
val deserializer = new ExperimentDeserializer(mcs_dir)
val violation = deserializer.get_violation(msgDeserializer)
val externals = deserializer.get_mcs
println("externals:")
externals.foreach { case e => println(e) }
// TODO(cs): put me in RunnerUtils, along with recording.
val sched = new InteractiveScheduler(schedulerConfig)
Instrumenter().scheduler = sched
val (trace, maybeViolation) = sched.run(externals)
val serializer = new ExperimentSerializer(
fingerprintFactory,
msgSerializer)
val new_dir = serializer.record_experiment("akka-raft-interactive",
trace.filterCheckpointMessages())
//serializer.recordMinimizationStats(dir, stats)
println("Found failing trace: " + trace.filterCheckpointMessages().size)
println("Saved trace at " + new_dir)
}
}