Changeset 77 in code
- Timestamp:
- Mar 4, 2020, 5:22:58 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/cmd/jounce/main.go
r70 r77 34 34 } 35 35 36 db, err := jounce.OpenSQLDB(cfg.SQLDriver, cfg.SQLSource) 37 if err != nil { 38 log.Fatalf("failed to open database: %v", err) 39 } 40 36 41 var ln net.Listener 37 42 if cfg.TLS != nil { … … 54 59 } 55 60 56 srv := jounce.NewServer( )61 srv := jounce.NewServer(db) 57 62 // TODO: load from config/DB 58 63 srv.Hostname = cfg.Hostname 59 64 srv.Debug = debug 60 srv.Upstreams = []jounce.Upstream{{61 Addr: "chat.freenode.net:6697",62 Nick: "jounce",63 Username: "jounce",64 Realname: "jounce",65 Channels: []string{"#jounce"},66 }}67 65 68 66 log.Printf("server listening on %q", cfg.Addr) 69 go srv.Run() 67 go func() { 68 if err := srv.Run(); err != nil { 69 log.Fatal(err) 70 } 71 }() 70 72 log.Fatal(srv.Serve(ln)) 71 73 } -
trunk/config/config.go
r62 r77 15 15 16 16 type Server struct { 17 Addr string 18 Hostname string 19 TLS *TLS 17 Addr string 18 Hostname string 19 TLS *TLS 20 SQLDriver string 21 SQLSource string 20 22 } 21 23 … … 26 28 } 27 29 return &Server{ 28 Addr: ":6667", 29 Hostname: hostname, 30 Addr: ":6667", 31 Hostname: hostname, 32 SQLDriver: "sqlite3", 33 SQLSource: "jounce.db", 30 34 } 31 35 } … … 65 69 } 66 70 srv.TLS = tls 71 case "sql": 72 if err := d.parseParams(&srv.SQLDriver, &srv.SQLSource); err != nil { 73 return nil, err 74 } 67 75 default: 68 76 return nil, fmt.Errorf("unknown directive %q", d.Name) -
trunk/downstream.go
r76 r77 59 59 username string 60 60 realname string 61 upstream *Upstream61 network *network // can be nil 62 62 } 63 63 … … 101 101 func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) { 102 102 dc.user.forEachUpstream(func(uc *upstreamConn) { 103 if dc. upstream != nil && uc.upstream != dc.upstream{103 if dc.network != nil && uc.network != dc.network { 104 104 return 105 105 } … … 302 302 func (dc *downstreamConn) register() error { 303 303 username := strings.TrimPrefix(dc.username, "~") 304 var upstreamName string304 var networkName string 305 305 if i := strings.LastIndexAny(username, "/@"); i >= 0 { 306 upstreamName = username[i+1:]306 networkName = username[i+1:] 307 307 } 308 308 if i := strings.IndexAny(username, "/@"); i >= 0 { … … 321 321 } 322 322 323 if upstreamName != "" {324 dc. upstream = dc.user.getUpstream(upstreamName)325 if dc. upstream== nil {326 dc.logger.Printf("failed registration: unknown upstream %q", upstreamName)323 if networkName != "" { 324 dc.network = dc.user.getNetwork(networkName) 325 if dc.network == nil { 326 dc.logger.Printf("failed registration: unknown network %q", networkName) 327 327 dc.SendMessage(&irc.Message{ 328 328 Prefix: dc.srv.prefix(), 329 329 Command: irc.ERR_PASSWDMISMATCH, 330 Params: []string{"*", fmt.Sprintf("Unknown upstream server %q", upstreamName)},330 Params: []string{"*", fmt.Sprintf("Unknown network %q", networkName)}, 331 331 }) 332 332 return nil -
trunk/go.mod
r1 r77 3 3 go 1.13 4 4 5 require gopkg.in/irc.v3 v3.1.0 5 require ( 6 github.com/mattn/go-sqlite3 v2.0.3+incompatible 7 gopkg.in/irc.v3 v3.1.0 8 ) -
trunk/go.sum
r1 r77 1 github.com/mattn/go-sqlite3 v1.13.0 h1:LnJI81JidiW9r7pS/hXe6cFeO5EXNq7KbfvoJLRI69c= 2 github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= 3 github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= 1 4 gopkg.in/irc.v3 v3.1.0 h1:AeDaEhQ/78gHfpbj/3mSi8FfiNIsFiVrWEgLzOwHWnU= 2 5 gopkg.in/irc.v3 v3.1.0/go.mod h1:qE0DWv0j8Z8wCbFhA9783JBO0bufi3rttcV1Sjin8io= -
trunk/server.go
r75 r77 48 48 } 49 49 50 type network struct { 51 Network 52 user *user 53 conn *upstreamConn 54 } 55 56 func newNetwork(user *user, record *Network) *network { 57 return &network{ 58 Network: *record, 59 user: user, 60 } 61 } 62 63 func (net *network) run() { 64 var lastTry time.Time 65 for { 66 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay { 67 delay := retryConnectMinDelay - dur 68 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr) 69 time.Sleep(delay) 70 } 71 lastTry = time.Now() 72 73 uc, err := connectToUpstream(net) 74 if err != nil { 75 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err) 76 continue 77 } 78 79 uc.register() 80 81 net.user.lock.Lock() 82 net.conn = uc 83 net.user.lock.Unlock() 84 85 if err := uc.readMessages(); err != nil { 86 uc.logger.Printf("failed to handle messages: %v", err) 87 } 88 uc.Close() 89 90 net.user.lock.Lock() 91 net.conn = nil 92 net.user.lock.Unlock() 93 } 94 } 95 50 96 type user struct { 51 username string97 User 52 98 srv *Server 53 99 54 100 lock sync.Mutex 55 upstreamConns []*upstreamConn101 networks []*network 56 102 downstreamConns []*downstreamConn 57 103 } 58 104 59 func newUser(srv *Server, username string) *user {105 func newUser(srv *Server, record *User) *user { 60 106 return &user{ 61 username: username,62 srv: 107 User: *record, 108 srv: srv, 63 109 } 64 110 } … … 66 112 func (u *user) forEachUpstream(f func(uc *upstreamConn)) { 67 113 u.lock.Lock() 68 for _, uc := range u.upstreamConns { 69 if !uc.registered || uc.closed { 114 for _, network := range u.networks { 115 uc := network.conn 116 if uc == nil || !uc.registered || uc.closed { 70 117 continue 71 118 } … … 83 130 } 84 131 85 func (u *user) get Upstream(name string) *Upstream{86 for i, upstream := range u.srv.Upstreams {87 if upstream.Addr == name {88 return &u.srv.Upstreams[i]132 func (u *user) getNetwork(name string) *network { 133 for _, network := range u.networks { 134 if network.Addr == name { 135 return network 89 136 } 90 137 } … … 92 139 } 93 140 94 type Upstream struct { 95 Addr string 96 Nick string 97 Username string 98 Realname string 99 Channels []string 141 func (u *user) run() { 142 networks, err := u.srv.db.ListNetworks(u.Username) 143 if err != nil { 144 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err) 145 return 146 } 147 148 u.lock.Lock() 149 for _, record := range networks { 150 network := newNetwork(u, &record) 151 u.networks = append(u.networks, network) 152 153 go network.run() 154 } 155 u.lock.Unlock() 100 156 } 101 157 … … 105 161 RingCap int 106 162 Debug bool 107 Upstreams []Upstream // TODO: per-user 163 164 db *DB 108 165 109 166 lock sync.Mutex … … 112 169 } 113 170 114 func NewServer( ) *Server {171 func NewServer(db *DB) *Server { 115 172 return &Server{ 116 173 Logger: log.New(log.Writer(), "", log.LstdFlags), 117 174 RingCap: 4096, 118 175 users: make(map[string]*user), 176 db: db, 119 177 } 120 178 } … … 124 182 } 125 183 126 func (s *Server) runUpstream(u *user, upstream *Upstream) { 127 var lastTry time.Time 128 for { 129 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay { 130 delay := retryConnectMinDelay - dur 131 s.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), upstream.Addr) 132 time.Sleep(delay) 133 } 134 lastTry = time.Now() 135 136 uc, err := connectToUpstream(u, upstream) 137 if err != nil { 138 s.Logger.Printf("failed to connect to upstream server %q: %v", upstream.Addr, err) 139 continue 140 } 141 142 uc.register() 143 144 u.lock.Lock() 145 u.upstreamConns = append(u.upstreamConns, uc) 146 u.lock.Unlock() 147 148 if err := uc.readMessages(); err != nil { 149 uc.logger.Printf("failed to handle messages: %v", err) 150 } 151 uc.Close() 152 153 u.lock.Lock() 154 for i := range u.upstreamConns { 155 if u.upstreamConns[i] == uc { 156 u.upstreamConns = append(u.upstreamConns[:i], u.upstreamConns[i+1:]...) 157 break 158 } 159 } 160 u.lock.Unlock() 161 } 162 } 163 164 func (s *Server) Run() { 165 // TODO: multi-user 166 u := newUser(s, "jounce") 184 func (s *Server) Run() error { 185 users, err := s.db.ListUsers() 186 if err != nil { 187 return err 188 } 167 189 168 190 s.lock.Lock() 169 s.users[u.username] = u 191 for _, record := range users { 192 s.Logger.Printf("starting bouncer for user %q", record.Username) 193 u := newUser(s, &record) 194 s.users[u.Username] = u 195 196 go u.run() 197 } 170 198 s.lock.Unlock() 171 199 172 for i := range s.Upstreams { 173 go s.runUpstream(u, &s.Upstreams[i]) 174 } 200 select {} 175 201 } 176 202 -
trunk/upstream.go
r74 r77 26 26 27 27 type upstreamConn struct { 28 upstream *Upstream28 network *network 29 29 logger Logger 30 30 net net.Conn … … 42 42 registered bool 43 43 nick string 44 username string 45 realname string 44 46 closed bool 45 47 modes modeSet … … 48 50 } 49 51 50 func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) { 51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)} 52 logger.Printf("connecting to server") 53 54 netConn, err := tls.Dial("tcp", upstream.Addr, nil) 52 func connectToUpstream(network *network) (*upstreamConn, error) { 53 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)} 54 55 addr := network.Addr 56 if !strings.ContainsRune(addr, ':') { 57 addr = addr + ":6697" 58 } 59 60 logger.Printf("connecting to TLS server at address %q", addr) 61 netConn, err := tls.Dial("tcp", addr, nil) 55 62 if err != nil { 56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)63 return nil, fmt.Errorf("failed to dial %q: %v", addr, err) 57 64 } 58 65 … … 61 68 msgs := make(chan *irc.Message, 64) 62 69 uc := &upstreamConn{ 63 upstream: upstream,70 network: network, 64 71 logger: logger, 65 72 net: netConn, 66 73 irc: irc.NewConn(netConn), 67 srv: u.srv,68 user: u,74 srv: network.user.srv, 75 user: network.user, 69 76 messages: msgs, 70 ring: NewRing( u.srv.RingCap),77 ring: NewRing(network.user.srv.RingCap), 71 78 channels: make(map[string]*upstreamChannel), 72 79 history: make(map[string]uint64), … … 103 110 func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) { 104 111 uc.user.forEachDownstream(func(dc *downstreamConn) { 105 if dc. upstream != nil && dc.upstream != uc.upstream{112 if dc.network != nil && dc.network != uc.network { 106 113 return 107 114 } … … 164 171 uc.logger.Printf("connection registered") 165 172 166 for _, ch := range uc.upstream.Channels { 173 channels, err := uc.srv.db.ListChannels(uc.network.ID) 174 if err != nil { 175 uc.logger.Printf("failed to list channels from database: %v", err) 176 break 177 } 178 179 for _, ch := range channels { 167 180 uc.SendMessage(&irc.Message{ 168 181 Command: "JOIN", 169 Params: []string{ch },182 Params: []string{ch.Name}, 170 183 }) 171 184 } … … 372 385 373 386 func (uc *upstreamConn) register() { 374 uc.nick = uc.upstream.Nick 387 uc.nick = uc.network.Nick 388 uc.username = uc.network.Username 389 if uc.username == "" { 390 uc.username = uc.nick 391 } 392 uc.realname = uc.network.Realname 393 if uc.realname == "" { 394 uc.realname = uc.nick 395 } 396 375 397 uc.SendMessage(&irc.Message{ 376 398 Command: "NICK", … … 379 401 uc.SendMessage(&irc.Message{ 380 402 Command: "USER", 381 Params: []string{uc.u pstream.Username, "0", "*", uc.upstream.Realname},403 Params: []string{uc.username, "0", "*", uc.realname}, 382 404 }) 383 405 }
Note:
See TracChangeset
for help on using the changeset viewer.