-
Notifications
You must be signed in to change notification settings - Fork 30.2k
/
Copy pathnode_messaging.h
181 lines (144 loc) Β· 6.69 KB
/
node_messaging.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#ifndef SRC_NODE_MESSAGING_H_
#define SRC_NODE_MESSAGING_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "env.h"
#include "node_mutex.h"
#include "sharedarraybuffer_metadata.h"
#include <list>
namespace node {
namespace worker {
class MessagePortData;
class MessagePort;
// Represents a single communication message.
class Message {
public:
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;
// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
v8::Local<v8::Context> context);
// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
// deserialization.
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);
// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
void AddSharedArrayBuffer(SharedArrayBufferMetadataReference ref);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
private:
MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_;
std::vector<SharedArrayBufferMetadataReference> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;
friend class MessagePort;
};
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData {
public:
explicit MessagePortData(MessagePort* owner);
~MessagePortData();
MessagePortData(MessagePortData&& other) = delete;
MessagePortData& operator=(MessagePortData&& other) = delete;
MessagePortData(const MessagePortData& other) = delete;
MessagePortData& operator=(const MessagePortData& other) = delete;
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);
// Returns true if and only this MessagePort is currently not entangled
// with another message port.
bool IsSiblingClosed() const;
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
// Removes any possible sibling. This is thread-safe (it acquires both
// `sibling_mutex_` and `mutex_`), and has to be because it is called once
// the corresponding JS handle handle wants to close
// which can happen on either side of a worker.
void Disentangle();
private:
// After disentangling this message port, the owner handle (if any)
// is asynchronously triggered, so that it can close down naturally.
void PingOwnerAfterDisentanglement();
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
bool receiving_messages_ = false;
std::list<Message> incoming_messages_;
MessagePort* owner_ = nullptr;
// This mutex protects the sibling_ field and is shared between two entangled
// MessagePorts. If both mutexes are acquired, this one needs to be
// acquired first.
std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
MessagePortData* sibling_ = nullptr;
friend class MessagePort;
};
// A message port that receives messages from other threads, including
// the uv_async_t handle that is used to notify the current event loop of
// new incoming messages.
class MessagePort : public HandleWrap {
public:
// Create a new MessagePort. The `context` argument specifies the Context
// instance that is used for creating the values emitted from this port.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);
~MessagePort();
// Create a new message port instance, optionally over an existing
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr);
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If there is no sibling, i.e. this port is closed,
// this message is silently discarded.
void Send(Message&& message);
void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
// Deliver a single message into this port's incoming queue.
void AddToIncomingQueue(Message&& message);
// Start processing messages on this port as a receiving end.
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
// Stop processing messages on this port as a receiving end,
// and stop the event loop that this port is associated with.
void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
static void Entangle(MessagePort* a, MessagePortData* b);
// Detach this port's data for transferring. After this, the MessagePortData
// is no longer associated with this handle, although it can still receive
// messages.
std::unique_ptr<MessagePortData> Detach();
bool IsSiblingClosed() const;
size_t self_size() const override;
private:
void OnClose() override;
void OnMessage();
void TriggerAsync();
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
bool stop_event_loop_ = false;
friend class MessagePortData;
};
v8::MaybeLocal<v8::Function> GetMessagePortConstructor(
Environment* env, v8::Local<v8::Context> context);
} // namespace worker
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_MESSAGING_H_