1
1
import datetime
2
2
from typing import List
3
3
4
- from sqlalchemy import select
4
+ from sqlalchemy import select , update
5
5
from sqlalchemy .ext .asyncio import AsyncSession
6
+ from sqlalchemy .orm import selectinload
6
7
7
8
from dstack ._internal .core .models .profiles import parse_duration
8
9
from dstack ._internal .core .models .volumes import VolumeStatus
9
10
from dstack ._internal .server .db import get_session_ctx
10
11
from dstack ._internal .server .models import VolumeModel
11
12
from dstack ._internal .server .services .locking import get_locker
12
- from dstack ._internal .server .services .volumes import delete_volumes , get_volume_configuration
13
+ from dstack ._internal .server .services .volumes import get_volume_configuration
13
14
from dstack ._internal .utils .common import get_current_datetime
14
15
from dstack ._internal .utils .logging import get_logger
15
16
19
20
async def process_idle_volumes ():
20
21
lock , lockset = get_locker ().get_lockset (VolumeModel .__tablename__ )
21
22
async with get_session_ctx () as session :
23
+ # Take lock, select IDs, add to lockset, release lock
22
24
async with lock :
23
25
res = await session .execute (
24
- select (VolumeModel )
26
+ select (VolumeModel . id )
25
27
.where (
26
28
VolumeModel .status == VolumeStatus .ACTIVE ,
27
29
VolumeModel .deleted == False ,
@@ -31,12 +33,21 @@ async def process_idle_volumes():
31
33
.limit (10 )
32
34
.with_for_update (skip_locked = True )
33
35
)
34
- volumes = list (res . unique () .scalars ().all ())
35
- if not volumes :
36
+ volume_ids = list (res .scalars ().all ())
37
+ if not volume_ids :
36
38
return
37
- for volume in volumes :
38
- await session .refresh (volume , ["project" , "attachments" ])
39
- lockset .add (volume .id )
39
+ for volume_id in volume_ids :
40
+ lockset .add (volume_id )
41
+
42
+ # Load volumes with related attributes in one query
43
+ res = await session .execute (
44
+ select (VolumeModel )
45
+ .where (VolumeModel .id .in_ (volume_ids ))
46
+ .options (selectinload (VolumeModel .project ))
47
+ .options (selectinload (VolumeModel .attachments ))
48
+ .execution_options (populate_existing = True )
49
+ )
50
+ volumes = list (res .unique ().scalars ().all ())
40
51
41
52
try :
42
53
to_delete = []
@@ -45,11 +56,11 @@ async def process_idle_volumes():
45
56
to_delete .append (volume )
46
57
47
58
if to_delete :
48
- await _delete_volumes (session , to_delete )
59
+ await _delete_idle_volumes (session , to_delete )
49
60
50
61
finally :
51
- for volume in volumes :
52
- lockset .discard (volume . id )
62
+ for volume_id in volume_ids :
63
+ lockset .discard (volume_id )
53
64
54
65
55
66
def _should_delete_volume (volume : VolumeModel ) -> bool :
@@ -89,16 +100,21 @@ def _get_idle_time(volume: VolumeModel) -> datetime.timedelta:
89
100
return max (idle_time , datetime .timedelta (0 ))
90
101
91
102
92
- async def _delete_volumes (session : AsyncSession , volumes : List [VolumeModel ]):
93
- by_project = {}
103
+ async def _delete_idle_volumes (session : AsyncSession , volumes : List [VolumeModel ]):
104
+ """Delete idle volumes without using the delete_volumes function to avoid locking conflicts."""
94
105
for volume in volumes :
95
- project = volume .project
96
- if project not in by_project :
97
- by_project [project ] = []
98
- by_project [project ].append (volume .name )
99
-
100
- for project , names in by_project .items ():
101
106
try :
102
- await delete_volumes (session , project , names )
107
+ # Mark volume as deleted
108
+ await session .execute (
109
+ update (VolumeModel )
110
+ .where (VolumeModel .id == volume .id )
111
+ .values (
112
+ deleted = True ,
113
+ deleted_at = get_current_datetime (),
114
+ )
115
+ )
116
+ logger .info ("Marked idle volume %s for deletion" , volume .name )
103
117
except Exception :
104
- logger .exception ("Failed to delete volumes for project %s" , project .name )
118
+ logger .exception ("Failed to mark volume %s for deletion" , volume .name )
119
+
120
+ await session .commit ()
0 commit comments