forked from k0sproject/rig
-
Notifications
You must be signed in to change notification settings - Fork 0
/
winfsys.go
436 lines (397 loc) · 12.3 KB
/
winfsys.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
package rig
import (
"bufio"
_ "embed"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"github.com/k0sproject/rig/errstring"
"github.com/k0sproject/rig/exec"
"github.com/k0sproject/rig/log"
ps "github.com/k0sproject/rig/powershell"
)
const bufSize = 32768
var (
// ErrNotRunning is returned when the rigrcp process is not running
ErrNotRunning = errstring.New("rigrcp is not running")
// ErrRcpCommandFailed is returned when a command to the rigrcp process fails
ErrRcpCommandFailed = errstring.New("rigrcp command failed")
)
// rigrcpScript is a helper script for transferring files between local and remote systems
//
//go:embed script/rigrcp.ps1
var rigrcpScript string
var (
_ fs.File = &winfsFile{}
_ fs.ReadDirFile = &winfsDir{}
_ fs.FS = &windowsFsys{}
)
type windowsFsys struct {
conn *Connection
rcp *rigrcp
buf []byte
mu sync.Mutex
}
type seekResponse struct {
Position int64 `json:"position"`
}
type readResponse struct {
Bytes int64 `json:"bytes"`
}
type sumResponse struct {
Sha256 string `json:"sha256"`
}
type rigrcpResponse struct {
Err error `json:"-"`
ErrString string `json:"error"`
Stat *FileInfo `json:"stat"`
Dir []*FileInfo `json:"dir"`
Seek *seekResponse `json:"seek"`
Read *readResponse `json:"read"`
Sum *sumResponse `json:"sum"`
}
func (r *rigrcpResponse) UnmarshalJSON(b []byte) error {
type rigresponse *rigrcpResponse
rr := rigresponse(r)
if err := json.Unmarshal(b, rr); err != nil {
return ErrCommandFailed.Wrapf("failed to unmarshal rigrcp response: %w", err)
}
if r.ErrString != "" {
r.Err = errstring.New(strings.TrimSpace(r.ErrString))
}
return nil
}
// newWindowsFsys returns a new fs.FS implementing filesystem for Windows targets
func newWindowsFsys(conn *Connection, opts ...exec.Option) *windowsFsys {
return &windowsFsys{
conn: conn,
buf: make([]byte, bufSize),
rcp: &rigrcp{conn: conn, opts: opts},
}
}
type rigrcp struct {
conn *Connection
opts []exec.Option
mu sync.Mutex
done chan struct{}
stdin io.WriteCloser
stdout *bufio.Reader
stderr io.WriteCloser
running bool
}
func (rcp *rigrcp) run() error {
log.Debugf("starting rigrcp")
rcp.mu.Lock()
defer rcp.mu.Unlock()
stdinR, stdinW := io.Pipe()
stdoutR, stdoutW := io.Pipe()
rcp.stdout = bufio.NewReader(stdoutR)
rcp.stdin = stdinW
rcp.stderr = os.Stderr
rcp.done = make(chan struct{})
waiter, err := rcp.conn.ExecStreams(ps.CompressedCmd(rigrcpScript), stdinR, stdoutW, rcp.stderr, rcp.opts...)
if err != nil {
return ErrCommandFailed.Wrapf("failed to start rigrcp: %w", err)
}
rcp.running = true
log.Tracef("started rigrcp")
go func() {
err := waiter.Wait()
if err != nil {
log.Errorf("rigrcp: %v", err)
}
log.Debugf("rigrcp exited")
close(rcp.done)
rcp.running = false
}()
return nil
}
func (rcp *rigrcp) command(cmd string) (rigrcpResponse, error) {
var res rigrcpResponse
if !rcp.running {
if err := rcp.run(); err != nil {
return res, err
}
}
rcp.mu.Lock()
defer rcp.mu.Unlock()
resp := make(chan []byte, 1)
go func() {
b, err := rcp.stdout.ReadBytes(0)
if err != nil {
log.Errorf("failed to read response: %v", err)
close(resp)
return
}
resp <- b[:len(b)-1] // drop the zero byte
}()
log.Tracef("writing rigrcp command: %s", cmd)
if _, err := rcp.stdin.Write([]byte(cmd + "\n")); err != nil {
return res, ErrRcpCommandFailed.Wrap(err)
}
select {
case <-rcp.done:
return res, ErrRcpCommandFailed.Wrapf("rigrcp exited")
case data := <-resp:
if data == nil {
return res, nil
}
if len(data) == 0 {
return res, nil
}
if err := json.Unmarshal(data, &res); err != nil {
return res, ErrRcpCommandFailed.Wrapf("failed to unmarshal response: %w", err)
}
log.Tracef("rigrcp response: %+v", res)
if res.Err != nil {
if res.Err.Error() == "eof" {
return res, io.EOF
}
return res, ErrRcpCommandFailed.Wrapf("rigrcp error: %w", res.Err)
}
return res, nil
}
}
// winfsFile is a file on a Windows target. It implements fs.File.
type winfsFile struct {
fsys *windowsFsys
path string
}
// Seek sets the offset for the next Read or Write on the remote file.
// The whence argument controls the interpretation of offset.
// 0 = offset from the beginning of file
// 1 = offset from the current position
// 2 = offset from the end of file
func (f *winfsFile) Seek(offset int64, whence int) (int64, error) {
resp, err := f.fsys.rcp.command(fmt.Sprintf("seek %d %d", offset, whence))
if err != nil {
return -1, &fs.PathError{Op: "seek", Path: name, Err: ErrRcpCommandFailed.Wrapf("failed to seek: %w", err)}
}
if resp.Seek == nil {
return -1, &fs.PathError{Op: "seek", Path: name, Err: ErrRcpCommandFailed.Wrapf("invalid response: %v", resp)}
}
return resp.Seek.Position, nil
}
// winfsDir is a directory on a Windows target. It implements fs.ReadDirFile.
type winfsDir struct {
winfsFile
entries []fs.DirEntry
hw int
}
// ReadDir reads the contents of the directory and returns
// a slice of up to n fs.DirEntry values in directory order.
// Subsequent calls on the same file will yield further DirEntry values.
func (d *winfsDir) ReadDir(n int) ([]fs.DirEntry, error) {
if n == 0 {
return d.winfsFile.fsys.ReadDir(d.path)
}
if d.entries == nil {
entries, err := d.winfsFile.fsys.ReadDir(d.path)
if err != nil {
return nil, err
}
d.entries = entries
d.hw = 0
}
if d.hw >= len(d.entries) {
return nil, io.EOF
}
var min int
if n > len(d.entries)-d.hw {
min = len(d.entries) - d.hw
} else {
min = n
}
old := d.hw
d.hw += min
return d.entries[old:d.hw], nil
}
// CopyFromN copies n bytes from the reader to the opened file on the target.
// The alt io.Writer parameter can be set to a non nil value if a progress bar or such
// is desired.
func (f *winfsFile) CopyFromN(src io.Reader, num int64, alt io.Writer) (int64, error) {
_, err := f.fsys.rcp.command(fmt.Sprintf("w %d", num))
if err != nil {
return 0, &fs.PathError{Op: "copy-to", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to copy: %w", err)}
}
var writer io.Writer
if alt != nil {
writer = io.MultiWriter(f.fsys.rcp.stdin, alt)
} else {
writer = f.fsys.rcp.stdin
}
copied, err := io.CopyN(writer, src, num)
if err != nil {
return copied, &fs.PathError{Op: "copy-to", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("error while copying: %w", err)}
}
return copied, nil
}
// Copy copies the complete remote file from the current file position to the supplied io.Writer.
func (f *winfsFile) Copy(dst io.Writer) (int, error) {
resp, err := f.fsys.rcp.command("r -1")
if errors.Is(err, io.EOF) {
return 0, io.EOF
}
if err != nil {
return 0, &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to copy: %w", err)}
}
if resp.Read == nil {
return 0, &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("invalid response: %v", resp)}
}
if resp.Read.Bytes == 0 {
return 0, io.EOF
}
var totalRead int64
for totalRead < resp.Read.Bytes {
f.fsys.mu.Lock()
read, err := f.fsys.rcp.stdout.Read(f.fsys.buf)
totalRead += int64(read)
if err != nil {
f.fsys.mu.Unlock()
return int(totalRead), &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to read: %w", err)}
}
_, err = dst.Write(f.fsys.buf[:read])
f.fsys.mu.Unlock()
if err != nil {
return int(totalRead), &fs.PathError{Op: "write", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to write: %w", err)}
}
}
return int(totalRead), nil
}
// Write writes len(p) bytes from p to the remote file.
func (f *winfsFile) Write(p []byte) (int, error) {
_, err := f.fsys.rcp.command(fmt.Sprintf("w %d", len(p)))
if errors.Is(err, io.EOF) {
return 0, io.EOF
}
if err != nil {
return 0, &fs.PathError{Op: "write", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to initiate write: %w", err)}
}
written, err := f.fsys.rcp.stdin.Write(p)
if err != nil {
return written, &fs.PathError{Op: "write", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("write error: %w", err)}
}
return written, nil
}
// Read reads up to len(p) bytes from the remote file.
func (f *winfsFile) Read(p []byte) (int, error) {
resp, err := f.fsys.rcp.command(fmt.Sprintf("r %d", len(p)))
if errors.Is(err, io.EOF) {
return 0, io.EOF
}
if err != nil {
return 0, &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to read: %w", err)}
}
if resp.Read == nil {
return 0, &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("invalid response: %v", resp)}
}
if resp.Read.Bytes == 0 {
return 0, io.EOF
}
var totalRead int64
for totalRead < resp.Read.Bytes {
read, err := f.fsys.rcp.stdout.Read(p[totalRead:resp.Read.Bytes])
totalRead += int64(read)
if err != nil {
return int(totalRead), &fs.PathError{Op: "read", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to read: %w", err)}
}
}
return int(totalRead), nil
}
// Stat returns the FileInfo for the remote file.
func (f *winfsFile) Stat() (fs.FileInfo, error) {
return f.fsys.Stat(f.path)
}
// Close closes the remote file.
func (f *winfsFile) Close() error {
_, err := f.fsys.rcp.command("c")
if err != nil {
return &fs.PathError{Op: "close", Path: f.path, Err: ErrRcpCommandFailed.Wrapf("failed to close: %w", err)}
}
return nil
}
// Open opens the named file for reading and returns fs.File.
// Use OpenFile to get a file that can be written to or if you need any of the methods not
// available on fs.File interface without type assertion.
func (fsys *windowsFsys) Open(name string) (fs.File, error) {
f, err := fsys.OpenFile(name, ModeRead, 0o644)
if err != nil {
return nil, err
}
return f, nil
}
// OpenFile opens the named remote file with the specified FileMode. perm is ignored on Windows.
func (fsys *windowsFsys) OpenFile(name string, mode FileMode, perm int) (File, error) {
var modeStr string
switch mode {
case ModeRead:
modeStr = "ro"
case ModeWrite:
modeStr = "w"
case ModeReadWrite:
modeStr = "rw"
case ModeAppend:
modeStr = "a"
case ModeCreate:
modeStr = "c"
default:
return nil, &fs.PathError{Op: "open", Path: name, Err: ErrRcpCommandFailed.Wrapf("invalid mode: %d", mode)}
}
log.Debugf("opening remote file %s (mode %s)", name, modeStr, perm)
_, err := fsys.rcp.command(fmt.Sprintf("o %s %s", modeStr, filepath.FromSlash(name)))
if err != nil {
return nil, &fs.PathError{Op: "open", Path: name, Err: fs.ErrNotExist}
}
return &winfsFile{fsys: fsys, path: name}, nil
}
// Stat returns fs.FileInfo for the remote file.
func (fsys *windowsFsys) Stat(name string) (fs.FileInfo, error) {
resp, err := fsys.rcp.command(fmt.Sprintf("stat %s", filepath.FromSlash(name)))
if err != nil {
return nil, &fs.PathError{Op: "stat", Path: name, Err: ErrRcpCommandFailed.Wrapf("failed to stat: %w", err)}
}
if resp.Stat == nil {
return nil, &fs.PathError{Op: "stat", Path: name, Err: ErrRcpCommandFailed.Wrapf("invalid response: %v", resp)}
}
return resp.Stat, nil
}
// Sha256 returns the SHA256 hash of the remote file.
func (fsys *windowsFsys) Sha256(name string) (string, error) {
resp, err := fsys.rcp.command(fmt.Sprintf("sum %s", filepath.FromSlash(name)))
if err != nil {
return "", &fs.PathError{Op: "sum", Path: name, Err: ErrRcpCommandFailed.Wrapf("failed to sum: %w", err)}
}
if resp.Sum == nil {
return "", &fs.PathError{Op: "sum", Path: name, Err: ErrRcpCommandFailed.Wrapf("invalid response: %v", resp)}
}
return resp.Sum.Sha256, nil
}
// ReadDir reads the directory named by dirname and returns a list of directory entries.
func (fsys *windowsFsys) ReadDir(name string) ([]fs.DirEntry, error) {
name = strings.ReplaceAll(name, "/", "\\")
resp, err := fsys.rcp.command(fmt.Sprintf("dir %s", filepath.FromSlash(name)))
if err != nil {
return nil, &fs.PathError{Op: "readdir", Path: name, Err: ErrRcpCommandFailed.Wrapf("failed to readdir: %v: %w", err, fs.ErrNotExist)}
}
if resp.Dir == nil {
return nil, nil
}
entries := make([]fs.DirEntry, len(resp.Dir))
for i, entry := range resp.Dir {
entries[i] = entry
}
return entries, nil
}
// Delete removes the named file or (empty) directory.
func (fsys *windowsFsys) Delete(name string) error {
if err := fsys.conn.Exec(fmt.Sprintf("del %s", ps.DoubleQuote(filepath.FromSlash(name)))); err != nil {
return ErrCommandFailed.Wrapf("delete %s: %w", name, err)
}
return nil
}