@@ -3,12 +3,16 @@ package dsfs
3
3
import (
4
4
"encoding/json"
5
5
"fmt"
6
+ // "io/ioutil"
7
+ "time"
6
8
7
9
"github.com/ipfs/go-datastore"
8
- // "github.com/libp2p/go-libp2p-crypto"
10
+ "github.com/libp2p/go-libp2p-crypto"
11
+ "github.com/mr-tron/base58/base58"
9
12
"github.com/qri-io/cafs"
10
13
"github.com/qri-io/cafs/memfs"
11
14
"github.com/qri-io/dataset"
15
+ "github.com/qri-io/dataset/validate"
12
16
)
13
17
14
18
// LoadDataset reads a dataset from a cafs and dereferences structure, transform, and commitMsg if they exist,
@@ -18,17 +22,7 @@ func LoadDataset(store cafs.Filestore, path datastore.Key) (*dataset.Dataset, er
18
22
if err != nil {
19
23
return nil , fmt .Errorf ("error loading dataset: %s" , err .Error ())
20
24
}
21
-
22
- if err := DerefDatasetMetadata (store , ds ); err != nil {
23
- return nil , err
24
- }
25
- if err := DerefDatasetStructure (store , ds ); err != nil {
26
- return nil , err
27
- }
28
- if err := DerefDatasetTransform (store , ds ); err != nil {
29
- return nil , err
30
- }
31
- if err := DerefDatasetCommit (store , ds ); err != nil {
25
+ if err := DerefDataset (store , ds ); err != nil {
32
26
return nil , err
33
27
}
34
28
@@ -65,6 +59,23 @@ func LoadDatasetRefs(store cafs.Filestore, path datastore.Key) (*dataset.Dataset
65
59
return ds , nil
66
60
}
67
61
62
+ // DerefDataset attempts to fully dereference a dataset
63
+ func DerefDataset (store cafs.Filestore , ds * dataset.Dataset ) error {
64
+ if err := DerefDatasetMetadata (store , ds ); err != nil {
65
+ return err
66
+ }
67
+ if err := DerefDatasetStructure (store , ds ); err != nil {
68
+ return err
69
+ }
70
+ if err := DerefDatasetTransform (store , ds ); err != nil {
71
+ return err
72
+ }
73
+ if err := DerefDatasetCommit (store , ds ); err != nil {
74
+ return err
75
+ }
76
+ return nil
77
+ }
78
+
68
79
// DerefDatasetStructure derferences a dataset's structure element if required
69
80
// should be a no-op if ds.Structure is nil or isn't a reference
70
81
func DerefDatasetStructure (store cafs.Filestore , ds * dataset.Dataset ) error {
@@ -125,111 +136,72 @@ func DerefDatasetCommit(store cafs.Filestore, ds *dataset.Dataset) error {
125
136
return nil
126
137
}
127
138
128
- // CreateDatasetParams defines parmeters for the CreateDataset function
129
- // type CreateDatasetParams struct {
130
- // // Store is where we're going to
131
- // Store cafs.Filestore
132
- // //
133
- // Dataset *dataset.Dataset
134
- // DataFile cafs.File
135
- // PrivKey crypto.PrivKey
136
- // }
137
-
138
- // CreateDataset is the canonical method for getting a dataset pointer & it's data into a store
139
- // func CreateDataset(p *CreateDatasetParams) (path datastore.Key, err error) {
140
- // // TODO - need a better strategy for huge files
141
- // data, err := ioutil.ReadAll(rdr)
142
- // if err != nil {
143
- // return fmt.Errorf("error reading file: %s", err.Error())
144
- // }
145
-
146
- // if err = PrepareDataset(p.Store, p.Dataset, p.DataFile); err != nil {
147
- // return
148
- // }
149
-
150
- // // Ensure that dataset is well-formed
151
- // // format, err := detect.ExtensionDataFormat(filename)
152
- // // if err != nil {
153
- // // return fmt.Errorf("error detecting format extension: %s", err.Error())
154
- // // }
155
- // // if err = validate.DataFormat(format, bytes.NewReader(data)); err != nil {
156
- // // return fmt.Errorf("invalid data format: %s", err.Error())
157
- // // }
158
-
159
- // // TODO - check for errors in dataset and warn user if errors exist
160
-
161
- // datakey, err := store.Put(memfs.NewMemfileBytes("data."+st.Format.String(), data), false)
162
- // if err != nil {
163
- // return fmt.Errorf("error putting data file in store: %s", err.Error())
164
- // }
165
-
166
- // ds.Timestamp = time.Now().In(time.UTC)
167
- // if ds.Title == "" {
168
- // ds.Title = name
169
- // }
170
- // ds.Data = datakey.String()
171
-
172
- // if err := validate.Dataset(ds); err != nil {
173
- // return err
174
- // }
175
-
176
- // dskey, err := SaveDataset(store, ds, true)
177
- // if err != nil {
178
- // return fmt.Errorf("error saving dataset: %s", err.Error())
179
- // }
180
- // }
139
+ // CreateDataset places a new dataset in the store. Admittedly, this isn't a simple process.
140
+ // Store is where we're going to
141
+ // Dataset to be saved
142
+ // Pin the dataset if the underlying store supports the pinning interface
143
+ func CreateDataset (store cafs.Filestore , ds * dataset.Dataset , df cafs.File , pk crypto.PrivKey , pin bool ) (path datastore.Key , err error ) {
144
+ if err = DerefDataset (store , ds ); err != nil {
145
+ return
146
+ }
147
+ if err = validate .Dataset (ds ); err != nil {
148
+ return
149
+ }
150
+ if err = prepareDataset (store , ds , df , pk ); err != nil {
151
+ return
152
+ }
153
+ path , err = WriteDataset (store , ds , df , pin )
154
+ if err != nil {
155
+ err = fmt .Errorf ("error writing dataset: %s" , err .Error ())
156
+ }
157
+ return
158
+ }
159
+
160
+ // timestamp is a function for getting commit timestamps
161
+ // we replace this with a static function for testing purposes
162
+ var timestamp = func () time.Time {
163
+ return time .Now ()
164
+ }
181
165
182
166
// prepareDataset modifies a dataset in preparation for adding to a dsfs
183
- // func PrepareDataset(store cafs.Filestore, ds *dataset.Dataset, data cafs.File) error {
184
-
185
- // st, err := detect.FromReader(data.FileName(), data)
186
- // if err != nil {
187
- // return fmt.Errorf("error determining dataset schema: %s", err.Error())
188
- // }
189
- // if ds.Structure == nil {
190
- // ds.Structure = &dataset.Structure{}
191
- // }
192
- // ds.Structure.Assign(st, ds.Structure)
193
-
194
- // // Ensure that dataset contains valid field names
195
- // if err = validate.Structure(st); err != nil {
196
- // return fmt.Errorf("invalid structure: %s", err.Error())
197
- // }
198
- // if err := validate.DataFormat(st.Format, bytes.NewReader(data)); err != nil {
199
- // return fmt.Errorf("invalid data format: %s", err.Error())
200
- // }
201
-
202
- // // generate abstract form of dataset
203
- // ds.Abstract = dataset.Abstract(ds)
204
-
205
- // if ds.AbstractTransform != nil {
206
- // // convert abstract transform to abstract references
207
- // for name, ref := range ds.AbstractTransform.Resources {
208
- // // data, _ := ref.MarshalJSON()
209
- // // fmt.Println(string(data))
210
- // if ref.Abstract != nil {
211
- // ds.AbstractTransform.Resources[name] = ref.Abstract
212
- // } else {
213
-
214
- // absf, err := JSONFile(PackageFileAbstract.String(), dataset.Abstract(ref))
215
- // if err != nil {
216
- // return err
217
- // }
218
- // path, err := store.Put(absf, true)
219
- // if err != nil {
220
- // return err
221
- // }
222
- // ds.AbstractTransform.Resources[name] = dataset.NewDatasetRef(path)
223
- // }
224
- // }
225
- // }
226
-
227
- // return nil
228
- // }
229
-
230
- // SaveDataset writes a dataset to a cafs, replacing subcomponents of a dataset with hash references
231
- // during the write process. Directory structure is according to PackageFile naming conventions
232
- func SaveDataset (store cafs.Filestore , ds * dataset.Dataset , pin bool ) (datastore.Key , error ) {
167
+ func prepareDataset (store cafs.Filestore , ds * dataset.Dataset , df cafs.File , privKey crypto.PrivKey ) error {
168
+ if df == nil {
169
+ return fmt .Errorf ("data file is required" )
170
+ }
171
+
172
+ // TODO - need a better strategy for huge files. I think that strategy is to split
173
+ // the reader into multiple consumers that are all performing their task on a stream
174
+ // of byte slices
175
+ // data, err := ioutil.ReadAll(df)
176
+ // if err != nil {
177
+ // return fmt.Errorf("error reading file: %s", err.Error())
178
+ // }
179
+
180
+ // generate abstract form of dataset
181
+ ds .Abstract = dataset .Abstract (ds )
182
+
183
+ // datakey, err := store.Put(memfs.NewMemfileBytes("data."+ds.Structure.Format.String(), data), false)
184
+ // if err != nil {
185
+ // return fmt.Errorf("error putting data file in store: %s", err.Error())
186
+ // }
187
+
188
+ ds .Commit .Timestamp = timestamp ()
189
+ signedBytes , err := privKey .Sign (ds .Commit .SignableBytes ())
190
+ if err != nil {
191
+ return fmt .Errorf ("error signing commit title: %s" , err .Error ())
192
+ }
193
+ ds .Commit .Signature = base58 .Encode (signedBytes )
194
+
195
+ // TODO - make sure file ending matches
196
+ // "data."+ds.Structure.Format.String()
197
+ return nil
198
+ }
199
+
200
+ // WriteDataset writes a dataset to a cafs, replacing subcomponents of a dataset with path references
201
+ // during the write process. Directory structure is according to PackageFile naming conventions.
202
+ // This method is currently exported, but 99% of use cases should use CreateDataset instead of this
203
+ // lower-level function
204
+ func WriteDataset (store cafs.Filestore , ds * dataset.Dataset , dataFile cafs.File , pin bool ) (datastore.Key , error ) {
233
205
// assign to a new dataset instance to avoid clobbering input dataset
234
206
cp := & dataset.Dataset {}
235
207
cp .Assign (ds )
@@ -247,11 +219,14 @@ func SaveDataset(store cafs.Filestore, ds *dataset.Dataset, pin bool) (datastore
247
219
}
248
220
249
221
if ds .AbstractTransform != nil {
250
- // ensure all dataset references are abstract
251
- for key , r := range ds .AbstractTransform .Resources {
252
- if ! r .IsEmpty () {
253
- return datastore .NewKey ("" ), fmt .Errorf ("abstract transform resource '%s' is not a reference" , key )
222
+ // convert abstract transform to abstract references
223
+ for name , ref := range ds .AbstractTransform .Resources {
224
+ absrf , err := JSONFile (fmt .Sprintf ("ref_%s.json" , name ), dataset .Abstract (ref ))
225
+ if err != nil {
226
+ return datastore .NewKey ("" ), fmt .Errorf ("error marshaling dataset resource '%s' to json: %s" , name , err .Error ())
254
227
}
228
+ fileTasks ++
229
+ adder .AddFile (absrf )
255
230
}
256
231
abstff , err := JSONFile (PackageFileAbstractTransform .String (), ds .AbstractTransform )
257
232
if err != nil {
@@ -279,11 +254,17 @@ func SaveDataset(store cafs.Filestore, ds *dataset.Dataset, pin bool) (datastore
279
254
// if err != nil {
280
255
// return datastore.NewKey(""), fmt.Errorf("error marshaling dataset to json: %s", err.Error())
281
256
// }
282
-
283
257
// fileTasks++
284
258
// adder.AddFile(dsf)
285
259
// addedDataset = true
286
260
// }
261
+ // data, err := store.Get(datastore.NewKey(ds.Data))
262
+ // if err != nil {
263
+ // return datastore.NewKey(""), fmt.Errorf("error getting dataset raw data: %s", err.Error())
264
+ // }
265
+
266
+ fileTasks ++
267
+ adder .AddFile (dataFile )
287
268
288
269
if ds .Transform != nil {
289
270
// all resources must be references
@@ -348,6 +329,16 @@ func SaveDataset(store cafs.Filestore, ds *dataset.Dataset, pin bool) (datastore
348
329
ds .Metadata = dataset .NewMetadataRef (ao .Path )
349
330
case PackageFileCommit .String ():
350
331
ds .Commit = dataset .NewCommitRef (ao .Path )
332
+ case dataFile .FileName ():
333
+ ds .DataPath = ao .Path .String ()
334
+ default :
335
+ if ds .AbstractTransform != nil {
336
+ for name := range ds .AbstractTransform .Resources {
337
+ if ao .Name == fmt .Sprintf ("ref_%s.json" , name ) {
338
+ ds .AbstractTransform .Resources [name ] = dataset .NewDatasetRef (ao .Path )
339
+ }
340
+ }
341
+ }
351
342
}
352
343
353
344
fileTasks --
0 commit comments