Skip to content

Commit

Permalink
chore: fix some quality issues
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jan 6, 2023
1 parent e2277dd commit 3966315
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
*/
public class DefaultFileSystemMonitor implements FileSystemMonitor {

// TODO: Timeout should be user-configurable
private static final long TASK_CONFIGURATION_DEFAULT_TIMEOUT = 15000L;

private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSystemMonitor.class);
Expand Down Expand Up @@ -112,7 +111,7 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
*/
public DefaultFileSystemMonitor(final Long allowTasksReconfigurationAfterTimeoutMs,
final FileSystemListing<?> fsListening,
final GenericFileCleanupPolicy cleanPolicy,
final GenericFileCleanupPolicy<?, ?> cleanPolicy,
final Predicate<FileObjectStatus> cleanablePredicate,
final SourceOffsetPolicy offsetPolicy,
final StateBackingStore<FileObject> store,
Expand Down Expand Up @@ -195,7 +194,7 @@ private void recoverPreviouslyCompletedSources() {
.map(it -> it.getValue().withKey(FileObjectKey.of(it.getKey())))
.filter(it -> cleanablePredicate.test(it.status()))
.forEach(cleanable::add);
LOG.info("Finished recovering previously completed files : " + cleanable);
LOG.info("Finished recovering previously completed files : {}", cleanable);
}

private boolean readStatesToEnd(final Duration timeout) {
Expand Down Expand Up @@ -361,6 +360,7 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
wait(Math.max(0, TASK_CONFIGURATION_DEFAULT_TIMEOUT - (now - started)));
}
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
now = Time.SYSTEM.milliseconds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private List<List<String>> partitionAndGet(int maxTasks) {
private Map<String, String> createTaskConfig(final int taskId,
final int taskCount,
final long taskConfigGen,
final List<String> URIs) {
final List<String> uris) {
final Map<String, String> taskConfig = new HashMap<>(configProperties);
taskConfig.put(SourceTaskConfig.TASK_GENERATION_ID, String.valueOf(taskConfigGen));
if (connectorConfig.isFileListingTaskDelegationEnabled()) {
Expand All @@ -216,7 +216,7 @@ private Map<String, String> createTaskConfig(final int taskId,
taskConfig.put(TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG, "true");
} else {
taskConfig.put(SourceTaskConfig.FILE_URIS_PROVIDER_CONFIG, DefaultTaskFileURIProvider.class.getName());
taskConfig.put(DefaultTaskFileURIProvider.Config.FILE_OBJECT_URIS_CONFIG, String.join(",", URIs));
taskConfig.put(DefaultTaskFileURIProvider.Config.FILE_OBJECT_URIS_CONFIG, String.join(",", uris));
taskConfig.put(TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG, "false");
}
return taskConfig;
Expand All @@ -242,6 +242,7 @@ private void closeResources() {
fsMonitorThread.join(DEFAULT_MAX_TIMEOUT);
} catch (InterruptedException e) {
LOG.warn("Failed to close file-system monitoring thread. Error: {}", e.getMessage());
Thread.currentThread().interrupt();
}
}
if (partitioner != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void run() {
}
} catch (InterruptedException e) {
LOG.error("Unexpected InterruptedException, ignoring: ", e);
Thread.currentThread().interrupt();
} finally {
monitor.close();
LOG.info("Stopped filesystem monitoring thread.");
Expand All @@ -110,6 +111,7 @@ void shutdown(final long timeoutMs) {
this.waitingLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
LOG.error("Timeout : scan loop is not terminated yet.");
Thread.currentThread().interrupt();
}
}
}

0 comments on commit 3966315

Please sign in to comment.