5
5
import struct
6
6
import tempfile
7
7
from io import BufferedWriter
8
- from typing import Any , BinaryIO , Sequence
8
+ from enum import Enum , auto
9
+ from typing import Any , IO , Sequence
9
10
10
11
import numpy as np
11
12
21
22
TokenType ,
22
23
)
23
24
25
+ class WriterState (Enum ):
26
+ EMPTY = auto ()
27
+ HEADER = auto ()
28
+ KV_DATA = auto ()
29
+ TI_DATA = auto ()
30
+
24
31
class GGUFWriter :
25
32
fout : BufferedWriter
26
- arch : str
27
- offset_tensor = 0
28
- data_alignment = GGUF_DEFAULT_ALIGNMENT
29
- kv_data = b""
30
- kv_data_count = 0
31
- ti_data = b""
32
- ti_data_count = 0
33
- use_temp_file : bool
34
- temp_file : tempfile .SpooledTemporaryFile [bytes ] | None = None
35
- tensors : list [tuple [np .ndarray [Any , Any ], int ]]
33
+ temp_file : tempfile .SpooledTemporaryFile [bytes ] | None
34
+ tensors : list [np .ndarray [Any , Any ]]
36
35
_simple_value_packing = {
37
36
GGUFValueType .UINT8 : "B" ,
38
37
GGUFValueType .INT8 : "b" ,
@@ -60,27 +59,47 @@ def __init__(self, path: os.PathLike[str] | str, arch: str, use_temp_file: bool
60
59
self .fout = open (path , "wb" )
61
60
self .arch = arch
62
61
self .endianess = endianess
63
- self .add_architecture ()
62
+ self .offset_tensor = 0
63
+ self .data_alignment = GGUF_DEFAULT_ALIGNMENT
64
+ self .kv_data = b""
65
+ self .kv_data_count = 0
66
+ self .ti_data = b""
67
+ self .ti_data_count = 0
64
68
self .use_temp_file = use_temp_file
69
+ self .temp_file = None
65
70
self .tensors = []
66
71
print ("gguf: This GGUF file is for {0} Endian only"
67
72
.format ("Big" if self .endianess == GGUFEndian .BIG else "Little" ))
73
+ self .state = WriterState .EMPTY
74
+
75
+ self .add_architecture ()
68
76
69
77
def write_header_to_file (self ) -> None :
78
+ if self .state is not WriterState .EMPTY :
79
+ raise ValueError (f'Expected output file to be empty, got { self .state } ' )
80
+
70
81
self ._write_packed ("<I" , GGUF_MAGIC , skip_pack_prefix = True )
71
82
self ._write_packed ("I" , GGUF_VERSION )
72
83
self ._write_packed ("Q" , self .ti_data_count )
73
84
self ._write_packed ("Q" , self .kv_data_count )
74
85
self .flush ()
75
- # print("tensors " + str( self.ti_data_count) + " kv " + str(self.kv_data_count))
86
+ self .state = WriterState . HEADER
76
87
77
88
def write_kv_data_to_file (self ) -> None :
89
+ if self .state is not WriterState .HEADER :
90
+ raise ValueError (f'Expected output file to contain the header, got { self .state } ' )
91
+
78
92
self .fout .write (self .kv_data )
79
93
self .flush ()
94
+ self .state = WriterState .KV_DATA
80
95
81
96
def write_ti_data_to_file (self ) -> None :
97
+ if self .state is not WriterState .KV_DATA :
98
+ raise ValueError (f'Expected output file to contain KV data, got { self .state } ' )
99
+
82
100
self .fout .write (self .ti_data )
83
101
self .flush ()
102
+ self .state = WriterState .TI_DATA
84
103
85
104
def add_key (self , key : str ) -> None :
86
105
self .add_val (key , GGUFValueType .STRING , add_vtype = False )
@@ -173,6 +192,9 @@ def ggml_pad(x: int, n: int) -> int:
173
192
return ((x + n - 1 ) // n ) * n
174
193
175
194
def add_tensor_info (self , name : str , tensor_shape : Sequence [int ], tensor_dtype : np .dtype [np .float16 ] | np .dtype [np .float32 ], tensor_nbytes : int , raw_dtype : GGMLQuantizationType | None = None ) -> None :
195
+ if self .state is not WriterState .EMPTY :
196
+ raise ValueError (f'Expected output file to be empty, got { self .state } ' )
197
+
176
198
if raw_dtype is None and tensor_dtype not in (np .float32 , np .float16 ):
177
199
raise ValueError ("Only F32 and F16 tensors are supported for now" )
178
200
@@ -203,23 +225,21 @@ def add_tensor(self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequenc
203
225
shape : Sequence [int ] = raw_shape if raw_shape is not None else tensor .shape
204
226
self .add_tensor_info (name , shape , tensor .dtype , tensor .nbytes , raw_dtype = raw_dtype )
205
227
206
- pad = GGUFWriter .ggml_pad (tensor .nbytes , self .data_alignment ) - tensor .nbytes
207
-
208
- if self .temp_file is None :
209
- self .tensors .append ((tensor , pad ))
210
- return
228
+ if self .temp_file is None :
229
+ self .tensors .append (tensor )
211
230
212
231
tensor .tofile (self .temp_file )
232
+ self .write_padding (self .temp_file , tensor .nbytes )
213
233
214
- if pad != 0 :
215
- self .temp_file .write (bytes ([0 ] * pad ))
216
-
217
- def write_padding (self , fp : BinaryIO , n : int , align : int | None = None ) -> None :
234
+ def write_padding (self , fp : IO [bytes ], n : int , align : int | None = None ):
218
235
pad = GGUFWriter .ggml_pad (n , align if align is not None else self .data_alignment ) - n
219
236
if pad != 0 :
220
237
fp .write (bytes ([0 ] * pad ))
221
238
222
239
def write_tensor_data (self , tensor : np .ndarray [Any , Any ]) -> None :
240
+ if self .state is not WriterState .TI_DATA :
241
+ raise ValueError (f'Expected output file to contain tensor info, got { self .state } ' )
242
+
223
243
if self .endianess == GGUFEndian .BIG :
224
244
tensor .byteswap (inplace = True )
225
245
self .write_padding (self .fout , self .fout .tell ())
@@ -232,10 +252,13 @@ def write_tensors_to_file(self) -> None:
232
252
self .write_padding (self .fout , self .fout .tell ())
233
253
234
254
if self .temp_file is None :
235
- for (currtensor , currpad ) in self .tensors :
236
- currtensor .tofile (self .fout )
237
- if currpad != 0 :
238
- self .fout .write (bytes ([0 ] * currpad ))
255
+ while True :
256
+ try :
257
+ tensor = self .tensors .pop (0 )
258
+ except IndexError :
259
+ break
260
+ tensor .tofile (self .fout )
261
+ self .write_padding (self .fout , tensor .nbytes )
239
262
return
240
263
241
264
self .temp_file .seek (0 )
0 commit comments