diff options
-rw-r--r-- | pkg/api/api0/api.go | 4 | ||||
-rw-r--r-- | pkg/api/api0/client.go | 93 | ||||
-rw-r--r-- | pkg/api/api0/server.go | 51 | ||||
-rw-r--r-- | pkg/atlas/server.go | 6 |
4 files changed, 102 insertions, 52 deletions
diff --git a/pkg/api/api0/api.go b/pkg/api/api0/api.go index 1c665f9..2e3593c 100644 --- a/pkg/api/api0/api.go +++ b/pkg/api/api0/api.go @@ -29,6 +29,7 @@ import ( "github.com/pg9182/ip2x" "github.com/r2northstar/atlas/pkg/eax" "github.com/r2northstar/atlas/pkg/metricsx" + "github.com/r2northstar/atlas/pkg/nspkt" "github.com/r2northstar/atlas/pkg/origin" "github.com/rs/zerolog/hlog" "golang.org/x/mod/semver" @@ -45,6 +46,9 @@ type Handler struct { // PdataStorage stores player data. It must be non-nil. PdataStorage PdataStorage + // NSPkt handles connectionless packets. It must be non-nil. + NSPkt *nspkt.Listener + // UsernameSource configures the source to use for usernames. UsernameSource UsernameSource diff --git a/pkg/api/api0/client.go b/pkg/api/api0/client.go index e7caaf6..6d90bc2 100644 --- a/pkg/api/api0/client.go +++ b/pkg/api/api0/client.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/r2northstar/atlas/pkg/a2s" "github.com/r2northstar/atlas/pkg/api/api0/api0gameserver" "github.com/r2northstar/atlas/pkg/eax" "github.com/r2northstar/atlas/pkg/origin" @@ -617,61 +616,65 @@ func (h *Handler) handleClientAuthWithServer(w http.ResponseWriter, r *http.Requ h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart) } else { - obj := map[string]any{ - "type": "connect", - "token": authToken, - "uid": acct.UID, - "username": acct.Username, - "ip": raddr.Addr().String(), - "time": time.Now().Unix(), - } + var attempts int + if rej, err := func() (string, error) { + obj := map[string]any{ + "type": "connect", + "token": authToken, + "uid": acct.UID, + "username": acct.Username, + "ip": raddr.Addr().String(), + "time": time.Now().Unix(), + } - key := connectStateKey{ - ServerID: srv.ID, - Token: authToken, - } + key := connectStateKey{ + ServerID: srv.ID, + Token: authToken, + } - ch := make(chan string, 1) - h.connect.Store(key, &connectState{ - res: ch, - pdata: pbuf, - }) - defer h.connect.Delete(key) + ch := make(chan string, 1) + h.connect.Store(key, &connectState{ + res: ch, + pdata: pbuf, + }) + defer h.connect.Delete(key) - var attempts int - if rej, err := func() (string, error) { - t := time.NewTicker(time.Millisecond * 250) - defer t.Stop() + ctx, cancel := context.WithCancel(ctx) + defer cancel() x := make(chan error, 1) - for { - attempts++ - go func() { - if err := a2s.AtlasSigreq1(srv.AuthAddr(), time.Second, srv.ServerAuthToken, obj); err != nil { - if !errors.Is(err, a2s.ErrTimeout) { - select { - case x <- err: - default: - } - } - } - }() - select { - case <-ctx.Done(): - err := ctx.Err() - if errors.Is(err, context.DeadlineExceeded) { - // if we timed out, it might be because of an error while sending the packet + go func() { + t := time.NewTicker(time.Millisecond * 250) + defer t.Stop() + + for { + attempts++ + if err := h.NSPkt.SendAtlasSigreq1(srv.AuthAddr(), srv.ServerAuthToken, obj); err != nil { select { - case err = <-x: + case x <- err: default: } } - return "", err - case x := <-ch: - return x, nil - case <-t.C: + select { + case <-ctx.Done(): + return + case <-t.C: + } + } + }() + + select { + case <-ctx.Done(): + err := ctx.Err() + select { + case err = <-x: + // error could be due to an issue sending the packet + default: } + return "", err + case x := <-ch: + return x, nil } }(); err != nil { h.m().client_authwithserver_gameserverauthudp_duration_seconds.UpdateDuration(authStart) diff --git a/pkg/api/api0/server.go b/pkg/api/api0/server.go index f26f7d7..c7f4f46 100644 --- a/pkg/api/api0/server.go +++ b/pkg/api/api0/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "net/http" "net/netip" "strconv" @@ -12,7 +13,6 @@ import ( "time" "github.com/pg9182/ip2x" - "github.com/r2northstar/atlas/pkg/a2s" "github.com/r2northstar/atlas/pkg/api/api0/api0gameserver" "github.com/rs/zerolog/hlog" ) @@ -391,18 +391,18 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) { } } - if err := a2s.Probe(s.Addr, time.Until(nsrv.VerificationDeadline)); err != nil { - var code ErrorCode + if err := h.probeUDP(ctx, s.Addr); err != nil { + var obj ErrorObj switch { - case errors.Is(err, a2s.ErrTimeout): + case errors.Is(err, context.DeadlineExceeded): h.m().server_upsert_requests_total.reject_verify_udptimeout(action).Inc() - code = ErrorCode_NO_GAMESERVER_RESPONSE + obj = ErrorCode_NO_GAMESERVER_RESPONSE.MessageObjf("failed to connect to game port") default: h.m().server_upsert_requests_total.reject_verify_udperr(action).Inc() - code = ErrorCode_BAD_GAMESERVER_RESPONSE + obj = ErrorCode_INTERNAL_SERVER_ERROR.MessageObjf("failed to connect to game port: %v", err) } h.m().server_upsert_verify_time_seconds.failure.UpdateDuration(verifyStart) - respFail(w, r, http.StatusBadGateway, code.MessageObjf("failed to connect to game port: %v", err)) + respFail(w, r, http.StatusBadGateway, obj) return } @@ -425,6 +425,43 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) { }) } +func (h *Handler) probeUDP(ctx context.Context, addr netip.AddrPort) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + uid := rand.Uint64() + + x := make(chan error, 1) + go func() { + t := time.NewTicker(time.Second * 3) // note: we don't want to exceed the connectionless rate limit + defer t.Stop() + + for { + if err := h.NSPkt.SendConnect(addr, uid); err != nil { + select { + case x <- err: + default: + } + } + select { + case <-ctx.Done(): + return + case <-t.C: + } + } + }() + + err := h.NSPkt.WaitConnectReply(ctx, addr, uid) + if err != nil { + select { + case err = <-x: + // error could be due to an issue sending the packet + default: + } + } + return err +} + func (h *Handler) handleServerRemove(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodOptions && r.Method != http.MethodDelete { h.m().server_remove_requests_total.http_method_not_allowed.Inc() diff --git a/pkg/atlas/server.go b/pkg/atlas/server.go index c8f39f4..2956401 100644 --- a/pkg/atlas/server.go +++ b/pkg/atlas/server.go @@ -28,6 +28,7 @@ import ( "github.com/r2northstar/atlas/pkg/cloudflare" "github.com/r2northstar/atlas/pkg/eax" "github.com/r2northstar/atlas/pkg/memstore" + "github.com/r2northstar/atlas/pkg/nspkt" "github.com/r2northstar/atlas/pkg/origin" "github.com/r2northstar/atlas/pkg/regionmap" "github.com/rs/zerolog" @@ -270,6 +271,7 @@ func NewServer(c *Config) (*Server, error) { m.Add(hlog.RequestIDHandler("rid", "")) s.API0 = &api0.Handler{ + NSPkt: nspkt.NewListener(), ServerList: api0.NewServerList(c.API0_ServerList_DeadTime, c.API0_ServerList_GhostTime, c.API0_ServerList_VerifyTime, api0.ServerListConfig{ ExperimentalDeterministicServerIDSecret: c.API0_ServerList_ExperimentalDeterministicServerIDSecret, AllowUwuify: c.AllowJokes, @@ -913,6 +915,9 @@ func (s *Server) Run(ctx context.Context) error { } }() } + go func() { + errch <- s.API0.NSPkt.ListenAndServe(netip.AddrPort{}) + }() select { case <-ctx.Done(): @@ -984,6 +989,7 @@ func (s *Server) serveRest(w http.ResponseWriter, r *http.Request) { if internal { ms = append(ms, metrics.WriteProcessMetrics) ms = append(ms, s.API0.WritePrometheus) + ms = append(ms, s.API0.NSPkt.WritePrometheus) } ms = append(ms, s.API0.ServerList.WritePrometheus) if internal && geo { |