source: code/trunk/vendor/github.com/lib/pq/notify.go@ 822

Last change on this file since 822 was 822, checked in by yakumo.izuru, 22 months ago

Prefer immortal.run over runit and rc.d, use vendored modules
for convenience.

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 25.0 KB
Line 
1package pq
2
3// Package pq is a pure Go Postgres driver for the database/sql package.
4// This module contains support for Postgres LISTEN/NOTIFY.
5
6import (
7 "context"
8 "database/sql/driver"
9 "errors"
10 "fmt"
11 "sync"
12 "sync/atomic"
13 "time"
14)
15
16// Notification represents a single notification from the database.
17type Notification struct {
18 // Process ID (PID) of the notifying postgres backend.
19 BePid int
20 // Name of the channel the notification was sent on.
21 Channel string
22 // Payload, or the empty string if unspecified.
23 Extra string
24}
25
26func recvNotification(r *readBuf) *Notification {
27 bePid := r.int32()
28 channel := r.string()
29 extra := r.string()
30
31 return &Notification{bePid, channel, extra}
32}
33
34// SetNotificationHandler sets the given notification handler on the given
35// connection. A runtime panic occurs if c is not a pq connection. A nil handler
36// may be used to unset it.
37//
38// Note: Notification handlers are executed synchronously by pq meaning commands
39// won't continue to be processed until the handler returns.
40func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
41 c.(*conn).notificationHandler = handler
42}
43
44// NotificationHandlerConnector wraps a regular connector and sets a notification handler
45// on it.
46type NotificationHandlerConnector struct {
47 driver.Connector
48 notificationHandler func(*Notification)
49}
50
51// Connect calls the underlying connector's connect method and then sets the
52// notification handler.
53func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
54 c, err := n.Connector.Connect(ctx)
55 if err == nil {
56 SetNotificationHandler(c, n.notificationHandler)
57 }
58 return c, err
59}
60
61// ConnectorNotificationHandler returns the currently set notification handler, if any. If
62// the given connector is not a result of ConnectorWithNotificationHandler, nil is
63// returned.
64func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
65 if c, ok := c.(*NotificationHandlerConnector); ok {
66 return c.notificationHandler
67 }
68 return nil
69}
70
71// ConnectorWithNotificationHandler creates or sets the given handler for the given
72// connector. If the given connector is a result of calling this function
73// previously, it is simply set on the given connector and returned. Otherwise,
74// this returns a new connector wrapping the given one and setting the notification
75// handler. A nil notification handler may be used to unset it.
76//
77// The returned connector is intended to be used with database/sql.OpenDB.
78//
79// Note: Notification handlers are executed synchronously by pq meaning commands
80// won't continue to be processed until the handler returns.
81func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
82 if c, ok := c.(*NotificationHandlerConnector); ok {
83 c.notificationHandler = handler
84 return c
85 }
86 return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
87}
88
89const (
90 connStateIdle int32 = iota
91 connStateExpectResponse
92 connStateExpectReadyForQuery
93)
94
95type message struct {
96 typ byte
97 err error
98}
99
100var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
101
102// ListenerConn is a low-level interface for waiting for notifications. You
103// should use Listener instead.
104type ListenerConn struct {
105 // guards cn and err
106 connectionLock sync.Mutex
107 cn *conn
108 err error
109
110 connState int32
111
112 // the sending goroutine will be holding this lock
113 senderLock sync.Mutex
114
115 notificationChan chan<- *Notification
116
117 replyChan chan message
118}
119
120// NewListenerConn creates a new ListenerConn. Use NewListener instead.
121func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
122 return newDialListenerConn(defaultDialer{}, name, notificationChan)
123}
124
125func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
126 cn, err := DialOpen(d, name)
127 if err != nil {
128 return nil, err
129 }
130
131 l := &ListenerConn{
132 cn: cn.(*conn),
133 notificationChan: c,
134 connState: connStateIdle,
135 replyChan: make(chan message, 2),
136 }
137
138 go l.listenerConnMain()
139
140 return l, nil
141}
142
143// We can only allow one goroutine at a time to be running a query on the
144// connection for various reasons, so the goroutine sending on the connection
145// must be holding senderLock.
146//
147// Returns an error if an unrecoverable error has occurred and the ListenerConn
148// should be abandoned.
149func (l *ListenerConn) acquireSenderLock() error {
150 // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
151 l.senderLock.Lock()
152
153 l.connectionLock.Lock()
154 err := l.err
155 l.connectionLock.Unlock()
156 if err != nil {
157 l.senderLock.Unlock()
158 return err
159 }
160 return nil
161}
162
163func (l *ListenerConn) releaseSenderLock() {
164 l.senderLock.Unlock()
165}
166
167// setState advances the protocol state to newState. Returns false if moving
168// to that state from the current state is not allowed.
169func (l *ListenerConn) setState(newState int32) bool {
170 var expectedState int32
171
172 switch newState {
173 case connStateIdle:
174 expectedState = connStateExpectReadyForQuery
175 case connStateExpectResponse:
176 expectedState = connStateIdle
177 case connStateExpectReadyForQuery:
178 expectedState = connStateExpectResponse
179 default:
180 panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
181 }
182
183 return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
184}
185
186// Main logic is here: receive messages from the postgres backend, forward
187// notifications and query replies and keep the internal state in sync with the
188// protocol state. Returns when the connection has been lost, is about to go
189// away or should be discarded because we couldn't agree on the state with the
190// server backend.
191func (l *ListenerConn) listenerConnLoop() (err error) {
192 defer errRecoverNoErrBadConn(&err)
193
194 r := &readBuf{}
195 for {
196 t, err := l.cn.recvMessage(r)
197 if err != nil {
198 return err
199 }
200
201 switch t {
202 case 'A':
203 // recvNotification copies all the data so we don't need to worry
204 // about the scratch buffer being overwritten.
205 l.notificationChan <- recvNotification(r)
206
207 case 'T', 'D':
208 // only used by tests; ignore
209
210 case 'E':
211 // We might receive an ErrorResponse even when not in a query; it
212 // is expected that the server will close the connection after
213 // that, but we should make sure that the error we display is the
214 // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
215 if !l.setState(connStateExpectReadyForQuery) {
216 return parseError(r)
217 }
218 l.replyChan <- message{t, parseError(r)}
219
220 case 'C', 'I':
221 if !l.setState(connStateExpectReadyForQuery) {
222 // protocol out of sync
223 return fmt.Errorf("unexpected CommandComplete")
224 }
225 // ExecSimpleQuery doesn't need to know about this message
226
227 case 'Z':
228 if !l.setState(connStateIdle) {
229 // protocol out of sync
230 return fmt.Errorf("unexpected ReadyForQuery")
231 }
232 l.replyChan <- message{t, nil}
233
234 case 'S':
235 // ignore
236 case 'N':
237 if n := l.cn.noticeHandler; n != nil {
238 n(parseError(r))
239 }
240 default:
241 return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
242 }
243 }
244}
245
246// This is the main routine for the goroutine receiving on the database
247// connection. Most of the main logic is in listenerConnLoop.
248func (l *ListenerConn) listenerConnMain() {
249 err := l.listenerConnLoop()
250
251 // listenerConnLoop terminated; we're done, but we still have to clean up.
252 // Make sure nobody tries to start any new queries by making sure the err
253 // pointer is set. It is important that we do not overwrite its value; a
254 // connection could be closed by either this goroutine or one sending on
255 // the connection -- whoever closes the connection is assumed to have the
256 // more meaningful error message (as the other one will probably get
257 // net.errClosed), so that goroutine sets the error we expose while the
258 // other error is discarded. If the connection is lost while two
259 // goroutines are operating on the socket, it probably doesn't matter which
260 // error we expose so we don't try to do anything more complex.
261 l.connectionLock.Lock()
262 if l.err == nil {
263 l.err = err
264 }
265 l.cn.Close()
266 l.connectionLock.Unlock()
267
268 // There might be a query in-flight; make sure nobody's waiting for a
269 // response to it, since there's not going to be one.
270 close(l.replyChan)
271
272 // let the listener know we're done
273 close(l.notificationChan)
274
275 // this ListenerConn is done
276}
277
278// Listen sends a LISTEN query to the server. See ExecSimpleQuery.
279func (l *ListenerConn) Listen(channel string) (bool, error) {
280 return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
281}
282
283// Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
284func (l *ListenerConn) Unlisten(channel string) (bool, error) {
285 return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
286}
287
288// UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
289func (l *ListenerConn) UnlistenAll() (bool, error) {
290 return l.ExecSimpleQuery("UNLISTEN *")
291}
292
293// Ping the remote server to make sure it's alive. Non-nil error means the
294// connection has failed and should be abandoned.
295func (l *ListenerConn) Ping() error {
296 sent, err := l.ExecSimpleQuery("")
297 if !sent {
298 return err
299 }
300 if err != nil {
301 // shouldn't happen
302 panic(err)
303 }
304 return nil
305}
306
307// Attempt to send a query on the connection. Returns an error if sending the
308// query failed, and the caller should initiate closure of this connection.
309// The caller must be holding senderLock (see acquireSenderLock and
310// releaseSenderLock).
311func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
312 defer errRecoverNoErrBadConn(&err)
313
314 // must set connection state before sending the query
315 if !l.setState(connStateExpectResponse) {
316 panic("two queries running at the same time")
317 }
318
319 // Can't use l.cn.writeBuf here because it uses the scratch buffer which
320 // might get overwritten by listenerConnLoop.
321 b := &writeBuf{
322 buf: []byte("Q\x00\x00\x00\x00"),
323 pos: 1,
324 }
325 b.string(q)
326 l.cn.send(b)
327
328 return nil
329}
330
331// ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
332// parameters) on the connection. The possible return values are:
333// 1) "executed" is true; the query was executed to completion on the
334// database server. If the query failed, err will be set to the error
335// returned by the database, otherwise err will be nil.
336// 2) If "executed" is false, the query could not be executed on the remote
337// server. err will be non-nil.
338//
339// After a call to ExecSimpleQuery has returned an executed=false value, the
340// connection has either been closed or will be closed shortly thereafter, and
341// all subsequently executed queries will return an error.
342func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
343 if err = l.acquireSenderLock(); err != nil {
344 return false, err
345 }
346 defer l.releaseSenderLock()
347
348 err = l.sendSimpleQuery(q)
349 if err != nil {
350 // We can't know what state the protocol is in, so we need to abandon
351 // this connection.
352 l.connectionLock.Lock()
353 // Set the error pointer if it hasn't been set already; see
354 // listenerConnMain.
355 if l.err == nil {
356 l.err = err
357 }
358 l.connectionLock.Unlock()
359 l.cn.c.Close()
360 return false, err
361 }
362
363 // now we just wait for a reply..
364 for {
365 m, ok := <-l.replyChan
366 if !ok {
367 // We lost the connection to server, don't bother waiting for a
368 // a response. err should have been set already.
369 l.connectionLock.Lock()
370 err := l.err
371 l.connectionLock.Unlock()
372 return false, err
373 }
374 switch m.typ {
375 case 'Z':
376 // sanity check
377 if m.err != nil {
378 panic("m.err != nil")
379 }
380 // done; err might or might not be set
381 return true, err
382
383 case 'E':
384 // sanity check
385 if m.err == nil {
386 panic("m.err == nil")
387 }
388 // server responded with an error; ReadyForQuery to follow
389 err = m.err
390
391 default:
392 return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
393 }
394 }
395}
396
397// Close closes the connection.
398func (l *ListenerConn) Close() error {
399 l.connectionLock.Lock()
400 if l.err != nil {
401 l.connectionLock.Unlock()
402 return errListenerConnClosed
403 }
404 l.err = errListenerConnClosed
405 l.connectionLock.Unlock()
406 // We can't send anything on the connection without holding senderLock.
407 // Simply close the net.Conn to wake up everyone operating on it.
408 return l.cn.c.Close()
409}
410
411// Err returns the reason the connection was closed. It is not safe to call
412// this function until l.Notify has been closed.
413func (l *ListenerConn) Err() error {
414 return l.err
415}
416
417var errListenerClosed = errors.New("pq: Listener has been closed")
418
419// ErrChannelAlreadyOpen is returned from Listen when a channel is already
420// open.
421var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
422
423// ErrChannelNotOpen is returned from Unlisten when a channel is not open.
424var ErrChannelNotOpen = errors.New("pq: channel is not open")
425
426// ListenerEventType is an enumeration of listener event types.
427type ListenerEventType int
428
429const (
430 // ListenerEventConnected is emitted only when the database connection
431 // has been initially initialized. The err argument of the callback
432 // will always be nil.
433 ListenerEventConnected ListenerEventType = iota
434
435 // ListenerEventDisconnected is emitted after a database connection has
436 // been lost, either because of an error or because Close has been
437 // called. The err argument will be set to the reason the database
438 // connection was lost.
439 ListenerEventDisconnected
440
441 // ListenerEventReconnected is emitted after a database connection has
442 // been re-established after connection loss. The err argument of the
443 // callback will always be nil. After this event has been emitted, a
444 // nil pq.Notification is sent on the Listener.Notify channel.
445 ListenerEventReconnected
446
447 // ListenerEventConnectionAttemptFailed is emitted after a connection
448 // to the database was attempted, but failed. The err argument will be
449 // set to an error describing why the connection attempt did not
450 // succeed.
451 ListenerEventConnectionAttemptFailed
452)
453
454// EventCallbackType is the event callback type. See also ListenerEventType
455// constants' documentation.
456type EventCallbackType func(event ListenerEventType, err error)
457
458// Listener provides an interface for listening to notifications from a
459// PostgreSQL database. For general usage information, see section
460// "Notifications".
461//
462// Listener can safely be used from concurrently running goroutines.
463type Listener struct {
464 // Channel for receiving notifications from the database. In some cases a
465 // nil value will be sent. See section "Notifications" above.
466 Notify chan *Notification
467
468 name string
469 minReconnectInterval time.Duration
470 maxReconnectInterval time.Duration
471 dialer Dialer
472 eventCallback EventCallbackType
473
474 lock sync.Mutex
475 isClosed bool
476 reconnectCond *sync.Cond
477 cn *ListenerConn
478 connNotificationChan <-chan *Notification
479 channels map[string]struct{}
480}
481
482// NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
483//
484// name should be set to a connection string to be used to establish the
485// database connection (see section "Connection String Parameters" above).
486//
487// minReconnectInterval controls the duration to wait before trying to
488// re-establish the database connection after connection loss. After each
489// consecutive failure this interval is doubled, until maxReconnectInterval is
490// reached. Successfully completing the connection establishment procedure
491// resets the interval back to minReconnectInterval.
492//
493// The last parameter eventCallback can be set to a function which will be
494// called by the Listener when the state of the underlying database connection
495// changes. This callback will be called by the goroutine which dispatches the
496// notifications over the Notify channel, so you should try to avoid doing
497// potentially time-consuming operations from the callback.
498func NewListener(name string,
499 minReconnectInterval time.Duration,
500 maxReconnectInterval time.Duration,
501 eventCallback EventCallbackType) *Listener {
502 return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
503}
504
505// NewDialListener is like NewListener but it takes a Dialer.
506func NewDialListener(d Dialer,
507 name string,
508 minReconnectInterval time.Duration,
509 maxReconnectInterval time.Duration,
510 eventCallback EventCallbackType) *Listener {
511
512 l := &Listener{
513 name: name,
514 minReconnectInterval: minReconnectInterval,
515 maxReconnectInterval: maxReconnectInterval,
516 dialer: d,
517 eventCallback: eventCallback,
518
519 channels: make(map[string]struct{}),
520
521 Notify: make(chan *Notification, 32),
522 }
523 l.reconnectCond = sync.NewCond(&l.lock)
524
525 go l.listenerMain()
526
527 return l
528}
529
530// NotificationChannel returns the notification channel for this listener.
531// This is the same channel as Notify, and will not be recreated during the
532// life time of the Listener.
533func (l *Listener) NotificationChannel() <-chan *Notification {
534 return l.Notify
535}
536
537// Listen starts listening for notifications on a channel. Calls to this
538// function will block until an acknowledgement has been received from the
539// server. Note that Listener automatically re-establishes the connection
540// after connection loss, so this function may block indefinitely if the
541// connection can not be re-established.
542//
543// Listen will only fail in three conditions:
544// 1) The channel is already open. The returned error will be
545// ErrChannelAlreadyOpen.
546// 2) The query was executed on the remote server, but PostgreSQL returned an
547// error message in response to the query. The returned error will be a
548// pq.Error containing the information the server supplied.
549// 3) Close is called on the Listener before the request could be completed.
550//
551// The channel name is case-sensitive.
552func (l *Listener) Listen(channel string) error {
553 l.lock.Lock()
554 defer l.lock.Unlock()
555
556 if l.isClosed {
557 return errListenerClosed
558 }
559
560 // The server allows you to issue a LISTEN on a channel which is already
561 // open, but it seems useful to be able to detect this case to spot for
562 // mistakes in application logic. If the application genuinely does't
563 // care, it can check the exported error and ignore it.
564 _, exists := l.channels[channel]
565 if exists {
566 return ErrChannelAlreadyOpen
567 }
568
569 if l.cn != nil {
570 // If gotResponse is true but error is set, the query was executed on
571 // the remote server, but resulted in an error. This should be
572 // relatively rare, so it's fine if we just pass the error to our
573 // caller. However, if gotResponse is false, we could not complete the
574 // query on the remote server and our underlying connection is about
575 // to go away, so we only add relname to l.channels, and wait for
576 // resync() to take care of the rest.
577 gotResponse, err := l.cn.Listen(channel)
578 if gotResponse && err != nil {
579 return err
580 }
581 }
582
583 l.channels[channel] = struct{}{}
584 for l.cn == nil {
585 l.reconnectCond.Wait()
586 // we let go of the mutex for a while
587 if l.isClosed {
588 return errListenerClosed
589 }
590 }
591
592 return nil
593}
594
595// Unlisten removes a channel from the Listener's channel list. Returns
596// ErrChannelNotOpen if the Listener is not listening on the specified channel.
597// Returns immediately with no error if there is no connection. Note that you
598// might still get notifications for this channel even after Unlisten has
599// returned.
600//
601// The channel name is case-sensitive.
602func (l *Listener) Unlisten(channel string) error {
603 l.lock.Lock()
604 defer l.lock.Unlock()
605
606 if l.isClosed {
607 return errListenerClosed
608 }
609
610 // Similarly to LISTEN, this is not an error in Postgres, but it seems
611 // useful to distinguish from the normal conditions.
612 _, exists := l.channels[channel]
613 if !exists {
614 return ErrChannelNotOpen
615 }
616
617 if l.cn != nil {
618 // Similarly to Listen (see comment in that function), the caller
619 // should only be bothered with an error if it came from the backend as
620 // a response to our query.
621 gotResponse, err := l.cn.Unlisten(channel)
622 if gotResponse && err != nil {
623 return err
624 }
625 }
626
627 // Don't bother waiting for resync if there's no connection.
628 delete(l.channels, channel)
629 return nil
630}
631
632// UnlistenAll removes all channels from the Listener's channel list. Returns
633// immediately with no error if there is no connection. Note that you might
634// still get notifications for any of the deleted channels even after
635// UnlistenAll has returned.
636func (l *Listener) UnlistenAll() error {
637 l.lock.Lock()
638 defer l.lock.Unlock()
639
640 if l.isClosed {
641 return errListenerClosed
642 }
643
644 if l.cn != nil {
645 // Similarly to Listen (see comment in that function), the caller
646 // should only be bothered with an error if it came from the backend as
647 // a response to our query.
648 gotResponse, err := l.cn.UnlistenAll()
649 if gotResponse && err != nil {
650 return err
651 }
652 }
653
654 // Don't bother waiting for resync if there's no connection.
655 l.channels = make(map[string]struct{})
656 return nil
657}
658
659// Ping the remote server to make sure it's alive. Non-nil return value means
660// that there is no active connection.
661func (l *Listener) Ping() error {
662 l.lock.Lock()
663 defer l.lock.Unlock()
664
665 if l.isClosed {
666 return errListenerClosed
667 }
668 if l.cn == nil {
669 return errors.New("no connection")
670 }
671
672 return l.cn.Ping()
673}
674
675// Clean up after losing the server connection. Returns l.cn.Err(), which
676// should have the reason the connection was lost.
677func (l *Listener) disconnectCleanup() error {
678 l.lock.Lock()
679 defer l.lock.Unlock()
680
681 // sanity check; can't look at Err() until the channel has been closed
682 select {
683 case _, ok := <-l.connNotificationChan:
684 if ok {
685 panic("connNotificationChan not closed")
686 }
687 default:
688 panic("connNotificationChan not closed")
689 }
690
691 err := l.cn.Err()
692 l.cn.Close()
693 l.cn = nil
694 return err
695}
696
697// Synchronize the list of channels we want to be listening on with the server
698// after the connection has been established.
699func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
700 doneChan := make(chan error)
701 go func(notificationChan <-chan *Notification) {
702 for channel := range l.channels {
703 // If we got a response, return that error to our caller as it's
704 // going to be more descriptive than cn.Err().
705 gotResponse, err := cn.Listen(channel)
706 if gotResponse && err != nil {
707 doneChan <- err
708 return
709 }
710
711 // If we couldn't reach the server, wait for notificationChan to
712 // close and then return the error message from the connection, as
713 // per ListenerConn's interface.
714 if err != nil {
715 for range notificationChan {
716 }
717 doneChan <- cn.Err()
718 return
719 }
720 }
721 doneChan <- nil
722 }(notificationChan)
723
724 // Ignore notifications while synchronization is going on to avoid
725 // deadlocks. We have to send a nil notification over Notify anyway as
726 // we can't possibly know which notifications (if any) were lost while
727 // the connection was down, so there's no reason to try and process
728 // these messages at all.
729 for {
730 select {
731 case _, ok := <-notificationChan:
732 if !ok {
733 notificationChan = nil
734 }
735
736 case err := <-doneChan:
737 return err
738 }
739 }
740}
741
742// caller should NOT be holding l.lock
743func (l *Listener) closed() bool {
744 l.lock.Lock()
745 defer l.lock.Unlock()
746
747 return l.isClosed
748}
749
750func (l *Listener) connect() error {
751 notificationChan := make(chan *Notification, 32)
752 cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
753 if err != nil {
754 return err
755 }
756
757 l.lock.Lock()
758 defer l.lock.Unlock()
759
760 err = l.resync(cn, notificationChan)
761 if err != nil {
762 cn.Close()
763 return err
764 }
765
766 l.cn = cn
767 l.connNotificationChan = notificationChan
768 l.reconnectCond.Broadcast()
769
770 return nil
771}
772
773// Close disconnects the Listener from the database and shuts it down.
774// Subsequent calls to its methods will return an error. Close returns an
775// error if the connection has already been closed.
776func (l *Listener) Close() error {
777 l.lock.Lock()
778 defer l.lock.Unlock()
779
780 if l.isClosed {
781 return errListenerClosed
782 }
783
784 if l.cn != nil {
785 l.cn.Close()
786 }
787 l.isClosed = true
788
789 // Unblock calls to Listen()
790 l.reconnectCond.Broadcast()
791
792 return nil
793}
794
795func (l *Listener) emitEvent(event ListenerEventType, err error) {
796 if l.eventCallback != nil {
797 l.eventCallback(event, err)
798 }
799}
800
801// Main logic here: maintain a connection to the server when possible, wait
802// for notifications and emit events.
803func (l *Listener) listenerConnLoop() {
804 var nextReconnect time.Time
805
806 reconnectInterval := l.minReconnectInterval
807 for {
808 for {
809 err := l.connect()
810 if err == nil {
811 break
812 }
813
814 if l.closed() {
815 return
816 }
817 l.emitEvent(ListenerEventConnectionAttemptFailed, err)
818
819 time.Sleep(reconnectInterval)
820 reconnectInterval *= 2
821 if reconnectInterval > l.maxReconnectInterval {
822 reconnectInterval = l.maxReconnectInterval
823 }
824 }
825
826 if nextReconnect.IsZero() {
827 l.emitEvent(ListenerEventConnected, nil)
828 } else {
829 l.emitEvent(ListenerEventReconnected, nil)
830 l.Notify <- nil
831 }
832
833 reconnectInterval = l.minReconnectInterval
834 nextReconnect = time.Now().Add(reconnectInterval)
835
836 for {
837 notification, ok := <-l.connNotificationChan
838 if !ok {
839 // lost connection, loop again
840 break
841 }
842 l.Notify <- notification
843 }
844
845 err := l.disconnectCleanup()
846 if l.closed() {
847 return
848 }
849 l.emitEvent(ListenerEventDisconnected, err)
850
851 time.Sleep(time.Until(nextReconnect))
852 }
853}
854
855func (l *Listener) listenerMain() {
856 l.listenerConnLoop()
857 close(l.Notify)
858}
Note: See TracBrowser for help on using the repository browser.