Changeset 103 in code


Ignore:
Timestamp:
Mar 16, 2020, 11:44:59 AM (5 years ago)
Author:
contact
Message:

Per-user dispatcher goroutine

This allows message handlers to read upstream/downstream connection
information without causing any race condition.

References: https://todo.sr.ht/~emersion/soju/1

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r102 r103  
    192192}
    193193
    194 func (dc *downstreamConn) readMessages() error {
     194func (dc *downstreamConn) readMessages(ch chan<- downstreamIncomingMessage) error {
    195195        dc.logger.Printf("new connection")
    196196
     
    207207                }
    208208
    209                 err = dc.handleMessage(msg)
    210                 if ircErr, ok := err.(ircError); ok {
    211                         ircErr.Message.Prefix = dc.srv.prefix()
    212                         dc.SendMessage(ircErr.Message)
    213                 } else if err != nil {
    214                         return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
    215                 }
    216 
    217                 if dc.isClosed() {
    218                         return nil
    219                 }
     209                ch <- downstreamIncomingMessage{msg, dc}
    220210        }
    221211
     
    485475}
    486476
     477func (dc *downstreamConn) runUntilRegistered() error {
     478        for !dc.registered {
     479                msg, err := dc.irc.ReadMessage()
     480                if err == io.EOF {
     481                        break
     482                } else if err != nil {
     483                        return fmt.Errorf("failed to read IRC command: %v", err)
     484                }
     485
     486                err = dc.handleMessage(msg)
     487                if ircErr, ok := err.(ircError); ok {
     488                        ircErr.Message.Prefix = dc.srv.prefix()
     489                        dc.SendMessage(ircErr.Message)
     490                } else if err != nil {
     491                        return fmt.Errorf("failed to handle IRC command %q: %v", msg, err)
     492                }
     493        }
     494
     495        return nil
     496}
     497
    487498func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
    488499        switch msg.Command {
  • trunk/server.go

    r101 r103  
    115115                        s.lock.Unlock()
    116116
    117                         if err := dc.readMessages(); err != nil {
    118                                 dc.logger.Printf("failed to handle messages: %v", err)
     117                        if err := dc.runUntilRegistered(); err != nil {
     118                                dc.logger.Print(err)
     119                        } else {
     120                                if err := dc.readMessages(dc.user.downstreamIncoming); err != nil {
     121                                        dc.logger.Print(err)
     122                                }
    119123                        }
    120124                        dc.Close()
  • trunk/upstream.go

    r102 r103  
    660660}
    661661
    662 func (uc *upstreamConn) readMessages() error {
     662func (uc *upstreamConn) readMessages(ch chan<- upstreamIncomingMessage) error {
    663663        for {
    664664                msg, err := uc.irc.ReadMessage()
     
    673673                }
    674674
    675                 if err := uc.handleMessage(msg); err != nil {
    676                         uc.logger.Printf("failed to handle message %q: %v", msg, err)
    677                 }
     675                ch <- upstreamIncomingMessage{msg, uc}
    678676        }
    679677
  • trunk/user.go

    r101 r103  
    44        "sync"
    55        "time"
     6
     7        "gopkg.in/irc.v3"
    68)
     9
     10type upstreamIncomingMessage struct {
     11        msg *irc.Message
     12        uc  *upstreamConn
     13}
     14
     15type downstreamIncomingMessage struct {
     16        msg *irc.Message
     17        dc  *downstreamConn
     18}
    719
    820type network struct {
     
    4153                net.user.lock.Unlock()
    4254
    43                 if err := uc.readMessages(); err != nil {
     55                if err := uc.readMessages(net.user.upstreamIncoming); err != nil {
    4456                        uc.logger.Printf("failed to handle messages: %v", err)
    4557                }
     
    5668        srv *Server
    5769
     70        upstreamIncoming   chan upstreamIncomingMessage
     71        downstreamIncoming chan downstreamIncomingMessage
     72
    5873        lock            sync.Mutex
    5974        networks        []*network
     
    6378func newUser(srv *Server, record *User) *user {
    6479        return &user{
    65                 User: *record,
    66                 srv:  srv,
     80                User:               *record,
     81                srv:                srv,
     82                upstreamIncoming:   make(chan upstreamIncomingMessage, 64),
     83                downstreamIncoming: make(chan downstreamIncomingMessage, 64),
    6784        }
    6885}
     
    120137        }
    121138        u.lock.Unlock()
     139
     140        for {
     141                select {
     142                case upstreamMsg := <-u.upstreamIncoming:
     143                        msg, uc := upstreamMsg.msg, upstreamMsg.uc
     144                        if err := uc.handleMessage(msg); err != nil {
     145                                uc.logger.Printf("failed to handle message %q: %v", msg, err)
     146                        }
     147                case downstreamMsg := <-u.downstreamIncoming:
     148                        msg, dc := downstreamMsg.msg, downstreamMsg.dc
     149                        err := dc.handleMessage(msg)
     150                        if ircErr, ok := err.(ircError); ok {
     151                                ircErr.Message.Prefix = dc.srv.prefix()
     152                                dc.SendMessage(ircErr.Message)
     153                        } else if err != nil {
     154                                dc.logger.Printf("failed to handle message %q: %v", msg, err)
     155                                dc.Close()
     156                        }
     157                }
     158        }
    122159}
    123160
Note: See TracChangeset for help on using the changeset viewer.