Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Game.Shared;
using Game.Shared.SaveData;
using Game.Shared.Chat.Client;
using Game.Shared.Realtime.Client;
using Game.Shared.Services;
using Game.Shared.Services.Network;
using Game.Shared.Services.Network.Queue;
Expand Down Expand Up @@ -41,6 +42,8 @@ public class SurvivorGameRunner : ISurvivorGameRunner
private readonly IRequestQueue _requestQueue;
private readonly INetworkService _networkService;
private readonly IChatClient _chatClient;
private readonly IMatchmakingClient _matchmakingClient;
private readonly ILobbyClient _lobbyClient;

private GameObject _gameRootInstance;
private SurvivorGameRootController _gameRootController;
Expand All @@ -61,7 +64,9 @@ public SurvivorGameRunner(
IAuthApiService authApiService,
IRequestQueue requestQueue,
INetworkService networkService,
IChatClient chatClient)
IChatClient chatClient,
IMatchmakingClient matchmakingClient,
ILobbyClient lobbyClient)
{
_container = container;
_sceneService = sceneService;
Expand All @@ -78,6 +83,8 @@ public SurvivorGameRunner(
_requestQueue = requestQueue;
_networkService = networkService;
_chatClient = chatClient;
_matchmakingClient = matchmakingClient;
_lobbyClient = lobbyClient;
}

public async UniTask StartupAsync()
Expand Down Expand Up @@ -204,8 +211,10 @@ public async UniTask ShutdownAsync()
_queueProcessingSubscription?.Dispose();
_queueProcessingSubscription = null;

// チャットクライアント切断
// クライアント切断
if (_chatClient != null) { await _chatClient.DisconnectAsync(); }
if (_matchmakingClient != null) { await _matchmakingClient.DisconnectAsync(); }
if (_lobbyClient != null) { await _lobbyClient.DisconnectAsync(); }

// セーブデータ保存(変更がある場合のみ)
await _saveService.SaveIfDirtyAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public interface ILobbyClient : IDisposable
/// </summary>
Task LeaveLobbyAsync();

/// <summary>
/// Hub 接続を切断しリソースを解放する(非同期)
/// </summary>
Task DisconnectAsync();

/// <summary>
/// ロビー検索(Unary のみ)
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public interface IMatchmakingClient : IDisposable
/// </summary>
Task CancelMatchmakingAsync();

/// <summary>
/// Hub 接続を切断しリソースを解放する(非同期)
/// </summary>
Task DisconnectAsync();

/// <summary>
/// キュー人数取得(Unary のみ)
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,18 @@ private async Task MonitorDisconnectionAsync(CancellationToken cancellationToken
}
}

public async Task DisconnectAsync()
{
_monitorCts?.Cancel();
_monitorCts?.Dispose();
_monitorCts = null;
if (_hub != null)
{
await _hub.DisposeAsync();
_hub = null;
}
}

public void Dispose()
{
if (!_disposed)
Expand All @@ -273,11 +285,17 @@ public void Dispose()
_monitorCts = null;
if (_hub != null)
{
try { _hub.DisposeAsync().GetAwaiter().GetResult(); }
catch (Exception ex) { Debug.LogWarning($"[LobbyClient] Dispose error: {ex.Message}"); }
var hub = _hub;
_hub = null;
_ = DisposeHubSafelyAsync(hub);
}
}
}

private static async Task DisposeHubSafelyAsync(ILobbyHub hub)
{
try { await hub.DisposeAsync(); }
catch (Exception ex) { Debug.LogWarning($"[LobbyClient] Background dispose error: {ex.Message}"); }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ private async Task MonitorDisconnectionAsync(CancellationToken cancellationToken
}
}

public async Task DisconnectAsync()
{
_monitorCts?.Cancel();
_monitorCts?.Dispose();
_monitorCts = null;
IsSearching = false;
if (_hub != null)
{
await _hub.DisposeAsync();
_hub = null;
}
}

public void Dispose()
{
if (!_disposed)
Expand All @@ -193,11 +206,17 @@ public void Dispose()
_monitorCts = null;
if (_hub != null)
{
try { _hub.DisposeAsync().GetAwaiter().GetResult(); }
catch (Exception ex) { Debug.LogWarning($"[MatchmakingClient] Dispose error: {ex.Message}"); }
var hub = _hub;
_hub = null;
_ = DisposeHubSafelyAsync(hub);
}
}
}

private static async Task DisposeHubSafelyAsync(IMatchmakingHub hub)
{
try { await hub.DisposeAsync(); }
catch (Exception ex) { Debug.LogWarning($"[MatchmakingClient] Background dispose error: {ex.Message}"); }
}
}
}
30 changes: 19 additions & 11 deletions src/Game.Realtime/Services/LobbyDataService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Game.Library.Shared.Dto;
using Game.Server.Shared.Extensions;
using Game.Server.Shared.Valkey;
using Medallion.Threading;
using StackExchange.Redis;

Expand Down Expand Up @@ -78,7 +79,14 @@ public async Task<bool> AddPlayerAsync(string lobbyId, string userId, string pla
if (!exists) return false;

// 最大人数チェック
var maxPlayers = (int)await db.HashGetAsync($"lobby:{lobbyId}", "maxPlayers");
var maxPlayersValue = await db.HashGetAsync($"lobby:{lobbyId}", "maxPlayers");
if (!maxPlayersValue.HasValue)
{
_logger.LogWarning("maxPlayers field missing for lobby {LobbyId}", lobbyId);
return false;
}

var maxPlayers = maxPlayersValue.ToInt();
var currentCount = await db.HashLengthAsync($"lobby:{lobbyId}:players");
if (currentCount >= maxPlayers) return false;

Expand Down Expand Up @@ -136,12 +144,12 @@ public async Task<bool> RemovePlayerAsync(string lobbyId, string userId)
return new LobbyInfo
{
LobbyId = lobbyId,
LobbyName = dict.GetValueOrDefault("name", ""),
HostUserId = dict.GetValueOrDefault("hostUserId", ""),
GameMode = dict.GetValueOrDefault("gameMode", ""),
LobbyName = dict.GetString("name"),
HostUserId = dict.GetString("hostUserId"),
GameMode = dict.GetString("gameMode"),
CurrentPlayers = checked((int)playerCount),
MaxPlayers = int.TryParse(dict.GetValueOrDefault("maxPlayers", "4"), out var mp) ? mp : 4,
IsPublic = dict.GetValueOrDefault("isPublic", "0") == "1",
MaxPlayers = dict.GetInt("maxPlayers", 4),
IsPublic = dict.GetBool("isPublic"),
};
}

Expand Down Expand Up @@ -200,19 +208,19 @@ public async Task<LobbyInfo[]> SearchPublicAsync(string gameMode, int maxResults

var dict = hash.ToDictionary(h => h.Name.ToString(), h => h.Value);
var playerCount = checked((int)await countTasks[i]);
var mp = int.TryParse(dict.GetValueOrDefault("maxPlayers", "4"), out var v) ? v : 4;
var mp = dict.GetInt("maxPlayers", 4);

if (playerCount < mp)
{
results.Add(new LobbyInfo
{
LobbyId = lobbyIds[i].ToString(),
LobbyName = dict.GetValueOrDefault("name", ""),
HostUserId = dict.GetValueOrDefault("hostUserId", ""),
GameMode = dict.GetValueOrDefault("gameMode", ""),
LobbyName = dict.GetString("name"),
HostUserId = dict.GetString("hostUserId"),
GameMode = dict.GetString("gameMode"),
CurrentPlayers = playerCount,
MaxPlayers = mp,
IsPublic = dict.GetValueOrDefault("isPublic", "0") == "1",
IsPublic = dict.GetBool("isPublic"),
});
}
}
Expand Down
67 changes: 11 additions & 56 deletions src/Game.Server.Shared/Extensions/ValkeyServiceExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Net.Security;
using Google.Apis.Auth.OAuth2;
using Game.Server.Shared.Valkey;
using Medallion.Threading;
using Medallion.Threading.Redis;
using StackExchange.Redis;
Expand All @@ -11,11 +10,9 @@ namespace Game.Server.Shared.Extensions;
/// </summary>
public static class ValkeyServiceExtensions
{
private static readonly TimeSpan _tokenRefreshInterval = TimeSpan.FromMinutes(4);

/// <summary>
/// IConnectionMultiplexer を DI に登録(接続文字列は ConnectionStrings:Valkey から取得)
/// ssl=true が含まれる場合、GCP IAM 認証モードで接続する
/// ssl=true が含まれる場合、GCP IAM 認証モードで非同期接続する(IHostedService 経由)
/// </summary>
public static IServiceCollection AddValkeyConnection(this IServiceCollection services, IConfiguration configuration)
{
Expand All @@ -25,10 +22,17 @@ public static IServiceCollection AddValkeyConnection(this IServiceCollection ser
var options = ConfigurationOptions.Parse(connectionString);
if (options.Ssl)
{
// ConfigurationOptions をシングルトンとして登録(ValkeyConnectionInitializer が使用)
services.AddSingleton(options);

// IHostedService として登録 → StartAsync で非同期接続
services.AddSingleton<ValkeyConnectionInitializer>();
services.AddHostedService(sp => sp.GetRequiredService<ValkeyConnectionInitializer>());

// IConnectionMultiplexer を ValkeyConnectionInitializer から転送
services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var logger = sp.GetRequiredService<ILogger<ConnectionMultiplexer>>();
return ConnectWithAuthAsync(options, logger).GetAwaiter().GetResult();
return sp.GetRequiredService<ValkeyConnectionInitializer>().Multiplexer;
});
}
else
Expand Down Expand Up @@ -73,53 +77,4 @@ public static IServiceCollection AddDistributedLock(this IServiceCollection serv

return services;
}

private static async Task<IConnectionMultiplexer> ConnectWithAuthAsync(ConfigurationOptions options, ILogger logger)
{
// GCP IAM 認証モードで接続する
var credential = await GoogleCredential.GetApplicationDefaultAsync();
var token = await credential.UnderlyingCredential.GetAccessTokenForRequestAsync();

options.User = "default";
options.Password = token;

// GCP Memorystore の内部 CA 証明書を信頼する
options.CertificateValidation += (_, _, _, errors) =>
errors is SslPolicyErrors.None or SslPolicyErrors.RemoteCertificateChainErrors;

var multiplexer = await ConnectionMultiplexer.ConnectAsync(options);
logger.LogInformation("Connected to Valkey/Redis with IAM authentication");

// トークンリフレッシュタイマー(4分間隔、トークン有効期限は1時間)
_ = new Timer(
delegate { _ = RefreshTokenAsync(credential, multiplexer, logger); },
null,
_tokenRefreshInterval,
_tokenRefreshInterval);

multiplexer.ConnectionFailed += (_, args) =>
logger.LogWarning("Valkey connection failed: {FailureType}", args.FailureType);
multiplexer.ConnectionRestored += (_, _) =>
logger.LogInformation("Valkey connection restored");

return multiplexer;
}

private static async Task RefreshTokenAsync(GoogleCredential credential, IConnectionMultiplexer multiplexer, ILogger logger)
{
try
{
var newToken = await credential.UnderlyingCredential.GetAccessTokenForRequestAsync();
foreach (var server in multiplexer.GetServers())
{
await server.ExecuteAsync("AUTH", "default", newToken);
}

logger.LogDebug("Valkey IAM token refreshed");
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to refresh Valkey IAM token");
}
}
}
Loading
Loading