Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,25 @@ public ClusterGlideClientAdapter getClusterAdapter() {
*/
@Override
protected void cleanupConnectionState() {
// Dont use RESET - we will destroy the configured state
// Use valkey-glide native pipe to selectively clear the state on the backends,
// adapter and connection object do not matter - they are being destroyed
// some state cannot be cleared (like stats) but this is acceptable if pooling to be used

GlideClusterClient nativeClient = (GlideClusterClient) unifiedClient.getNativeClient();

@SuppressWarnings("unchecked")
Callable<Void>[] actions = new Callable[] {
() -> nativeClient.customCommand(new String[]{"UNWATCH"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// TODO: Uncomment when dynamic pubsub is implemented
// () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get(),
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get()
};

for (Callable<Void> action : actions) {
// Only send UNWATCH if keys were actually watched
if (!watchedKeys.isEmpty()) {
GlideClusterClient nativeClient = (GlideClusterClient) unifiedClient.getNativeClient();
try {
action.call();
nativeClient.customCommand(new String[]{"UNWATCH"}, SimpleMultiNodeRoute.ALL_NODES).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// Ignore cleanup errors
}
watchedKeys.clear();
}

// TODO: Add similar checks for pubsub when implemented
// if (hasActiveSubscriptions) {
// nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get();
// nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get();
// nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}, SimpleMultiNodeRoute.ALL_NODES).get();
// }
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ValkeyGlideConnection extends AbstractValkeyConnection {
private final AtomicBoolean closed = new AtomicBoolean(false);

private final List<ResultMapper<?, ?>> batchCommandsConverters = new ArrayList<>();
private final Set<byte[]> watchedKeys = new HashSet<>();
protected final Set<byte[]> watchedKeys = new HashSet<>();
private @Nullable Subscription subscription;

// Command interfaces
Expand Down Expand Up @@ -202,30 +202,25 @@ public void close() throws DataAccessException {
* This ensures the next connection gets a clean client without stale state.
*/
protected void cleanupConnectionState() {
// Dont use RESET - we will destroy the configured state
// Use valkey-glide native pipe to selectively clear the state on the backends,
// adapter and connection object do not matter - they are being destroyed
// some state cannot be cleared (like stats) but this is acceptable if pooling to be used

GlideClient nativeClient = (GlideClient) unifiedClient.getNativeClient();

@SuppressWarnings("unchecked")
Callable<Void>[] actions = new Callable[] {
() -> nativeClient.customCommand(new String[]{"UNWATCH"}).get(),
// TODO: Uncomment when dynamic pubsub is implemented
// () -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}).get(),
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get()
};

for (Callable<Void> action : actions) {
// Only send UNWATCH if keys were actually watched
if (!watchedKeys.isEmpty()) {
GlideClient nativeClient = (GlideClient) unifiedClient.getNativeClient();
try {
action.call();
nativeClient.customCommand(new String[]{"UNWATCH"}).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// Ignore cleanup errors
}
watchedKeys.clear();
}

// TODO: Add similar checks for pubsub when implemented
// if (hasActiveSubscriptions) {
// nativeClient.customCommand(new String[]{"UNSUBSCRIBE"}).get();
// nativeClient.customCommand(new String[]{"PUNSUBSCRIBE"}).get();
// nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get();
// }
}

@Override
Expand Down