Skip to content

Commit

Permalink
Fix parallel discovery to ensure discovery runs to completion if any (#…
Browse files Browse the repository at this point in the history
…866)

concurrent discovery manager is starved.
  • Loading branch information
codito authored and Faizan2304 committed Jun 15, 2017
1 parent 322bf71 commit f59a108
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Engine;
using System.Linq;

/// <summary>
/// ParallelProxyDiscoveryManager that manages parallel discovery
Expand All @@ -19,6 +20,7 @@ internal class ParallelProxyDiscoveryManager : ParallelOperationManager<IProxyDi
#region DiscoverySpecificData

private int discoveryCompletedClients = 0;
private int availableTestSources = -1;

private DiscoveryCriteria actualDiscoveryCriteria;

Expand Down Expand Up @@ -55,14 +57,19 @@ public void Initialize()
}

/// <inheritdoc/>
void IProxyDiscoveryManager.DiscoverTests(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler eventHandler)
public void DiscoverTests(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler eventHandler)
{
this.actualDiscoveryCriteria = discoveryCriteria;

// Set the enumerator for parallel yielding of sources
// Whenever a concurrent executor becomes free, it picks up the next source using this enumerator
this.sourceEnumerator = discoveryCriteria.Sources.GetEnumerator();
this.availableTestSources = discoveryCriteria.Sources.Count();

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ParallelProxyDiscoveryManager: Start discovery. Total sources: " + this.availableTestSources);
}
this.DiscoverTestsPrivate(eventHandler);
}

Expand All @@ -86,14 +93,50 @@ public void Close()
public bool HandlePartialDiscoveryComplete(IProxyDiscoveryManager proxyDiscoveryManager, long totalTests, IEnumerable<TestCase> lastChunk, bool isAborted)
{
var allDiscoverersCompleted = false;
lock (this.discoveryStatusLockObject)
{
// Each concurrent Executor calls this method
// So, we need to keep track of total discoverycomplete calls
this.discoveryCompletedClients++;

// If there are no more sources/testcases, a parallel executor is truly done with discovery
allDiscoverersCompleted = this.discoveryCompletedClients == this.availableTestSources;

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ParallelProxyDiscoveryManager: HandlePartialDiscoveryComplete: Total completed clients = {0}, Discovery complete = {1}.", this.discoveryCompletedClients, allDiscoverersCompleted);
}
}

// Discovery is completed. Schedule the clean up for managers and handlers.
if (allDiscoverersCompleted)
{
// Reset enumerators
this.sourceEnumerator = null;

this.currentDiscoveryDataAggregator = null;
this.currentDiscoveryEventsHandler = null;

// In Case of Abort, clean old one and create a new ProxyDiscoveryManager in place of old one.
// Dispose concurrent executors
// Do not do the cleanuptask in the current thread as we will unncessarily add to discovery time
this.lastParallelDiscoveryCleanUpTask = Task.Run(() => this.UpdateParallelLevel(0));

return true;
}

// Discovery is not complete.
// First, clean up the used proxy discovery manager if the last run was aborted
// or this run doesn't support shared hosts (netcore tests)
if (!this.SharedHosts || isAborted)
{
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ParallelProxyDiscoveryManager: HandlePartialDiscoveryComplete: Replace discovery manager. Shared: {0}, Aborted: {1}.", this.SharedHosts, isAborted);
}

this.RemoveManager(proxyDiscoveryManager);

proxyDiscoveryManager = this.CreateNewConcurrentManager();

var parallelEventsHandler = new ParallelDiscoveryEventsHandler(
proxyDiscoveryManager,
this.currentDiscoveryEventsHandler,
Expand All @@ -102,34 +145,8 @@ public bool HandlePartialDiscoveryComplete(IProxyDiscoveryManager proxyDiscovery
this.AddManager(proxyDiscoveryManager, parallelEventsHandler);
}

// If there are no more sources/testcases, a parallel executor is truly done with discovery
if (!this.DiscoverTestsOnConcurrentManager(proxyDiscoveryManager))
{
lock (this.discoveryStatusLockObject)
{
// Each concurrent Executor calls this method
// So, we need to keep track of total discoverycomplete calls
this.discoveryCompletedClients++;
allDiscoverersCompleted = this.discoveryCompletedClients == this.GetConcurrentManagersCount();
}

// verify that all executors are done with the discovery and there are no more sources/testcases to execute
if (allDiscoverersCompleted)
{
// Reset enumerators
this.sourceEnumerator = null;

this.currentDiscoveryDataAggregator = null;
this.currentDiscoveryEventsHandler = null;

// Dispose concurrent executors
// Do not do the cleanuptask in the current thread as we will unncessarily add to discovery time
this.lastParallelDiscoveryCleanUpTask = Task.Run(() =>
{
this.UpdateParallelLevel(0);
});
}
}
// Second, let's attempt to trigger discovery for the next source.
this.DiscoverTestsOnConcurrentManager(proxyDiscoveryManager);

return allDiscoverersCompleted;
}
Expand All @@ -150,9 +167,10 @@ protected override void DisposeInstance(IProxyDiscoveryManager managerInstance)
{
managerInstance.Close();
}
catch (Exception)
catch (Exception ex)
{
// ignore any exceptions
EqtTrace.Error("ParallelProxyDiscoveryManager: Failed to dispose discoveyr manager. Exception: " + ex);
}
}
}
Expand All @@ -169,6 +187,11 @@ private void DiscoverTestsPrivate(ITestDiscoveryEventsHandler discoveryEventsHan
{
try
{
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ProxyParallelDiscoveryManager: Wait for last cleanup to complete.");
}

this.lastParallelDiscoveryCleanUpTask.Wait();
}
catch (Exception ex)
Expand Down Expand Up @@ -198,8 +221,7 @@ private void DiscoverTestsPrivate(ITestDiscoveryEventsHandler discoveryEventsHan
this.currentDiscoveryDataAggregator);

this.UpdateHandlerForManager(concurrentManager, parallelEventsHandler);

Task.Run(() => this.DiscoverTestsOnConcurrentManager(concurrentManager));
this.DiscoverTestsOnConcurrentManager(concurrentManager);
}
}

Expand All @@ -208,25 +230,46 @@ private void DiscoverTestsPrivate(ITestDiscoveryEventsHandler discoveryEventsHan
/// Each concurrent discoverer calls this method, once its completed working on previous data
/// </summary>
/// <param name="ProxyDiscoveryManager">Proxy discovery manager instance.</param>
/// <returns>True, if discovery triggered</returns>
private bool DiscoverTestsOnConcurrentManager(IProxyDiscoveryManager proxyDiscoveryManager)
private void DiscoverTestsOnConcurrentManager(IProxyDiscoveryManager proxyDiscoveryManager)
{
DiscoveryCriteria discoveryCriteria = null;

string nextSource = null;
if (this.TryFetchNextSource(this.sourceEnumerator, out nextSource))
// Peek to see if we have sources to trigger a discovery
if (this.TryFetchNextSource(this.sourceEnumerator, out string nextSource))
{
EqtTrace.Info("ProxyParallelDiscoveryManager: Triggering test discovery for next source: {0}", nextSource);
discoveryCriteria = new DiscoveryCriteria(new List<string>() { nextSource }, this.actualDiscoveryCriteria.FrequencyOfDiscoveredTestsEvent, this.actualDiscoveryCriteria.DiscoveredTestEventTimeout, this.actualDiscoveryCriteria.RunSettings);
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ProxyParallelDiscoveryManager: Triggering test discovery for next source: {0}", nextSource);
}

// Kick off another discovery task for the next source
var discoveryCriteria = new DiscoveryCriteria(new[] { nextSource }, this.actualDiscoveryCriteria.FrequencyOfDiscoveredTestsEvent, this.actualDiscoveryCriteria.DiscoveredTestEventTimeout, this.actualDiscoveryCriteria.RunSettings);
Task.Run(() =>
{
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("ParallelProxyDiscoveryManager: Discovery started.");
}
proxyDiscoveryManager.DiscoverTests(discoveryCriteria, this.GetHandlerForGivenManager(proxyDiscoveryManager));
})
.ContinueWith(t =>
{
// Just in case, the actual discovery couldn't start for an instance. Ensure that
// we call discovery complete since we have already fetched a source. Otherwise
// discovery will not terminate
if (EqtTrace.IsWarningEnabled)
{
EqtTrace.Warning("ParallelProxyDiscoveryManager: Failed to trigger discovery. Exception: " + t.Exception);
}
this.GetHandlerForGivenManager(proxyDiscoveryManager).HandleDiscoveryComplete(0, Enumerable.Empty<TestCase>(), true);
},
TaskContinuationOptions.OnlyOnFaulted);
}

if (discoveryCriteria != null)
if (EqtTrace.IsVerboseEnabled)
{
proxyDiscoveryManager.DiscoverTests(discoveryCriteria, this.GetHandlerForGivenManager(proxyDiscoveryManager));
return true;
EqtTrace.Verbose("ProxyParallelDiscoveryManager: No sources available for discovery.");
}

return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ internal class ProxyDataCollectionManager : IProxyDataCollectionManager
private string settingsXml;
private int connectionTimeout;


/// <summary>
/// Initializes a new instance of the <see cref="ProxyDataCollectionManager"/> class.
/// </summary>
Expand Down
Loading

0 comments on commit f59a108

Please sign in to comment.