[822] | 1 | package pq
|
---|
| 2 |
|
---|
| 3 | // Package pq is a pure Go Postgres driver for the database/sql package.
|
---|
| 4 | // This module contains support for Postgres LISTEN/NOTIFY.
|
---|
| 5 |
|
---|
| 6 | import (
|
---|
| 7 | "context"
|
---|
| 8 | "database/sql/driver"
|
---|
| 9 | "errors"
|
---|
| 10 | "fmt"
|
---|
| 11 | "sync"
|
---|
| 12 | "sync/atomic"
|
---|
| 13 | "time"
|
---|
| 14 | )
|
---|
| 15 |
|
---|
| 16 | // Notification represents a single notification from the database.
|
---|
| 17 | type Notification struct {
|
---|
| 18 | // Process ID (PID) of the notifying postgres backend.
|
---|
| 19 | BePid int
|
---|
| 20 | // Name of the channel the notification was sent on.
|
---|
| 21 | Channel string
|
---|
| 22 | // Payload, or the empty string if unspecified.
|
---|
| 23 | Extra string
|
---|
| 24 | }
|
---|
| 25 |
|
---|
| 26 | func recvNotification(r *readBuf) *Notification {
|
---|
| 27 | bePid := r.int32()
|
---|
| 28 | channel := r.string()
|
---|
| 29 | extra := r.string()
|
---|
| 30 |
|
---|
| 31 | return &Notification{bePid, channel, extra}
|
---|
| 32 | }
|
---|
| 33 |
|
---|
| 34 | // SetNotificationHandler sets the given notification handler on the given
|
---|
| 35 | // connection. A runtime panic occurs if c is not a pq connection. A nil handler
|
---|
| 36 | // may be used to unset it.
|
---|
| 37 | //
|
---|
| 38 | // Note: Notification handlers are executed synchronously by pq meaning commands
|
---|
| 39 | // won't continue to be processed until the handler returns.
|
---|
| 40 | func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
|
---|
| 41 | c.(*conn).notificationHandler = handler
|
---|
| 42 | }
|
---|
| 43 |
|
---|
| 44 | // NotificationHandlerConnector wraps a regular connector and sets a notification handler
|
---|
| 45 | // on it.
|
---|
| 46 | type NotificationHandlerConnector struct {
|
---|
| 47 | driver.Connector
|
---|
| 48 | notificationHandler func(*Notification)
|
---|
| 49 | }
|
---|
| 50 |
|
---|
| 51 | // Connect calls the underlying connector's connect method and then sets the
|
---|
| 52 | // notification handler.
|
---|
| 53 | func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
|
---|
| 54 | c, err := n.Connector.Connect(ctx)
|
---|
| 55 | if err == nil {
|
---|
| 56 | SetNotificationHandler(c, n.notificationHandler)
|
---|
| 57 | }
|
---|
| 58 | return c, err
|
---|
| 59 | }
|
---|
| 60 |
|
---|
| 61 | // ConnectorNotificationHandler returns the currently set notification handler, if any. If
|
---|
| 62 | // the given connector is not a result of ConnectorWithNotificationHandler, nil is
|
---|
| 63 | // returned.
|
---|
| 64 | func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
|
---|
| 65 | if c, ok := c.(*NotificationHandlerConnector); ok {
|
---|
| 66 | return c.notificationHandler
|
---|
| 67 | }
|
---|
| 68 | return nil
|
---|
| 69 | }
|
---|
| 70 |
|
---|
| 71 | // ConnectorWithNotificationHandler creates or sets the given handler for the given
|
---|
| 72 | // connector. If the given connector is a result of calling this function
|
---|
| 73 | // previously, it is simply set on the given connector and returned. Otherwise,
|
---|
| 74 | // this returns a new connector wrapping the given one and setting the notification
|
---|
| 75 | // handler. A nil notification handler may be used to unset it.
|
---|
| 76 | //
|
---|
| 77 | // The returned connector is intended to be used with database/sql.OpenDB.
|
---|
| 78 | //
|
---|
| 79 | // Note: Notification handlers are executed synchronously by pq meaning commands
|
---|
| 80 | // won't continue to be processed until the handler returns.
|
---|
| 81 | func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
|
---|
| 82 | if c, ok := c.(*NotificationHandlerConnector); ok {
|
---|
| 83 | c.notificationHandler = handler
|
---|
| 84 | return c
|
---|
| 85 | }
|
---|
| 86 | return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
|
---|
| 87 | }
|
---|
| 88 |
|
---|
| 89 | const (
|
---|
| 90 | connStateIdle int32 = iota
|
---|
| 91 | connStateExpectResponse
|
---|
| 92 | connStateExpectReadyForQuery
|
---|
| 93 | )
|
---|
| 94 |
|
---|
| 95 | type message struct {
|
---|
| 96 | typ byte
|
---|
| 97 | err error
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
|
---|
| 101 |
|
---|
| 102 | // ListenerConn is a low-level interface for waiting for notifications. You
|
---|
| 103 | // should use Listener instead.
|
---|
| 104 | type ListenerConn struct {
|
---|
| 105 | // guards cn and err
|
---|
| 106 | connectionLock sync.Mutex
|
---|
| 107 | cn *conn
|
---|
| 108 | err error
|
---|
| 109 |
|
---|
| 110 | connState int32
|
---|
| 111 |
|
---|
| 112 | // the sending goroutine will be holding this lock
|
---|
| 113 | senderLock sync.Mutex
|
---|
| 114 |
|
---|
| 115 | notificationChan chan<- *Notification
|
---|
| 116 |
|
---|
| 117 | replyChan chan message
|
---|
| 118 | }
|
---|
| 119 |
|
---|
| 120 | // NewListenerConn creates a new ListenerConn. Use NewListener instead.
|
---|
| 121 | func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
|
---|
| 122 | return newDialListenerConn(defaultDialer{}, name, notificationChan)
|
---|
| 123 | }
|
---|
| 124 |
|
---|
| 125 | func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
|
---|
| 126 | cn, err := DialOpen(d, name)
|
---|
| 127 | if err != nil {
|
---|
| 128 | return nil, err
|
---|
| 129 | }
|
---|
| 130 |
|
---|
| 131 | l := &ListenerConn{
|
---|
| 132 | cn: cn.(*conn),
|
---|
| 133 | notificationChan: c,
|
---|
| 134 | connState: connStateIdle,
|
---|
| 135 | replyChan: make(chan message, 2),
|
---|
| 136 | }
|
---|
| 137 |
|
---|
| 138 | go l.listenerConnMain()
|
---|
| 139 |
|
---|
| 140 | return l, nil
|
---|
| 141 | }
|
---|
| 142 |
|
---|
| 143 | // We can only allow one goroutine at a time to be running a query on the
|
---|
| 144 | // connection for various reasons, so the goroutine sending on the connection
|
---|
| 145 | // must be holding senderLock.
|
---|
| 146 | //
|
---|
| 147 | // Returns an error if an unrecoverable error has occurred and the ListenerConn
|
---|
| 148 | // should be abandoned.
|
---|
| 149 | func (l *ListenerConn) acquireSenderLock() error {
|
---|
| 150 | // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
|
---|
| 151 | l.senderLock.Lock()
|
---|
| 152 |
|
---|
| 153 | l.connectionLock.Lock()
|
---|
| 154 | err := l.err
|
---|
| 155 | l.connectionLock.Unlock()
|
---|
| 156 | if err != nil {
|
---|
| 157 | l.senderLock.Unlock()
|
---|
| 158 | return err
|
---|
| 159 | }
|
---|
| 160 | return nil
|
---|
| 161 | }
|
---|
| 162 |
|
---|
| 163 | func (l *ListenerConn) releaseSenderLock() {
|
---|
| 164 | l.senderLock.Unlock()
|
---|
| 165 | }
|
---|
| 166 |
|
---|
| 167 | // setState advances the protocol state to newState. Returns false if moving
|
---|
| 168 | // to that state from the current state is not allowed.
|
---|
| 169 | func (l *ListenerConn) setState(newState int32) bool {
|
---|
| 170 | var expectedState int32
|
---|
| 171 |
|
---|
| 172 | switch newState {
|
---|
| 173 | case connStateIdle:
|
---|
| 174 | expectedState = connStateExpectReadyForQuery
|
---|
| 175 | case connStateExpectResponse:
|
---|
| 176 | expectedState = connStateIdle
|
---|
| 177 | case connStateExpectReadyForQuery:
|
---|
| 178 | expectedState = connStateExpectResponse
|
---|
| 179 | default:
|
---|
| 180 | panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
|
---|
| 181 | }
|
---|
| 182 |
|
---|
| 183 | return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
|
---|
| 184 | }
|
---|
| 185 |
|
---|
| 186 | // Main logic is here: receive messages from the postgres backend, forward
|
---|
| 187 | // notifications and query replies and keep the internal state in sync with the
|
---|
| 188 | // protocol state. Returns when the connection has been lost, is about to go
|
---|
| 189 | // away or should be discarded because we couldn't agree on the state with the
|
---|
| 190 | // server backend.
|
---|
| 191 | func (l *ListenerConn) listenerConnLoop() (err error) {
|
---|
| 192 | defer errRecoverNoErrBadConn(&err)
|
---|
| 193 |
|
---|
| 194 | r := &readBuf{}
|
---|
| 195 | for {
|
---|
| 196 | t, err := l.cn.recvMessage(r)
|
---|
| 197 | if err != nil {
|
---|
| 198 | return err
|
---|
| 199 | }
|
---|
| 200 |
|
---|
| 201 | switch t {
|
---|
| 202 | case 'A':
|
---|
| 203 | // recvNotification copies all the data so we don't need to worry
|
---|
| 204 | // about the scratch buffer being overwritten.
|
---|
| 205 | l.notificationChan <- recvNotification(r)
|
---|
| 206 |
|
---|
| 207 | case 'T', 'D':
|
---|
| 208 | // only used by tests; ignore
|
---|
| 209 |
|
---|
| 210 | case 'E':
|
---|
| 211 | // We might receive an ErrorResponse even when not in a query; it
|
---|
| 212 | // is expected that the server will close the connection after
|
---|
| 213 | // that, but we should make sure that the error we display is the
|
---|
| 214 | // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
|
---|
| 215 | if !l.setState(connStateExpectReadyForQuery) {
|
---|
| 216 | return parseError(r)
|
---|
| 217 | }
|
---|
| 218 | l.replyChan <- message{t, parseError(r)}
|
---|
| 219 |
|
---|
| 220 | case 'C', 'I':
|
---|
| 221 | if !l.setState(connStateExpectReadyForQuery) {
|
---|
| 222 | // protocol out of sync
|
---|
| 223 | return fmt.Errorf("unexpected CommandComplete")
|
---|
| 224 | }
|
---|
| 225 | // ExecSimpleQuery doesn't need to know about this message
|
---|
| 226 |
|
---|
| 227 | case 'Z':
|
---|
| 228 | if !l.setState(connStateIdle) {
|
---|
| 229 | // protocol out of sync
|
---|
| 230 | return fmt.Errorf("unexpected ReadyForQuery")
|
---|
| 231 | }
|
---|
| 232 | l.replyChan <- message{t, nil}
|
---|
| 233 |
|
---|
| 234 | case 'S':
|
---|
| 235 | // ignore
|
---|
| 236 | case 'N':
|
---|
| 237 | if n := l.cn.noticeHandler; n != nil {
|
---|
| 238 | n(parseError(r))
|
---|
| 239 | }
|
---|
| 240 | default:
|
---|
| 241 | return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
|
---|
| 242 | }
|
---|
| 243 | }
|
---|
| 244 | }
|
---|
| 245 |
|
---|
| 246 | // This is the main routine for the goroutine receiving on the database
|
---|
| 247 | // connection. Most of the main logic is in listenerConnLoop.
|
---|
| 248 | func (l *ListenerConn) listenerConnMain() {
|
---|
| 249 | err := l.listenerConnLoop()
|
---|
| 250 |
|
---|
| 251 | // listenerConnLoop terminated; we're done, but we still have to clean up.
|
---|
| 252 | // Make sure nobody tries to start any new queries by making sure the err
|
---|
| 253 | // pointer is set. It is important that we do not overwrite its value; a
|
---|
| 254 | // connection could be closed by either this goroutine or one sending on
|
---|
| 255 | // the connection -- whoever closes the connection is assumed to have the
|
---|
| 256 | // more meaningful error message (as the other one will probably get
|
---|
| 257 | // net.errClosed), so that goroutine sets the error we expose while the
|
---|
| 258 | // other error is discarded. If the connection is lost while two
|
---|
| 259 | // goroutines are operating on the socket, it probably doesn't matter which
|
---|
| 260 | // error we expose so we don't try to do anything more complex.
|
---|
| 261 | l.connectionLock.Lock()
|
---|
| 262 | if l.err == nil {
|
---|
| 263 | l.err = err
|
---|
| 264 | }
|
---|
| 265 | l.cn.Close()
|
---|
| 266 | l.connectionLock.Unlock()
|
---|
| 267 |
|
---|
| 268 | // There might be a query in-flight; make sure nobody's waiting for a
|
---|
| 269 | // response to it, since there's not going to be one.
|
---|
| 270 | close(l.replyChan)
|
---|
| 271 |
|
---|
| 272 | // let the listener know we're done
|
---|
| 273 | close(l.notificationChan)
|
---|
| 274 |
|
---|
| 275 | // this ListenerConn is done
|
---|
| 276 | }
|
---|
| 277 |
|
---|
| 278 | // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
|
---|
| 279 | func (l *ListenerConn) Listen(channel string) (bool, error) {
|
---|
| 280 | return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
|
---|
| 281 | }
|
---|
| 282 |
|
---|
| 283 | // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
|
---|
| 284 | func (l *ListenerConn) Unlisten(channel string) (bool, error) {
|
---|
| 285 | return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
|
---|
| 286 | }
|
---|
| 287 |
|
---|
| 288 | // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
|
---|
| 289 | func (l *ListenerConn) UnlistenAll() (bool, error) {
|
---|
| 290 | return l.ExecSimpleQuery("UNLISTEN *")
|
---|
| 291 | }
|
---|
| 292 |
|
---|
| 293 | // Ping the remote server to make sure it's alive. Non-nil error means the
|
---|
| 294 | // connection has failed and should be abandoned.
|
---|
| 295 | func (l *ListenerConn) Ping() error {
|
---|
| 296 | sent, err := l.ExecSimpleQuery("")
|
---|
| 297 | if !sent {
|
---|
| 298 | return err
|
---|
| 299 | }
|
---|
| 300 | if err != nil {
|
---|
| 301 | // shouldn't happen
|
---|
| 302 | panic(err)
|
---|
| 303 | }
|
---|
| 304 | return nil
|
---|
| 305 | }
|
---|
| 306 |
|
---|
| 307 | // Attempt to send a query on the connection. Returns an error if sending the
|
---|
| 308 | // query failed, and the caller should initiate closure of this connection.
|
---|
| 309 | // The caller must be holding senderLock (see acquireSenderLock and
|
---|
| 310 | // releaseSenderLock).
|
---|
| 311 | func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
|
---|
| 312 | defer errRecoverNoErrBadConn(&err)
|
---|
| 313 |
|
---|
| 314 | // must set connection state before sending the query
|
---|
| 315 | if !l.setState(connStateExpectResponse) {
|
---|
| 316 | panic("two queries running at the same time")
|
---|
| 317 | }
|
---|
| 318 |
|
---|
| 319 | // Can't use l.cn.writeBuf here because it uses the scratch buffer which
|
---|
| 320 | // might get overwritten by listenerConnLoop.
|
---|
| 321 | b := &writeBuf{
|
---|
| 322 | buf: []byte("Q\x00\x00\x00\x00"),
|
---|
| 323 | pos: 1,
|
---|
| 324 | }
|
---|
| 325 | b.string(q)
|
---|
| 326 | l.cn.send(b)
|
---|
| 327 |
|
---|
| 328 | return nil
|
---|
| 329 | }
|
---|
| 330 |
|
---|
| 331 | // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
|
---|
| 332 | // parameters) on the connection. The possible return values are:
|
---|
| 333 | // 1) "executed" is true; the query was executed to completion on the
|
---|
| 334 | // database server. If the query failed, err will be set to the error
|
---|
| 335 | // returned by the database, otherwise err will be nil.
|
---|
| 336 | // 2) If "executed" is false, the query could not be executed on the remote
|
---|
| 337 | // server. err will be non-nil.
|
---|
| 338 | //
|
---|
| 339 | // After a call to ExecSimpleQuery has returned an executed=false value, the
|
---|
| 340 | // connection has either been closed or will be closed shortly thereafter, and
|
---|
| 341 | // all subsequently executed queries will return an error.
|
---|
| 342 | func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
|
---|
| 343 | if err = l.acquireSenderLock(); err != nil {
|
---|
| 344 | return false, err
|
---|
| 345 | }
|
---|
| 346 | defer l.releaseSenderLock()
|
---|
| 347 |
|
---|
| 348 | err = l.sendSimpleQuery(q)
|
---|
| 349 | if err != nil {
|
---|
| 350 | // We can't know what state the protocol is in, so we need to abandon
|
---|
| 351 | // this connection.
|
---|
| 352 | l.connectionLock.Lock()
|
---|
| 353 | // Set the error pointer if it hasn't been set already; see
|
---|
| 354 | // listenerConnMain.
|
---|
| 355 | if l.err == nil {
|
---|
| 356 | l.err = err
|
---|
| 357 | }
|
---|
| 358 | l.connectionLock.Unlock()
|
---|
| 359 | l.cn.c.Close()
|
---|
| 360 | return false, err
|
---|
| 361 | }
|
---|
| 362 |
|
---|
| 363 | // now we just wait for a reply..
|
---|
| 364 | for {
|
---|
| 365 | m, ok := <-l.replyChan
|
---|
| 366 | if !ok {
|
---|
| 367 | // We lost the connection to server, don't bother waiting for a
|
---|
| 368 | // a response. err should have been set already.
|
---|
| 369 | l.connectionLock.Lock()
|
---|
| 370 | err := l.err
|
---|
| 371 | l.connectionLock.Unlock()
|
---|
| 372 | return false, err
|
---|
| 373 | }
|
---|
| 374 | switch m.typ {
|
---|
| 375 | case 'Z':
|
---|
| 376 | // sanity check
|
---|
| 377 | if m.err != nil {
|
---|
| 378 | panic("m.err != nil")
|
---|
| 379 | }
|
---|
| 380 | // done; err might or might not be set
|
---|
| 381 | return true, err
|
---|
| 382 |
|
---|
| 383 | case 'E':
|
---|
| 384 | // sanity check
|
---|
| 385 | if m.err == nil {
|
---|
| 386 | panic("m.err == nil")
|
---|
| 387 | }
|
---|
| 388 | // server responded with an error; ReadyForQuery to follow
|
---|
| 389 | err = m.err
|
---|
| 390 |
|
---|
| 391 | default:
|
---|
| 392 | return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
|
---|
| 393 | }
|
---|
| 394 | }
|
---|
| 395 | }
|
---|
| 396 |
|
---|
| 397 | // Close closes the connection.
|
---|
| 398 | func (l *ListenerConn) Close() error {
|
---|
| 399 | l.connectionLock.Lock()
|
---|
| 400 | if l.err != nil {
|
---|
| 401 | l.connectionLock.Unlock()
|
---|
| 402 | return errListenerConnClosed
|
---|
| 403 | }
|
---|
| 404 | l.err = errListenerConnClosed
|
---|
| 405 | l.connectionLock.Unlock()
|
---|
| 406 | // We can't send anything on the connection without holding senderLock.
|
---|
| 407 | // Simply close the net.Conn to wake up everyone operating on it.
|
---|
| 408 | return l.cn.c.Close()
|
---|
| 409 | }
|
---|
| 410 |
|
---|
| 411 | // Err returns the reason the connection was closed. It is not safe to call
|
---|
| 412 | // this function until l.Notify has been closed.
|
---|
| 413 | func (l *ListenerConn) Err() error {
|
---|
| 414 | return l.err
|
---|
| 415 | }
|
---|
| 416 |
|
---|
| 417 | var errListenerClosed = errors.New("pq: Listener has been closed")
|
---|
| 418 |
|
---|
| 419 | // ErrChannelAlreadyOpen is returned from Listen when a channel is already
|
---|
| 420 | // open.
|
---|
| 421 | var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
|
---|
| 422 |
|
---|
| 423 | // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
|
---|
| 424 | var ErrChannelNotOpen = errors.New("pq: channel is not open")
|
---|
| 425 |
|
---|
| 426 | // ListenerEventType is an enumeration of listener event types.
|
---|
| 427 | type ListenerEventType int
|
---|
| 428 |
|
---|
| 429 | const (
|
---|
| 430 | // ListenerEventConnected is emitted only when the database connection
|
---|
| 431 | // has been initially initialized. The err argument of the callback
|
---|
| 432 | // will always be nil.
|
---|
| 433 | ListenerEventConnected ListenerEventType = iota
|
---|
| 434 |
|
---|
| 435 | // ListenerEventDisconnected is emitted after a database connection has
|
---|
| 436 | // been lost, either because of an error or because Close has been
|
---|
| 437 | // called. The err argument will be set to the reason the database
|
---|
| 438 | // connection was lost.
|
---|
| 439 | ListenerEventDisconnected
|
---|
| 440 |
|
---|
| 441 | // ListenerEventReconnected is emitted after a database connection has
|
---|
| 442 | // been re-established after connection loss. The err argument of the
|
---|
| 443 | // callback will always be nil. After this event has been emitted, a
|
---|
| 444 | // nil pq.Notification is sent on the Listener.Notify channel.
|
---|
| 445 | ListenerEventReconnected
|
---|
| 446 |
|
---|
| 447 | // ListenerEventConnectionAttemptFailed is emitted after a connection
|
---|
| 448 | // to the database was attempted, but failed. The err argument will be
|
---|
| 449 | // set to an error describing why the connection attempt did not
|
---|
| 450 | // succeed.
|
---|
| 451 | ListenerEventConnectionAttemptFailed
|
---|
| 452 | )
|
---|
| 453 |
|
---|
| 454 | // EventCallbackType is the event callback type. See also ListenerEventType
|
---|
| 455 | // constants' documentation.
|
---|
| 456 | type EventCallbackType func(event ListenerEventType, err error)
|
---|
| 457 |
|
---|
| 458 | // Listener provides an interface for listening to notifications from a
|
---|
| 459 | // PostgreSQL database. For general usage information, see section
|
---|
| 460 | // "Notifications".
|
---|
| 461 | //
|
---|
| 462 | // Listener can safely be used from concurrently running goroutines.
|
---|
| 463 | type Listener struct {
|
---|
| 464 | // Channel for receiving notifications from the database. In some cases a
|
---|
| 465 | // nil value will be sent. See section "Notifications" above.
|
---|
| 466 | Notify chan *Notification
|
---|
| 467 |
|
---|
| 468 | name string
|
---|
| 469 | minReconnectInterval time.Duration
|
---|
| 470 | maxReconnectInterval time.Duration
|
---|
| 471 | dialer Dialer
|
---|
| 472 | eventCallback EventCallbackType
|
---|
| 473 |
|
---|
| 474 | lock sync.Mutex
|
---|
| 475 | isClosed bool
|
---|
| 476 | reconnectCond *sync.Cond
|
---|
| 477 | cn *ListenerConn
|
---|
| 478 | connNotificationChan <-chan *Notification
|
---|
| 479 | channels map[string]struct{}
|
---|
| 480 | }
|
---|
| 481 |
|
---|
| 482 | // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
|
---|
| 483 | //
|
---|
| 484 | // name should be set to a connection string to be used to establish the
|
---|
| 485 | // database connection (see section "Connection String Parameters" above).
|
---|
| 486 | //
|
---|
| 487 | // minReconnectInterval controls the duration to wait before trying to
|
---|
| 488 | // re-establish the database connection after connection loss. After each
|
---|
| 489 | // consecutive failure this interval is doubled, until maxReconnectInterval is
|
---|
| 490 | // reached. Successfully completing the connection establishment procedure
|
---|
| 491 | // resets the interval back to minReconnectInterval.
|
---|
| 492 | //
|
---|
| 493 | // The last parameter eventCallback can be set to a function which will be
|
---|
| 494 | // called by the Listener when the state of the underlying database connection
|
---|
| 495 | // changes. This callback will be called by the goroutine which dispatches the
|
---|
| 496 | // notifications over the Notify channel, so you should try to avoid doing
|
---|
| 497 | // potentially time-consuming operations from the callback.
|
---|
| 498 | func NewListener(name string,
|
---|
| 499 | minReconnectInterval time.Duration,
|
---|
| 500 | maxReconnectInterval time.Duration,
|
---|
| 501 | eventCallback EventCallbackType) *Listener {
|
---|
| 502 | return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
|
---|
| 503 | }
|
---|
| 504 |
|
---|
| 505 | // NewDialListener is like NewListener but it takes a Dialer.
|
---|
| 506 | func NewDialListener(d Dialer,
|
---|
| 507 | name string,
|
---|
| 508 | minReconnectInterval time.Duration,
|
---|
| 509 | maxReconnectInterval time.Duration,
|
---|
| 510 | eventCallback EventCallbackType) *Listener {
|
---|
| 511 |
|
---|
| 512 | l := &Listener{
|
---|
| 513 | name: name,
|
---|
| 514 | minReconnectInterval: minReconnectInterval,
|
---|
| 515 | maxReconnectInterval: maxReconnectInterval,
|
---|
| 516 | dialer: d,
|
---|
| 517 | eventCallback: eventCallback,
|
---|
| 518 |
|
---|
| 519 | channels: make(map[string]struct{}),
|
---|
| 520 |
|
---|
| 521 | Notify: make(chan *Notification, 32),
|
---|
| 522 | }
|
---|
| 523 | l.reconnectCond = sync.NewCond(&l.lock)
|
---|
| 524 |
|
---|
| 525 | go l.listenerMain()
|
---|
| 526 |
|
---|
| 527 | return l
|
---|
| 528 | }
|
---|
| 529 |
|
---|
| 530 | // NotificationChannel returns the notification channel for this listener.
|
---|
| 531 | // This is the same channel as Notify, and will not be recreated during the
|
---|
| 532 | // life time of the Listener.
|
---|
| 533 | func (l *Listener) NotificationChannel() <-chan *Notification {
|
---|
| 534 | return l.Notify
|
---|
| 535 | }
|
---|
| 536 |
|
---|
| 537 | // Listen starts listening for notifications on a channel. Calls to this
|
---|
| 538 | // function will block until an acknowledgement has been received from the
|
---|
| 539 | // server. Note that Listener automatically re-establishes the connection
|
---|
| 540 | // after connection loss, so this function may block indefinitely if the
|
---|
| 541 | // connection can not be re-established.
|
---|
| 542 | //
|
---|
| 543 | // Listen will only fail in three conditions:
|
---|
| 544 | // 1) The channel is already open. The returned error will be
|
---|
| 545 | // ErrChannelAlreadyOpen.
|
---|
| 546 | // 2) The query was executed on the remote server, but PostgreSQL returned an
|
---|
| 547 | // error message in response to the query. The returned error will be a
|
---|
| 548 | // pq.Error containing the information the server supplied.
|
---|
| 549 | // 3) Close is called on the Listener before the request could be completed.
|
---|
| 550 | //
|
---|
| 551 | // The channel name is case-sensitive.
|
---|
| 552 | func (l *Listener) Listen(channel string) error {
|
---|
| 553 | l.lock.Lock()
|
---|
| 554 | defer l.lock.Unlock()
|
---|
| 555 |
|
---|
| 556 | if l.isClosed {
|
---|
| 557 | return errListenerClosed
|
---|
| 558 | }
|
---|
| 559 |
|
---|
| 560 | // The server allows you to issue a LISTEN on a channel which is already
|
---|
| 561 | // open, but it seems useful to be able to detect this case to spot for
|
---|
| 562 | // mistakes in application logic. If the application genuinely does't
|
---|
| 563 | // care, it can check the exported error and ignore it.
|
---|
| 564 | _, exists := l.channels[channel]
|
---|
| 565 | if exists {
|
---|
| 566 | return ErrChannelAlreadyOpen
|
---|
| 567 | }
|
---|
| 568 |
|
---|
| 569 | if l.cn != nil {
|
---|
| 570 | // If gotResponse is true but error is set, the query was executed on
|
---|
| 571 | // the remote server, but resulted in an error. This should be
|
---|
| 572 | // relatively rare, so it's fine if we just pass the error to our
|
---|
| 573 | // caller. However, if gotResponse is false, we could not complete the
|
---|
| 574 | // query on the remote server and our underlying connection is about
|
---|
| 575 | // to go away, so we only add relname to l.channels, and wait for
|
---|
| 576 | // resync() to take care of the rest.
|
---|
| 577 | gotResponse, err := l.cn.Listen(channel)
|
---|
| 578 | if gotResponse && err != nil {
|
---|
| 579 | return err
|
---|
| 580 | }
|
---|
| 581 | }
|
---|
| 582 |
|
---|
| 583 | l.channels[channel] = struct{}{}
|
---|
| 584 | for l.cn == nil {
|
---|
| 585 | l.reconnectCond.Wait()
|
---|
| 586 | // we let go of the mutex for a while
|
---|
| 587 | if l.isClosed {
|
---|
| 588 | return errListenerClosed
|
---|
| 589 | }
|
---|
| 590 | }
|
---|
| 591 |
|
---|
| 592 | return nil
|
---|
| 593 | }
|
---|
| 594 |
|
---|
| 595 | // Unlisten removes a channel from the Listener's channel list. Returns
|
---|
| 596 | // ErrChannelNotOpen if the Listener is not listening on the specified channel.
|
---|
| 597 | // Returns immediately with no error if there is no connection. Note that you
|
---|
| 598 | // might still get notifications for this channel even after Unlisten has
|
---|
| 599 | // returned.
|
---|
| 600 | //
|
---|
| 601 | // The channel name is case-sensitive.
|
---|
| 602 | func (l *Listener) Unlisten(channel string) error {
|
---|
| 603 | l.lock.Lock()
|
---|
| 604 | defer l.lock.Unlock()
|
---|
| 605 |
|
---|
| 606 | if l.isClosed {
|
---|
| 607 | return errListenerClosed
|
---|
| 608 | }
|
---|
| 609 |
|
---|
| 610 | // Similarly to LISTEN, this is not an error in Postgres, but it seems
|
---|
| 611 | // useful to distinguish from the normal conditions.
|
---|
| 612 | _, exists := l.channels[channel]
|
---|
| 613 | if !exists {
|
---|
| 614 | return ErrChannelNotOpen
|
---|
| 615 | }
|
---|
| 616 |
|
---|
| 617 | if l.cn != nil {
|
---|
| 618 | // Similarly to Listen (see comment in that function), the caller
|
---|
| 619 | // should only be bothered with an error if it came from the backend as
|
---|
| 620 | // a response to our query.
|
---|
| 621 | gotResponse, err := l.cn.Unlisten(channel)
|
---|
| 622 | if gotResponse && err != nil {
|
---|
| 623 | return err
|
---|
| 624 | }
|
---|
| 625 | }
|
---|
| 626 |
|
---|
| 627 | // Don't bother waiting for resync if there's no connection.
|
---|
| 628 | delete(l.channels, channel)
|
---|
| 629 | return nil
|
---|
| 630 | }
|
---|
| 631 |
|
---|
| 632 | // UnlistenAll removes all channels from the Listener's channel list. Returns
|
---|
| 633 | // immediately with no error if there is no connection. Note that you might
|
---|
| 634 | // still get notifications for any of the deleted channels even after
|
---|
| 635 | // UnlistenAll has returned.
|
---|
| 636 | func (l *Listener) UnlistenAll() error {
|
---|
| 637 | l.lock.Lock()
|
---|
| 638 | defer l.lock.Unlock()
|
---|
| 639 |
|
---|
| 640 | if l.isClosed {
|
---|
| 641 | return errListenerClosed
|
---|
| 642 | }
|
---|
| 643 |
|
---|
| 644 | if l.cn != nil {
|
---|
| 645 | // Similarly to Listen (see comment in that function), the caller
|
---|
| 646 | // should only be bothered with an error if it came from the backend as
|
---|
| 647 | // a response to our query.
|
---|
| 648 | gotResponse, err := l.cn.UnlistenAll()
|
---|
| 649 | if gotResponse && err != nil {
|
---|
| 650 | return err
|
---|
| 651 | }
|
---|
| 652 | }
|
---|
| 653 |
|
---|
| 654 | // Don't bother waiting for resync if there's no connection.
|
---|
| 655 | l.channels = make(map[string]struct{})
|
---|
| 656 | return nil
|
---|
| 657 | }
|
---|
| 658 |
|
---|
| 659 | // Ping the remote server to make sure it's alive. Non-nil return value means
|
---|
| 660 | // that there is no active connection.
|
---|
| 661 | func (l *Listener) Ping() error {
|
---|
| 662 | l.lock.Lock()
|
---|
| 663 | defer l.lock.Unlock()
|
---|
| 664 |
|
---|
| 665 | if l.isClosed {
|
---|
| 666 | return errListenerClosed
|
---|
| 667 | }
|
---|
| 668 | if l.cn == nil {
|
---|
| 669 | return errors.New("no connection")
|
---|
| 670 | }
|
---|
| 671 |
|
---|
| 672 | return l.cn.Ping()
|
---|
| 673 | }
|
---|
| 674 |
|
---|
| 675 | // Clean up after losing the server connection. Returns l.cn.Err(), which
|
---|
| 676 | // should have the reason the connection was lost.
|
---|
| 677 | func (l *Listener) disconnectCleanup() error {
|
---|
| 678 | l.lock.Lock()
|
---|
| 679 | defer l.lock.Unlock()
|
---|
| 680 |
|
---|
| 681 | // sanity check; can't look at Err() until the channel has been closed
|
---|
| 682 | select {
|
---|
| 683 | case _, ok := <-l.connNotificationChan:
|
---|
| 684 | if ok {
|
---|
| 685 | panic("connNotificationChan not closed")
|
---|
| 686 | }
|
---|
| 687 | default:
|
---|
| 688 | panic("connNotificationChan not closed")
|
---|
| 689 | }
|
---|
| 690 |
|
---|
| 691 | err := l.cn.Err()
|
---|
| 692 | l.cn.Close()
|
---|
| 693 | l.cn = nil
|
---|
| 694 | return err
|
---|
| 695 | }
|
---|
| 696 |
|
---|
| 697 | // Synchronize the list of channels we want to be listening on with the server
|
---|
| 698 | // after the connection has been established.
|
---|
| 699 | func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
|
---|
| 700 | doneChan := make(chan error)
|
---|
| 701 | go func(notificationChan <-chan *Notification) {
|
---|
| 702 | for channel := range l.channels {
|
---|
| 703 | // If we got a response, return that error to our caller as it's
|
---|
| 704 | // going to be more descriptive than cn.Err().
|
---|
| 705 | gotResponse, err := cn.Listen(channel)
|
---|
| 706 | if gotResponse && err != nil {
|
---|
| 707 | doneChan <- err
|
---|
| 708 | return
|
---|
| 709 | }
|
---|
| 710 |
|
---|
| 711 | // If we couldn't reach the server, wait for notificationChan to
|
---|
| 712 | // close and then return the error message from the connection, as
|
---|
| 713 | // per ListenerConn's interface.
|
---|
| 714 | if err != nil {
|
---|
| 715 | for range notificationChan {
|
---|
| 716 | }
|
---|
| 717 | doneChan <- cn.Err()
|
---|
| 718 | return
|
---|
| 719 | }
|
---|
| 720 | }
|
---|
| 721 | doneChan <- nil
|
---|
| 722 | }(notificationChan)
|
---|
| 723 |
|
---|
| 724 | // Ignore notifications while synchronization is going on to avoid
|
---|
| 725 | // deadlocks. We have to send a nil notification over Notify anyway as
|
---|
| 726 | // we can't possibly know which notifications (if any) were lost while
|
---|
| 727 | // the connection was down, so there's no reason to try and process
|
---|
| 728 | // these messages at all.
|
---|
| 729 | for {
|
---|
| 730 | select {
|
---|
| 731 | case _, ok := <-notificationChan:
|
---|
| 732 | if !ok {
|
---|
| 733 | notificationChan = nil
|
---|
| 734 | }
|
---|
| 735 |
|
---|
| 736 | case err := <-doneChan:
|
---|
| 737 | return err
|
---|
| 738 | }
|
---|
| 739 | }
|
---|
| 740 | }
|
---|
| 741 |
|
---|
| 742 | // caller should NOT be holding l.lock
|
---|
| 743 | func (l *Listener) closed() bool {
|
---|
| 744 | l.lock.Lock()
|
---|
| 745 | defer l.lock.Unlock()
|
---|
| 746 |
|
---|
| 747 | return l.isClosed
|
---|
| 748 | }
|
---|
| 749 |
|
---|
| 750 | func (l *Listener) connect() error {
|
---|
| 751 | notificationChan := make(chan *Notification, 32)
|
---|
| 752 | cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
|
---|
| 753 | if err != nil {
|
---|
| 754 | return err
|
---|
| 755 | }
|
---|
| 756 |
|
---|
| 757 | l.lock.Lock()
|
---|
| 758 | defer l.lock.Unlock()
|
---|
| 759 |
|
---|
| 760 | err = l.resync(cn, notificationChan)
|
---|
| 761 | if err != nil {
|
---|
| 762 | cn.Close()
|
---|
| 763 | return err
|
---|
| 764 | }
|
---|
| 765 |
|
---|
| 766 | l.cn = cn
|
---|
| 767 | l.connNotificationChan = notificationChan
|
---|
| 768 | l.reconnectCond.Broadcast()
|
---|
| 769 |
|
---|
| 770 | return nil
|
---|
| 771 | }
|
---|
| 772 |
|
---|
| 773 | // Close disconnects the Listener from the database and shuts it down.
|
---|
| 774 | // Subsequent calls to its methods will return an error. Close returns an
|
---|
| 775 | // error if the connection has already been closed.
|
---|
| 776 | func (l *Listener) Close() error {
|
---|
| 777 | l.lock.Lock()
|
---|
| 778 | defer l.lock.Unlock()
|
---|
| 779 |
|
---|
| 780 | if l.isClosed {
|
---|
| 781 | return errListenerClosed
|
---|
| 782 | }
|
---|
| 783 |
|
---|
| 784 | if l.cn != nil {
|
---|
| 785 | l.cn.Close()
|
---|
| 786 | }
|
---|
| 787 | l.isClosed = true
|
---|
| 788 |
|
---|
| 789 | // Unblock calls to Listen()
|
---|
| 790 | l.reconnectCond.Broadcast()
|
---|
| 791 |
|
---|
| 792 | return nil
|
---|
| 793 | }
|
---|
| 794 |
|
---|
| 795 | func (l *Listener) emitEvent(event ListenerEventType, err error) {
|
---|
| 796 | if l.eventCallback != nil {
|
---|
| 797 | l.eventCallback(event, err)
|
---|
| 798 | }
|
---|
| 799 | }
|
---|
| 800 |
|
---|
| 801 | // Main logic here: maintain a connection to the server when possible, wait
|
---|
| 802 | // for notifications and emit events.
|
---|
| 803 | func (l *Listener) listenerConnLoop() {
|
---|
| 804 | var nextReconnect time.Time
|
---|
| 805 |
|
---|
| 806 | reconnectInterval := l.minReconnectInterval
|
---|
| 807 | for {
|
---|
| 808 | for {
|
---|
| 809 | err := l.connect()
|
---|
| 810 | if err == nil {
|
---|
| 811 | break
|
---|
| 812 | }
|
---|
| 813 |
|
---|
| 814 | if l.closed() {
|
---|
| 815 | return
|
---|
| 816 | }
|
---|
| 817 | l.emitEvent(ListenerEventConnectionAttemptFailed, err)
|
---|
| 818 |
|
---|
| 819 | time.Sleep(reconnectInterval)
|
---|
| 820 | reconnectInterval *= 2
|
---|
| 821 | if reconnectInterval > l.maxReconnectInterval {
|
---|
| 822 | reconnectInterval = l.maxReconnectInterval
|
---|
| 823 | }
|
---|
| 824 | }
|
---|
| 825 |
|
---|
| 826 | if nextReconnect.IsZero() {
|
---|
| 827 | l.emitEvent(ListenerEventConnected, nil)
|
---|
| 828 | } else {
|
---|
| 829 | l.emitEvent(ListenerEventReconnected, nil)
|
---|
| 830 | l.Notify <- nil
|
---|
| 831 | }
|
---|
| 832 |
|
---|
| 833 | reconnectInterval = l.minReconnectInterval
|
---|
| 834 | nextReconnect = time.Now().Add(reconnectInterval)
|
---|
| 835 |
|
---|
| 836 | for {
|
---|
| 837 | notification, ok := <-l.connNotificationChan
|
---|
| 838 | if !ok {
|
---|
| 839 | // lost connection, loop again
|
---|
| 840 | break
|
---|
| 841 | }
|
---|
| 842 | l.Notify <- notification
|
---|
| 843 | }
|
---|
| 844 |
|
---|
| 845 | err := l.disconnectCleanup()
|
---|
| 846 | if l.closed() {
|
---|
| 847 | return
|
---|
| 848 | }
|
---|
| 849 | l.emitEvent(ListenerEventDisconnected, err)
|
---|
| 850 |
|
---|
| 851 | time.Sleep(time.Until(nextReconnect))
|
---|
| 852 | }
|
---|
| 853 | }
|
---|
| 854 |
|
---|
| 855 | func (l *Listener) listenerMain() {
|
---|
| 856 | l.listenerConnLoop()
|
---|
| 857 | close(l.Notify)
|
---|
| 858 | }
|
---|