Skip to content

Commit a8f9a62

Browse files
committed
Add NewConnectorListener
1 parent 8446d16 commit a8f9a62

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

notify.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ type ListenerConn struct {
119119

120120
// NewListenerConn creates a new ListenerConn. Use NewListener instead.
121121
func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
122-
return newDialListenerConn(defaultDialer{}, name, notificationChan)
122+
return newDialListenerConn(defaultDialer{}, nil, name, notificationChan)
123123
}
124124

125-
func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
126-
cn, err := DialOpen(d, name)
125+
func newDialListenerConn(d Dialer, connector driver.Connector, name string, c chan<- *Notification) (*ListenerConn, error) {
126+
cn, err := getConn(connector, d, name)
127127
if err != nil {
128128
return nil, err
129129
}
@@ -140,6 +140,15 @@ func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*Listen
140140
return l, nil
141141
}
142142

143+
// getConn uses driver.Connector if provided and falls back to Dialer otherwise.
144+
func getConn(c driver.Connector, d Dialer, name string) (driver.Conn, error) {
145+
if c != nil {
146+
return c.Connect(context.Background())
147+
}
148+
149+
return DialOpen(d, name)
150+
}
151+
143152
// We can only allow one goroutine at a time to be running a query on the
144153
// connection for various reasons, so the goroutine sending on the connection
145154
// must be holding senderLock.
@@ -470,6 +479,7 @@ type Listener struct {
470479
maxReconnectInterval time.Duration
471480
dialer Dialer
472481
eventCallback EventCallbackType
482+
connector driver.Connector
473483

474484
lock sync.Mutex
475485
isClosed bool
@@ -502,19 +512,37 @@ func NewListener(name string,
502512
return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
503513
}
504514

515+
// NewConnectorListener is like NewListener but it takes a driver.Connector.
516+
func NewConnectorListener(c driver.Connector,
517+
name string,
518+
minReconnectInterval time.Duration,
519+
maxReconnectInterval time.Duration,
520+
eventCallback EventCallbackType) *Listener {
521+
return listener(c, defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
522+
}
523+
505524
// NewDialListener is like NewListener but it takes a Dialer.
506525
func NewDialListener(d Dialer,
507526
name string,
508527
minReconnectInterval time.Duration,
509528
maxReconnectInterval time.Duration,
510529
eventCallback EventCallbackType) *Listener {
530+
return listener(nil, d, name, minReconnectInterval, maxReconnectInterval, eventCallback)
531+
}
511532

533+
func listener(c driver.Connector,
534+
d Dialer,
535+
name string,
536+
minReconnectInterval time.Duration,
537+
maxReconnectInterval time.Duration,
538+
eventCallback EventCallbackType) *Listener {
512539
l := &Listener{
513540
name: name,
514541
minReconnectInterval: minReconnectInterval,
515542
maxReconnectInterval: maxReconnectInterval,
516543
dialer: d,
517544
eventCallback: eventCallback,
545+
connector: c,
518546

519547
channels: make(map[string]struct{}),
520548

@@ -749,7 +777,7 @@ func (l *Listener) closed() bool {
749777

750778
func (l *Listener) connect() error {
751779
notificationChan := make(chan *Notification, 32)
752-
cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
780+
cn, err := newDialListenerConn(l.dialer, l.connector, l.name, notificationChan)
753781
if err != nil {
754782
return err
755783
}

0 commit comments

Comments
 (0)