Skip to content

Commit 934c855

Browse files
author
Zhen Li
committed
Added max limit on session pool size
1 parent 655921d commit 934c855

File tree

5 files changed

+126
-71
lines changed

5 files changed

+126
-71
lines changed

Neo4j.Driver/Neo4j.Driver.Tck.Tests/TCK/TypeSystem.feature.steps.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ public static void GlobalBeforeScenario()
5959
try { AfterScenario(); } catch { /*Do Nothing*/ }
6060
throw;
6161
}
62-
Driver = GraphDatabase.Driver(Url);
62+
var config = Config.DefaultConfig;
63+
config.MaxSessionPoolSize = Config.InfiniteSessionPoolSize;
64+
Driver = GraphDatabase.Driver(Url, config);
6365
}
6466

6567
[AfterTestRun]

Neo4j.Driver/Neo4j.Driver.Tests/SessionPoolTests.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Threading.Tasks;
2020
using FluentAssertions;
2121
using Moq;
22+
using Neo4j.Driver.Exceptions;
2223
using Neo4j.Driver.Internal;
2324
using Xunit;
2425
using Xunit.Abstractions;
@@ -38,11 +39,26 @@ public GetSessionMethod(ITestOutputHelper output)
3839
_output = output;
3940
}
4041

42+
[Fact]
43+
public void ShouldThrowExceptionWhenMaximumPoolSizeReached()
44+
{
45+
var mock = new Mock<IConnection>();
46+
var config = new Config {MaxSessionPoolSize = 2};
47+
var pool = new SessionPool(null, TestUri, config, mock.Object);
48+
pool.GetSession();
49+
pool.GetSession();
50+
pool.NumberOfAvailableSessions.Should().Be(0);
51+
pool.NumberOfInUseSessions.Should().Be(2);
52+
53+
var ex = Xunit.Record.Exception(() => pool.GetSession());
54+
ex.Should().BeOfType<ClientException>();
55+
}
56+
4157
[Fact]
4258
public void ShouldCreateNewSessionWhenQueueIsEmpty()
4359
{
4460
var mock = new Mock<IConnection>();
45-
var pool = new SessionPool(null, TestUri, null, mock.Object);
61+
var pool = new SessionPool(null, TestUri, Config.DefaultConfig, mock.Object);
4662
pool.GetSession();
4763
pool.NumberOfAvailableSessions.Should().Be(0);
4864
pool.NumberOfInUseSessions.Should().Be(1);
@@ -120,7 +136,7 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
120136
sessions.Enqueue(mock.Object);
121137
mockSessions.Enqueue(mock);
122138
}
123-
139+
124140
var pool = new SessionPool(sessions, null);
125141

126142
pool.NumberOfAvailableSessions.Should().Be(numberOfThreads);
@@ -161,6 +177,7 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
161177
{
162178
mock.Verify(x => x.Reset(), Times.Once);
163179
}
180+
164181
}
165182

166183

Neo4j.Driver/Neo4j.Driver/Config.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ namespace Neo4j.Driver
1919
/// <summary>
2020
/// Use this class to config the <see cref="Driver"/> in a certain way
2121
/// </summary>
22-
public class Config
22+
public class Config
2323
{
24-
private Config() { }
25-
24+
public const int InfiniteSessionPoolSize = 0;
2625
static Config()
2726
{
28-
DefaultConfig = new Config { TlsEnabled = false, Logger = new DebugLogger {Level = LogLevel.Info} };
27+
DefaultConfig = new Config
28+
{
29+
TlsEnabled = false,
30+
Logger = new DebugLogger {Level = LogLevel.Info},
31+
MaxSessionPoolSize = 20
32+
};
2933
}
3034

3135
/// <summary>
@@ -37,9 +41,11 @@ static Config()
3741

3842
public static IConfigBuilder Builder => new ConfigBuilder(new Config());
3943

40-
public bool TlsEnabled { get; private set; }
44+
public bool TlsEnabled { get; set; }
4145

42-
public ILogger Logger { get; private set; }
46+
public ILogger Logger { get; set; }
47+
48+
public int MaxSessionPoolSize { get; set; }
4349

4450
private class ConfigBuilder : IConfigBuilder
4551
{
@@ -62,6 +68,12 @@ public IConfigBuilder WithLogger(ILogger logger)
6268
return this;
6369
}
6470

71+
public IConfigBuilder WithMaxSessionPoolSize(int size)
72+
{
73+
_config.MaxSessionPoolSize = size;
74+
return this;
75+
}
76+
6577
public Config ToConfig()
6678
{
6779
return _config;
@@ -73,6 +85,7 @@ public interface IConfigBuilder
7385
Config ToConfig();
7486
IConfigBuilder WithTlsEnabled(bool enableTls);
7587
IConfigBuilder WithLogger(ILogger logger);
88+
IConfigBuilder WithMaxSessionPoolSize(int size);
7689
}
7790

7891

Neo4j.Driver/Neo4j.Driver/Driver.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,6 @@ public ISession Session()
6060
{
6161
return _sessionPool.GetSession();
6262
}
63+
6364
}
6465
}

Neo4j.Driver/Neo4j.Driver/Internal/SessionPool.cs

Lines changed: 84 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
// limitations under the License.
1717
using System;
1818
using System.Collections.Generic;
19+
using System.Threading;
20+
using Neo4j.Driver.Exceptions;
1921

2022
namespace Neo4j.Driver.Internal
2123
{
@@ -26,6 +28,8 @@ internal class SessionPool : LoggerBase
2628
private readonly Uri _uri;
2729
private readonly Config _config;
2830
private readonly IConnection _connection;
31+
private readonly int _maxSessionPoolSize;
32+
private int _currentPoolSize;
2933

3034
internal int NumberOfInUseSessions => _inUseSessions.Count;
3135
internal int NumberOfAvailableSessions => _availableSessions.Count;
@@ -35,77 +39,92 @@ public SessionPool(ILogger logger, Uri uri, Config config, IConnection connectio
3539
_uri = uri;
3640
_config = config;
3741
_connection = connection;
42+
_maxSessionPoolSize = config.MaxSessionPoolSize;
3843
}
3944

4045
internal SessionPool(
41-
Queue<IPooledSession> availableSessions,
42-
Dictionary<Guid, IPooledSession> inUseDictionary,
43-
Uri uri = null,
44-
IConnection connection = null,
45-
ILogger logger = null)
46-
: this(logger, uri, null, connection)
46+
Queue<IPooledSession> availableSessions,
47+
Dictionary<Guid, IPooledSession> inUseDictionary,
48+
Uri uri = null,
49+
IConnection connection = null,
50+
ILogger logger = null)
51+
: this(logger, uri, Config.DefaultConfig, connection)
4752
{
4853
_availableSessions = availableSessions ?? new Queue<IPooledSession>();
4954
_inUseSessions = inUseDictionary ?? new Dictionary<Guid, IPooledSession>();
5055
}
5156

5257
public ISession GetSession()
5358
{
54-
IPooledSession session = null;
55-
lock (_availableSessions)
59+
return TryExecute(() =>
5660
{
57-
if(_availableSessions.Count != 0)
58-
session = _availableSessions.Dequeue();
59-
}
61+
IPooledSession session = null;
62+
lock (_availableSessions)
63+
{
64+
if (_availableSessions.Count != 0)
65+
session = _availableSessions.Dequeue();
66+
}
6067

61-
if (session == null)
62-
{
63-
session = new Session(_uri, _config, _connection, Release);
68+
if (_maxSessionPoolSize > Config.InfiniteSessionPoolSize && _currentPoolSize >= _maxSessionPoolSize)
69+
{
70+
throw new ClientException($"Maximum session pool size ({_maxSessionPoolSize}) reached.");
71+
}
72+
73+
if (session == null)
74+
{
75+
session = new Session(_uri, _config, _connection, Release);
76+
Interlocked.Increment(ref _currentPoolSize);
77+
lock (_inUseSessions)
78+
{
79+
_inUseSessions.Add(session.Id, session);
80+
}
81+
return session;
82+
}
83+
84+
if (!session.IsHealthy())
85+
{
86+
session.Close();
87+
Interlocked.Decrement(ref _currentPoolSize);
88+
return GetSession();
89+
}
90+
91+
session.Reset();
6492
lock (_inUseSessions)
6593
{
6694
_inUseSessions.Add(session.Id, session);
6795
}
6896
return session;
69-
}
70-
71-
if (!session.IsHealthy())
72-
{
73-
session.Close();
74-
return GetSession();
75-
}
76-
77-
session.Reset();
78-
lock (_inUseSessions)
79-
{
80-
_inUseSessions.Add(session.Id, session);
81-
}
82-
return session;
97+
});
8398
}
8499

85100
public void Release(Guid sessionId)
86101
{
87-
IPooledSession session;
88-
lock (_inUseSessions)
102+
TryExecute(() =>
89103
{
90-
if (!_inUseSessions.ContainsKey(sessionId))
104+
IPooledSession session;
105+
lock (_inUseSessions)
91106
{
92-
return;
93-
}
107+
if (!_inUseSessions.ContainsKey(sessionId))
108+
{
109+
return;
110+
}
94111

95-
session = _inUseSessions[sessionId];
96-
_inUseSessions.Remove(sessionId);
97-
}
112+
session = _inUseSessions[sessionId];
113+
_inUseSessions.Remove(sessionId);
114+
}
98115

99-
if (session.IsHealthy())
100-
{
101-
lock (_availableSessions)
102-
_availableSessions.Enqueue(session);
103-
}
104-
else
105-
{
106-
//release resources by session
107-
session.Close();
108-
}
116+
if (session.IsHealthy())
117+
{
118+
lock (_availableSessions)
119+
_availableSessions.Enqueue(session);
120+
}
121+
else
122+
{
123+
Interlocked.Decrement(ref _currentPoolSize);
124+
//release resources by session
125+
session.Close();
126+
}
127+
});
109128
}
110129

111130
protected override void Dispose(bool isDisposing)
@@ -115,30 +134,33 @@ protected override void Dispose(bool isDisposing)
115134
return;
116135
}
117136

118-
lock (_inUseSessions)
119-
{
120-
var sessions = new List<IPooledSession>(_inUseSessions.Values);
121-
_inUseSessions.Clear();
122-
foreach (var inUseSession in sessions)
137+
TryExecute(() =>
138+
{
139+
lock (_inUseSessions)
123140
{
124-
Logger?.Info($"Disposing In Use Session {inUseSession.Id}");
125-
inUseSession.Close();
141+
var sessions = new List<IPooledSession>(_inUseSessions.Values);
142+
_inUseSessions.Clear();
143+
foreach (var inUseSession in sessions)
144+
{
145+
Logger?.Info($"Disposing In Use Session {inUseSession.Id}");
146+
inUseSession.Close();
147+
}
126148
}
127-
}
128-
lock (_availableSessions)
129-
{
130-
while (_availableSessions.Count > 0)
149+
lock (_availableSessions)
131150
{
132-
var session = _availableSessions.Dequeue();
133-
Logger?.Info($"Disposing Available Session {session.Id}");
134-
session.Close();
151+
while (_availableSessions.Count > 0)
152+
{
153+
var session = _availableSessions.Dequeue();
154+
Logger?.Info($"Disposing Available Session {session.Id}");
155+
session.Close();
156+
}
135157
}
136-
}
137-
158+
});
138159
base.Dispose(true);
139160
}
140161
}
141162

163+
142164
public interface IPooledSession : ISession
143165
{
144166
Guid Id { get; }

0 commit comments

Comments
 (0)