Skip to content

Commit 2b6b1a6

Browse files
committed
chore: 上传客户端代码(没怎么测试, 有时间测试下)
1 parent b71307a commit 2b6b1a6

File tree

2 files changed

+160
-0
lines changed

2 files changed

+160
-0
lines changed

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,56 @@ func main() {
144144
}
145145
```
146146

147+
## 客户端事件循环示例
148+
149+
```go
150+
package main
151+
152+
import (
153+
"context"
154+
"fmt"
155+
"net"
156+
"github.com/antlabs/pulse"
157+
)
158+
159+
type MyClientHandler struct{}
160+
161+
func (h *MyClientHandler) OnOpen(c *pulse.Conn) {
162+
if err != nil {
163+
fmt.Println("连接失败:", err)
164+
return
165+
}
166+
fmt.Println("连接成功")
167+
c.Write([]byte("hello server!"))
168+
}
169+
170+
func (h *MyClientHandler) OnData(c *pulse.Conn, data []byte) {
171+
fmt.Println("收到数据:", string(data))
172+
}
173+
174+
func (h *MyClientHandler) OnClose(c *pulse.Conn, err error) {
175+
fmt.Println("连接关闭", err)
176+
}
177+
178+
func main() {
179+
conn1, err := net.Dial("tcp", "127.0.0.1:8080")
180+
if err != nil {
181+
panic(err)
182+
}
183+
conn2, err := net.Dial("tcp", "127.0.0.1:8081")
184+
if err != nil {
185+
panic(err)
186+
}
187+
loop := pulse.NewClientEventLoop(
188+
context.Background(),
189+
pulse.WithCallback(&MyClientHandler{}),
190+
)
191+
loop.RegisterConn(conn1)
192+
loop.RegisterConn(conn2)
193+
loop.Serve()
194+
}
195+
```
196+
147197
## 主要概念
148198

149199
### 回调接口

client_event_loop.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package pulse
2+
3+
import (
4+
"context"
5+
"net"
6+
"runtime"
7+
"sync"
8+
"sync/atomic"
9+
10+
"github.com/antlabs/pulse/core"
11+
)
12+
13+
type ClientEventLoop struct {
14+
pollers []core.PollingApi
15+
conns []*core.SafeConns[Conn]
16+
callback Callback
17+
options Options
18+
next uint32 // 用于轮询分配
19+
ctx context.Context
20+
}
21+
22+
func NewClientEventLoop(ctx context.Context, opts ...func(*Options)) *ClientEventLoop {
23+
var options Options
24+
for _, opt := range opts {
25+
opt(&options)
26+
}
27+
n := runtime.NumCPU()
28+
pollers := make([]core.PollingApi, n)
29+
conns := make([]*core.SafeConns[Conn], n)
30+
for i := 0; i < n; i++ {
31+
pollers[i], _ = core.Create(core.TriggerTypeEdge)
32+
conns[i] = &core.SafeConns[Conn]{}
33+
conns[i].Init(core.GetMaxFd())
34+
}
35+
return &ClientEventLoop{
36+
pollers: pollers,
37+
conns: conns,
38+
callback: options.callback,
39+
options: options,
40+
ctx: ctx,
41+
}
42+
}
43+
44+
func (loop *ClientEventLoop) RegisterConn(conn net.Conn) error {
45+
fd, err := core.GetFdFromConn(conn)
46+
if err != nil {
47+
return err
48+
}
49+
if err := conn.Close(); err != nil {
50+
return err
51+
}
52+
idx := atomic.AddUint32(&loop.next, 1) % uint32(len(loop.pollers))
53+
c := newConn(fd, loop.conns[idx], nil, TaskTypeInEventLoop, loop.pollers[idx], 4096, false)
54+
loop.conns[idx].Add(fd, c)
55+
loop.callback.OnOpen(c)
56+
return loop.pollers[idx].AddRead(fd)
57+
}
58+
59+
func (loop *ClientEventLoop) Serve() {
60+
n := len(loop.pollers)
61+
var wg sync.WaitGroup
62+
wg.Add(n)
63+
defer wg.Wait()
64+
65+
for i := 0; i < n; i++ {
66+
go func(idx int) {
67+
defer wg.Done()
68+
buf := make([]byte, loop.options.eventLoopReadBufferSize)
69+
for {
70+
select {
71+
case <-loop.ctx.Done():
72+
return
73+
default:
74+
}
75+
_, err := loop.pollers[idx].Poll(0, func(fd int, state core.State, pollErr error) {
76+
c := loop.conns[idx].GetUnsafe(fd)
77+
if pollErr != nil {
78+
if c != nil {
79+
c.Close()
80+
loop.callback.OnClose(c, pollErr)
81+
}
82+
return
83+
}
84+
if c == nil {
85+
return
86+
}
87+
if state.IsRead() {
88+
c.mu.Lock()
89+
n, err := core.Read(fd, buf)
90+
c.mu.Unlock()
91+
if err != nil {
92+
c.Close()
93+
loop.callback.OnClose(c, err)
94+
return
95+
}
96+
if n == 0 {
97+
c.Close()
98+
loop.callback.OnClose(c, nil)
99+
return
100+
}
101+
loop.callback.OnData(c, buf[:n])
102+
}
103+
})
104+
if err != nil {
105+
break
106+
}
107+
}
108+
}(i)
109+
}
110+
}

0 commit comments

Comments
 (0)