42
42
import org .slf4j .LoggerFactory ;
43
43
44
44
/**
45
- * {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest
46
- * itself. Since data files may be present in multiple manifests across different snapshots, we
47
- * assume a data file that doesn't exist is missing because it was already deleted by another task.
45
+ * {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the
46
+ * files in a manifest and the manifest itself. Since data files may be present in multiple
47
+ * manifests across different snapshots, we assume a data file that doesn't exist is missing because
48
+ * it was already deleted by another task. 2. Table metadata files: It contains previous metadata
49
+ * and statistics files, which are grouped and deleted in batch
48
50
*/
51
+ // TODO: Rename this class since we introducing metadata cleanup here
49
52
public class ManifestFileCleanupTaskHandler implements TaskHandler {
50
53
public static final int MAX_ATTEMPTS = 3 ;
51
54
public static final int FILE_DELETION_RETRY_MILLIS = 100 ;
@@ -62,66 +65,119 @@ public ManifestFileCleanupTaskHandler(
62
65
63
66
@ Override
64
67
public boolean canHandleTask (TaskEntity task ) {
65
- return task .getTaskType () == AsyncTaskType .FILE_CLEANUP ;
68
+ return task .getTaskType () == AsyncTaskType .MANIFEST_FILE_CLEANUP
69
+ || task .getTaskType () == AsyncTaskType .METADATA_FILE_BATCH_CLEANUP ;
66
70
}
67
71
68
72
@ Override
69
73
public boolean handleTask (TaskEntity task ) {
70
74
ManifestCleanupTask cleanupTask = task .readData (ManifestCleanupTask .class );
71
- ManifestFile manifestFile = decodeManifestData (cleanupTask .getManifestFileData ());
72
75
TableIdentifier tableId = cleanupTask .getTableId ();
73
76
try (FileIO authorizedFileIO = fileIOSupplier .apply (task )) {
74
-
75
- // if the file doesn't exist, we assume that another task execution was successful, but failed
76
- // to drop the task entity. Log a warning and return success
77
- if (!TaskUtils .exists (manifestFile .path (), authorizedFileIO )) {
77
+ if (task .getTaskType () == AsyncTaskType .MANIFEST_FILE_CLEANUP ) {
78
+ ManifestFile manifestFile = decodeManifestData (cleanupTask .getManifestFileData ());
79
+ return cleanUpManifestFile (manifestFile , authorizedFileIO , tableId );
80
+ } else if (task .getTaskType () == AsyncTaskType .METADATA_FILE_BATCH_CLEANUP ) {
81
+ return cleanUpMetadataFiles (cleanupTask .getMetadataFiles (), authorizedFileIO , tableId );
82
+ } else {
78
83
LOGGER
79
84
.atWarn ()
80
- .addKeyValue ("manifestFile" , manifestFile .path ())
81
85
.addKeyValue ("tableId" , tableId )
82
- .log ("Manifest cleanup task scheduled, but manifest file doesn't exist" );
83
- return true ;
84
- }
85
-
86
- ManifestReader <DataFile > dataFiles = ManifestFiles .read (manifestFile , authorizedFileIO );
87
- List <CompletableFuture <Void >> dataFileDeletes =
88
- StreamSupport .stream (
89
- Spliterators .spliteratorUnknownSize (dataFiles .iterator (), Spliterator .IMMUTABLE ),
90
- false )
91
- .map (
92
- file ->
93
- tryDelete (
94
- tableId , authorizedFileIO , manifestFile , file .path ().toString (), null , 1 ))
95
- .toList ();
96
- LOGGER .debug (
97
- "Scheduled {} data files to be deleted from manifest {}" ,
98
- dataFileDeletes .size (),
99
- manifestFile .path ());
100
- try {
101
- // wait for all data files to be deleted, then wait for the manifest itself to be deleted
102
- CompletableFuture .allOf (dataFileDeletes .toArray (CompletableFuture []::new ))
103
- .thenCompose (
104
- (v ) -> {
105
- LOGGER
106
- .atInfo ()
107
- .addKeyValue ("manifestFile" , manifestFile .path ())
108
- .log ("All data files in manifest deleted - deleting manifest" );
109
- return tryDelete (
110
- tableId , authorizedFileIO , manifestFile , manifestFile .path (), null , 1 );
111
- })
112
- .get ();
113
- return true ;
114
- } catch (InterruptedException e ) {
115
- LOGGER .error (
116
- "Interrupted exception deleting data files from manifest {}" , manifestFile .path (), e );
117
- throw new RuntimeException (e );
118
- } catch (ExecutionException e ) {
119
- LOGGER .error ("Unable to delete data files from manifest {}" , manifestFile .path (), e );
86
+ .log ("Unknown task type {}" , task .getTaskType ());
120
87
return false ;
121
88
}
122
89
}
123
90
}
124
91
92
+ private boolean cleanUpManifestFile (
93
+ ManifestFile manifestFile , FileIO fileIO , TableIdentifier tableId ) {
94
+ // if the file doesn't exist, we assume that another task execution was successful, but
95
+ // failed to drop the task entity. Log a warning and return success
96
+ if (!TaskUtils .exists (manifestFile .path (), fileIO )) {
97
+ LOGGER
98
+ .atWarn ()
99
+ .addKeyValue ("manifestFile" , manifestFile .path ())
100
+ .addKeyValue ("tableId" , tableId )
101
+ .log ("Manifest cleanup task scheduled, but manifest file doesn't exist" );
102
+ return true ;
103
+ }
104
+
105
+ ManifestReader <DataFile > dataFiles = ManifestFiles .read (manifestFile , fileIO );
106
+ List <CompletableFuture <Void >> dataFileDeletes =
107
+ StreamSupport .stream (
108
+ Spliterators .spliteratorUnknownSize (dataFiles .iterator (), Spliterator .IMMUTABLE ),
109
+ false )
110
+ .map (file -> tryDelete (tableId , fileIO , manifestFile , file .path ().toString (), null , 1 ))
111
+ .toList ();
112
+ LOGGER .debug (
113
+ "Scheduled {} data files to be deleted from manifest {}" ,
114
+ dataFileDeletes .size (),
115
+ manifestFile .path ());
116
+ try {
117
+ // wait for all data files to be deleted, then wait for the manifest itself to be deleted
118
+ CompletableFuture .allOf (dataFileDeletes .toArray (CompletableFuture []::new ))
119
+ .thenCompose (
120
+ (v ) -> {
121
+ LOGGER
122
+ .atInfo ()
123
+ .addKeyValue ("manifestFile" , manifestFile .path ())
124
+ .log ("All data files in manifest deleted - deleting manifest" );
125
+ return tryDelete (tableId , fileIO , manifestFile , manifestFile .path (), null , 1 );
126
+ })
127
+ .get ();
128
+ return true ;
129
+ } catch (InterruptedException e ) {
130
+ LOGGER .error (
131
+ "Interrupted exception deleting data files from manifest {}" , manifestFile .path (), e );
132
+ throw new RuntimeException (e );
133
+ } catch (ExecutionException e ) {
134
+ LOGGER .error ("Unable to delete data files from manifest {}" , manifestFile .path (), e );
135
+ return false ;
136
+ }
137
+ }
138
+
139
+ private boolean cleanUpMetadataFiles (
140
+ List <String > metadataFiles , FileIO fileIO , TableIdentifier tableId ) {
141
+ List <String > validFiles =
142
+ metadataFiles .stream ().filter (file -> TaskUtils .exists (file , fileIO )).toList ();
143
+ if (validFiles .isEmpty ()) {
144
+ LOGGER
145
+ .atWarn ()
146
+ .addKeyValue ("metadataFiles" , metadataFiles .toString ())
147
+ .addKeyValue ("tableId" , tableId )
148
+ .log ("Table metadata cleanup task scheduled, but the none of the file in batch exists" );
149
+ return true ;
150
+ }
151
+ if (validFiles .size () < metadataFiles .size ()) {
152
+ List <String > missingFiles =
153
+ metadataFiles .stream ().filter (file -> !TaskUtils .exists (file , fileIO )).toList ();
154
+ LOGGER
155
+ .atWarn ()
156
+ .addKeyValue ("metadataFiles" , metadataFiles .toString ())
157
+ .addKeyValue ("missingFiles" , missingFiles )
158
+ .addKeyValue ("tableId" , tableId )
159
+ .log (
160
+ "Table metadata cleanup task scheduled, but {} files in the batch are missing" ,
161
+ missingFiles .size ());
162
+ }
163
+
164
+ // Schedule the deletion for each file asynchronously
165
+ List <CompletableFuture <Void >> deleteFutures =
166
+ validFiles .stream ().map (file -> tryDelete (tableId , fileIO , null , file , null , 1 )).toList ();
167
+
168
+ try {
169
+ // Wait for all delete operations to finish
170
+ CompletableFuture <Void > allDeletes =
171
+ CompletableFuture .allOf (deleteFutures .toArray (new CompletableFuture [0 ]));
172
+ allDeletes .join ();
173
+ } catch (Exception e ) {
174
+ LOGGER .error ("Exception detected during metadata file deletion" , e );
175
+ return false ;
176
+ }
177
+
178
+ return true ;
179
+ }
180
+
125
181
private static ManifestFile decodeManifestData (String manifestFileData ) {
126
182
try {
127
183
return ManifestFiles .decode (Base64 .decodeBase64 (manifestFileData ));
@@ -134,16 +190,16 @@ private CompletableFuture<Void> tryDelete(
134
190
TableIdentifier tableId ,
135
191
FileIO fileIO ,
136
192
ManifestFile manifestFile ,
137
- String dataFile ,
193
+ String file ,
138
194
Throwable e ,
139
195
int attempt ) {
140
196
if (e != null && attempt <= MAX_ATTEMPTS ) {
141
197
LOGGER
142
198
.atWarn ()
143
- .addKeyValue ("dataFile " , dataFile )
199
+ .addKeyValue ("file " , file )
144
200
.addKeyValue ("attempt" , attempt )
145
201
.addKeyValue ("error" , e .getMessage ())
146
- .log ("Error encountered attempting to delete data file" );
202
+ .log ("Error encountered attempting to delete file" );
147
203
}
148
204
if (attempt > MAX_ATTEMPTS && e != null ) {
149
205
return CompletableFuture .failedFuture (e );
@@ -155,27 +211,27 @@ private CompletableFuture<Void> tryDelete(
155
211
// file's existence, but then it is deleted before we have a chance to
156
212
// send the delete request. In such a case, we <i>should</i> retry
157
213
// and find
158
- if (TaskUtils .exists (dataFile , fileIO )) {
159
- fileIO .deleteFile (dataFile );
214
+ if (TaskUtils .exists (file , fileIO )) {
215
+ fileIO .deleteFile (file );
160
216
} else {
161
217
LOGGER
162
218
.atInfo ()
163
- .addKeyValue ("dataFile " , dataFile )
164
- .addKeyValue ("manifestFile" , manifestFile .path ())
219
+ .addKeyValue ("file " , file )
220
+ .addKeyValue ("manifestFile" , manifestFile != null ? manifestFile .path () : "" )
165
221
.addKeyValue ("tableId" , tableId )
166
- .log ("Manifest cleanup task scheduled, but data file doesn't exist" );
222
+ .log ("table file cleanup task scheduled, but data file doesn't exist" );
167
223
}
168
224
},
169
225
executorService )
170
226
.exceptionallyComposeAsync (
171
227
newEx -> {
172
228
LOGGER
173
229
.atWarn ()
174
- .addKeyValue ("dataFile" , dataFile )
175
- .addKeyValue ("tableIdentifer " , tableId )
176
- .addKeyValue ("manifestFile" , manifestFile .path ())
230
+ .addKeyValue ("dataFile" , file )
231
+ .addKeyValue ("tableIdentifier " , tableId )
232
+ .addKeyValue ("manifestFile" , manifestFile != null ? manifestFile .path () : "" )
177
233
.log ("Exception caught deleting data file from manifest" , newEx );
178
- return tryDelete (tableId , fileIO , manifestFile , dataFile , newEx , attempt + 1 );
234
+ return tryDelete (tableId , fileIO , manifestFile , file , newEx , attempt + 1 );
179
235
},
180
236
CompletableFuture .delayedExecutor (
181
237
FILE_DELETION_RETRY_MILLIS , TimeUnit .MILLISECONDS , executorService ));
@@ -185,12 +241,18 @@ private CompletableFuture<Void> tryDelete(
185
241
public static final class ManifestCleanupTask {
186
242
private TableIdentifier tableId ;
187
243
private String manifestFileData ;
244
+ private List <String > metadataFiles ;
188
245
189
246
public ManifestCleanupTask (TableIdentifier tableId , String manifestFileData ) {
190
247
this .tableId = tableId ;
191
248
this .manifestFileData = manifestFileData ;
192
249
}
193
250
251
+ public ManifestCleanupTask (TableIdentifier tableId , List <String > metadataFiles ) {
252
+ this .tableId = tableId ;
253
+ this .metadataFiles = metadataFiles ;
254
+ }
255
+
194
256
public ManifestCleanupTask () {}
195
257
196
258
public TableIdentifier getTableId () {
@@ -209,17 +271,26 @@ public void setManifestFileData(String manifestFileData) {
209
271
this .manifestFileData = manifestFileData ;
210
272
}
211
273
274
+ public List <String > getMetadataFiles () {
275
+ return metadataFiles ;
276
+ }
277
+
278
+ public void setMetadataFiles (List <String > metadataFiles ) {
279
+ this .metadataFiles = metadataFiles ;
280
+ }
281
+
212
282
@ Override
213
283
public boolean equals (Object object ) {
214
284
if (this == object ) return true ;
215
285
if (!(object instanceof ManifestCleanupTask that )) return false ;
216
286
return Objects .equals (tableId , that .tableId )
217
- && Objects .equals (manifestFileData , that .manifestFileData );
287
+ && Objects .equals (manifestFileData , that .manifestFileData )
288
+ && Objects .equals (metadataFiles , that .metadataFiles );
218
289
}
219
290
220
291
@ Override
221
292
public int hashCode () {
222
- return Objects .hash (tableId , manifestFileData );
293
+ return Objects .hash (tableId , manifestFileData , metadataFiles );
223
294
}
224
295
}
225
296
}
0 commit comments