|
24 | 24 |
|
25 | 25 | import fnmatch |
26 | 26 | import os |
| 27 | +import fcntl |
27 | 28 |
|
28 | 29 | import intelmq.lib.exceptions as exceptions |
29 | 30 | from intelmq.lib.bot import CollectorBot |
@@ -58,32 +59,41 @@ def process(self): |
58 | 59 | self.logger.debug("Started looking for files.") |
59 | 60 |
|
60 | 61 | if os.path.isdir(self.path): |
61 | | - p = os.path.abspath(self.path) |
| 62 | + path = os.path.abspath(self.path) |
62 | 63 |
|
63 | 64 | # iterate over all files in dir |
64 | | - for f in os.listdir(p): |
65 | | - filename = os.path.join(p, f) |
| 65 | + for file in os.listdir(path): |
| 66 | + filename = os.path.join(path, file) |
66 | 67 | if os.path.isfile(filename): |
67 | | - if fnmatch.fnmatch(f, '*' + self.postfix): |
| 68 | + if fnmatch.fnmatch(file, '*' + self.postfix): |
68 | 69 | self.logger.info("Processing file %r.", filename) |
69 | 70 |
|
70 | 71 | template = self.new_report() |
71 | | - template.add("feed.url", "file://localhost%s" % filename) |
72 | | - template.add("extra.file_name", f) |
73 | | - |
74 | | - with open(filename, 'rb') as fh: |
75 | | - for report in generate_reports(template, fh, self.chunk_size, |
76 | | - self.chunk_replicate_header): |
77 | | - self.send_message(report) |
| 72 | + template.add("feed.url", f"file://localhost{filename}") |
| 73 | + template.add("extra.file_name", file) |
| 74 | + |
| 75 | + try: |
| 76 | + with open(filename, 'rb') as file_handle: |
| 77 | + fcntl.flock(file_handle, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| 78 | + for report in generate_reports(template, file_handle, |
| 79 | + self.chunk_size, |
| 80 | + self.chunk_replicate_header): |
| 81 | + self.send_message(report) |
| 82 | + fcntl.flock(file_handle, fcntl.LOCK_UN) |
| 83 | + except BlockingIOError: |
| 84 | + self.logger.info("File is already being used by another" |
| 85 | + " process, skipping.") |
78 | 86 |
|
79 | 87 | if self.delete_file: |
80 | 88 | try: |
81 | 89 | os.remove(filename) |
82 | 90 | self.logger.debug("Deleted file: %r.", filename) |
83 | 91 | except PermissionError: |
84 | 92 | self.logger.error("Could not delete file %r.", filename) |
85 | | - self.logger.info("Maybe I don't have sufficient rights on that file?") |
86 | | - self.logger.error("Stopping now, to prevent reading this file again.") |
| 93 | + self.logger.info("Maybe I don't have sufficient rights" |
| 94 | + " on that file?") |
| 95 | + self.logger.error("Stopping now, to prevent reading this" |
| 96 | + " file again.") |
87 | 97 | self.stop() |
88 | 98 |
|
89 | 99 |
|
|
0 commit comments