Skip to content

Commit b45479b

Browse files
committed
Add failover support to RedisMqServer and Basic ClientManager
1 parent 41cb476 commit b45479b

File tree

9 files changed

+270
-48
lines changed

9 files changed

+270
-48
lines changed

src/ServiceStack.Redis/BasicRedisClientManager.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace ServiceStack.Redis
2121
/// Allows the configuration of different ReadWrite and ReadOnly hosts
2222
/// </summary>
2323
public partial class BasicRedisClientManager
24-
: IRedisClientsManager
24+
: IRedisClientsManager, IRedisFailover
2525
{
2626
private List<RedisEndPoint> ReadWriteHosts { get; set; }
2727
private List<RedisEndPoint> ReadOnlyHosts { get; set; }
@@ -40,6 +40,8 @@ public partial class BasicRedisClientManager
4040

4141
public Action<IRedisNativeClient> ConnectionFilter { get; set; }
4242

43+
public List<Action<IRedisClientsManager>> OnFailover { get; private set; }
44+
4345
public BasicRedisClientManager() : this(RedisNativeClient.DefaultHost) { }
4446

4547
public BasicRedisClientManager(params string[] readWriteHosts)
@@ -156,6 +158,19 @@ public void Start()
156158
readOnlyHostsIndex = 0;
157159
}
158160

161+
public void FailoverTo(params string[] readWriteHosts)
162+
{
163+
FailoverTo(readWriteHosts, readWriteHosts);
164+
}
165+
166+
public void FailoverTo(IEnumerable<string> readWriteHosts, IEnumerable<string> readOnlyHosts)
167+
{
168+
ReadWriteHosts = readWriteHosts.ToRedisEndPoints();
169+
ReadOnlyHosts = readOnlyHosts.ToRedisEndPoints();
170+
171+
Start();
172+
}
173+
159174
public void Dispose()
160175
{
161176
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace ServiceStack.Redis
5+
{
6+
public interface IRedisFailover
7+
{
8+
List<Action<IRedisClientsManager>> OnFailover { get; }
9+
10+
void FailoverTo(params string[] readWriteHosts);
11+
}
12+
}

src/ServiceStack.Redis/Messaging/RedisMqHost.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace ServiceStack.Redis.Messaging
2121
/// The Start/Stop methods are idempotent i.e. It's safe to call them repeatedly on multiple threads
2222
/// and the Redis MQ Host will only have Started/Stopped once.
2323
/// </summary>
24+
[Obsolete("RedisMqServer is maintained and preferred over RedisMqHost")]
2425
public class RedisMqHost : IMessageService
2526
{
2627
private static readonly ILog Log = LogManager.GetLogger(typeof(RedisMqHost));

src/ServiceStack.Redis/Messaging/RedisMqServer.cs

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ public RedisMqServer(IRedisClientsManager clientsManager,
114114
this.MessageFactory = new RedisMessageFactory(clientsManager);
115115
this.ErrorHandler = ex => Log.Error("Exception in Redis MQ Server: " + ex.Message, ex);
116116
this.KeepAliveRetryAfterMs = 2000;
117+
118+
var failoverHost = clientsManager as IRedisFailover;
119+
if (failoverHost != null)
120+
{
121+
failoverHost.OnFailover.Add(OnFailover);
122+
}
117123
}
118124

119125
public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn)
@@ -230,11 +236,6 @@ public void Start()
230236
return;
231237
}
232238

233-
foreach (var worker in workers)
234-
{
235-
worker.Start();
236-
}
237-
238239
SleepBackOffMultiplier(Interlocked.CompareExchange(ref noOfContinuousErrors, 0, 0));
239240

240241
StartWorkerThreads();
@@ -266,54 +267,67 @@ public void Start()
266267
}
267268
}
268269

270+
private IRedisClient masterClient;
269271
private void RunLoop()
270272
{
271273
if (Interlocked.CompareExchange(ref status, WorkerStatus.Started, WorkerStatus.Starting) != WorkerStatus.Starting) return;
272274
Interlocked.Increment(ref timesStarted);
273275

274276
try
275277
{
276-
using (var redisClient = clientsManager.GetReadOnlyClient())
278+
//RESET
279+
while (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Started)
277280
{
278-
//Record that we had a good run...
279-
Interlocked.CompareExchange(ref noOfContinuousErrors, 0, noOfContinuousErrors);
280-
281-
using (var subscription = redisClient.CreateSubscription())
281+
using (var redisClient = clientsManager.GetReadOnlyClient())
282282
{
283-
subscription.OnUnSubscribe = channel => Log.Debug("OnUnSubscribe: " + channel);
283+
masterClient = redisClient;
284284

285-
subscription.OnMessage = (channel, msg) => {
285+
//Record that we had a good run...
286+
Interlocked.CompareExchange(ref noOfContinuousErrors, 0, noOfContinuousErrors);
286287

287-
if (msg == WorkerStatus.StopCommand)
288+
using (var subscription = redisClient.CreateSubscription())
289+
{
290+
subscription.OnUnSubscribe = channel => Log.Debug("OnUnSubscribe: " + channel);
291+
292+
subscription.OnMessage = (channel, msg) =>
288293
{
289-
Log.Debug("Stop Command Issued");
294+
if (msg == WorkerStatus.StopCommand)
295+
{
296+
Log.Debug("Stop Command Issued");
290297

291-
if (Interlocked.CompareExchange(ref status, WorkerStatus.Stopped, WorkerStatus.Started) != WorkerStatus.Started)
292-
Interlocked.CompareExchange(ref status, WorkerStatus.Stopped, WorkerStatus.Stopping);
298+
if (Interlocked.CompareExchange(ref status, WorkerStatus.Stopped, WorkerStatus.Started) != WorkerStatus.Started)
299+
Interlocked.CompareExchange(ref status, WorkerStatus.Stopped, WorkerStatus.Stopping);
293300

294-
Log.Debug("UnSubscribe From All Channels...");
295-
subscription.UnSubscribeFromAllChannels(); //Un block thread.
296-
return;
297-
}
301+
Log.Debug("UnSubscribe From All Channels...");
302+
subscription.UnSubscribeFromAllChannels(); //Un block thread.
303+
return;
304+
}
305+
if (msg == WorkerStatus.ResetCommand)
306+
{
307+
subscription.UnSubscribeFromAllChannels(); //Un block thread.
308+
return;
309+
}
298310

299-
if (!string.IsNullOrEmpty(msg))
300-
{
301-
int[] workerIndexes;
302-
if (queueWorkerIndexMap.TryGetValue(msg, out workerIndexes))
311+
if (!string.IsNullOrEmpty(msg))
303312
{
304-
foreach (var workerIndex in workerIndexes)
313+
int[] workerIndexes;
314+
if (queueWorkerIndexMap.TryGetValue(msg, out workerIndexes))
305315
{
306-
workers[workerIndex].NotifyNewMessage();
316+
foreach (var workerIndex in workerIndexes)
317+
{
318+
workers[workerIndex].NotifyNewMessage();
319+
}
307320
}
308321
}
309-
}
310-
};
322+
};
311323

312-
subscription.SubscribeToChannels(QueueNames.TopicIn); //blocks thread
324+
subscription.SubscribeToChannels(QueueNames.TopicIn); //blocks thread
325+
masterClient = null;
326+
}
313327
}
314-
315-
StopWorkerThreads();
316328
}
329+
330+
StopWorkerThreads();
317331
}
318332
catch (Exception ex)
319333
{
@@ -363,6 +377,37 @@ public void Stop()
363377
}
364378
}
365379

380+
private void OnFailover(IRedisClientsManager clientsManager)
381+
{
382+
try
383+
{
384+
if (masterClient != null)
385+
{
386+
//New thread-safe client with same connection info as connected master
387+
using (var currentlySubscribedClient = ((RedisClient)masterClient).CloneClient())
388+
{
389+
currentlySubscribedClient.PublishMessage(QueueNames.TopicIn, WorkerStatus.ResetCommand);
390+
}
391+
}
392+
else
393+
{
394+
Restart();
395+
}
396+
}
397+
catch (Exception ex)
398+
{
399+
if (this.ErrorHandler != null) this.ErrorHandler(ex);
400+
Log.Warn("Error trying to UnSubscribeFromChannels in OnFailover. Restarting...", ex);
401+
Restart();
402+
}
403+
}
404+
405+
public void Restart()
406+
{
407+
Stop();
408+
Start();
409+
}
410+
366411
public void NotifyAll()
367412
{
368413
Log.Debug("Notifying all worker threads to check for new messages...");

src/ServiceStack.Redis/Messaging/WorkerStatus.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public static class WorkerStatus
1010

1111
//Control Commands
1212
public const string StopCommand = "STOP";
13+
public const string ResetCommand = "RESET";
1314

1415
public static string ToString(int workerStatus)
1516
{

src/ServiceStack.Redis/PooledRedisClientManager.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace ServiceStack.Redis
2626
/// 1 master and multiple replicated read slaves.
2727
/// </summary>
2828
public partial class PooledRedisClientManager
29-
: IRedisClientsManager
29+
: IRedisClientsManager, IRedisFailover
3030
{
3131
private static readonly ILog Log = LogManager.GetLogger(typeof(PooledRedisClientManager));
3232

@@ -47,6 +47,7 @@ public partial class PooledRedisClientManager
4747

4848
private List<RedisEndPoint> ReadWriteHosts { get; set; }
4949
private List<RedisEndPoint> ReadOnlyHosts { get; set; }
50+
public List<Action<IRedisClientsManager>> OnFailover { get; private set; }
5051

5152
private RedisClient[] writeClients = new RedisClient[0];
5253
protected int WritePoolIndex;
@@ -133,6 +134,8 @@ public PooledRedisClientManager(
133134
MaxReadPoolSize = ReadOnlyHosts.Count * PoolSizeMultiplier,
134135
};
135136

137+
this.OnFailover = new List<Action<IRedisClientsManager>>();
138+
136139
// if timeout provided, convert into milliseconds
137140
this.PoolTimeout = poolTimeOutSeconds != null
138141
? poolTimeOutSeconds * 1000
@@ -169,6 +172,21 @@ public void FailoverTo(IEnumerable<string> readWriteHosts, IEnumerable<string> r
169172
}
170173
ReadWriteHosts = readWriteHosts.ToRedisEndPoints();
171174
}
175+
176+
if (this.OnFailover != null)
177+
{
178+
foreach (var callback in OnFailover)
179+
{
180+
try
181+
{
182+
callback(this);
183+
}
184+
catch (Exception ex)
185+
{
186+
Log.Error("Error firing OnFailover callback(): ", ex);
187+
}
188+
}
189+
}
172190
}
173191

174192
protected virtual void OnStart()

src/ServiceStack.Redis/RedisClient.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public List<Dictionary<string, string>> GetClientList()
170170
{
171171
var clientList = base.ClientList().FromUtf8Bytes();
172172
var results = new List<Dictionary<string, string>>();
173-
173+
174174
var lines = clientList.Split('\n');
175175
foreach (var line in lines)
176176
{
@@ -279,7 +279,7 @@ public long DecrementValueBy(string key, int count)
279279
return DecrBy(key, count);
280280
}
281281

282-
public long AppendToValue(string key, string value)
282+
public long AppendToValue(string key, string value)
283283
{
284284
return base.Append(key, value.ToUtf8Bytes());
285285
}
@@ -471,7 +471,7 @@ public IRedisSubscription CreateSubscription()
471471
return new RedisSubscription(this);
472472
}
473473

474-
public long PublishMessage(string toChannel, string message)
474+
public long PublishMessage(string toChannel, string message)
475475
{
476476
return base.Publish(toChannel, message.ToUtf8Bytes());
477477
}
@@ -734,6 +734,15 @@ public void Delete<T>(T entity)
734734
}
735735
}
736736

737+
public RedisClient CloneClient()
738+
{
739+
return new RedisClient(Host, Port, Password, Db)
740+
{
741+
SendTimeout = SendTimeout,
742+
ReceiveTimeout = ReceiveTimeout
743+
};
744+
}
745+
737746
/// <summary>
738747
/// Returns key with automatic object id detection in provided value with <typeparam name="T">generic type</typeparam>.
739748
/// </summary>
@@ -770,22 +779,22 @@ internal string UrnKey(Type type, object id)
770779

771780
#region LUA EVAL
772781

773-
public long ExecLuaAsInt(string body, params string[] args)
782+
public long ExecLuaAsInt(string body, params string[] args)
774783
{
775784
return base.EvalInt(body, 0, args.ToMultiByteArray());
776785
}
777786

778-
public long ExecLuaAsInt(string luaBody, string[] keys, string[] args)
787+
public long ExecLuaAsInt(string luaBody, string[] keys, string[] args)
779788
{
780789
return base.EvalInt(luaBody, keys.Length, MergeAndConvertToBytes(keys, args));
781790
}
782791

783-
public long ExecLuaShaAsInt(string sha1, params string[] args)
792+
public long ExecLuaShaAsInt(string sha1, params string[] args)
784793
{
785794
return base.EvalShaInt(sha1, args.Length, args.ToMultiByteArray());
786795
}
787796

788-
public long ExecLuaShaAsInt(string sha1, string[] keys, string[] args)
797+
public long ExecLuaShaAsInt(string sha1, string[] keys, string[] args)
789798
{
790799
return base.EvalShaInt(sha1, keys.Length, MergeAndConvertToBytes(keys, args));
791800
}

src/ServiceStack.Redis/ServiceStack.Redis.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
<Compile Include="Generic\RedisTypedPipeline.cs" />
156156
<Compile Include="Generic\RedisTypedCommandQueue.cs" />
157157
<Compile Include="IRedisClientFactory.cs" />
158+
<Compile Include="IRedisFailover.cs" />
158159
<Compile Include="Messaging\MessageHandlerWorker.cs" />
159160
<Compile Include="Messaging\RedisMqHost.cs" />
160161
<Compile Include="Messaging\RedisMessageFactory.cs" />

0 commit comments

Comments
 (0)