22
22
23
23
#include " include/proxy-wasm/context.h"
24
24
#include " include/proxy-wasm/wasm.h"
25
+ #include " src/shared_data.h"
26
+ #include " src/shared_queue.h"
25
27
26
28
#define CHECK_FAIL (_call, _stream_type, _return_open, _return_closed ) \
27
29
if (isFailed()) { \
60
62
61
63
namespace proxy_wasm {
62
64
63
- namespace {
64
-
65
- using CallOnThreadFunction = std::function<void (std::function<void ()>)>;
66
-
67
- class SharedData {
68
- public:
69
- WasmResult get (std::string_view vm_id, const std::string_view key,
70
- std::pair<std::string, uint32_t > *result) {
71
- std::lock_guard<std::mutex> lock (mutex_);
72
- auto map = data_.find (std::string (vm_id));
73
- if (map == data_.end ()) {
74
- return WasmResult::NotFound;
75
- }
76
- auto it = map->second .find (std::string (key));
77
- if (it != map->second .end ()) {
78
- *result = it->second ;
79
- return WasmResult::Ok;
80
- }
81
- return WasmResult::NotFound;
82
- }
83
-
84
- WasmResult set (std::string_view vm_id, std::string_view key, std::string_view value,
85
- uint32_t cas) {
86
- std::lock_guard<std::mutex> lock (mutex_);
87
- std::unordered_map<std::string, std::pair<std::string, uint32_t >> *map;
88
- auto map_it = data_.find (std::string (vm_id));
89
- if (map_it == data_.end ()) {
90
- map = &data_[std::string (vm_id)];
91
- } else {
92
- map = &map_it->second ;
93
- }
94
- auto it = map->find (std::string (key));
95
- if (it != map->end ()) {
96
- if (cas && cas != it->second .second ) {
97
- return WasmResult::CasMismatch;
98
- }
99
- it->second = std::make_pair (std::string (value), nextCas ());
100
- } else {
101
- map->emplace (key, std::make_pair (std::string (value), nextCas ()));
102
- }
103
- return WasmResult::Ok;
104
- }
105
-
106
- uint32_t registerQueue (std::string_view vm_id, std::string_view queue_name, uint32_t context_id,
107
- CallOnThreadFunction call_on_thread, std::string_view vm_key) {
108
- std::lock_guard<std::mutex> lock (mutex_);
109
- auto key = std::make_pair (std::string (vm_id), std::string (queue_name));
110
- auto it = queue_tokens_.insert (std::make_pair (key, static_cast <uint32_t >(0 )));
111
- if (it.second ) {
112
- it.first ->second = nextQueueToken ();
113
- queue_token_set_.insert (it.first ->second );
114
- }
115
- uint32_t token = it.first ->second ;
116
- auto &q = queues_[token];
117
- q.vm_key = std::string (vm_key);
118
- q.context_id = context_id;
119
- q.call_on_thread = std::move (call_on_thread);
120
- // Preserve any existing data.
121
- return token;
122
- }
123
-
124
- uint32_t resolveQueue (std::string_view vm_id, std::string_view queue_name) {
125
- std::lock_guard<std::mutex> lock (mutex_);
126
- auto key = std::make_pair (std::string (vm_id), std::string (queue_name));
127
- auto it = queue_tokens_.find (key);
128
- if (it != queue_tokens_.end ()) {
129
- return it->second ;
130
- }
131
- return 0 ; // N.B. zero indicates that the queue was not found.
132
- }
133
-
134
- WasmResult dequeue (uint32_t token, std::string *data) {
135
- std::lock_guard<std::mutex> lock (mutex_);
136
- auto it = queues_.find (token);
137
- if (it == queues_.end ()) {
138
- return WasmResult::NotFound;
139
- }
140
- if (it->second .queue .empty ()) {
141
- return WasmResult::Empty;
142
- }
143
- *data = it->second .queue .front ();
144
- it->second .queue .pop_front ();
145
- return WasmResult::Ok;
146
- }
147
-
148
- WasmResult enqueue (uint32_t token, std::string_view value) {
149
- std::string vm_key;
150
- uint32_t context_id;
151
- CallOnThreadFunction call_on_thread;
152
-
153
- {
154
- std::lock_guard<std::mutex> lock (mutex_);
155
- auto it = queues_.find (token);
156
- if (it == queues_.end ()) {
157
- return WasmResult::NotFound;
158
- }
159
- Queue *target_queue = &(it->second );
160
- vm_key = target_queue->vm_key ;
161
- context_id = target_queue->context_id ;
162
- call_on_thread = target_queue->call_on_thread ;
163
- target_queue->queue .push_back (std::string (value));
164
- }
165
-
166
- call_on_thread ([vm_key, context_id, token] {
167
- // This code may or may not execute in another thread.
168
- // Make sure that the lock is no longer held here.
169
- auto wasm = getThreadLocalWasm (vm_key);
170
- if (wasm) {
171
- auto context = wasm->wasm ()->getContext (context_id);
172
- if (context) {
173
- context->onQueueReady (token);
174
- }
175
- }
176
- });
177
- return WasmResult::Ok;
178
- }
179
-
180
- uint32_t nextCas () {
181
- auto result = cas_;
182
- cas_++;
183
- if (!cas_) { // 0 is not a valid CAS value.
184
- cas_++;
185
- }
186
- return result;
187
- }
188
-
189
- private:
190
- uint32_t nextQueueToken () {
191
- while (true ) {
192
- uint32_t token = next_queue_token_++;
193
- if (token == 0 ) {
194
- continue ; // 0 is an illegal token.
195
- }
196
- if (queue_token_set_.find (token) == queue_token_set_.end ()) {
197
- return token;
198
- }
199
- }
200
- }
201
-
202
- struct Queue {
203
- std::string vm_key;
204
- uint32_t context_id;
205
- CallOnThreadFunction call_on_thread;
206
- std::deque<std::string> queue;
207
- };
208
-
209
- // TODO: use std::shared_mutex in C++17.
210
- std::mutex mutex_;
211
- uint32_t cas_ = 1 ;
212
- uint32_t next_queue_token_ = 1 ;
213
- std::map<std::string, std::unordered_map<std::string, std::pair<std::string, uint32_t >>> data_;
214
- std::map<uint32_t , Queue> queues_;
215
- struct pair_hash {
216
- template <class T1 , class T2 > std::size_t operator ()(const std::pair<T1, T2> &pair) const {
217
- return std::hash<T1>()(pair.first ) ^ std::hash<T2>()(pair.second );
218
- }
219
- };
220
- std::unordered_map<std::pair<std::string, std::string>, uint32_t , pair_hash> queue_tokens_;
221
- std::unordered_set<uint32_t > queue_token_set_;
222
- };
223
-
224
- SharedData global_shared_data;
225
-
226
- } // namespace
227
-
228
65
DeferAfterCallActions::~DeferAfterCallActions () {
229
66
wasm_->stopNextIteration (false );
230
67
wasm_->doAfterVmCallActions ();
@@ -247,9 +84,8 @@ WasmResult BufferBase::copyTo(WasmBase *wasm, size_t start, size_t length, uint6
247
84
}
248
85
249
86
// Test support.
250
-
251
87
uint32_t resolveQueueForTest (std::string_view vm_id, std::string_view queue_name) {
252
- return global_shared_data .resolveQueue (vm_id, queue_name);
88
+ return global_shared_queue .resolveQueue (vm_id, queue_name);
253
89
}
254
90
255
91
std::string PluginBase::makeLogPrefix () const {
@@ -380,15 +216,15 @@ WasmResult ContextBase::registerSharedQueue(std::string_view queue_name,
380
216
SharedQueueDequeueToken *result) {
381
217
// Get the id of the root context if this is a stream context because onQueueReady is on the
382
218
// root.
383
- *result = global_shared_data .registerQueue (wasm_->vm_id (), queue_name,
384
- isRootContext () ? id_ : parent_context_id_,
385
- wasm_->callOnThreadFunction (), wasm_->vm_key ());
219
+ *result = global_shared_queue .registerQueue (wasm_->vm_id (), queue_name,
220
+ isRootContext () ? id_ : parent_context_id_,
221
+ wasm_->callOnThreadFunction (), wasm_->vm_key ());
386
222
return WasmResult::Ok;
387
223
}
388
224
389
225
WasmResult ContextBase::lookupSharedQueue (std::string_view vm_id, std::string_view queue_name,
390
226
uint32_t *token_ptr) {
391
- uint32_t token = global_shared_data .resolveQueue (vm_id, queue_name);
227
+ uint32_t token = global_shared_queue .resolveQueue (vm_id, queue_name);
392
228
if (isFailed () || !token) {
393
229
return WasmResult::NotFound;
394
230
}
@@ -397,11 +233,11 @@ WasmResult ContextBase::lookupSharedQueue(std::string_view vm_id, std::string_vi
397
233
}
398
234
399
235
WasmResult ContextBase::dequeueSharedQueue (uint32_t token, std::string *data) {
400
- return global_shared_data .dequeue (token, data);
236
+ return global_shared_queue .dequeue (token, data);
401
237
}
402
238
403
239
WasmResult ContextBase::enqueueSharedQueue (uint32_t token, std::string_view value) {
404
- return global_shared_data .enqueue (token, value);
240
+ return global_shared_queue .enqueue (token, value);
405
241
}
406
242
void ContextBase::destroy () {
407
243
if (destroyed_) {
0 commit comments