33using System . Collections . Generic ;
44using System . Threading ;
55using System . Threading . Tasks ;
6+ using Microsoft . Extensions . Logging ;
67
78namespace OmniSharp . Extensions . JsonRpc
89{
910 public class ProcessScheduler : IScheduler
1011 {
11- private readonly BlockingCollection < ( RequestProcessType type , Func < Task > request ) > _queue ;
12+ private readonly ILogger < ProcessScheduler > _logger ;
13+ private readonly BlockingCollection < ( RequestProcessType type , string name , Func < Task > request ) > _queue ;
1214 private readonly CancellationTokenSource _cancel ;
1315 private readonly Thread _thread ;
1416
15- public ProcessScheduler ( )
17+ public ProcessScheduler ( ILoggerFactory loggerFactory )
1618 {
17- _queue = new BlockingCollection < ( RequestProcessType type , Func < Task > request ) > ( ) ;
19+ _logger = loggerFactory . CreateLogger < ProcessScheduler > ( ) ;
20+ _queue = new BlockingCollection < ( RequestProcessType type , string name , Func < Task > request ) > ( ) ;
1821 _cancel = new CancellationTokenSource ( ) ;
1922 _thread = new Thread ( ProcessRequestQueue ) { IsBackground = true , Name = "ProcessRequestQueue" } ;
2023 }
@@ -24,9 +27,9 @@ public void Start()
2427 _thread . Start ( ) ;
2528 }
2629
27- public void Add ( RequestProcessType type , Func < Task > request )
30+ public void Add ( RequestProcessType type , string name , Func < Task > request )
2831 {
29- _queue . Add ( ( type , request ) ) ;
32+ _queue . Add ( ( type , name , request ) ) ;
3033 }
3134
3235 private Task Start ( Func < Task > request )
@@ -42,7 +45,7 @@ private List<Task> RemoveCompleteTasks(List<Task> list)
4245 if ( list . Count == 0 ) return list ;
4346
4447 var result = new List < Task > ( ) ;
45- foreach ( var t in list )
48+ foreach ( var t in list )
4649 {
4750 if ( t . IsFaulted )
4851 {
@@ -69,20 +72,32 @@ private void ProcessRequestQueue()
6972 {
7073 if ( _queue . TryTake ( out var item , Timeout . Infinite , token ) )
7174 {
72- var ( type , request ) = item ;
73- if ( type == RequestProcessType . Serial )
75+ var ( type , name , request ) = item ;
76+ try
7477 {
75- Task . WaitAll ( waitables . ToArray ( ) , token ) ;
76- Start ( request ) . Wait ( token ) ;
78+ if ( type == RequestProcessType . Serial )
79+ {
80+ Task . WaitAll ( waitables . ToArray ( ) , token ) ;
81+ Start ( request ) . Wait ( token ) ;
82+ }
83+ else if ( type == RequestProcessType . Parallel )
84+ {
85+ waitables . Add ( Start ( request ) ) ;
86+ }
87+ else
88+ throw new NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ;
89+ waitables = RemoveCompleteTasks ( waitables ) ;
90+ Interlocked . Exchange ( ref _TestOnly_NonCompleteTaskCount , waitables . Count ) ;
7791 }
78- else if ( type == RequestProcessType . Parallel )
92+ catch ( OperationCanceledException ex ) when ( ex . CancellationToken == token )
7993 {
80- waitables . Add ( Start ( request ) ) ;
94+ throw ;
95+ }
96+ catch ( Exception e )
97+ {
98+ // TODO: Create proper event ids
99+ _logger . LogCritical ( Events . UnhandledRequest , e , "Unhandled exception executing request {Name}" , name ) ;
81100 }
82- else
83- throw new NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ;
84- waitables = RemoveCompleteTasks ( waitables ) ;
85- Interlocked . Exchange ( ref _TestOnly_NonCompleteTaskCount , waitables . Count ) ;
86101 }
87102 }
88103 }
@@ -112,4 +127,9 @@ public void Dispose()
112127 _cancel . Dispose ( ) ;
113128 }
114129 }
130+
131+ static class Events
132+ {
133+ public static EventId UnhandledRequest = new EventId ( 1337_100 ) ;
134+ }
115135}
0 commit comments