Skip to content

Commit f186072

Browse files
author
朱胜
committed
基本完成异步框架, 还有内存泄露.
1 parent 70cf1bd commit f186072

File tree

13 files changed

+284
-151
lines changed

13 files changed

+284
-151
lines changed

src/couv/channel.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class channel
2828
bool is_closed() {
2929
return m_closed;
3030
}
31+
32+
operator bool() const {
33+
return !m_closed;
34+
}
3135

3236
void close() {
3337
m_closed = true;
@@ -42,13 +46,15 @@ class channel
4246
m_sem.signal();
4347
}
4448

45-
void receive(T& t) {
46-
if(m_closed)
47-
return;
49+
channel<T>& receive(T& t) {
50+
if(m_closed) {
51+
return *this;
52+
}
4853

4954
m_sem.wait();
5055
t = m_queue.front();
5156
m_queue.pop_front();
57+
return *this;
5258
}
5359

5460
T receive() {

src/couv/coroutine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class coroutine_base;
2222
// customer
2323
class coroutine: public coroutine_base
2424
{
25+
public:
26+
typedef void sign_type();
27+
typedef std::function<sign_type> func_type;
2528
public:
2629
coroutine(func_type f);
2730

src/couv/coroutine_base.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
#include "couv/coroutine_base.h"
88
#include "couv/scheduler.h"
9-
#include "couv/scheduler.h"
109
#include "log/Logger.h"
1110

1211
namespace couv
@@ -40,8 +39,13 @@ void coroutine_base::resume_coroutine(coroutine_ptr other)
4039
m_active = false;
4140
other->m_active = true;
4241

43-
current_scheduler->set_current(other);
42+
scheduler_t::self()->set_current(other);
4443
swapcontext(&m_ctx, &other->m_ctx);
4544
}
4645

46+
coroutine_ptr coroutine_base::self()
47+
{
48+
return scheduler_t::self()->current();
49+
}
50+
4751
} /* namespace coroutine */

src/couv/coroutine_base.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ namespace couv
1919

2020
class coroutine_base
2121
{
22-
public:
23-
typedef void sign_type();
24-
typedef std::function<sign_type> func_type;
25-
2622
public:
2723
coroutine_base();
2824
virtual ~coroutine_base();
@@ -35,8 +31,10 @@ class coroutine_base
3531
bool is_active()const { return m_active; }
3632
bool is_blocked()const { return m_blocked; }
3733
bool is_done()const { return m_done; }
38-
void set_blocked(bool block) { m_blocked = block; }
34+
void set_blocked(bool block) { m_blocked = block; }
3935
void set_delegate(delegate_t *d){ m_delegate = d; }
36+
37+
static coroutine_ptr self();
4038
protected:
4139
bool m_active;
4240
bool m_blocked;

src/couv/lock.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ lock_t::~lock_t()
2121

2222
void lock_t::lock()
2323
{
24-
coroutine_ptr current = current_coroutine;
24+
coroutine_ptr current = coroutine_base::self();
2525
if(m_flag) { // wait
2626
if(m_owner != current && m_queue.find(current) == m_queue.end()) {
2727
current->set_blocked(true);
@@ -50,7 +50,7 @@ bool lock_t::try_lock()
5050
{
5151
if(!m_flag) {
5252
m_flag = true;
53-
m_owner = current_coroutine;
53+
m_owner = coroutine_base::self();
5454
return true;
5555
}
5656
return false;

src/couv/scheduler.cpp

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@
1111

1212
namespace couv
1313
{
14-
//static scheduler_t local_scheduler;
15-
//scheduler_t* current_scheduler = &local_scheduler;
16-
scheduler_t* current_scheduler = nullptr;
14+
15+
scheduler_t* scheduler_t::s_instance = nullptr;
16+
1717

1818
scheduler_t::scheduler_t(bool install)
19+
:m_stop(false)
1920
{
2021
m_root = std::make_shared<coroutine_base>();
2122
m_current = m_root;
2223
if(install) {
23-
current_scheduler = this;
24+
s_instance = this;
2425
}
2526
}
2627

@@ -29,24 +30,47 @@ scheduler_t::~scheduler_t()
2930
logAssert(m_current == m_root);
3031
}
3132

32-
coroutine_ptr scheduler_t::add(coroutine_base::func_type&& f)
33-
{
34-
return add(coroutine_ptr(new coroutine(f)));
35-
}
36-
37-
coroutine_ptr scheduler_t::add(coroutine_ptr r)
33+
bool scheduler_t::add(coroutine_ptr r)
3834
{
3935
logAssert(r != nullptr);
4036

4137
if(m_root == r || r->is_done()) {
42-
return r;
38+
return false;
4339
}
4440

4541
logDebug("add %d", r->id());
4642
m_queue.push_back(r);
43+
return true;
44+
}
45+
46+
coroutine_ptr scheduler_t::add(const func_type& f)
47+
{
48+
coroutine_ptr r = std::make_shared<coroutine>(f);
49+
add(r);
4750
return r;
4851
}
4952

53+
coroutine_ptr scheduler_t::add(func_type&& f)
54+
{
55+
coroutine_ptr r = std::make_shared<coroutine>(std::forward<func_type>(f));
56+
add(r);
57+
return r;
58+
}
59+
60+
scheduler_t& scheduler_t::operator<<(func_type&& f)
61+
{
62+
coroutine_ptr r = std::make_shared<coroutine>(std::forward<func_type>(f));
63+
add(r);
64+
return *this;
65+
}
66+
67+
scheduler_t& scheduler_t::operator<<(const func_type& f)
68+
{
69+
coroutine_ptr r = std::make_shared<coroutine>(f);
70+
add(r);
71+
return *this;
72+
}
73+
5074
void scheduler_t::remove(coroutine_ptr r)
5175
{
5276
logDebug("remove %d", r->id());
@@ -85,7 +109,7 @@ void scheduler_t::run()
85109
{
86110
//logAssert(current_scheduler == nullptr);
87111

88-
current_scheduler = this;
112+
install();
89113

90114
while(!m_queue.empty())
91115
{
@@ -103,16 +127,24 @@ void scheduler_t::run()
103127

104128
if(!found)
105129
{
106-
logWarn("all coroutine blocked \n");
107-
exit(1);
130+
if(!has_more())
131+
{
132+
break;
133+
}
108134
}
109135
else
110136
{
111137
logDebug("OK!");
112138
}
113139
}
114140

115-
current_scheduler = nullptr;
141+
s_instance = nullptr;
142+
}
143+
144+
bool scheduler_t::has_more()
145+
{
146+
logWarn("all coroutine blocked \n");
147+
return false;
116148
}
117149

118150
void scheduler_t::on_start(coroutine_base* r)

src/couv/scheduler.h

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,18 @@
1010

1111
#include <deque>
1212

13-
#include "couv/coroutine_base.h"
1413
#include "couv/delegate.h"
1514
#include "log/Logger.h"
1615

17-
#define current_coroutine couv::current_scheduler->current()
18-
1916
#define yield(...) do{ \
20-
logDebug("yield from %d", current_coroutine->id()); \
21-
couv::current_scheduler->yield_coroutine( __VA_ARGS__ ); \
17+
logDebug("yield from %d", couv::coroutine_base::self()->id()); \
18+
couv::scheduler_t::self()->yield_coroutine( __VA_ARGS__ ); \
2219
} while(0)
2320

24-
#define resume couv::current_scheduler->resume
21+
#define resume couv::scheduler_t::self()->resume
2522

26-
#define go (*couv::current_scheduler)<<[&]
23+
#define go (*couv::scheduler_t::self())<<[&]
24+
#define goo (*couv::scheduler_t::self())<<
2725

2826
namespace couv
2927
{
@@ -33,40 +31,54 @@ namespace couv
3331
*/
3432
class scheduler_t: public delegate_t
3533
{
34+
public:
35+
typedef void sign_type();
36+
typedef std::function<sign_type> func_type;
3637
public:
3738
scheduler_t(bool install=true);
3839
virtual ~scheduler_t();
40+
virtual bool add(coroutine_ptr r);
41+
virtual void remove(coroutine_ptr r);
3942
virtual void run();
40-
41-
coroutine_ptr add(coroutine_base::func_type&& f);
42-
coroutine_ptr add(coroutine_ptr r);
43-
void remove(coroutine_ptr r);
44-
43+
4544
void yield_coroutine(coroutine_ptr r);
4645
void yield_coroutine();
46+
47+
coroutine_ptr add(const func_type& f);
48+
coroutine_ptr add(func_type&& f);
49+
scheduler_t& operator<<(const func_type& f);
50+
scheduler_t& operator<<(func_type&& f);
4751

4852
void set_current(coroutine_ptr r) { m_current = r; }
4953
coroutine_ptr current(){ return m_current; }
50-
51-
scheduler_t& operator<<(coroutine_base::func_type&& f){
52-
add(std::move(f));
53-
return *this;
54+
55+
void stop() { m_stop = true; }
56+
bool is_stop()const { return m_stop; }
57+
58+
void install() { s_instance = this; }
59+
bool is_installed()const { return s_instance == this; }
60+
61+
static scheduler_t* self()
62+
{
63+
logAssert(s_instance != nullptr);
64+
return s_instance;
5465
}
5566

5667
protected:
57-
void resume_coroutine(coroutine_ptr r);
68+
virtual void resume_coroutine(coroutine_ptr r);
69+
virtual bool has_more();
5870

5971
virtual void on_start(coroutine_base* r);
6072
virtual void on_stop(coroutine_base* r);
6173

6274
protected:
75+
static scheduler_t* s_instance;
76+
bool m_stop;
6377
coroutine_ptr m_root;
6478
coroutine_ptr m_current;
6579
std::deque<coroutine_ptr> m_queue;
6680
};
6781

68-
extern scheduler_t* current_scheduler;
69-
7082
} /* namespace coroutine */
7183

7284
#endif /* SRC_COUV_SCHEDULER_H_ */

src/couv/sem.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ sem_t::~sem_t()
2323
void sem_t::wait()
2424
{
2525
if(--m_counter < 0) {
26-
current_coroutine->set_blocked(true);
27-
m_queue.push_back(current_coroutine);
26+
coroutine_base::self()->set_blocked(true);
27+
m_queue.push_back(coroutine_base::self());
2828
yield();
2929
}
3030
}

0 commit comments

Comments
 (0)