Changeset 188 in code


Ignore:
Timestamp:
Mar 31, 2020, 4:16:54 PM (5 years ago)
Author:
contact
Message:

Consume ring messages outside of writer goroutine

This fixes out-of-order JOIN and PRIVMSG messages.

Closes: https://todo.sr.ht/~emersion/soju/36

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r185 r188  
    5252}}
    5353
    54 type ringMessage struct {
    55         consumer     *RingConsumer
    56         upstreamConn *upstreamConn
    57 }
    58 
    5954type downstreamConn struct {
    60         id           uint64
    61         net          net.Conn
    62         irc          *irc.Conn
    63         srv          *Server
    64         logger       Logger
    65         outgoing     chan *irc.Message
    66         ringMessages chan ringMessage
    67         closed       chan struct{}
     55        id       uint64
     56        net      net.Conn
     57        irc      *irc.Conn
     58        srv      *Server
     59        logger   Logger
     60        outgoing chan *irc.Message
     61        closed   chan struct{}
    6862
    6963        registered  bool
     
    9084func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
    9185        dc := &downstreamConn{
    92                 id:           id,
    93                 net:          netConn,
    94                 irc:          irc.NewConn(netConn),
    95                 srv:          srv,
    96                 logger:       &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
    97                 outgoing:     make(chan *irc.Message, 64),
    98                 ringMessages: make(chan ringMessage),
    99                 closed:       make(chan struct{}),
    100                 caps:         make(map[string]bool),
    101                 ourMessages:  make(map[*irc.Message]struct{}),
     86                id:          id,
     87                net:         netConn,
     88                irc:         irc.NewConn(netConn),
     89                srv:         srv,
     90                logger:      &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
     91                outgoing:    make(chan *irc.Message, 64),
     92                closed:      make(chan struct{}),
     93                caps:        make(map[string]bool),
     94                ourMessages: make(map[*irc.Message]struct{}),
    10295        }
    10396        dc.hostname = netConn.RemoteAddr().String()
     
    258251                        }
    259252                        err = dc.irc.WriteMessage(msg)
    260                 case ringMessage := <-dc.ringMessages:
    261                         consumer, uc := ringMessage.consumer, ringMessage.upstreamConn
    262                         for {
    263                                 msg := consumer.Peek()
    264                                 if msg == nil {
    265                                         break
    266                                 }
    267 
    268                                 dc.lock.Lock()
    269                                 _, ours := dc.ourMessages[msg]
    270                                 delete(dc.ourMessages, msg)
    271                                 dc.lock.Unlock()
    272                                 if ours {
    273                                         // The message comes from our connection, don't echo it
    274                                         // back
    275                                         consumer.Consume()
    276                                         continue
    277                                 }
    278 
    279                                 msg = msg.Copy()
    280                                 switch msg.Command {
    281                                 case "PRIVMSG":
    282                                         msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
    283                                         msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
    284                                 default:
    285                                         panic("expected to consume a PRIVMSG message")
    286                                 }
    287                                 if dc.srv.Debug {
    288                                         dc.logger.Printf("sent: %v", msg)
    289                                 }
    290                                 err = dc.irc.WriteMessage(msg)
    291                                 if err != nil {
    292                                         break
    293                                 }
    294                                 consumer.Consume()
    295                         }
    296253                case <-dc.closed:
    297254                        closed = true
     
    775732                                        break
    776733                                }
    777                                 dc.ringMessages <- ringMessage{consumer, uc}
     734
     735                                for {
     736                                        msg := consumer.Peek()
     737                                        if msg == nil {
     738                                                break
     739                                        }
     740
     741                                        dc.lock.Lock()
     742                                        _, ours := dc.ourMessages[msg]
     743                                        delete(dc.ourMessages, msg)
     744                                        dc.lock.Unlock()
     745                                        if ours {
     746                                                // The message comes from our connection, don't echo it
     747                                                // back
     748                                                consumer.Consume()
     749                                                continue
     750                                        }
     751
     752                                        msg = msg.Copy()
     753                                        switch msg.Command {
     754                                        case "PRIVMSG":
     755                                                msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
     756                                                msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
     757                                        default:
     758                                                panic("expected to consume a PRIVMSG message")
     759                                        }
     760
     761                                        dc.SendMessage(msg)
     762                                        consumer.Consume()
     763                                }
    778764                        case <-dc.closed:
    779765                                closed = true
Note: See TracChangeset for help on using the changeset viewer.