-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKobSplit.py
244 lines (172 loc) · 9.04 KB
/
KobSplit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# -------------------------------------------------------------------------------------------------------------------------------------------
# Class : KobSplit
# Copyright : © KobbySoft Ltd
# Description : File splitter and merge program
# Author : Kobby Awadzi
# History : July 2024 - Created.
# Notes: : Splits files into specified chunk sizes. Merges chunks back together into one file.
# --------------------------------------------------------------------------------------------------------------------------------------------
from pathlib import Path
import os
import fnmatch
import math
#constants
MIN_CHUNK_SIZE = 1024 * 10;
SPLIT_FILE_SUFFIX = ".KOB";
DEFAULT_READ_BUFFER_SIZE = 1024 * 10;
class KobSplit:
log_to_screen = False
_files_created = 0
#initializer
def __init__(self,buffer_size = DEFAULT_READ_BUFFER_SIZE ):
self.read_buffer_size = self.__get_size_value(buffer_size)
pass
# -------------------------------------------------------------------------------------
# split file routine
#--------------------------------------------------------------------------------------
def split_file(self,input_file_name,destination,file_chunk_size = MIN_CHUNK_SIZE):
chunk_size = self.__get_size_value(str(file_chunk_size))
file_name = os.path.basename(input_file_name)
input_file = Path(input_file_name)
result = True
chunk_count = 0
current_chunk_file = ""
chunk_file_prefix = ""
bytes_read = 0
bytes_to_read = 0
bytes_left = 0
#check if file to split exists
if not input_file.is_file():
raise ValueError(f'file {input_file_name} does not exist.')
output_directory = Path(destination)
if output_directory.is_dir():
# check if we already have any split files in output directory
extension = SPLIT_FILE_SUFFIX + "*"
file_names = fnmatch.filter(os.listdir(output_directory),input_file_name + SPLIT_FILE_SUFFIX + "*")
if len(file_names) > 0:
raise ValueError(f'there are already split files for {input_file_name} in folder {output_directory}')
try:
# work out number of chunks and hence number of '0's to pad filename suffix with
file_size = os.path.getsize(input_file_name)
chunk_number = (int) (file_size / chunk_size)
if chunk_number * chunk_size < file_size:
chunk_number+=1
chunk_file_prefix = "0" * (int(math.log10(chunk_number)) + 1)
if len(chunk_file_prefix) < 2:
chunk_file_prefix = "00"
bytes_read = 0 #initialize bytes read
file = open(input_file_name, "rb")
#read file using read buffer size and split into chunk size.
read_position = file.tell()
while read_position != file_size:
bytes_left = file_size - read_position
if bytes_left < self.read_buffer_size:
bytes_to_read = int(bytes_left)
else:
bytes_to_read = int(self.read_buffer_size)
read_buffer = file.read(bytes_to_read)
if (bytes_read + bytes_to_read > chunk_size) or bytes_read == 0:
chunk_count = chunk_count + 1
current_chunk_file = os.path.join(destination,file_name + SPLIT_FILE_SUFFIX + str(chunk_count).rjust(len(chunk_file_prefix),"0"))
bytes_read = 0
self.__write_chunk(read_buffer,current_chunk_file)
bytes_read = bytes_read + bytes_to_read
read_position = file.tell()
file.close()
if self.log_to_screen: print(f"finished. {self._files_created} files created.")
except:
result = False
raise
# -------------------------------------------------------------------------------------
# merge file routine
# -------------------------------------------------------------------------------------
def merge_file(self,output_file_name,split_files_directory):
result = True
bytes_to_read = 0
bytes_left = 0
output_file = Path(output_file_name)
file_mask = "*" + SPLIT_FILE_SUFFIX + "*"
# check if merged output file already exists
if output_file.is_file():
raise ValueError(f'file {output_file} already exists.')
input_directory = Path(split_files_directory)
if not input_directory.is_dir():
raise ValueError(f'Directory {input_directory} does not exist.')
#create output directory if needed
directory = os.path.dirname(output_file_name)
output_directory = Path(directory)
if not output_directory.is_dir():
os.makedirs(directory)
file_names = fnmatch.filter(os.listdir(input_directory),file_mask)
# check if we have merge files in source directory
if len(file_names) == 0:
raise ValueError(f'no files matching {file_mask} in directory {input_directory}')
else:
# check if we have more than one set of split files in merge folder
file_name = Path(file_names[0]).stem
f = filter(lambda l: Path(l).stem != file_name, file_names )
fl = list(f)
if len(fl) > 0:
raise ValueError(f'there are more than one set of split files in directory {input_directory}')
file_names.sort()
# loop through merge files and write to merged output file
try:
if self.log_to_screen: print(f'creating {output_file_name}...')
foutput = open(output_file_name,'wb')
for current_file in file_names:
current_file = os.path.join(input_directory,current_file)
if self.log_to_screen: print(f'reading {current_file}...')
file_size = os.path.getsize(current_file)
file = open(current_file,'rb')
read_position = file.tell()
while read_position != file_size:
bytes_left = file_size - read_position
if bytes_left < self.read_buffer_size:
bytes_to_read = int(bytes_left)
else:
bytes_to_read = int(self.read_buffer_size)
read_buffer = file.read(bytes_to_read)
foutput.write(read_buffer)
read_position = file.tell()
file.close()
foutput.close()
if self.log_to_screen: print(f"finished. [{output_file_name}] created.")
except:
raise
#helper function to write chunks to current file
def __write_chunk(self,write_buffer,file_name):
output_file = Path(file_name)
if output_file.is_file():
file = open(file_name,'ab')
else:
if self.log_to_screen: print (f'creating {file_name}...')
self._files_created = self._files_created + 1
directory = os.path.dirname(file_name)
output_directory = Path(directory)
if not output_directory.is_dir():
os.makedirs(directory)
file = open(file_name,'wb')
file.write(write_buffer)
file.close()
# check size values. used to indicate read_buffer_size and chunk_size. wllows specification as a number of bytes or with kb, mb and gb suffixed.
def __get_size_value(self,size_string) -> int:
kilo_bytes = 1024
mega_bytes = kilo_bytes * 1024
giga_bytes = mega_bytes * 1024
result = -1
try:
if str(size_string).isdigit():
result = int(size_string)
else:
size_string = size_string.lower()
if "kb" in size_string:
result = int(size_string.replace("kb","").strip()) * kilo_bytes
elif "mb" in size_string:
result = int(size_string.replace("mb","").strip()) * mega_bytes
elif "gb" in size_string:
result = int(size_string.replace("gb","").strip()) * giga_bytes
else:
raise ValueError(f'Invalid size parameter {size_string}')
except:
raise
return result