-
Notifications
You must be signed in to change notification settings - Fork 0
/
lib.go
150 lines (140 loc) · 4.32 KB
/
lib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package wuploader
import (
"context"
"errors"
"sync"
"github.com/gammazero/nexus/client"
"github.com/gammazero/nexus/transport/serialize"
"github.com/gammazero/nexus/wamp"
)
func returnError(err wamp.URI, args ...interface{}) *client.InvokeResult {
return &client.InvokeResult{
Err: err,
Args: args,
}
}
func returnResult(args ...interface{}) *client.InvokeResult {
return &client.InvokeResult{
Args: args,
}
}
// UploadChecker is a callback type which is executed to verify the beginning of a transaction
type UploadChecker func(uploadSize int64, args wamp.List, kwargs, details wamp.Dict) error
// UploadHandler is a callback type which is executed when the file upload has finished and the transaction should be executed.
type UploadHandler func(ctx context.Context, uploaded serialize.BinaryData, args wamp.List, kwargs, details wamp.Dict) *client.InvokeResult
// Uploader is the main interface, allowing the user to create and delete data upload endpoints
type Uploader interface {
Add(endpoint string, begin UploadChecker, handler UploadHandler) error
Destroy(endpoint string) error
Stop()
}
type uploadTxn struct {
buf serialize.BinaryData
pos uint64
size uint64
}
type uploaderImpl struct {
client *client.Client
txns map[wamp.ID]*uploadTxn
lock sync.RWMutex
}
func (u *uploaderImpl) Add(endpoint string, begin UploadChecker, handler UploadHandler) error {
return u.client.Register(endpoint, func(ctx context.Context, args wamp.List, kwargs, details wamp.Dict) *client.InvokeResult {
if len(args) == 0 {
return returnError(wamp.ErrInvalidArgument)
}
act, ok := wamp.AsString(args[0])
if !ok {
return returnError(wamp.ErrInvalidArgument)
}
switch act {
case "start":
if len(args) < 2 {
return returnError(wamp.ErrInvalidArgument)
}
uploadSize, ok := wamp.AsInt64(args[1])
// Files of length 0 will be considered empty files, so allow them being uploaded.
// For files of zero length, we don't need to initialize any storage
// Since in go 'nil' is considered equal a zero-length array
if !ok || uploadSize < 0 {
return returnError(wamp.ErrInvalidArgument)
}
if begin != nil {
if err := begin(uploadSize, args[2:], kwargs, details); err != nil {
return returnError("com.robulab.internal-error", err.Error())
}
}
id := wamp.GlobalID()
u.lock.Lock()
defer u.lock.Unlock()
u.txns[id] = &uploadTxn{
buf: make([]byte, 0, uploadSize),
pos: 0,
size: uint64(uploadSize),
}
return returnResult(id)
case "data":
if len(args) < 3 {
return returnError(wamp.ErrInvalidArgument)
}
txn, tok := wamp.AsID(args[1])
data, bok := args[2].(serialize.BinaryData)
if !tok || !bok {
return returnError(wamp.ErrInvalidArgument)
}
u.lock.RLock()
txnObj, ok := u.txns[txn]
u.lock.RUnlock()
if !ok {
return returnError("com.robulab.no-such-upload", txn)
}
if txnObj.pos+uint64(len(data)) > txnObj.size {
return returnError("com.robulab.invalid-upload-size")
}
txnObj.buf = append(txnObj.buf, data...)
txnObj.pos += uint64(len(data))
return returnResult(txnObj.pos)
case "finish":
if len(args) < 2 {
return returnError(wamp.ErrInvalidArgument)
}
txn, tok := wamp.AsID(args[1])
if !tok {
return returnError(wamp.ErrInvalidArgument)
}
u.lock.Lock()
txnObj, ok := u.txns[txn]
delete(u.txns, txn)
u.lock.Unlock()
if !ok {
return returnError("com.robulab.no-such-upload", txn)
}
if txnObj.pos != txnObj.size || txnObj.pos != uint64(len(txnObj.buf)) {
return returnError("com.robulab.invalid-upload-size")
}
return handler(ctx, txnObj.buf, args[2:], kwargs, details)
}
return &client.InvokeResult{
Err: wamp.ErrNoSuchProcedure,
}
}, wamp.Dict{
wamp.OptDiscloseCaller: true,
wamp.OptInvoke: wamp.InvokeSingle, // Single invocation to ensure it will fail!
})
}
func (u *uploaderImpl) Destroy(endpoint string) error {
return u.client.Unregister(string(endpoint))
}
func (u *uploaderImpl) Stop() {}
// NewUploader creates a new uploader and starts monitoring tasks.
// Free the returned uploader after use using Uploader.Stop()
func NewUploader(c *client.Client) (Uploader, error) {
if c == nil {
return nil, errors.New("client may not be nil")
}
return &uploaderImpl{
client: c,
txns: map[wamp.ID]*uploadTxn{},
lock: sync.RWMutex{},
}, nil
}