Azure Redis Cache - pool of ConnectionMultiplexer objects
You can also accomplish this in a easier way by using StackExchange.Redis.Extensions
Sample code:
using StackExchange.Redis;
using StackExchange.Redis.Extensions.Core.Abstractions;
using StackExchange.Redis.Extensions.Core.Configuration;
using System;
using System.Collections.Concurrent;
using System.Linq;
namespace Pool.Redis
{
/// <summary>
/// Provides redis pool
/// </summary>
public class RedisConnectionPool : IRedisCacheConnectionPoolManager
{
private static ConcurrentBag<Lazy<ConnectionMultiplexer>> connections;
private readonly RedisConfiguration redisConfiguration;
public RedisConnectionPool(RedisConfiguration redisConfiguration)
{
this.redisConfiguration = redisConfiguration;
Initialize();
}
public IConnectionMultiplexer GetConnection()
{
Lazy<ConnectionMultiplexer> response;
var loadedLazys = connections.Where(lazy => lazy.IsValueCreated);
if (loadedLazys.Count() == connections.Count)
{
response = connections.OrderBy(x => x.Value.GetCounters().TotalOutstanding).First();
}
else
{
response = connections.First(lazy => !lazy.IsValueCreated);
}
return response.Value;
}
private void Initialize()
{
connections = new ConcurrentBag<Lazy<ConnectionMultiplexer>>();
for (int i = 0; i < redisConfiguration.PoolSize; i++)
{
connections.Add(new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(redisConfiguration.ConfigurationOptions)));
}
}
public void Dispose()
{
var activeConnections = connections.Where(lazy => lazy.IsValueCreated).ToList();
activeConnections.ForEach(connection => connection.Value.Dispose());
Initialize();
}
}
}
Where RedisConfiguration
is something like this:
return new RedisConfiguration()
{
AbortOnConnectFail = true,
Hosts = new RedisHost[] {
new RedisHost()
{
Host = ConfigurationManager.AppSettings["RedisCacheAddress"].ToString(),
Port = 6380
},
},
ConnectTimeout = Convert.ToInt32(ConfigurationManager.AppSettings["RedisTimeout"].ToString()),
Database = 0,
Ssl = true,
Password = ConfigurationManager.AppSettings["RedisCachePassword"].ToString(),
ServerEnumerationStrategy = new ServerEnumerationStrategy()
{
Mode = ServerEnumerationStrategy.ModeOptions.All,
TargetRole = ServerEnumerationStrategy.TargetRoleOptions.Any,
UnreachableServerAction = ServerEnumerationStrategy.UnreachableServerActionOptions.Throw
},
PoolSize = 50
};
If you're using StackExchange.Redis, according to this github issue, you can use the TotalOutstanding property on the connection multiplexer object.
Here is a implementation I came up with, that is working correctly:
public static int POOL_SIZE = 100;
private static readonly Object lockPookRoundRobin = new Object();
private static Lazy<Context>[] lazyConnection = null;
//Static initializer to be executed once on the first call
private static void InitConnectionPool()
{
lock (lockPookRoundRobin)
{
if (lazyConnection == null) {
lazyConnection = new Lazy<Context>[POOL_SIZE];
}
for (int i = 0; i < POOL_SIZE; i++){
if (lazyConnection[i] == null)
lazyConnection[i] = new Lazy<Context>(() => new Context("YOUR_CONNECTION_STRING", new CachingFramework.Redis.Serializers.JsonSerializer()));
}
}
}
private static Context GetLeastLoadedConnection()
{
//choose the least loaded connection from the pool
/*
var minValue = lazyConnection.Min((lazyCtx) => lazyCtx.Value.GetConnectionMultiplexer().GetCounters().TotalOutstanding);
var lazyContext = lazyConnection.Where((lazyCtx) => lazyCtx.Value.GetConnectionMultiplexer().GetCounters().TotalOutstanding == minValue).First();
*/
// UPDATE following @Luke Foust comment below
Lazy<Connection> lazyContext;
var loadedLazys = lazyConnection.Where((lazy) => lazy.IsValueCreated);
if(loadedLazys.Count()==lazyConnection.Count()){
var minValue = loadedLazys.Min((lazy) => lazy.Value.TotalOutstanding);
lazyContext = loadedLazys.Where((lazy) => lazy.Value.TotalOutstanding == minValue).First();
}else{
lazyContext = lazyConnection[loadedLazys.Count()];
}
return lazyContext.Value;
}
private static Context Connection
{
get
{
lock (lockPookRoundRobin)
{
return GetLeastLoadedConnection();
}
}
}
public RedisCacheService()
{
InitConnectionPool();
}