aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/api/api0/api.go4
-rw-r--r--pkg/api/api0/client.go93
-rw-r--r--pkg/api/api0/server.go51
-rw-r--r--pkg/atlas/server.go6
4 files changed, 102 insertions, 52 deletions
diff --git a/pkg/api/api0/api.go b/pkg/api/api0/api.go
index 1c665f9..2e3593c 100644
--- a/pkg/api/api0/api.go
+++ b/pkg/api/api0/api.go
@@ -29,6 +29,7 @@ import (
"github.com/pg9182/ip2x"
"github.com/r2northstar/atlas/pkg/eax"
"github.com/r2northstar/atlas/pkg/metricsx"
+ "github.com/r2northstar/atlas/pkg/nspkt"
"github.com/r2northstar/atlas/pkg/origin"
"github.com/rs/zerolog/hlog"
"golang.org/x/mod/semver"
@@ -45,6 +46,9 @@ type Handler struct {
// PdataStorage stores player data. It must be non-nil.
PdataStorage PdataStorage
+ // NSPkt handles connectionless packets. It must be non-nil.
+ NSPkt *nspkt.Listener
+
// UsernameSource configures the source to use for usernames.
UsernameSource UsernameSource
diff --git a/pkg/api/api0/client.go b/pkg/api/api0/client.go
index e7caaf6..6d90bc2 100644
--- a/pkg/api/api0/client.go
+++ b/pkg/api/api0/client.go
@@ -11,7 +11,6 @@ import (
"strings"
"time"
- "github.com/r2northstar/atlas/pkg/a2s"
"github.com/r2northstar/atlas/pkg/api/api0/api0gameserver"
"github.com/r2northstar/atlas/pkg/eax"
"github.com/r2northstar/atlas/pkg/origin"
@@ -617,61 +616,65 @@ func (h *Handler) handleClientAuthWithServer(w http.ResponseWriter, r *http.Requ
h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart)
} else {
- obj := map[string]any{
- "type": "connect",
- "token": authToken,
- "uid": acct.UID,
- "username": acct.Username,
- "ip": raddr.Addr().String(),
- "time": time.Now().Unix(),
- }
+ var attempts int
+ if rej, err := func() (string, error) {
+ obj := map[string]any{
+ "type": "connect",
+ "token": authToken,
+ "uid": acct.UID,
+ "username": acct.Username,
+ "ip": raddr.Addr().String(),
+ "time": time.Now().Unix(),
+ }
- key := connectStateKey{
- ServerID: srv.ID,
- Token: authToken,
- }
+ key := connectStateKey{
+ ServerID: srv.ID,
+ Token: authToken,
+ }
- ch := make(chan string, 1)
- h.connect.Store(key, &connectState{
- res: ch,
- pdata: pbuf,
- })
- defer h.connect.Delete(key)
+ ch := make(chan string, 1)
+ h.connect.Store(key, &connectState{
+ res: ch,
+ pdata: pbuf,
+ })
+ defer h.connect.Delete(key)
- var attempts int
- if rej, err := func() (string, error) {
- t := time.NewTicker(time.Millisecond * 250)
- defer t.Stop()
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
x := make(chan error, 1)
- for {
- attempts++
- go func() {
- if err := a2s.AtlasSigreq1(srv.AuthAddr(), time.Second, srv.ServerAuthToken, obj); err != nil {
- if !errors.Is(err, a2s.ErrTimeout) {
- select {
- case x <- err:
- default:
- }
- }
- }
- }()
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if errors.Is(err, context.DeadlineExceeded) {
- // if we timed out, it might be because of an error while sending the packet
+ go func() {
+ t := time.NewTicker(time.Millisecond * 250)
+ defer t.Stop()
+
+ for {
+ attempts++
+ if err := h.NSPkt.SendAtlasSigreq1(srv.AuthAddr(), srv.ServerAuthToken, obj); err != nil {
select {
- case err = <-x:
+ case x <- err:
default:
}
}
- return "", err
- case x := <-ch:
- return x, nil
- case <-t.C:
+ select {
+ case <-ctx.Done():
+ return
+ case <-t.C:
+ }
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ select {
+ case err = <-x:
+ // error could be due to an issue sending the packet
+ default:
}
+ return "", err
+ case x := <-ch:
+ return x, nil
}
}(); err != nil {
h.m().client_authwithserver_gameserverauthudp_duration_seconds.UpdateDuration(authStart)
diff --git a/pkg/api/api0/server.go b/pkg/api/api0/server.go
index f26f7d7..c7f4f46 100644
--- a/pkg/api/api0/server.go
+++ b/pkg/api/api0/server.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "math/rand"
"net/http"
"net/netip"
"strconv"
@@ -12,7 +13,6 @@ import (
"time"
"github.com/pg9182/ip2x"
- "github.com/r2northstar/atlas/pkg/a2s"
"github.com/r2northstar/atlas/pkg/api/api0/api0gameserver"
"github.com/rs/zerolog/hlog"
)
@@ -391,18 +391,18 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) {
}
}
- if err := a2s.Probe(s.Addr, time.Until(nsrv.VerificationDeadline)); err != nil {
- var code ErrorCode
+ if err := h.probeUDP(ctx, s.Addr); err != nil {
+ var obj ErrorObj
switch {
- case errors.Is(err, a2s.ErrTimeout):
+ case errors.Is(err, context.DeadlineExceeded):
h.m().server_upsert_requests_total.reject_verify_udptimeout(action).Inc()
- code = ErrorCode_NO_GAMESERVER_RESPONSE
+ obj = ErrorCode_NO_GAMESERVER_RESPONSE.MessageObjf("failed to connect to game port")
default:
h.m().server_upsert_requests_total.reject_verify_udperr(action).Inc()
- code = ErrorCode_BAD_GAMESERVER_RESPONSE
+ obj = ErrorCode_INTERNAL_SERVER_ERROR.MessageObjf("failed to connect to game port: %v", err)
}
h.m().server_upsert_verify_time_seconds.failure.UpdateDuration(verifyStart)
- respFail(w, r, http.StatusBadGateway, code.MessageObjf("failed to connect to game port: %v", err))
+ respFail(w, r, http.StatusBadGateway, obj)
return
}
@@ -425,6 +425,43 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) {
})
}
+func (h *Handler) probeUDP(ctx context.Context, addr netip.AddrPort) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ uid := rand.Uint64()
+
+ x := make(chan error, 1)
+ go func() {
+ t := time.NewTicker(time.Second * 3) // note: we don't want to exceed the connectionless rate limit
+ defer t.Stop()
+
+ for {
+ if err := h.NSPkt.SendConnect(addr, uid); err != nil {
+ select {
+ case x <- err:
+ default:
+ }
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-t.C:
+ }
+ }
+ }()
+
+ err := h.NSPkt.WaitConnectReply(ctx, addr, uid)
+ if err != nil {
+ select {
+ case err = <-x:
+ // error could be due to an issue sending the packet
+ default:
+ }
+ }
+ return err
+}
+
func (h *Handler) handleServerRemove(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodOptions && r.Method != http.MethodDelete {
h.m().server_remove_requests_total.http_method_not_allowed.Inc()
diff --git a/pkg/atlas/server.go b/pkg/atlas/server.go
index c8f39f4..2956401 100644
--- a/pkg/atlas/server.go
+++ b/pkg/atlas/server.go
@@ -28,6 +28,7 @@ import (
"github.com/r2northstar/atlas/pkg/cloudflare"
"github.com/r2northstar/atlas/pkg/eax"
"github.com/r2northstar/atlas/pkg/memstore"
+ "github.com/r2northstar/atlas/pkg/nspkt"
"github.com/r2northstar/atlas/pkg/origin"
"github.com/r2northstar/atlas/pkg/regionmap"
"github.com/rs/zerolog"
@@ -270,6 +271,7 @@ func NewServer(c *Config) (*Server, error) {
m.Add(hlog.RequestIDHandler("rid", ""))
s.API0 = &api0.Handler{
+ NSPkt: nspkt.NewListener(),
ServerList: api0.NewServerList(c.API0_ServerList_DeadTime, c.API0_ServerList_GhostTime, c.API0_ServerList_VerifyTime, api0.ServerListConfig{
ExperimentalDeterministicServerIDSecret: c.API0_ServerList_ExperimentalDeterministicServerIDSecret,
AllowUwuify: c.AllowJokes,
@@ -913,6 +915,9 @@ func (s *Server) Run(ctx context.Context) error {
}
}()
}
+ go func() {
+ errch <- s.API0.NSPkt.ListenAndServe(netip.AddrPort{})
+ }()
select {
case <-ctx.Done():
@@ -984,6 +989,7 @@ func (s *Server) serveRest(w http.ResponseWriter, r *http.Request) {
if internal {
ms = append(ms, metrics.WriteProcessMetrics)
ms = append(ms, s.API0.WritePrometheus)
+ ms = append(ms, s.API0.NSPkt.WritePrometheus)
}
ms = append(ms, s.API0.ServerList.WritePrometheus)
if internal && geo {