diff --git a/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs b/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs index cf30ac0c67..e769ce7037 100644 --- a/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs +++ b/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs @@ -1,22 +1,17 @@ using J2N.Threading; using J2N.Threading.Atomic; -using Lucene.Net.Analysis.Standard; -using Lucene.Net.Documents; using Lucene.Net.Index.Extensions; -using Lucene.Net.Store; using Lucene.Net.Util; using NUnit.Framework; using RandomizedTesting.Generators; using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; -using Console = Lucene.Net.Util.SystemConsole; using Assert = Lucene.Net.TestFramework.Assert; - +using Console = Lucene.Net.Util.SystemConsole; namespace Lucene.Net.Search { @@ -60,6 +55,7 @@ namespace Lucene.Net.Search using TextField = Lucene.Net.Documents.TextField; using ThreadedIndexingAndSearchingTestCase = Lucene.Net.Index.ThreadedIndexingAndSearchingTestCase; using TrackingIndexWriter = Lucene.Net.Index.TrackingIndexWriter; + //using ThreadInterruptedException = Lucene.Net.Util.ThreadInterruptedException; using Version = Lucene.Net.Util.LuceneVersion; [SuppressCodecs("SimpleText", "Memory", "Direct")] @@ -400,12 +396,10 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); TrackingIndexWriter writer = new TrackingIndexWriter(_writer); SearcherManager manager = new SearcherManager(_writer, false, null); - Document doc = new Document(); doc.Add(NewTextField("test", "test", Field.Store.YES)); writer.AddDocument(doc); manager.MaybeRefresh(); - var t = new ThreadAnonymousClass(this, latch, signal, writer, manager); t.Start(); _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through @@ -422,7 +416,6 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() { manager.Release(searcher); } - ControlledRealTimeReopenThread thread = new ControlledRealTimeReopenThread(writer, manager, 0.01, 0.01); thread.Start(); // start reopening if (Verbose) @@ -434,7 +427,6 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() var waiter = new ThreadAnonymousClass2(this, lastGen, thread, finished); waiter.Start(); manager.MaybeRefresh(); - waiter.Join(1000); if (!finished) { @@ -742,259 +734,5 @@ public override void Run() } } } - - - /// - /// This test was purposely written in a way that demonstrates how to use the - /// ControlledRealTimeReopenThread. It contains seperate Asserts for each of - /// several use cases rather then trying to brake these use cases up into - /// seperate unit tests. This combined approach makes the behavior of - /// ControlledRealTimeReopenThread easier to understand. - /// - // LUCENENET specific - An extra test to demonstrate use of ControlledRealTimeReopen. - [Test] - public void TestStraightForwardDemonstration() - { - - RAMDirectory indexDir = new RAMDirectory(); - - Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT); - IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer); - IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig); - TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter); - - Document doc = new Document(); - doc.Add(new Int32Field("id", 1, Field.Store.YES)); - doc.Add(new StringField("name", "Doc1", Field.Store.YES)); - trackingWriter.AddDocument(doc); - - SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null); - - //Reopen SearcherManager every 1 secs via background thread if no thread waiting for newer generation. - //Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation. - var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(trackingWriter, searcherManager, 1, 0.2); - - //Start() will start a seperate thread that will invoke the object's Run(). However, - //calling Run() directly would execute that code on the current thread rather then a new thread - //which would defeat the purpose of using controlledRealTimeReopenThread. This aspect of the API - //is not as intuitive as it could be. ie. Call Start() not Run(). - controlledRealTimeReopenThread.IsBackground = true; //Set as a background thread - controlledRealTimeReopenThread.Name = "Controlled Real Time Reopen Thread"; - controlledRealTimeReopenThread.Priority = (ThreadPriority)Math.Min((int)Thread.CurrentThread.Priority + 2, (int)ThreadPriority.Highest); - controlledRealTimeReopenThread.Start(); - - //An indexSearcher only sees Doc1 - IndexSearcher indexSearcher = searcherManager.Acquire(); - try - { - TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1); - assertEquals(1, topDocs.TotalHits); //There is only one doc - } - finally - { - searcherManager.Release(indexSearcher); - } - - //Add a 2nd document - doc = new Document(); - doc.Add(new Int32Field("id", 2, Field.Store.YES)); - doc.Add(new StringField("name", "Doc2", Field.Store.YES)); - trackingWriter.AddDocument(doc); - - //Demonstrate that we can only see the first doc because we haven't - //waited 1 sec or called WaitForGeneration - indexSearcher = searcherManager.Acquire(); - try - { - TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1); - assertEquals(1, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs - } - finally - { - searcherManager.Release(indexSearcher); - } - - - //Demonstrate that we can see both docs after we wait a little more - //then 1 sec so that controlledRealTimeReopenThread max interval is exceeded - //and it calls MaybeRefresh - Thread.Sleep(1100); //wait 1.1 secs as ms - indexSearcher = searcherManager.Acquire(); - try - { - TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1); - assertEquals(2, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs - } - finally - { - searcherManager.Release(indexSearcher); - } - - - //Add a 3rd document - doc = new Document(); - doc.Add(new Int32Field("id", 3, Field.Store.YES)); - doc.Add(new StringField("name", "Doc3", Field.Store.YES)); - long generation = trackingWriter.AddDocument(doc); - - //Demonstrate that if we call WaitForGeneration our wait will be - // .2 secs or less (the min interval we set earlier) and then we will - //see all 3 documents. - Stopwatch stopwatch = Stopwatch.StartNew(); - controlledRealTimeReopenThread.WaitForGeneration(generation); - stopwatch.Stop(); - assertTrue(stopwatch.Elapsed.TotalMilliseconds <= 200 + 30); //30ms is fudged factor to account for call overhead. - - indexSearcher = searcherManager.Acquire(); - try - { - TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1); - assertEquals(3, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs - } - finally - { - searcherManager.Release(indexSearcher); - } - - controlledRealTimeReopenThread.Dispose(); - searcherManager.Dispose(); - indexWriter.Dispose(); - indexDir.Dispose(); - } - - - - /// - /// In this test multiple threads are created each of which is waiting on the same - /// generation before doing a search. These threads will all stack up on the - /// WaitForGeneration(generation) call. All threads should return from this call - /// in approximately in the time expected, namely the value for targetMinStaleSec passed - /// to ControlledRealTimeReopenThread in it's constructor. - /// - // LUCENENET specific - An extra test to test multithreaded use of ControlledRealTimeReopen. - [Test] - public void TestMultithreadedWaitForGeneration() - { - Thread CreateWorker(int threadNum, ControlledRealTimeReopenThread controlledReopen, long generation, - SearcherManager searcherManager, List outputList) - { - ThreadStart threadStart = delegate - { - - Stopwatch stopwatch = Stopwatch.StartNew(); - controlledReopen.WaitForGeneration(generation); - stopwatch.Stop(); - double milliSecsWaited = stopwatch.Elapsed.TotalMilliseconds; - - int numRecs = 0; - IndexSearcher indexSearcher = searcherManager.Acquire(); - try - { - TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1); - numRecs = topDocs.TotalHits; - } - finally - { - searcherManager.Release(indexSearcher); - } - - lock (outputList) - { - outputList.Add(new ThreadOutput { ThreadNum = threadNum, NumRecs = numRecs, MilliSecsWaited = milliSecsWaited }); - } - - }; - return new Thread(threadStart); - } - - int threadCount = 3; - List outputList = new List(); - - RAMDirectory indexDir = new RAMDirectory(); - Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT); - IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer); - IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig); - TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter); - - //Add two documents - Document doc = new Document(); - doc.Add(new Int32Field("id", 1, Field.Store.YES)); - doc.Add(new StringField("name", "Doc1", Field.Store.YES)); - long generation = trackingWriter.AddDocument(doc); - - doc.Add(new Int32Field("id", 2, Field.Store.YES)); - doc.Add(new StringField("name", "Doc3", Field.Store.YES)); - generation = trackingWriter.AddDocument(doc); - - SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null); - - //Reopen SearcherManager every 2 secs via background thread if no thread waiting for newer generation. - //Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation. - double maxRefreshSecs = 2.0; - double minRefreshSecs = .2; - var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(trackingWriter, searcherManager, maxRefreshSecs, minRefreshSecs); - - //Start() will start a seperate thread that will invoke the object's Run(). However, - //calling Run() directly would execute that code on the current thread rather then a new thread - //which would defeat the purpose of using controlledRealTimeReopenThread. This aspect of the API - //is not as intuitive as it could be. ie. Call Start() not Run(). - controlledRealTimeReopenThread.IsBackground = true; //Set as a background thread - controlledRealTimeReopenThread.Name = "Controlled Real Time Reopen Thread"; - controlledRealTimeReopenThread.Priority = (ThreadPriority)Math.Min((int)Thread.CurrentThread.Priority + 2, (int)ThreadPriority.Highest); - controlledRealTimeReopenThread.Start(); - - - //Create the threads for doing searchers - List threadList = new List(); - for (int i = 1; i <= threadCount; i++) - { - threadList.Add(CreateWorker(i, controlledRealTimeReopenThread, generation, searcherManager, outputList)); - } - - //Start all the threads - foreach (Thread thread in threadList) - { - thread.Start(); - } - - //wait for the threads to finish. - foreach (Thread thread in threadList) - { - thread.Join(); //will wait here until the thread terminates. - } - - //Now make sure that no thread waited longer then our min refresh time - //plus a small fudge factor. Also verify that all threads resported back and - //each saw 2 records. - - //Verify all threads reported back a result. - assertEquals(threadCount, outputList.Count); - - int millisecsPerSec = 1000; - foreach (ThreadOutput output in outputList) - { - //Verify the thread saw exactly 2 docs - assertEquals(2, output.NumRecs); - - //Verify the thread wait time was around what was expected. - Assert.True(output.MilliSecsWaited <= (minRefreshSecs * millisecsPerSec) + 30); //30ms is fudged factor to account for call overhead - } - - controlledRealTimeReopenThread.Dispose(); //will kill and join to the thread - Assert.False(controlledRealTimeReopenThread.IsAlive); //to prove that Dispose really does kill the thread. - - searcherManager.Dispose(); - indexWriter.Dispose(); - indexDir.Dispose(); - - } - - [DebuggerDisplay("ThreadNum:{ThreadNum}, NumRecs:{NumRecs}, MilliSecsWaited:{MilliSecsWaited}")] - public class ThreadOutput - { - public int ThreadNum { get; set; } - public int NumRecs { get; set; } - public double MilliSecsWaited { get; set; } - } } } \ No newline at end of file diff --git a/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs b/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs index c11b15f007..961ea847bd 100644 --- a/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs +++ b/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs @@ -42,7 +42,6 @@ namespace Lucene.Net.Search public class ControlledRealTimeReopenThread : ThreadJob, IDisposable where T : class { - // LUCENENET: java final converted readonly private readonly ReferenceManager manager; private readonly long targetMaxStaleNS; private readonly long targetMinStaleNS; @@ -51,10 +50,9 @@ public class ControlledRealTimeReopenThread : ThreadJob, IDisposable private long waitingGen; private long searchingGen; private long refreshStartGen; - private bool isDisposed = false; - private readonly EventWaitHandle intrinsic = new ManualResetEvent(false); // LUCENENET specific: used to mimic intrinsic monitor used by java wait and notifyAll keywords. - private readonly EventWaitHandle reopenCond = new AutoResetEvent(false); // LUCENENET NOTE: unlike java, in c# we don't need to lock reopenCond when calling methods on it. + private readonly EventWaitHandle reopenCond = new AutoResetEvent(false); // LUCENENET: marked readonly + private readonly EventWaitHandle available = new AutoResetEvent(false); // LUCENENET: marked readonly /// /// Create , to periodically @@ -77,7 +75,6 @@ public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManag { throw new ArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec.ToString("0.0") + ") < targetMinStaleSec (=" + targetMinStaleSec.ToString("0.0") + ")"); } - this.writer = writer; this.manager = manager; this.targetMaxStaleNS = (long)(1000000000 * targetMaxStaleSec); @@ -108,65 +105,46 @@ private void RefreshDone() { lock (this) { - searchingGen = refreshStartGen; - intrinsic.Set(); // LUCENENET NOTE: Will notify all and remain signaled, so it must be reset in WaitForGeneration + // if we're finishing, , make it out so that all waiting search threads will return + searchingGen = finish ? long.MaxValue : refreshStartGen; + available.Set(); } + reopenCond.Reset(); } /// - /// Kills the thread and releases all resources used by the - /// . Also joins to the - /// thread so that when this method returns the thread is no longer alive. + /// Releases all resources used by the . /// public void Dispose() { - Dispose(disposing: true); + Dispose(true); GC.SuppressFinalize(this); } - - /// - /// Kills the thread and releases all resources used by the - /// . Also joins to the - /// thread so that when this method returns the thread is no longer alive. + /// Releases resources used by the and + /// if overridden in a derived class, optionally releases unmanaged resources. /// - // LUCENENET specific - Support for Dispose(bool) since this is a non-sealed class. + /// true to release both managed and unmanaged resources; + /// false to release only unmanaged resources. + + // LUCENENET specific - implemented proper dispose pattern protected virtual void Dispose(bool disposing) { - lock (this) + if (disposing) { - if (isDisposed) - { - return; - } - - if (disposing) - { + finish = true; + reopenCond.Set(); - finish = true; + Join(); + // LUCENENET NOTE: No need to catch and rethrow same excepton type ThreadInterruptedException - // So thread wakes up and notices it should finish: - reopenCond.Set(); - - Join(); - // LUCENENET NOTE: No need to catch and rethrow same excepton type ThreadInterruptedException - - // Max it out so any waiting search threads will return: - searchingGen = long.MaxValue; - intrinsic.Set(); //LUCENENET NOTE: notify all the waiting threads to wake them up. - - // LUCENENET specific: dispose reset event - reopenCond.Dispose(); - intrinsic.Dispose(); - - } - - isDisposed = true; + // LUCENENET specific: dispose reset event + reopenCond.Dispose(); + available.Dispose(); } } - /// /// Waits for the target generation to become visible in /// the searcher. @@ -201,36 +179,29 @@ public virtual void WaitForGeneration(long targetGen) /// or false if wait time was exceeded public virtual bool WaitForGeneration(long targetGen, int maxMS) { - // LUCENENET NOTE: Porting this method is a bit tricky because the java wait method releases the - // syncronize lock and c# has no similar primitive. So we must handle locking a - // bit differently here to mimic that affect. - long curGen = writer.Generation; if (targetGen > curGen) { throw new ArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")"); } - lock (this) - { if (targetGen <= searchingGen) - { return true; + else + { + waitingGen = Math.Max(waitingGen, targetGen); + reopenCond.Set(); + available.Reset(); } - // Need to find waitingGen inside lock as its used to determine - // stale time - waitingGen = Math.Max(waitingGen, targetGen); - reopenCond.Set(); // LUCENENET NOTE: gives Run() an oppertunity to notice one is now waiting if one wasn't before. - intrinsic.Reset(); // LUCENENET specific: required to "close the door". Java's notifyAll keyword didn't need this. - } - long startMS = Time.NanoTime() / 1000000; - while (targetGen > Interlocked.Read(ref searchingGen)) // LUCENENET specific - reading searchingGen not thread safe, so use Interlocked.Read() + + // LUCENENET specific - reading searchingGen not thread safe, so use Interlocked.Read() + while (targetGen > Interlocked.Read(ref searchingGen)) { if (maxMS < 0) { - intrinsic.WaitOne(); + available.WaitOne(); } else { @@ -241,7 +212,7 @@ public virtual bool WaitForGeneration(long targetGen, int maxMS) } else { - intrinsic.WaitOne(TimeSpan.FromMilliseconds(msLeft)); + available.WaitOne(TimeSpan.FromMilliseconds(msLeft)); } } } @@ -253,42 +224,23 @@ public override void Run() { // TODO: maybe use private thread ticktock timer, in // case clock shift messes up nanoTime? - // LUCENENET NOTE: Time.NanoTime() is not affected by clock changes. long lastReopenStartNS = Time.NanoTime(); //System.out.println("reopen: start"); while (!finish) { + bool hasWaiting; - // TODO: try to guestimate how long reopen might - // take based on past data? + lock (this) + hasWaiting = waitingGen > searchingGen; - // Loop until we've waiting long enough before the - // next reopen: - while (!finish) - { + long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); + long sleepNS = nextReopenStartNS - Time.NanoTime(); + if (sleepNS > 0) try { - // Need lock before finding out if has waiting - bool hasWaiting; - lock (this) - { - // True if we have someone waiting for reopened searcher: - hasWaiting = waitingGen > searchingGen; - } - - long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); - long sleepNS = nextReopenStartNS - Time.NanoTime(); - - if (sleepNS > 0) - { - reopenCond.WaitOne(TimeSpan.FromMilliseconds(sleepNS / Time.MillisecondsPerNanosecond));//Convert NS to MS - } - else - { - break; - } + reopenCond.WaitOne(TimeSpan.FromMilliseconds(sleepNS / Time.MillisecondsPerNanosecond));//Convert NS to MS } catch (Exception ie) when (ie.IsInterruptedException()) { @@ -296,7 +248,6 @@ public override void Run() return; } - } if (finish) { break; @@ -315,10 +266,9 @@ public override void Run() { throw RuntimeException.Create(ioe); } - } + // this will set the searchingGen so that all waiting threads will exit + RefreshDone(); } - - } } \ No newline at end of file