Skip to content

Commit 3b3942f

Browse files
committed
Admin/XMover: Implement suggestions by CodeRabbit
1 parent b288ab8 commit 3b3942f

File tree

15 files changed

+100
-72
lines changed

15 files changed

+100
-72
lines changed

cratedb_toolkit/admin/xmover/analysis/shard.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, client: CrateDBClient):
3838
self.shards: List[ShardInfo] = []
3939

4040
# Initialize session-based caches for performance.
41-
self._zone_conflict_cache: Dict[Tuple[str, int, str], Union[str, None]] = {}
41+
self._zone_conflict_cache: Dict[Tuple[str, str, int, str], Union[str, None]] = {}
4242
self._node_lookup_cache: Dict[str, Union[NodeInfo, None]] = {}
4343
self._target_nodes_cache: Dict[Tuple[float, frozenset[Any], float, float], List[NodeInfo]] = {}
4444
self._cache_hits = 0
@@ -183,8 +183,6 @@ def find_nodes_with_capacity(
183183
free_space_gb = node.available_space_gb
184184
if free_space_gb >= (required_space_gb + min_free_space_gb):
185185
available_nodes.append(node)
186-
else:
187-
continue
188186

189187
# Sort by available space (most space first) - prioritize nodes with more free space
190188
available_nodes.sort(key=lambda n: n.available_space_gb, reverse=True)
@@ -206,7 +204,7 @@ def generate_rebalancing_recommendations(
206204
# Get moveable shards (only healthy ones for actual operations)
207205
moveable_shards = self.find_moveable_shards(constraints.min_size, constraints.max_size, constraints.table_name)
208206

209-
print(
207+
logger.info(
210208
f"Analyzing {len(moveable_shards)} candidate shards "
211209
f"in size range {constraints.min_size}-{constraints.max_size}GB..."
212210
)
@@ -239,12 +237,11 @@ def generate_rebalancing_recommendations(
239237
# Optimize processing: if filtering by source node, only process those shards
240238
if constraints.source_node:
241239
processing_shards = [s for s in moveable_shards if s.node_name == constraints.source_node]
242-
print(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
240+
logger.info(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
243241
else:
244242
processing_shards = moveable_shards
245243

246244
# Generate move recommendations
247-
safe_recommendations = 0 # noqa: F841
248245
total_evaluated = 0
249246

250247
for i, shard in enumerate(processing_shards):
@@ -368,12 +365,12 @@ def generate_rebalancing_recommendations(
368365

369366
if len(processing_shards) > 100:
370367
print() # New line after progress dots
371-
print(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
372-
print(f"Performance: {self.get_cache_stats()}")
368+
logger.info(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
369+
logger.info(f"Performance: {self.get_cache_stats()}")
373370
return recommendations
374371

375372
def validate_move_safety(
376-
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0
373+
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0, buffer_gb: float = 50.0
377374
) -> Tuple[bool, str]:
378375
"""Validate that a move recommendation is safe to execute"""
379376
# Find target node (with caching)
@@ -388,7 +385,7 @@ def validate_move_safety(
388385
return False, zone_conflict
389386

390387
# Check available space
391-
required_space_gb = recommendation.size_gb + 50 # 50GB buffer
388+
required_space_gb = recommendation.size_gb + buffer_gb
392389
if target_node.available_space_gb < required_space_gb:
393390
return (
394391
False,
@@ -423,7 +420,7 @@ def _check_zone_conflict_cached(self, recommendation: ShardRelocationResponse) -
423420
"""Check zone conflicts with caching"""
424421
# Create cache key: table, shard, target zone
425422
target_zone = self._get_node_zone(recommendation.to_node)
426-
cache_key = (recommendation.table_name, recommendation.shard_id, target_zone)
423+
cache_key = (recommendation.schema_name, recommendation.table_name, recommendation.shard_id, target_zone)
427424

428425
if cache_key in self._zone_conflict_cache:
429426
self._cache_hits += 1
@@ -813,11 +810,14 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
813810
# Determine feasibility
814811
feasible = len(infeasible_moves) == 0
815812

813+
# Safety margin for cluster capacity after decommission
814+
capacity_safety_margin = 1.2 # 20 % buffer
815+
816816
# Add capacity warnings
817817
if feasible:
818-
# Check if remaining cluster capacity is sufficient after decommission
818+
# Check if the remaining cluster capacity is sufficient after decommission
819819
remaining_capacity = sum(n.available_space_gb for n in self.nodes if n.name != node_name)
820-
if remaining_capacity < total_size_gb * 1.2: # 20% safety margin
820+
if remaining_capacity < total_size_gb * capacity_safety_margin:
821821
warnings.append(
822822
f"Low remaining capacity after decommission. "
823823
f"Only {remaining_capacity:.1f}GB available for {total_size_gb:.1f}GB of data"
@@ -833,7 +833,7 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
833833
"recommendations": move_plan,
834834
"infeasible_moves": infeasible_moves,
835835
"warnings": warnings,
836-
"estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 6 minutes per move
836+
"estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 0.1 hours (6 minutes) per move
837837
"message": "Decommission plan generated" if feasible else "Decommission not currently feasible",
838838
}
839839

cratedb_toolkit/admin/xmover/analysis/table.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from rich.console import Console
1515
from rich.table import Table
1616

17+
from cratedb_toolkit.admin.xmover.model import NodeInfo
1718
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient
1819

1920
logger = logging.getLogger(__name__)
@@ -97,6 +98,9 @@ def find_table_by_name(self, table_name: str) -> Optional[str]:
9798

9899
try:
99100
choice = input("\nSelect table (enter number): ").strip()
101+
if not choice:
102+
rprint("[yellow]No selection made[/yellow]")
103+
return None
100104
idx = int(choice) - 1
101105
if 0 <= idx < len(rows):
102106
schema, table = rows[idx]
@@ -292,14 +296,9 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
292296
zone_distribution = {}
293297
for node_name, node_data in table_dist.node_distributions.items():
294298
# Try to get zone info for each node
295-
node_info = next((n for n in all_nodes_info if n.name == node_name), None)
296-
if (
297-
node_info
298-
and hasattr(node_info, "attributes")
299-
and node_info.attributes
300-
and "zone" in node_info.attributes
301-
):
302-
zone = node_info.attributes["zone"]
299+
node_info: Optional[NodeInfo] = next((n for n in all_nodes_info if n.name == node_name), None)
300+
if node_info and node_info.zone:
301+
zone = node_info.zone
303302
if zone not in zone_distribution:
304303
zone_distribution[zone] = {"nodes": 0, "shards": 0, "size": 0}
305304
zone_distribution[zone]["nodes"] += 1

cratedb_toolkit/admin/xmover/analysis/zone.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ def distribution_conflicts(self, shard_details: bool = False, table: Optional[st
135135
health_indicator = "✓" if shard_copy.routing_state == "STARTED" else "⚠"
136136
console.print(
137137
f" {health_indicator} {shard_copy.shard_type} "
138-
f"on {shard_copy.node_name} ({shard_copy.zone}) - {shard_copy.routing_state}"
138+
f"on {shard_copy.node_name} ({shard_copy.zone}) - "
139+
f"{shard_copy.state}/{shard_copy.routing_state}"
139140
)
140141

141142
console.print(analysis_table)

cratedb_toolkit/admin/xmover/cli.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
Command Line Interface.
55
"""
66

7-
import sys
87
import time
98
from typing import Optional
109

@@ -46,11 +45,11 @@ def main(ctx):
4645
if not client.test_connection():
4746
console.print("[red]Error: Could not connect to CrateDB[/red]")
4847
console.print("Please check your CRATE_CONNECTION_STRING in .env file")
49-
sys.exit(1)
48+
raise click.Abort()
5049
ctx.obj["client"] = client
5150
except Exception as e:
5251
console.print(f"[red]Error connecting to CrateDB: {e}[/red]")
53-
sys.exit(1)
52+
raise click.Abort() from e
5453

5554

5655
@main.command()
@@ -170,11 +169,11 @@ def test_connection(ctx, connection_string: Optional[str]):
170169
console.print(f" • {node.name} (zone: {node.zone})")
171170
else:
172171
console.print("[red]✗ Connection failed[/red]")
173-
sys.exit(1)
172+
raise click.Abort()
174173

175174
except Exception as e:
176175
console.print(f"[red]✗ Connection error: {e}[/red]")
177-
sys.exit(1)
176+
raise click.Abort() from e
178177

179178

180179
@main.command()
@@ -525,13 +524,14 @@ def monitor_recovery(
525524
xmover monitor-recovery --watch # Continuous monitoring
526525
xmover monitor-recovery --recovery-type PEER # Only PEER recoveries
527526
"""
527+
effective_recovery_type = None if recovery_type == "all" else recovery_type
528528
recovery_monitor = RecoveryMonitor(
529529
client=ctx.obj["client"],
530530
options=RecoveryOptions(
531531
table=table,
532532
node=node,
533533
refresh_interval=refresh_interval,
534-
recovery_type=recovery_type,
534+
recovery_type=effective_recovery_type,
535535
include_transitioning=include_transitioning,
536536
),
537537
)

cratedb_toolkit/admin/xmover/model.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import dataclasses
21
from dataclasses import dataclass
32
from typing import Dict, Optional
43

@@ -149,6 +148,12 @@ def safety_score(self) -> float:
149148
if "rebalancing" in self.reason.lower():
150149
score += 0.2
151150

151+
# Consider shard size - smaller shards are safer to move
152+
if self.size_gb < 10:
153+
score += 0.1
154+
elif self.size_gb > 100:
155+
score -= 0.2
156+
152157
# Ensure score stays in valid range
153158
return max(0.0, min(1.0, score))
154159

@@ -165,15 +170,15 @@ class DistributionStats:
165170
node_balance_score: float # 0-100, higher is better
166171

167172

168-
@dataclasses.dataclass
173+
@dataclass
169174
class SizeCriteria:
170175
min_size: float = 40.0
171176
max_size: float = 60.0
172177
table_name: Optional[str] = None
173178
source_node: Optional[str] = None
174179

175180

176-
@dataclasses.dataclass
181+
@dataclass
177182
class ShardRelocationConstraints:
178183
min_size: float = SizeCriteria().min_size
179184
max_size: float = SizeCriteria().max_size

cratedb_toolkit/admin/xmover/operational/candidates.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class CandidateFinder:
1414
def __init__(self, analyzer: ShardAnalyzer):
1515
self.analyzer = analyzer
1616

17-
def movement_candidates(self, criteria: SizeCriteria, limit: int):
17+
def movement_candidates(self, criteria: SizeCriteria, limit: int) -> int:
1818
"""
1919
Find shard candidates for movement based on size criteria
2020
@@ -23,7 +23,7 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int):
2323
"""
2424

2525
console.print(
26-
Panel.fit(f"[bold blue]Finding Moveable Shards ({criteria.min_size}-{criteria.max_size}GB)[/bold blue]")
26+
Panel.fit(f"[bold blue]Finding Movable Shards ({criteria.min_size}-{criteria.max_size}GB)[/bold blue]")
2727
)
2828

2929
if criteria.source_node:
@@ -45,7 +45,7 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int):
4545
console.print("[dim]Tip: Try different size ranges or remove --node filter to see all candidates[/dim]")
4646
else:
4747
console.print("[yellow]No moveable shards found in the specified size range.[/yellow]")
48-
return
48+
return 0
4949

5050
# Show limited results
5151
shown_candidates = candidates[:limit]
@@ -82,3 +82,5 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int):
8282

8383
if len(candidates) > limit:
8484
console.print(f"\n[dim]... and {len(candidates) - limit} more candidates[/dim]")
85+
86+
return len(candidates)

cratedb_toolkit/admin/xmover/operational/monitor.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def get_cluster_recovery_status(self) -> List[RecoveryInfo]:
3737
)
3838

3939
# Apply recovery type filter
40-
if self.options.recovery_type is not None:
40+
if self.options.recovery_type is not None and self.options.recovery_type.lower() != "all":
4141
recoveries = [r for r in recoveries if r.recovery_type.upper() == self.options.recovery_type.upper()]
4242

4343
return recoveries
@@ -178,7 +178,6 @@ def start(self, watch: bool, debug: bool = False):
178178

179179
# Track previous state for change detection
180180
previous_recoveries: Dict[str, Dict[str, Any]] = {}
181-
previous_timestamp = None
182181
first_run = True
183182

184183
while True:
@@ -307,7 +306,6 @@ def start(self, watch: bool, debug: bool = False):
307306
elif active_count > 0:
308307
console.print(f"{current_time} | {status} (no changes)")
309308

310-
previous_timestamp = current_time # noqa: F841
311309
first_run = False
312310
time.sleep(self.options.refresh_interval)
313311

cratedb_toolkit/admin/xmover/operational/recommend.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ def validate(self, request: ShardRelocationRequest):
123123
console.print()
124124
console.print("[dim]# Monitor shard health after execution[/dim]")
125125
console.print(
126-
"[dim]# Check with: SELECT * FROM sys.shards "
127-
"WHERE table_name = '{table_name}' AND id = {shard_id};[/dim]"
126+
"[dim]# Check with: SELECT * FROM sys.shards " # noqa: S608
127+
f"WHERE table_name = '{table_name}' AND id = {request.shard_id};[/dim]"
128128
)
129129
else:
130130
console.print("[red]✗ VALIDATION FAILED - Move not safe[/red]")
@@ -323,7 +323,7 @@ def execute(
323323
rec, max_disk_usage_percent=constraints.max_disk_usage
324324
)
325325
if not is_safe:
326-
if "Zone conflict" in safety_msg:
326+
if "zone conflict" in safety_msg.lower():
327327
zone_conflicts += 1
328328
console.print(f"-- Move {i}: SKIPPED - {safety_msg}")
329329
console.print(
@@ -340,7 +340,7 @@ def execute(
340340

341341
# Auto-execution if requested
342342
if auto_execute:
343-
self._execute_recommendations_safely(recommendations, validate)
343+
self._execute_recommendations_safely(constraints, recommendations, validate)
344344

345345
if validate and safe_moves < len(recommendations):
346346
if zone_conflicts > 0:
@@ -352,14 +352,16 @@ def execute(
352352
f"[yellow]Warning: Only {safe_moves} of {len(recommendations)} moves passed safety validation[/yellow]"
353353
)
354354

355-
def _execute_recommendations_safely(self, recommendations, validate: bool):
355+
def _execute_recommendations_safely(self, constraints, recommendations, validate: bool):
356356
"""Execute recommendations with extensive safety measures"""
357357

358358
# Filter to only safe recommendations
359359
safe_recommendations = []
360360
if validate:
361361
for rec in recommendations:
362-
is_safe, safety_msg = self.analyzer.validate_move_safety(rec, max_disk_usage_percent=95.0)
362+
is_safe, safety_msg = self.analyzer.validate_move_safety(
363+
rec, max_disk_usage_percent=constraints.max_disk_usage
364+
)
363365
if is_safe:
364366
safe_recommendations.append(rec)
365367
else:
@@ -423,7 +425,8 @@ def _execute_recommendations_safely(self, recommendations, validate: bool):
423425
# Execute the SQL command
424426
result = self.client.execute_query(sql_command)
425427

426-
if result.get("rowcount", 0) >= 0: # Success indicator for ALTER statements
428+
# ALTER TABLE REROUTE commands don't return rowcount, check for no error instead.
429+
if "error" not in result:
427430
console.print(" [green]✅ SUCCESS[/green] - Move initiated")
428431
successful_moves += 1
429432

@@ -482,7 +485,8 @@ def _wait_for_recovery_capacity(self, max_concurrent_recoveries: int = 5):
482485
while True:
483486
# Check active recoveries (including transitioning)
484487
recoveries = recovery_monitor.get_cluster_recovery_status()
485-
active_count = len([r for r in recoveries if r.overall_progress < 100.0 or r.stage != "DONE"])
488+
# Count recoveries that are actively running (not completed)
489+
active_count = len([r for r in recoveries if r.overall_progress < 100.0])
486490
status = f"{active_count}/{max_concurrent_recoveries}"
487491
if active_count < max_concurrent_recoveries:
488492
if wait_time > 0:

0 commit comments

Comments
 (0)