Skip to content

Commit b61b9d9

Browse files
committed
增加 Message.end 用于中止 chain 事务
1 parent 232657c commit b61b9d9

12 files changed

+129
-12
lines changed

fibjs/include/HttpRequest.h

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class HttpRequest: public HttpRequest_base
4040
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
4141
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
4242
virtual result_t get_length(int64_t &retVal);
43+
virtual result_t end();
44+
virtual result_t isEnded(bool& retVal);
4345
virtual result_t clear();
4446
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
4547
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);

fibjs/include/HttpResponse.h

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class HttpResponse: public HttpResponse_base
3939
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
4040
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
4141
virtual result_t get_length(int64_t &retVal);
42+
virtual result_t end();
43+
virtual result_t isEnded(bool& retVal);
4244
virtual result_t clear();
4345
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
4446
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);

fibjs/include/Message.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class Message: public Message_base
3131
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
3232
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
3333
virtual result_t get_length(int64_t &retVal);
34+
virtual result_t end();
35+
virtual result_t isEnded(bool& retVal);
3436
virtual result_t clear();
3537
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
3638
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);
@@ -40,7 +42,7 @@ class Message: public Message_base
4042
virtual result_t set_lastError(exlib::string newVal);
4143

4244
public:
43-
Message(bool bRep = false) : m_bRep(bRep)
45+
Message(bool bRep = false) : m_bRep(bRep), m_end(false)
4446
{
4547
}
4648

@@ -60,6 +62,7 @@ class Message: public Message_base
6062
obj_ptr<SeekableStream_base> m_body;
6163
exlib::string m_lastError;
6264
bool m_bRep;
65+
bool m_end;
6366
};
6467

6568
} /* namespace fibjs */

fibjs/include/WebSocketMessage.h

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class WebSocketMessage: public WebSocketMessage_base
3737
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
3838
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
3939
virtual result_t get_length(int64_t &retVal);
40+
virtual result_t end();
41+
virtual result_t isEnded(bool& retVal);
4042
virtual result_t clear();
4143
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
4244
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);

fibjs/include/ifs/Message.h

+32
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class Message_base : public object_base
4141
virtual result_t readAll(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac) = 0;
4242
virtual result_t write(Buffer_base* data, AsyncEvent* ac) = 0;
4343
virtual result_t get_length(int64_t& retVal) = 0;
44+
virtual result_t end() = 0;
45+
virtual result_t isEnded(bool& retVal) = 0;
4446
virtual result_t clear() = 0;
4547
virtual result_t sendTo(Stream_base* stm, AsyncEvent* ac) = 0;
4648
virtual result_t readFrom(Stream_base* stm, AsyncEvent* ac) = 0;
@@ -67,6 +69,8 @@ class Message_base : public object_base
6769
static void s_readAll(const v8::FunctionCallbackInfo<v8::Value>& args);
6870
static void s_write(const v8::FunctionCallbackInfo<v8::Value>& args);
6971
static void s_get_length(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value> &args);
72+
static void s_end(const v8::FunctionCallbackInfo<v8::Value>& args);
73+
static void s_isEnded(const v8::FunctionCallbackInfo<v8::Value>& args);
7074
static void s_clear(const v8::FunctionCallbackInfo<v8::Value>& args);
7175
static void s_sendTo(const v8::FunctionCallbackInfo<v8::Value>& args);
7276
static void s_readFrom(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -99,6 +103,8 @@ namespace fibjs
99103
{"read", s_read, false},
100104
{"readAll", s_readAll, false},
101105
{"write", s_write, false},
106+
{"end", s_end, false},
107+
{"isEnded", s_isEnded, false},
102108
{"clear", s_clear, false},
103109
{"sendTo", s_sendTo, false},
104110
{"readFrom", s_readFrom, false}
@@ -306,6 +312,32 @@ namespace fibjs
306312
METHOD_RETURN();
307313
}
308314

315+
inline void Message_base::s_end(const v8::FunctionCallbackInfo<v8::Value>& args)
316+
{
317+
METHOD_INSTANCE(Message_base);
318+
METHOD_ENTER();
319+
320+
METHOD_OVER(0, 0);
321+
322+
hr = pInst->end();
323+
324+
METHOD_VOID();
325+
}
326+
327+
inline void Message_base::s_isEnded(const v8::FunctionCallbackInfo<v8::Value>& args)
328+
{
329+
bool vr;
330+
331+
METHOD_INSTANCE(Message_base);
332+
METHOD_ENTER();
333+
334+
METHOD_OVER(0, 0);
335+
336+
hr = pInst->isEnded(vr);
337+
338+
METHOD_RETURN();
339+
}
340+
309341
inline void Message_base::s_clear(const v8::FunctionCallbackInfo<v8::Value>& args)
310342
{
311343
METHOD_INSTANCE(Message_base);

fibjs/include/ifs/Message.idl

+8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ interface Message : object
4343
/*! @brief 消息数据部分的长度 */
4444
readonly Long length;
4545

46+
/*! @brief 设置当前消息处理结束,Chain 处理器不再继续后面的事务 */
47+
end();
48+
49+
/*! @brief 查询当前消息是否结束
50+
@return 结束则返回 true
51+
*/
52+
Boolean isEnded();
53+
4654
/*! @brief 清除消息的内容 */
4755
clear();
4856

fibjs/src/http/HttpRequest.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ result_t HttpRequest::set_lastError(exlib::string newVal)
185185
return m_message->set_lastError(newVal);
186186
}
187187

188+
result_t HttpRequest::end()
189+
{
190+
return m_message->end();
191+
}
192+
193+
result_t HttpRequest::isEnded(bool& retVal)
194+
{
195+
return m_message->isEnded(retVal);
196+
}
197+
188198
result_t HttpRequest::clear()
189199
{
190200
m_message = new HttpMessage();

fibjs/src/http/HttpResponse.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ result_t HttpResponse::set_lastError(exlib::string newVal)
186186
return m_message->set_lastError(newVal);
187187
}
188188

189+
result_t HttpResponse::end()
190+
{
191+
return m_message->end();
192+
}
193+
194+
result_t HttpResponse::isEnded(bool& retVal)
195+
{
196+
return m_message->isEnded(retVal);
197+
}
198+
189199
result_t HttpResponse::clear()
190200
{
191201
m_message = new HttpMessage(true);

fibjs/src/mq/Chain.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
4040
{
4141
int32_t i;
4242

43+
m_msg = Message_base::getInstance(m_v);
44+
4345
m_array.resize(pThis->m_array.size());
4446
for (i = 0; i < (int32_t) pThis->m_array.size(); i ++)
4547
m_array[i] = pThis->m_array[i];
@@ -51,8 +53,11 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
5153
static int32_t invoke(AsyncState *pState, int32_t n)
5254
{
5355
asyncInvoke *pThis = (asyncInvoke *) pState;
56+
bool end = false;
5457

55-
if (pThis->m_pos == (int32_t) pThis->m_array.size())
58+
if (pThis->m_msg)
59+
pThis->m_msg->isEnded(end);
60+
if (end || (pThis->m_pos == (int32_t) pThis->m_array.size()))
5661
return pThis->done(CALL_RETURN_NULL);
5762

5863
pThis->m_pos++;
@@ -63,6 +68,7 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
6368
private:
6469
std::vector<obj_ptr<Handler_base> > m_array;
6570
obj_ptr<object_base> m_v;
71+
obj_ptr<Message_base> m_msg;
6672
int32_t m_pos;
6773
};
6874

fibjs/src/mq/Message.cpp

+16
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,24 @@ result_t Message_base::_new(obj_ptr<Message_base> &retVal, v8::Local<v8::Object>
111111
return 0;
112112
}
113113

114+
result_t Message::end()
115+
{
116+
m_end = true;
117+
return 0;
118+
}
119+
120+
result_t Message::isEnded(bool& retVal)
121+
{
122+
retVal = m_end;
123+
if (!m_end && m_response)
124+
m_response->isEnded(retVal);
125+
126+
return 0;
127+
}
128+
114129
result_t Message::clear()
115130
{
131+
m_end = false;
116132
m_params.Release();
117133
m_result.clear();
118134
m_value.clear();

fibjs/src/websocket/WebSocketMessage.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ result_t WebSocketMessage::set_lastError(exlib::string newVal)
9393
return m_message->set_lastError(newVal);
9494
}
9595

96+
result_t WebSocketMessage::end()
97+
{
98+
return m_message->end();
99+
}
100+
101+
result_t WebSocketMessage::isEnded(bool& retVal)
102+
{
103+
return m_message->isEnded(retVal);
104+
}
105+
96106
result_t WebSocketMessage::clear()
97107
{
98108
m_message = new Message(m_bRep);

test/mq_test.js

+26-10
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,15 @@ describe("mq", () => {
135135
})
136136

137137
describe("chain handler", () => {
138-
it("chain invoke",
139-
() => {
140-
var chain = new mq.Chain([hdlr1, hdlr2,
141-
mq.jsHandler(hdlr3)
142-
]);
138+
it("chain invoke", () => {
139+
var chain = new mq.Chain([hdlr1, hdlr2,
140+
mq.jsHandler(hdlr3)
141+
]);
143142

144-
n = 0;
145-
chain.invoke(v);
146-
assert.equal(7, n);
147-
});
143+
n = 0;
144+
chain.invoke(v);
145+
assert.equal(7, n);
146+
});
148147

149148
it("params", () => {
150149
function chain_params(v, p1, p2) {
@@ -169,7 +168,6 @@ describe("mq", () => {
169168

170169
it("Message", () => {
171170
var handler = new mq.Chain([
172-
173171
(v) => {
174172
return {};
175173
},
@@ -198,6 +196,24 @@ describe("mq", () => {
198196
mq.invoke(handler, req);
199197
});
200198

199+
it("end chain", () => {
200+
var handler = new mq.Chain([
201+
(v) => {
202+
return 1;
203+
},
204+
(v) => {
205+
v.end();
206+
},
207+
(v) => {
208+
return 3;
209+
}
210+
]);
211+
212+
var req = new mq.Message();
213+
mq.invoke(handler, req);
214+
assert.equal(1, req.result);
215+
});
216+
201217
it("memory leak", () => {
202218
var svr = new net.TcpServer(8888, () => {});
203219
ss.push(svr.socket);

0 commit comments

Comments
 (0)