1- module FSharp.Compiler.Service.Tests.GraphProcessing
1+ /// Parallel processing of graph of work items with dependencies
2+ module FSharp.Compiler.Service.Tests.GraphProcessing
23
3- open System
4- open System.Collections .Concurrent
54open System.Collections .Generic
65open System.Threading
6+ open FSharp.Compiler .Service .Tests .Graph
77
88/// Used for processing
99type NodeInfo < 'Item > =
@@ -12,12 +12,12 @@ type NodeInfo<'Item> =
1212 Deps : 'Item []
1313 TransitiveDeps : 'Item []
1414 Dependants : 'Item []
15- ProcessedDepsCount : int
1615 }
1716type Node < 'Item , 'State , 'Result > =
1817 {
1918 Info : NodeInfo < 'Item >
20- Result : ( 'State * 'Result ) option
19+ mutable ProcessedDepsCount : int
20+ mutable Result : ( 'State * 'Result ) option
2121 }
2222
2323// TODO Do we need to suppress some error logging if we
@@ -66,98 +66,74 @@ let combineResults
6666 let state = Array.fold folder firstState resultsToAdd
6767 state
6868
69-
70- // TODO Test this version
71- /// Untested version that uses MailboxProcessor.
72- /// See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent for implementation
73- let processInParallelUsingMailbox
74- ( firstItems : 'Item [])
75- ( work : 'Item -> Async < 'Item []>)
76- ( parallelism : int )
77- ( notify : int -> unit )
78- ( ct : CancellationToken )
79- : unit
80- =
81- let processedCountLock = Object()
82- let mutable processedCount = 0
83- let agent = Parallel.threadingLimitAgent 10 ct
84- let rec processItem item =
85- async {
86- let! toSchedule = work item
87- let pc = lock processedCountLock ( fun () -> processedCount <- processedCount + 1 ; processedCount)
88- notify pc
89- toSchedule |> Array.iter ( fun x -> agent.Post( Parallel.Start( processItem x)))
90- }
91- firstItems |> Array.iter ( fun x -> agent.Post( Parallel.Start( processItem x)))
92- ()
93-
94- // TODO Could replace with MailboxProcessor+Tasks/Asyncs instead of BlockingCollection + Threads
95- // See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent
96- let processInParallel
97- ( firstItems : 'Item [])
98- ( work : 'Item -> 'Item [])
99- ( parallelism : int )
100- ( stop : int -> bool )
101- ( ct : CancellationToken )
102- : unit
103- =
104- let bc = new BlockingCollection< 'Item>()
105- firstItems |> Array.iter bc.Add
106- let processedCountLock = Object()
107- let mutable processedCount = 0
108- let processItem item =
109- let toSchedule = work item
110- let processedCount = lock processedCountLock ( fun () -> processedCount <- processedCount + 1 ; processedCount)
111- toSchedule |> Array.iter bc.Add
112- processedCount
113-
114- // TODO Could avoid workers with some semaphores
115- let workerWork () : unit =
116- for node in bc.GetConsumingEnumerable( ct) do
117- if not ct.IsCancellationRequested then // improve
118- let processedCount = processItem node
119- if stop processedCount then
120- bc.CompleteAdding()
121-
122- Array.Parallel.map workerWork |> ignore // use cancellation
123- ()
124-
125- let processGraph
126- ( graph : FileGraph )
127- ( doWork : 'Item -> 'State -> 'Result * 'State )
69+ // TODO Could be replaced with a simpler recursive approach with memoised per-item results
70+ let processGraph < 'Item , 'State , 'Result when 'Item : equality >
71+ ( graph : Graph < 'Item >)
72+ ( doWork : 'Item -> 'State -> 'State * 'Result )
12873 ( folder : 'State -> 'Result -> 'State )
12974 ( parallelism : int )
13075 : 'State
13176 =
132- let transitiveDeps = graph |> calcTransitiveGraph
133- let dependants = graph |> reverseGraph
134- let nodes = graph.Keys |> Seq.map ...
135- let leaves = nodes |> Seq.filter ...
77+ let transitiveDeps = graph |> Graph.transitive
78+ let dependants = graph |> Graph.reverse
79+ let makeNode ( item : 'Item ) : Node < 'Item , 'State , 'Result > =
80+ let info =
81+ {
82+ Item = item
83+ Deps = graph[ item]
84+ TransitiveDeps = transitiveDeps[ item]
85+ Dependants = dependants[ item]
86+ }
87+ {
88+ Info = info
89+ Result = None
90+ ProcessedDepsCount = 0
91+ }
92+
93+ let nodes =
94+ graph.Keys
95+ |> Seq.map ( fun item -> item, makeNode item)
96+ |> readOnlyDict
97+ let lookup item = nodes[ item]
98+ let lookupMany items = items |> Array.map lookup
99+
100+ let leaves =
101+ nodes.Values
102+ |> Seq.filter ( fun n -> n.Info.Deps.Length = 0 )
103+ |> Seq.toArray
104+
136105 let work
137106 ( node : Node < 'Item , 'State , 'Result >)
138107 : Node < 'Item , 'State , 'Result >[]
139108 =
140- let inputState = combineResults node.Deps node.TransitiveDeps folder
141- let res = doWork node.Info.Item
142- node.Result <- res
109+ let deps = lookupMany node.Info.Deps
110+ let transitiveDeps = lookupMany node.Info.TransitiveDeps
111+ let inputState = combineResults deps transitiveDeps folder
112+ let res = doWork node.Info.Item inputState
113+ node.Result <- Some res
114+ // Need to double-check that only one dependency schedules this dependant
143115 let unblocked =
144116 node.Info.Dependants
117+ |> lookupMany
145118 |> Array.filter ( fun x ->
146119 let pdc =
147120 lock x ( fun () ->
148- x.Info .ProcessedDepsCount++
149- x.Info.PrcessedDepsCount
150- )
121+ x.ProcessedDepsCount <- x .ProcessedDepsCount + 1
122+ x.ProcessedDepsCount
123+ )
151124 pdc = node.Info.Deps.Length
152125 )
153- |> Array.map ( fun x -> nodes[ x])
154- unblocked
126+ unblocked
127+
128+ use cts = new CancellationTokenSource()
155129
156- processInParallel
130+ Parallel. processInParallel
157131 leaves
158132 work
159133 parallelism
160- ( fun processedCount -> processedCount = nodes.Length)
134+ ( fun processedCount -> processedCount = nodes.Count)
135+ cts.Token
161136
162- let state = combineResults nodes nodes addCheckResultsToTcState
137+ let nodesArray = nodes.Values |> Seq.toArray
138+ let state = combineResults nodesArray nodesArray folder
163139 state
0 commit comments