Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix: Re-ported ControlledRealTimeReopenThread and added 2 new tests #513

Merged
merged 7 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 264 additions & 2 deletions src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
using J2N.Threading;
using J2N.Threading.Atomic;
using Lucene.Net.Analysis.Standard;
using Lucene.Net.Documents;
using Lucene.Net.Index.Extensions;
using Lucene.Net.Store;
using Lucene.Net.Util;
using NUnit.Framework;
using RandomizedTesting.Generators;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Assert = Lucene.Net.TestFramework.Assert;
using Console = Lucene.Net.Util.SystemConsole;
using Assert = Lucene.Net.TestFramework.Assert;


namespace Lucene.Net.Search
{
Expand Down Expand Up @@ -55,7 +60,6 @@ namespace Lucene.Net.Search
using TextField = Lucene.Net.Documents.TextField;
using ThreadedIndexingAndSearchingTestCase = Lucene.Net.Index.ThreadedIndexingAndSearchingTestCase;
using TrackingIndexWriter = Lucene.Net.Index.TrackingIndexWriter;
//using ThreadInterruptedException = Lucene.Net.Util.ThreadInterruptedException;
using Version = Lucene.Net.Util.LuceneVersion;

[SuppressCodecs("SimpleText", "Memory", "Direct")]
Expand Down Expand Up @@ -396,10 +400,12 @@ public virtual void TestThreadStarvationNoDeleteNRTReader()
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
SearcherManager manager = new SearcherManager(_writer, false, null);

Document doc = new Document();
doc.Add(NewTextField("test", "test", Field.Store.YES));
writer.AddDocument(doc);
manager.MaybeRefresh();

var t = new ThreadAnonymousClass(this, latch, signal, writer, manager);
t.Start();
_writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
Expand All @@ -416,6 +422,7 @@ public virtual void TestThreadStarvationNoDeleteNRTReader()
{
manager.Release(searcher);
}

ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<IndexSearcher>(writer, manager, 0.01, 0.01);
thread.Start(); // start reopening
if (Verbose)
Expand All @@ -427,6 +434,7 @@ public virtual void TestThreadStarvationNoDeleteNRTReader()
var waiter = new ThreadAnonymousClass2(this, lastGen, thread, finished);
waiter.Start();
manager.MaybeRefresh();

waiter.Join(1000);
if (!finished)
{
Expand Down Expand Up @@ -734,5 +742,259 @@ public override void Run()
}
}
}


/// <summary>
/// This test was purposely written in a way that demonstrates how to use the
/// ControlledRealTimeReopenThread. It contains seperate Asserts for each of
/// several use cases rather then trying to brake these use cases up into
/// seperate unit tests. This combined approach makes the behavior of
/// ControlledRealTimeReopenThread easier to understand.
/// </summary>
// LUCENENET specific - An extra test to demonstrate use of ControlledRealTimeReopen.
[Test]
public void TestStraightForwardDemonstration()
{

RAMDirectory indexDir = new RAMDirectory();

Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT);
IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer);
IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig);
TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter);

Document doc = new Document();
doc.Add(new Int32Field("id", 1, Field.Store.YES));
doc.Add(new StringField("name", "Doc1", Field.Store.YES));
trackingWriter.AddDocument(doc);

SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null);

//Reopen SearcherManager every 1 secs via background thread if no thread waiting for newer generation.
//Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation.
var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(trackingWriter, searcherManager, 1, 0.2);

//Start() will start a seperate thread that will invoke the object's Run(). However,
//calling Run() directly would execute that code on the current thread rather then a new thread
//which would defeat the purpose of using controlledRealTimeReopenThread. This aspect of the API
//is not as intuitive as it could be. ie. Call Start() not Run().
controlledRealTimeReopenThread.IsBackground = true; //Set as a background thread
controlledRealTimeReopenThread.Name = "Controlled Real Time Reopen Thread";
controlledRealTimeReopenThread.Priority = (ThreadPriority)Math.Min((int)Thread.CurrentThread.Priority + 2, (int)ThreadPriority.Highest);
controlledRealTimeReopenThread.Start();

//An indexSearcher only sees Doc1
IndexSearcher indexSearcher = searcherManager.Acquire();
try
{
TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1);
assertEquals(1, topDocs.TotalHits); //There is only one doc
}
finally
{
searcherManager.Release(indexSearcher);
}

//Add a 2nd document
doc = new Document();
doc.Add(new Int32Field("id", 2, Field.Store.YES));
doc.Add(new StringField("name", "Doc2", Field.Store.YES));
trackingWriter.AddDocument(doc);

//Demonstrate that we can only see the first doc because we haven't
//waited 1 sec or called WaitForGeneration
indexSearcher = searcherManager.Acquire();
try
{
TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1);
assertEquals(1, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs
}
finally
{
searcherManager.Release(indexSearcher);
}


//Demonstrate that we can see both docs after we wait a little more
//then 1 sec so that controlledRealTimeReopenThread max interval is exceeded
//and it calls MaybeRefresh
Thread.Sleep(1100); //wait 1.1 secs as ms
indexSearcher = searcherManager.Acquire();
try
{
TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1);
assertEquals(2, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs
}
finally
{
searcherManager.Release(indexSearcher);
}


//Add a 3rd document
doc = new Document();
doc.Add(new Int32Field("id", 3, Field.Store.YES));
doc.Add(new StringField("name", "Doc3", Field.Store.YES));
long generation = trackingWriter.AddDocument(doc);

//Demonstrate that if we call WaitForGeneration our wait will be
// .2 secs or less (the min interval we set earlier) and then we will
//see all 3 documents.
Stopwatch stopwatch = Stopwatch.StartNew();
controlledRealTimeReopenThread.WaitForGeneration(generation);
stopwatch.Stop();
assertTrue(stopwatch.Elapsed.TotalMilliseconds <= 200 + 30); //30ms is fudged factor to account for call overhead.

indexSearcher = searcherManager.Acquire();
try
{
TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1);
assertEquals(3, topDocs.TotalHits); //Can see both docs due to auto refresh after 1.1 secs
}
finally
{
searcherManager.Release(indexSearcher);
}

controlledRealTimeReopenThread.Dispose();
searcherManager.Dispose();
indexWriter.Dispose();
indexDir.Dispose();
}



/// <summary>
/// In this test multiple threads are created each of which is waiting on the same
/// generation before doing a search. These threads will all stack up on the
/// WaitForGeneration(generation) call. All threads should return from this call
/// in approximately in the time expected, namely the value for targetMinStaleSec passed
/// to ControlledRealTimeReopenThread in it's constructor.
/// </summary>
// LUCENENET specific - An extra test to test multithreaded use of ControlledRealTimeReopen.
[Test]
public void TestMultithreadedWaitForGeneration()
{
Thread CreateWorker(int threadNum, ControlledRealTimeReopenThread<IndexSearcher> controlledReopen, long generation,
SearcherManager searcherManager, List<ThreadOutput> outputList)
{
ThreadStart threadStart = delegate
{

Stopwatch stopwatch = Stopwatch.StartNew();
controlledReopen.WaitForGeneration(generation);
stopwatch.Stop();
double milliSecsWaited = stopwatch.Elapsed.TotalMilliseconds;

int numRecs = 0;
IndexSearcher indexSearcher = searcherManager.Acquire();
try
{
TopDocs topDocs = indexSearcher.Search(new MatchAllDocsQuery(), 1);
numRecs = topDocs.TotalHits;
}
finally
{
searcherManager.Release(indexSearcher);
}

lock (outputList)
{
outputList.Add(new ThreadOutput { ThreadNum = threadNum, NumRecs = numRecs, MilliSecsWaited = milliSecsWaited });
}

};
return new Thread(threadStart);
}

int threadCount = 3;
List<ThreadOutput> outputList = new List<ThreadOutput>();

RAMDirectory indexDir = new RAMDirectory();
Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT);
IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer);
IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig);
TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter);

//Add two documents
Document doc = new Document();
doc.Add(new Int32Field("id", 1, Field.Store.YES));
doc.Add(new StringField("name", "Doc1", Field.Store.YES));
long generation = trackingWriter.AddDocument(doc);

doc.Add(new Int32Field("id", 2, Field.Store.YES));
doc.Add(new StringField("name", "Doc3", Field.Store.YES));
generation = trackingWriter.AddDocument(doc);

SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null);

//Reopen SearcherManager every 2 secs via background thread if no thread waiting for newer generation.
//Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation.
double maxRefreshSecs = 2.0;
double minRefreshSecs = .2;
var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(trackingWriter, searcherManager, maxRefreshSecs, minRefreshSecs);

//Start() will start a seperate thread that will invoke the object's Run(). However,
//calling Run() directly would execute that code on the current thread rather then a new thread
//which would defeat the purpose of using controlledRealTimeReopenThread. This aspect of the API
//is not as intuitive as it could be. ie. Call Start() not Run().
controlledRealTimeReopenThread.IsBackground = true; //Set as a background thread
controlledRealTimeReopenThread.Name = "Controlled Real Time Reopen Thread";
controlledRealTimeReopenThread.Priority = (ThreadPriority)Math.Min((int)Thread.CurrentThread.Priority + 2, (int)ThreadPriority.Highest);
controlledRealTimeReopenThread.Start();


//Create the threads for doing searchers
List<Thread> threadList = new List<Thread>();
for (int i = 1; i <= threadCount; i++)
{
threadList.Add(CreateWorker(i, controlledRealTimeReopenThread, generation, searcherManager, outputList));
}

//Start all the threads
foreach (Thread thread in threadList)
{
thread.Start();
}

//wait for the threads to finish.
foreach (Thread thread in threadList)
{
thread.Join(); //will wait here until the thread terminates.
}

//Now make sure that no thread waited longer then our min refresh time
//plus a small fudge factor. Also verify that all threads resported back and
//each saw 2 records.

//Verify all threads reported back a result.
assertEquals(threadCount, outputList.Count);

int millisecsPerSec = 1000;
foreach (ThreadOutput output in outputList)
{
//Verify the thread saw exactly 2 docs
assertEquals(2, output.NumRecs);

//Verify the thread wait time was around what was expected.
Assert.True(output.MilliSecsWaited <= (minRefreshSecs * millisecsPerSec) + 30); //30ms is fudged factor to account for call overhead
}

controlledRealTimeReopenThread.Dispose(); //will kill and join to the thread
Assert.False(controlledRealTimeReopenThread.IsAlive); //to prove that Dispose really does kill the thread.

searcherManager.Dispose();
indexWriter.Dispose();
indexDir.Dispose();

}

[DebuggerDisplay("ThreadNum:{ThreadNum}, NumRecs:{NumRecs}, MilliSecsWaited:{MilliSecsWaited}")]
public class ThreadOutput
{
public int ThreadNum { get; set; }
public int NumRecs { get; set; }
public double MilliSecsWaited { get; set; }
}
}
}
Loading