16
16
// limitations under the License.
17
17
using System ;
18
18
using System . Collections . Generic ;
19
+ using System . Threading ;
20
+ using Neo4j . Driver . Exceptions ;
19
21
20
22
namespace Neo4j . Driver . Internal
21
23
{
@@ -26,6 +28,8 @@ internal class SessionPool : LoggerBase
26
28
private readonly Uri _uri ;
27
29
private readonly Config _config ;
28
30
private readonly IConnection _connection ;
31
+ private readonly int _maxSessionPoolSize ;
32
+ private int _currentPoolSize ;
29
33
30
34
internal int NumberOfInUseSessions => _inUseSessions . Count ;
31
35
internal int NumberOfAvailableSessions => _availableSessions . Count ;
@@ -35,77 +39,92 @@ public SessionPool(ILogger logger, Uri uri, Config config, IConnection connectio
35
39
_uri = uri ;
36
40
_config = config ;
37
41
_connection = connection ;
42
+ _maxSessionPoolSize = config . MaxSessionPoolSize ;
38
43
}
39
44
40
45
internal SessionPool (
41
- Queue < IPooledSession > availableSessions ,
42
- Dictionary < Guid , IPooledSession > inUseDictionary ,
43
- Uri uri = null ,
44
- IConnection connection = null ,
45
- ILogger logger = null )
46
- : this ( logger , uri , null , connection )
46
+ Queue < IPooledSession > availableSessions ,
47
+ Dictionary < Guid , IPooledSession > inUseDictionary ,
48
+ Uri uri = null ,
49
+ IConnection connection = null ,
50
+ ILogger logger = null )
51
+ : this ( logger , uri , Config . DefaultConfig , connection )
47
52
{
48
53
_availableSessions = availableSessions ?? new Queue < IPooledSession > ( ) ;
49
54
_inUseSessions = inUseDictionary ?? new Dictionary < Guid , IPooledSession > ( ) ;
50
55
}
51
56
52
57
public ISession GetSession ( )
53
58
{
54
- IPooledSession session = null ;
55
- lock ( _availableSessions )
59
+ return TryExecute ( ( ) =>
56
60
{
57
- if ( _availableSessions . Count != 0 )
58
- session = _availableSessions . Dequeue ( ) ;
59
- }
61
+ IPooledSession session = null ;
62
+ lock ( _availableSessions )
63
+ {
64
+ if ( _availableSessions . Count != 0 )
65
+ session = _availableSessions . Dequeue ( ) ;
66
+ }
60
67
61
- if ( session == null )
62
- {
63
- session = new Session ( _uri , _config , _connection , Release ) ;
68
+ if ( _maxSessionPoolSize > Config . InfiniteSessionPoolSize && _currentPoolSize >= _maxSessionPoolSize )
69
+ {
70
+ throw new ClientException ( $ "Maximum session pool size ({ _maxSessionPoolSize } ) reached.") ;
71
+ }
72
+
73
+ if ( session == null )
74
+ {
75
+ session = new Session ( _uri , _config , _connection , Release ) ;
76
+ Interlocked . Increment ( ref _currentPoolSize ) ;
77
+ lock ( _inUseSessions )
78
+ {
79
+ _inUseSessions . Add ( session . Id , session ) ;
80
+ }
81
+ return session ;
82
+ }
83
+
84
+ if ( ! session . IsHealthy ( ) )
85
+ {
86
+ session . Close ( ) ;
87
+ Interlocked . Decrement ( ref _currentPoolSize ) ;
88
+ return GetSession ( ) ;
89
+ }
90
+
91
+ session . Reset ( ) ;
64
92
lock ( _inUseSessions )
65
93
{
66
94
_inUseSessions . Add ( session . Id , session ) ;
67
95
}
68
96
return session ;
69
- }
70
-
71
- if ( ! session . IsHealthy ( ) )
72
- {
73
- session . Close ( ) ;
74
- return GetSession ( ) ;
75
- }
76
-
77
- session . Reset ( ) ;
78
- lock ( _inUseSessions )
79
- {
80
- _inUseSessions . Add ( session . Id , session ) ;
81
- }
82
- return session ;
97
+ } ) ;
83
98
}
84
99
85
100
public void Release ( Guid sessionId )
86
101
{
87
- IPooledSession session ;
88
- lock ( _inUseSessions )
102
+ TryExecute ( ( ) =>
89
103
{
90
- if ( ! _inUseSessions . ContainsKey ( sessionId ) )
104
+ IPooledSession session ;
105
+ lock ( _inUseSessions )
91
106
{
92
- return ;
93
- }
107
+ if ( ! _inUseSessions . ContainsKey ( sessionId ) )
108
+ {
109
+ return ;
110
+ }
94
111
95
- session = _inUseSessions [ sessionId ] ;
96
- _inUseSessions . Remove ( sessionId ) ;
97
- }
112
+ session = _inUseSessions [ sessionId ] ;
113
+ _inUseSessions . Remove ( sessionId ) ;
114
+ }
98
115
99
- if ( session . IsHealthy ( ) )
100
- {
101
- lock ( _availableSessions )
102
- _availableSessions . Enqueue ( session ) ;
103
- }
104
- else
105
- {
106
- //release resources by session
107
- session . Close ( ) ;
108
- }
116
+ if ( session . IsHealthy ( ) )
117
+ {
118
+ lock ( _availableSessions )
119
+ _availableSessions . Enqueue ( session ) ;
120
+ }
121
+ else
122
+ {
123
+ Interlocked . Decrement ( ref _currentPoolSize ) ;
124
+ //release resources by session
125
+ session . Close ( ) ;
126
+ }
127
+ } ) ;
109
128
}
110
129
111
130
protected override void Dispose ( bool isDisposing )
@@ -115,30 +134,33 @@ protected override void Dispose(bool isDisposing)
115
134
return ;
116
135
}
117
136
118
- lock ( _inUseSessions )
119
- {
120
- var sessions = new List < IPooledSession > ( _inUseSessions . Values ) ;
121
- _inUseSessions . Clear ( ) ;
122
- foreach ( var inUseSession in sessions )
137
+ TryExecute ( ( ) =>
138
+ {
139
+ lock ( _inUseSessions )
123
140
{
124
- Logger ? . Info ( $ "Disposing In Use Session { inUseSession . Id } ") ;
125
- inUseSession . Close ( ) ;
141
+ var sessions = new List < IPooledSession > ( _inUseSessions . Values ) ;
142
+ _inUseSessions . Clear ( ) ;
143
+ foreach ( var inUseSession in sessions )
144
+ {
145
+ Logger ? . Info ( $ "Disposing In Use Session { inUseSession . Id } ") ;
146
+ inUseSession . Close ( ) ;
147
+ }
126
148
}
127
- }
128
- lock ( _availableSessions )
129
- {
130
- while ( _availableSessions . Count > 0 )
149
+ lock ( _availableSessions )
131
150
{
132
- var session = _availableSessions . Dequeue ( ) ;
133
- Logger ? . Info ( $ "Disposing Available Session { session . Id } ") ;
134
- session . Close ( ) ;
151
+ while ( _availableSessions . Count > 0 )
152
+ {
153
+ var session = _availableSessions . Dequeue ( ) ;
154
+ Logger ? . Info ( $ "Disposing Available Session { session . Id } ") ;
155
+ session . Close ( ) ;
156
+ }
135
157
}
136
- }
137
-
158
+ } ) ;
138
159
base . Dispose ( true ) ;
139
160
}
140
161
}
141
162
163
+
142
164
public interface IPooledSession : ISession
143
165
{
144
166
Guid Id { get ; }
0 commit comments