Skip to content

Commit ef5a332

Browse files
committed
WIP Graph processing
1 parent 7522f86 commit ef5a332

File tree

1 file changed

+100
-10
lines changed

1 file changed

+100
-10
lines changed

tests/FSharp.Compiler.Service.Tests2/RunCompiler.fs

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,128 @@
11
module FSharp.Compiler.Service.Tests.RunCompiler
22

3+
open System
34
open System.Collections.Concurrent
45
open System.Threading
56
open System.Threading.Tasks
67
open NUnit.Framework
78

9+
type Node =
10+
{
11+
Idx : int
12+
Deps : int[]
13+
Dependants : int[]
14+
mutable Result : string option
15+
mutable UnprocessedDepsCount : int
16+
_lock : Object
17+
}
18+
819
[<Test>]
920
let runCompiler () =
1021
// let args =
1122
// System.IO.File.ReadAllLines(@"C:\projekty\fsharp\heuristic\tests\FSharp.Compiler.Service.Tests2\args.txt") |> Array.skip 1
1223
// FSharp.Compiler.CommandLineMain.main args |> ignore
1324

14-
let go (idx : int) =
15-
printfn $"{idx} start"
16-
Thread.Sleep(1000)
17-
printfn $"{idx} stop"
25+
let fileDeps =
26+
[|
27+
0, [||] // A
28+
1, [|0|] // B1 -> A
29+
2, [|1|] // B2 -> B1
30+
3, [|0|] // C1 -> A
31+
4, [|3|] // C2 -> C1
32+
5, [|2; 4|] // D -> B2, C2
33+
|]
34+
35+
let fileDependants =
36+
fileDeps
37+
// Collect all edges
38+
|> Array.collect (fun (idx, deps) -> deps |> Array.map (fun dep -> idx, dep))
39+
// Group dependants of the same dependencies together
40+
|> Array.groupBy (fun (idx, dep) -> dep)
41+
// Construct reversed graph
42+
|> Array.map (fun (dep, edges) -> dep, edges |> Array.map fst)
43+
|> dict
44+
// Add nodes that are missing due to having no dependants
45+
|> fun g ->
46+
fileDeps
47+
|> Array.map fst
48+
|> Array.map (fun idx ->
49+
match g.TryGetValue idx with
50+
| true, dependants -> dependants
51+
| false, _ -> [||]
52+
)
53+
54+
let graph =
55+
fileDeps
56+
|> Seq.map (fun (idx, deps) -> idx, {Idx = idx; Deps = deps; Dependants = fileDependants[idx]; Result = None; UnprocessedDepsCount = deps.Length; _lock = Object()})
57+
|> dict
1858

1959
printfn "start"
2060
use q = new BlockingCollection<int>()
21-
let work (idx : int) =
61+
62+
// Add leaves to the queue
63+
let filesWithoutDeps =
64+
graph
65+
|> Seq.filter (fun x -> x.Value.UnprocessedDepsCount = 0)
66+
filesWithoutDeps
67+
|> Seq.iter (fun f -> q.Add(f.Key))
68+
69+
// Keep track of the number of items to be processed
70+
let l = Object()
71+
let mutable unprocessedCount = graph.Count
72+
73+
let decrementProcessedCount () =
74+
lock l (fun () ->
75+
unprocessedCount <- unprocessedCount - 1
76+
printfn $"UnprocessedCount = {unprocessedCount}"
77+
)
78+
79+
let actualWork (idx : int) =
80+
idx.ToString()
81+
82+
// Processing of a single node/file - gives a result
83+
let go (idx : int) =
84+
let node = graph[idx]
85+
printfn $"Start {idx} -> %+A{node.Deps}"
86+
Thread.Sleep(500)
87+
let res = actualWork idx
88+
node.Result <- Some res
89+
printfn $" Stop {idx} work"
90+
91+
// Increment processed deps count for all dependants and schedule those who are now unblocked
92+
node.Dependants
93+
|> Array.iter (fun dependant ->
94+
let node = graph[dependant]
95+
let unprocessedDepsCount =
96+
lock node._lock (fun () ->
97+
node.UnprocessedDepsCount <- node.UnprocessedDepsCount - 1
98+
node.UnprocessedDepsCount
99+
)
100+
printfn $"{idx}'s dependant {dependant} now has {unprocessedDepsCount} unprocessed deps left"
101+
// Dependant is unblocked - schedule it
102+
if unprocessedDepsCount = 0 then
103+
printfn $"Scheduling {dependant}"
104+
q.Add(dependant)
105+
)
106+
107+
printfn $"Quitting {idx}"
108+
decrementProcessedCount ()
109+
()
110+
111+
let workerWork (idx : int) =
22112
printfn $"start worker {idx}"
23113
q.GetConsumingEnumerable()
24114
|> Seq.iter go
25115
printfn $"end worker {idx}"
116+
26117
let maxParallel = 4
27118
printfn "workers"
28119
let workers =
29120
[|1..maxParallel|]
30-
|> Array.map (fun idx -> Task.Factory.StartNew(fun () -> work idx))
121+
|> Array.map (fun idx -> Task.Factory.StartNew(fun () -> workerWork idx))
122+
123+
while unprocessedCount > 0 do
124+
Thread.Sleep(100)
31125

32-
printfn "adding"
33-
for i in [|1..20|] do
34-
q.Add(i)
35-
Thread.Sleep(300)
36126
printfn "CompleteAdding"
37127
q.CompleteAdding()
38128
printfn "waitall"

0 commit comments

Comments
 (0)