forked from getlantern/marionette
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
121 lines (100 loc) · 2.79 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package marionette
import (
"context"
"errors"
"net"
"strconv"
"sync"
"github.com/redjack/marionette/mar"
"go.uber.org/zap"
)
// Listener listens on a port and communicates over the marionette protocol.
type Listener struct {
mu sync.RWMutex
ln net.Listener
doc *mar.Document
newStreams chan *Stream
err error
once sync.Once
wg sync.WaitGroup
closing chan struct{}
}
// Listen returns a new instance of Listener.
func Listen(doc *mar.Document, iface string) (*Listener, error) {
// Parse port from MAR specification.
// TODO: Handle "ftp_pasv_port".
port, err := strconv.Atoi(doc.Port)
if err != nil {
return nil, errors.New("invalid connection port")
}
addr := net.JoinHostPort(iface, strconv.Itoa(port))
Logger.Debug("opening listener", zap.String("transport", doc.Transport), zap.String("bind", addr))
ln, err := net.Listen(doc.Transport, addr)
if err != nil {
return nil, err
}
l := &Listener{
ln: ln,
doc: doc,
newStreams: make(chan *Stream),
closing: make(chan struct{}),
}
// Hand off connection handling to separate goroutine.
l.wg.Add(1)
go func() { defer l.wg.Done(); l.accept() }()
return l, nil
}
// Err returns the last error that occurred on the listener.
func (l *Listener) Err() error {
l.mu.RLock()
defer l.mu.RUnlock()
return l.err
}
// Addr returns the underlying network address.
func (l *Listener) Addr() net.Addr { return l.ln.Addr() }
// Close stops the listener and waits for the connections to finish.
func (l *Listener) Close() error {
err := l.ln.Close()
l.once.Do(func() { close(l.closing) })
l.wg.Wait()
return err
}
// Accept waits for a new connection.
func (l *Listener) Accept() (net.Conn, error) {
stream := <-l.newStreams
return stream, l.Err()
}
// accept continually accepts networks connections and multiplexes to streams.
func (l *Listener) accept() {
defer close(l.newStreams)
for {
// Wait for next connection.
conn, err := l.ln.Accept()
if err != nil {
l.mu.Lock()
l.err = err
l.mu.Unlock()
return
}
fsm := NewFSM(l.doc, PartyServer)
fsm.conn = conn
fsm.streams.LocalAddr = conn.LocalAddr()
fsm.streams.RemoteAddr = conn.RemoteAddr()
fsm.streams.OnNewStream = l.onNewStream
// Run execution in a separate goroutine.
l.wg.Add(1)
go func() { defer l.wg.Done(); l.execute(context.Background(), fsm) }()
}
}
func (l *Listener) execute(ctx context.Context, fsm *FSM) {
Logger.Debug("server fsm executing")
defer Logger.Debug("server fsm execution complete")
if err := fsm.Execute(ctx); err != nil {
Logger.Debug("server fsm execution error", zap.Error(err))
}
}
// onNewStream is called everytime the FSM's stream set creates a new stream.
func (l *Listener) onNewStream(stream *Stream) {
Logger.Debug("new server stream")
l.newStreams <- stream
}