@@ -37,7 +37,7 @@ use graph::{
37
37
info, lazy_static, o, warn, BlockNumber , BlockPtr , CheapClone , Logger , StoreError , ENV_VARS ,
38
38
} ,
39
39
schema:: EntityType ,
40
- slog:: error,
40
+ slog:: { debug , error} ,
41
41
tokio,
42
42
} ;
43
43
use itertools:: Itertools ;
@@ -1121,26 +1121,61 @@ impl Connection {
1121
1121
// we do not ever leave the loop with `self.conn == None`
1122
1122
let mut workers = Workers :: new ( ) ;
1123
1123
while !state. unfinished . is_empty ( ) || workers. has_work ( ) {
1124
+ debug ! ( self . logger, "copy_data_internal: looking for more work" ;
1125
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1124
1126
// We usually add at least one job here, except if we are out of
1125
1127
// tables to copy. In that case, we go through the `while` loop
1126
1128
// every time one of the tables we are currently copying
1127
1129
// finishes
1128
1130
if let Some ( worker) = self . default_worker ( & mut state, & progress) {
1129
1131
workers. add ( worker) ;
1132
+ debug ! ( self . logger, "copy_data_internal: found more work for default worker" ;
1133
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1134
+ } else {
1135
+ debug ! ( self . logger, "copy_data_internal: no more work for default worker" ;
1136
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1130
1137
}
1131
1138
loop {
1132
1139
if workers. len ( ) >= self . workers {
1140
+ debug ! ( self . logger, "copy_data_internal: max workers reached, not looking for extra work" ;
1141
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1142
+
1133
1143
break ;
1134
1144
}
1135
1145
let Some ( worker) = self . extra_worker ( & mut state, & progress) . await else {
1146
+ debug ! ( self . logger, "copy_data_internal: no more work for extra worker" ;
1147
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1136
1148
break ;
1137
1149
} ;
1150
+ debug ! ( self . logger, "copy_data_internal: found more work for extra worker" ;
1151
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1138
1152
workers. add ( worker) ;
1139
1153
}
1140
1154
1141
1155
self . assert_progress ( workers. len ( ) , & state) ?;
1156
+ debug ! ( self . logger, "copy_data_internal: running workers" ;
1157
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1142
1158
let result = workers. select ( ) . await ;
1143
1159
1160
+ match & result {
1161
+ W :: Ok ( copy_table_worker) => {
1162
+ debug ! ( self . logger, "copy_data_internal: worker finished successfully" ;
1163
+ "workers" => workers. len( ) ,
1164
+ "unfinished" => state. unfinished. len( ) ,
1165
+ "table" => copy_table_worker. table. dst. name. as_str( ) ) ;
1166
+ }
1167
+ W :: Err ( store_error) => {
1168
+ debug ! ( self . logger, "copy_data_internal: worker finished with error" ;
1169
+ "workers" => workers. len( ) ,
1170
+ "unfinished" => state. unfinished. len( ) ,
1171
+ "error" => store_error. to_string( ) ) ;
1172
+ }
1173
+ W :: Wake => {
1174
+ debug ! ( self . logger, "copy_data_internal: waker finished" ;
1175
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1176
+ }
1177
+ }
1178
+
1144
1179
// Analyze `result` and take another trip through the loop if
1145
1180
// everything is ok; wait for pending workers and return if
1146
1181
// there was an error or if copying was cancelled.
@@ -1187,6 +1222,8 @@ impl Connection {
1187
1222
} ;
1188
1223
}
1189
1224
debug_assert ! ( self . conn. is_some( ) ) ;
1225
+ debug ! ( self . logger, "copy_data_internal: finished all tables" ;
1226
+ "workers" => workers. len( ) , "unfinished" => state. unfinished. len( ) ) ;
1190
1227
1191
1228
// Create indexes for all the attributes that were postponed at the start of
1192
1229
// the copy/graft operations.
0 commit comments