-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintro.qbk
335 lines (270 loc) · 12 KB
/
intro.qbk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
[/
Copyright Oliver Kowalke 2009.
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE_1_0.txt or copy at
http://www.boost.org/LICENSE_1_0.txt
]
[section:intro Introduction]
[heading Definition]
In computer science routines are defined as a sequence of operations.
The execution of routines form a parent-child relationship and the child
terminates always before the parent.
Coroutines are a generalization of routines.
The principal difference between coroutines and routines is that a coroutine
enables explicit suspend and resume of their progress via additional operations by
preserving local state, e.g. a coroutine is a kind of continuation.
A continuation is a object representing a suspended execution (registers,
stack). Each coroutine has its own stack and local variables, sub-routine calls
etc.
In this sense coroutines are (actually) a language concept.
[heading How it works]
Functions foo() and bar() are supposed to alternate their execution (leave and
enter function body).
[$../../../../libs/coroutine/doc/images/foo_bar.png [align center]]
If coroutines would be called such as routines, the stack would grow with
every call and will never be degraded. A jump into the middle of a coroutine
would not be possible, because the return address would have been on top of
stack entries.
The solution is that each coroutine has its own stack and control-block
(__fcontext__ from __boost_context__).
Before the coroutine gets suspended, the non-volatile registers (including stack
and instruction/program pointer) of the currently active coroutine are stored in
coroutine's control-block.
The registers of the newly activated coroutine must be restored from its
associated control-block before it can continue with their work.
[$../../../../libs/coroutine/doc/images/foo_bar_seq.png [align center]]
The context switch requires no system privileges and provides cooperative
multitasking on the level of language. Coroutines provide quasi parallelism.
When a program is supposed to do several things at the same time, coroutines
help to do this much simpler and more elegant than with only a single flow of
control.
Advantages can be seen particularly clearly with the use of a recursive
function, such as traversal of binary trees (see example 'same fringe').
[heading Example: asio::io_stream with std::stream]
This section demonstrates how stackfull coroutines help to use standard C++
IO-streams together with IO-demultiplexer like __io_service__ (using
non-blocking IO).
int main( int argc, char * argv[])
{
...
{
boost::asio::io_service io_service;
io_service.post(
boost::bind(
& server::start,
server::create(
io_service, port) ) );
io_service.run();
}
...
}
__server__ accepts connection-requests made by clients, creates for each new
connection an instance of type __session__ and invokes __start__ on it.
class server : public boost::enable_shared_from_this< server >
{
private:
boost::asio::io_service & io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
void handle_accept_( session * new_session, boost::system::error_code const& error)
{
if ( ! error)
{
// start asynchronous read
new_session->start();
// start asynchronous accept
start();
}
}
server( boost::asio::io_service & io_service, short port) :
io_service_( io_service),
acceptor_(
io_service_,
boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), port) )
{}
public:
typedef boost::shared_ptr< server > ptr_t;
static ptr_t create( boost::asio::io_service & io_service, short port)
{ return ptr_t( new server( io_service, port) ); }
void start()
{
// create new session which gets started if asynchronous
// accept completes
session * new_session( new session( io_service_) );
acceptor_.async_accept(
new_session->socket(),
boost::bind( & server::handle_accept_, this->shared_from_this(),
new_session, boost::asio::placeholders::error) );
}
};
Each __session__ communicates with the connected client and handles the
requests.
The application protocol in this example uses TCP-sockets as channel and
'newline' to separate the messages in the byte stream.
An application protocol is a set of rules for the order in which messages are
exchanged.
['std::istream] is used to extract the messages from the character stream .
Message 'exit' terminates the session.
class session : private boost::noncopyable
{
private:
void handle_read_( coro_t::caller_type & self)
{
// create stream-buffer reading from socket
inbuf buf( socket_);
std::istream s( & buf);
// messages are separated by 'newline'
std::string msg;
std::getline( s, msg);
std::cout << msg << std::endl;
// terminate session for message 'exit'
// else do asynchronous read
if ( "exit" == msg)
io_service_.post(
boost::bind(
& session::destroy_, this) );
else
start();
}
void destroy_()
{ delete this; }
boost::asio::io_service & io_service_;
boost::asio::ip::tcp::socket socket_;
public:
session( boost::asio::io_service & io_service) :
io_service_( io_service),
socket_( io_service_)
{ std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }
~session()
{ std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }
boost::asio::ip::tcp::socket & socket()
{ return socket_; }
void start()
{
// register on io_service for asynchronous read
io_service_.async_read(
socket_,
boost::bind(
& session::handle_read_, this->shared_from_this(), _1, _2) );
}
};
Function __getline__ returns only if a 'newline' was read from the socket.
Therefore the application will block until 'newline' is received by the socket.
The stream-buffer used by the stream maintains an internal buffer which gets
(re-)filled by its function __underflow__. __underflow__ does the
read-operation on the socket. The C++ IO-streams framework does not provide an
easy way to create an continuation which represents reading bytes from the socket.
Coroutines help in this case to make the application non-blocking even if no
'newline' was received.
Class ['session] creates a coroutine which uses __handle_read__ as
__coro_fn__. On a new created ['session] ['start()] called starting the
coroutine. In the __coro_fn__ __handle_read__ the messages are received
via __getline__ in a loop until 'exit' is delivered.
class session : private boost::noncopyable
{
private:
void handle_read_( coro_t::caller_type & ca)
{
// create stream-buffer with coroutine
inbuf buf( socket_, coro_, ca);
std::istream s( & buf);
std::string msg;
do
{
// read message
// we not block if no newline was received yet
std::getline( s, msg);
std::cout << msg << std::endl;
} while ( msg != "exit");
io_service_.post(
boost::bind(
& session::destroy_, this) );
}
void destroy_()
{ delete this; }
coro_t coro_;
boost::asio::io_service & io_service_;
boost::asio::ip::tcp::socket socket_;
public:
session( boost::asio::io_service & io_service) :
coro_(),
io_service_( io_service),
socket_( io_service_)
{ std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }
~session()
{ std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }
boost::asio::ip::tcp::socket & socket()
{ return socket_; }
void start()
{
// create and start a coroutine
// handle_read_() is used as coroutine-function
coro_ = coro_t( boost::bind( & session::handle_read_, this, _1) );
}
};
The stream-buffer is created with __coro_caller__ and handles suspend/resume of this
code path depending on if bytes can be read from the socket.
class inbuf : public std::streambuf,
private boost::noncopyable
{
private:
static const std::streamsize pb_size;
enum
{ bf_size = 16 };
int fetch_()
{
std::streamsize num = std::min(
static_cast< std::streamsize >( gptr() - eback() ), pb_size);
std::memmove(
buffer_ + ( pb_size - num),
gptr() - num, num);
// read bytes from the socket into internal buffer 'buffer_'
// make coro_t::operator() as callback, invoked if some
// bytes are read into 'buffer_'
s_.async_read_some(
boost::asio::buffer( buffer_ + pb_size, bf_size - pb_size),
boost::bind( & coro_t::operator(), & coro_, _1, _2) );
// suspend this coroutine
ca_();
// coroutine was resumed by boost::asio::io_sevice
boost::system::error_code ec;
std::size_t n = 0;
// check arguments
boost::tie( ec, n) = ca_.get();
// check if an error occurred
if ( ec)
{
setg( 0, 0, 0);
return -1;
}
setg( buffer_ + pb_size - num, buffer_ + pb_size, buffer_ + pb_size + n);
return n;
}
boost::asio::ip::tcp::socket & s_;
coro_t & coro_;
coro_t::caller_type & ca_;
char buffer_[bf_size];
protected:
virtual int underflow()
{
if ( gptr() < egptr() )
return traits_type::to_int_type( * gptr() );
if ( 0 > fetch_() )
return traits_type::eof();
else
return traits_type::to_int_type( * gptr() );
}
public:
inbuf(
boost::asio::ip::tcp::socket & s,
coro_t & coro,
coro_t::caller_type & ca) :
s_( s), coro_( coro), ca_( ca), buffer_()
{ setg( buffer_ + 4, buffer_ + 4, buffer_ + 4); }
};
const std::streamsize inbuf::pb_size = 4;
__fetch__ uses __coro_op__ as callback for the asynchronous read-operation on the
socket and suspends itself (['ca_()] jumps back to __start__). If some bytes are
available in the socket receive buffer __io_service__ copies the bytes to the
internal buffer ['buffer_] and invokes the callback which resumes the coroutine
(['ca_()] returns).
[endsect]