diff options
-rw-r--r-- | pkg/api/api0/api.go | 16 | ||||
-rw-r--r-- | pkg/api/api0/client.go | 149 | ||||
-rw-r--r-- | pkg/api/api0/metrics.go | 32 | ||||
-rw-r--r-- | pkg/api/api0/server.go | 141 | ||||
-rw-r--r-- | pkg/api/api0/serverlist.go | 8 |
5 files changed, 299 insertions, 47 deletions
diff --git a/pkg/api/api0/api.go b/pkg/api/api0/api.go index 79766e5..1c665f9 100644 --- a/pkg/api/api0/api.go +++ b/pkg/api/api0/api.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/klauspost/compress/gzip" @@ -107,6 +108,19 @@ type Handler struct { metricsInit sync.Once metricsObj apiMetrics + + connect sync.Map // [connectStateKey]*connectState +} + +type connectStateKey struct { + ServerID string + Token string +} + +type connectState struct { + res chan<- string // buffer 1 + pdata []byte + gotPdata atomic.Bool } // ServeHTTP routes requests to Handler. @@ -135,6 +149,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.handleServerUpsert(w, r) case "/server/remove_server": h.handleServerRemove(w, r) + case "/server/connect": + h.handleServerConnect(w, r) case "/accounts/write_persistence": h.handleAccountsWritePersistence(w, r) case "/accounts/get_username": diff --git a/pkg/api/api0/client.go b/pkg/api/api0/client.go index 2dbad76..e7caaf6 100644 --- a/pkg/api/api0/client.go +++ b/pkg/api/api0/client.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/r2northstar/atlas/pkg/a2s" "github.com/r2northstar/atlas/pkg/api/api0/api0gameserver" "github.com/r2northstar/atlas/pkg/eax" "github.com/r2northstar/atlas/pkg/origin" @@ -490,6 +491,16 @@ func (h *Handler) handleClientAuthWithServer(w http.ResponseWriter, r *http.Requ return } + raddr, err := netip.ParseAddrPort(r.RemoteAddr) + if err != nil { + hlog.FromRequest(r).Error(). + Err(err). + Msgf("failed to parse remote ip %q", r.RemoteAddr) + h.m().client_authwithself_requests_total.fail_other_error.Inc() + respFail(w, r, http.StatusInternalServerError, ErrorCode_INTERNAL_SERVER_ERROR.MessageObj()) + return + } + uidQ := r.URL.Query().Get("id") if uidQ == "" { h.m().client_authwithserver_requests_total.reject_bad_request.Inc() @@ -572,38 +583,126 @@ func (h *Handler) handleClientAuthWithServer(w http.ResponseWriter, r *http.Requ ctx, cancel := context.WithTimeout(r.Context(), time.Second*5) defer cancel() - if err := api0gameserver.AuthenticateIncomingPlayer(ctx, srv.AuthAddr(), acct.UID, acct.Username, authToken, srv.ServerAuthToken, pbuf); err != nil { - h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart) - if errors.Is(err, context.DeadlineExceeded) { - err = fmt.Errorf("request timed out") - } - var rej api0gameserver.ConnectionRejectedError - switch { - case errors.As(err, &rej): - h.m().client_authwithserver_requests_total.reject_gameserver.Inc() - respFail(w, r, http.StatusForbidden, ErrorCode_CONNECTION_REJECTED.MessageObjf("%s", rej.Reason())) - case errors.Is(err, api0gameserver.ErrAuthFailed): - h.m().client_authwithserver_requests_total.reject_gameserverauth.Inc() - respFail(w, r, http.StatusInternalServerError, ErrorCode_JSON_PARSE_ERROR.MessageObj()) // this is kind of misleading... but it's what the original master server did - case errors.Is(err, api0gameserver.ErrInvalidResponse): - hlog.FromRequest(r).Error(). - Err(err). - Msgf("failed to make gameserver auth request") - h.m().client_authwithserver_requests_total.fail_gameserverauth.Inc() - respFail(w, r, http.StatusInternalServerError, ErrorCode_BAD_GAMESERVER_RESPONSE.MessageObj()) - default: - if !errors.Is(err, context.Canceled) { + if srv.AuthPort != 0 { + if err := api0gameserver.AuthenticateIncomingPlayer(ctx, srv.AuthAddr(), acct.UID, acct.Username, authToken, srv.ServerAuthToken, pbuf); err != nil { + h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart) + if errors.Is(err, context.DeadlineExceeded) { + err = fmt.Errorf("request timed out") + } + var rej api0gameserver.ConnectionRejectedError + switch { + case errors.As(err, &rej): + h.m().client_authwithserver_requests_total.reject_gameserver.Inc() + respFail(w, r, http.StatusForbidden, ErrorCode_CONNECTION_REJECTED.MessageObjf("%s", rej.Reason())) + case errors.Is(err, api0gameserver.ErrAuthFailed): + h.m().client_authwithserver_requests_total.reject_gameserverauth.Inc() + respFail(w, r, http.StatusInternalServerError, ErrorCode_JSON_PARSE_ERROR.MessageObj()) // this is kind of misleading... but it's what the original master server did + case errors.Is(err, api0gameserver.ErrInvalidResponse): hlog.FromRequest(r).Error(). Err(err). Msgf("failed to make gameserver auth request") h.m().client_authwithserver_requests_total.fail_gameserverauth.Inc() + respFail(w, r, http.StatusInternalServerError, ErrorCode_BAD_GAMESERVER_RESPONSE.MessageObj()) + default: + if !errors.Is(err, context.Canceled) { + hlog.FromRequest(r).Error(). + Err(err). + Msgf("failed to make gameserver auth request") + h.m().client_authwithserver_requests_total.fail_gameserverauth.Inc() + } + respFail(w, r, http.StatusInternalServerError, ErrorCode_INTERNAL_SERVER_ERROR.MessageObj()) } - respFail(w, r, http.StatusInternalServerError, ErrorCode_INTERNAL_SERVER_ERROR.MessageObj()) + return } - return - } - h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart) + h.m().client_authwithserver_gameserverauth_duration_seconds.UpdateDuration(authStart) + } else { + obj := map[string]any{ + "type": "connect", + "token": authToken, + "uid": acct.UID, + "username": acct.Username, + "ip": raddr.Addr().String(), + "time": time.Now().Unix(), + } + + key := connectStateKey{ + ServerID: srv.ID, + Token: authToken, + } + + ch := make(chan string, 1) + h.connect.Store(key, &connectState{ + res: ch, + pdata: pbuf, + }) + defer h.connect.Delete(key) + + var attempts int + if rej, err := func() (string, error) { + t := time.NewTicker(time.Millisecond * 250) + defer t.Stop() + + x := make(chan error, 1) + + for { + attempts++ + go func() { + if err := a2s.AtlasSigreq1(srv.AuthAddr(), time.Second, srv.ServerAuthToken, obj); err != nil { + if !errors.Is(err, a2s.ErrTimeout) { + select { + case x <- err: + default: + } + } + } + }() + select { + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + // if we timed out, it might be because of an error while sending the packet + select { + case err = <-x: + default: + } + } + return "", err + case x := <-ch: + return x, nil + case <-t.C: + } + } + }(); err != nil { + h.m().client_authwithserver_gameserverauthudp_duration_seconds.UpdateDuration(authStart) + switch { + case errors.Is(err, context.DeadlineExceeded): + hlog.FromRequest(r).Error(). + Err(err). + Msgf("failed to make gameserver udp auth request") + h.m().client_authwithserver_requests_total.fail_gameserverauthudp.Inc() + respFail(w, r, http.StatusGatewayTimeout, ErrorCode_NO_GAMESERVER_RESPONSE.MessageObj()) + default: + if !errors.Is(err, context.Canceled) { + hlog.FromRequest(r).Error(). + Err(err). + Msgf("failed to make gameserver udp auth request") + h.m().client_authwithserver_requests_total.fail_gameserverauthudp.Inc() + respFail(w, r, http.StatusInternalServerError, ErrorCode_INTERNAL_SERVER_ERROR.MessageObj()) + } + } + return + } else if rej != "" { + h.m().client_authwithserver_gameserverauthudp_duration_seconds.UpdateDuration(authStart) + h.m().client_authwithserver_gameserverauthudp_attempts.Update(float64(attempts)) + h.m().client_authwithserver_requests_total.reject_gameserver.Inc() + respFail(w, r, http.StatusForbidden, ErrorCode_CONNECTION_REJECTED.MessageObjf("%s", rej)) + return + } + + h.m().client_authwithserver_gameserverauthudp_duration_seconds.UpdateDuration(authStart) + h.m().client_authwithserver_gameserverauthudp_attempts.Update(float64(attempts)) + } } acct.LastServerID = srv.ID diff --git a/pkg/api/api0/metrics.go b/pkg/api/api0/metrics.go index 271b61b..33c47b6 100644 --- a/pkg/api/api0/metrics.go +++ b/pkg/api/api0/metrics.go @@ -96,13 +96,16 @@ type apiMetrics struct { reject_gameserverauth *metrics.Counter reject_gameserver *metrics.Counter fail_gameserverauth *metrics.Counter + fail_gameserverauthudp *metrics.Counter fail_storage_error_account *metrics.Counter fail_storage_error_pdata *metrics.Counter fail_other_error *metrics.Counter http_method_not_allowed *metrics.Counter } - client_authwithserver_gameserverauth_duration_seconds *metrics.Histogram - client_authwithself_requests_total struct { + client_authwithserver_gameserverauth_duration_seconds *metrics.Histogram + client_authwithserver_gameserverauthudp_duration_seconds *metrics.Histogram + client_authwithserver_gameserverauthudp_attempts *metrics.Histogram + client_authwithself_requests_total struct { success *metrics.Counter reject_bad_request *metrics.Counter reject_versiongate *metrics.Counter @@ -159,6 +162,18 @@ type apiMetrics struct { fail_other_error *metrics.Counter http_method_not_allowed *metrics.Counter } + server_connect_requests_total struct { + success *metrics.Counter + success_reject *metrics.Counter + success_pdata *metrics.Counter + reject_unauthorized_ip *metrics.Counter + reject_server_not_found *metrics.Counter + reject_invalid_connection_token *metrics.Counter + reject_must_get_pdata *metrics.Counter + reject_bad_request *metrics.Counter + fail_other_error *metrics.Counter + http_method_not_allowed *metrics.Counter + } player_pdata_requests_total struct { success func(filter string) *metrics.Counter reject_bad_request *metrics.Counter @@ -268,11 +283,14 @@ func (h *Handler) m() *apiMetrics { mo.client_authwithserver_requests_total.reject_gameserverauth = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="reject_gameserverauth"}`) mo.client_authwithserver_requests_total.reject_gameserver = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="reject_gameserver"}`) mo.client_authwithserver_requests_total.fail_gameserverauth = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="fail_gameserverauth"}`) + mo.client_authwithserver_requests_total.fail_gameserverauthudp = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="fail_gameserverauthudp"}`) mo.client_authwithserver_requests_total.fail_storage_error_account = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="fail_storage_error_account"}`) mo.client_authwithserver_requests_total.fail_storage_error_pdata = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="fail_storage_error_pdata"}`) mo.client_authwithserver_requests_total.fail_other_error = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="fail_other_error"}`) mo.client_authwithserver_requests_total.http_method_not_allowed = mo.set.NewCounter(`atlas_api0_client_authwithserver_requests_total{result="http_method_not_allowed"}`) mo.client_authwithserver_gameserverauth_duration_seconds = mo.set.NewHistogram(`atlas_api0_client_authwithserver_gameserverauth_duration_seconds`) + mo.client_authwithserver_gameserverauthudp_duration_seconds = mo.set.NewHistogram(`atlas_api0_client_authwithserver_gameserverauthudp_duration_seconds`) + mo.client_authwithserver_gameserverauthudp_attempts = mo.set.NewHistogram(`atlas_api0_client_authwithserver_gameserverauthudp_attempts`) mo.client_authwithself_requests_total.success = mo.set.NewCounter(`atlas_api0_client_authwithself_requests_total{result="success"}`) mo.client_authwithself_requests_total.reject_bad_request = mo.set.NewCounter(`atlas_api0_client_authwithself_requests_total{result="reject_bad_request"}`) mo.client_authwithself_requests_total.reject_versiongate = mo.set.NewCounter(`atlas_api0_client_authwithself_requests_total{result="reject_versiongate"}`) @@ -432,6 +450,16 @@ func (h *Handler) m() *apiMetrics { mo.server_remove_requests_total.reject_server_not_found = mo.set.NewCounter(`atlas_api0_server_remove_requests_total{result="reject_server_not_found"}`) mo.server_remove_requests_total.fail_other_error = mo.set.NewCounter(`atlas_api0_server_remove_requests_total{result="fail_other_error"}`) mo.server_remove_requests_total.http_method_not_allowed = mo.set.NewCounter(`atlas_api0_server_remove_requests_total{result="http_method_not_allowed"}`) + mo.server_connect_requests_total.success = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="success"}`) + mo.server_connect_requests_total.success_reject = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="success_reject"}`) + mo.server_connect_requests_total.success_pdata = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="success_pdata"}`) + mo.server_connect_requests_total.reject_unauthorized_ip = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="reject_unauthorized_ip"}`) + mo.server_connect_requests_total.reject_server_not_found = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="reject_server_not_found"}`) + mo.server_connect_requests_total.reject_invalid_connection_token = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="reject_invalid_connection_token"}`) + mo.server_connect_requests_total.reject_must_get_pdata = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="reject_must_get_pdata"}`) + mo.server_connect_requests_total.reject_bad_request = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="reject_bad_request"}`) + mo.server_connect_requests_total.fail_other_error = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="fail_other_error"}`) + mo.server_connect_requests_total.http_method_not_allowed = mo.set.NewCounter(`atlas_api0_server_connect_requests_total{result="http_method_not_allowed"}`) mo.player_pdata_requests_total.success = func(filter string) *metrics.Counter { if filter == "" { panic("invalid filter") diff --git a/pkg/api/api0/server.go b/pkg/api/api0/server.go index 93608b0..f26f7d7 100644 --- a/pkg/api/api0/server.go +++ b/pkg/api/api0/server.go @@ -140,6 +140,8 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) { respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("authPort param is required")) return } + } else if v == "udp" { + s.AuthPort = 0 } else if n, err := strconv.ParseUint(v, 10, 16); err != nil { h.m().server_upsert_requests_total.reject_bad_request(action).Inc() respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("authPort param is invalid: %v", err)) @@ -368,23 +370,25 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithDeadline(r.Context(), nsrv.VerificationDeadline) defer cancel() - if err := api0gameserver.Verify(ctx, s.AuthAddr()); err != nil { - var code ErrorCode - switch { - case errors.Is(err, context.DeadlineExceeded): - err = fmt.Errorf("request timed out") - code = ErrorCode_NO_GAMESERVER_RESPONSE - h.m().server_upsert_requests_total.reject_verify_authtimeout(action).Inc() - case errors.Is(err, api0gameserver.ErrInvalidResponse): - code = ErrorCode_BAD_GAMESERVER_RESPONSE - h.m().server_upsert_requests_total.reject_verify_authresp(action).Inc() - default: - code = ErrorCode_NO_GAMESERVER_RESPONSE - h.m().server_upsert_requests_total.reject_verify_autherr(action).Inc() + if nsrv.AuthPort != 0 { + if err := api0gameserver.Verify(ctx, s.AuthAddr()); err != nil { + var code ErrorCode + switch { + case errors.Is(err, context.DeadlineExceeded): + err = fmt.Errorf("request timed out") + code = ErrorCode_NO_GAMESERVER_RESPONSE + h.m().server_upsert_requests_total.reject_verify_authtimeout(action).Inc() + case errors.Is(err, api0gameserver.ErrInvalidResponse): + code = ErrorCode_BAD_GAMESERVER_RESPONSE + h.m().server_upsert_requests_total.reject_verify_authresp(action).Inc() + default: + code = ErrorCode_NO_GAMESERVER_RESPONSE + h.m().server_upsert_requests_total.reject_verify_autherr(action).Inc() + } + h.m().server_upsert_verify_time_seconds.failure.UpdateDuration(verifyStart) + respFail(w, r, http.StatusBadGateway, code.MessageObjf("failed to connect to auth port: %v", err)) + return } - h.m().server_upsert_verify_time_seconds.failure.UpdateDuration(verifyStart) - respFail(w, r, http.StatusBadGateway, code.MessageObjf("failed to connect to auth port: %v", err)) - return } if err := a2s.Probe(s.Addr, time.Until(nsrv.VerificationDeadline)); err != nil { @@ -401,6 +405,7 @@ func (h *Handler) handleServerUpsert(w http.ResponseWriter, r *http.Request) { respFail(w, r, http.StatusBadGateway, code.MessageObjf("failed to connect to game port: %v", err)) return } + h.m().server_upsert_verify_time_seconds.success.UpdateDuration(verifyStart) if !h.ServerList.VerifyServer(nsrv.ID) { @@ -474,3 +479,107 @@ func (h *Handler) handleServerRemove(w http.ResponseWriter, r *http.Request) { "success": true, }) } + +func (h *Handler) handleServerConnect(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodOptions && r.Method != http.MethodGet && r.Method != http.MethodPost { + h.m().server_connect_requests_total.http_method_not_allowed.Inc() + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + + w.Header().Set("Cache-Control", "private, no-cache, no-store") + w.Header().Set("Expires", "0") + w.Header().Set("Pragma", "no-cache") + + if r.Method == http.MethodOptions { + w.Header().Set("Allow", "OPTIONS, GET, POST") + w.WriteHeader(http.StatusNoContent) + return + } + + raddr, err := netip.ParseAddrPort(r.RemoteAddr) + if err != nil { + hlog.FromRequest(r).Error(). + Err(err). + Msgf("failed to parse remote ip %q", r.RemoteAddr) + h.m().server_connect_requests_total.fail_other_error.Inc() + respFail(w, r, http.StatusInternalServerError, ErrorCode_INTERNAL_SERVER_ERROR.MessageObj()) + return + } + + var serverId string + if v := r.URL.Query().Get("serverId"); v == "" { + h.m().server_connect_requests_total.reject_bad_request.Inc() + respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("serverId param is required")) + return + } else { + serverId = v + } + + srv := h.ServerList.GetServerByID(serverId) + if srv == nil { + h.m().server_connect_requests_total.reject_server_not_found.Inc() + respFail(w, r, http.StatusForbidden, ErrorCode_UNAUTHORIZED_GAMESERVER.MessageObjf("no such game server")) + return + } + if srv.Addr.Addr() != raddr.Addr() { + h.m().server_connect_requests_total.reject_unauthorized_ip.Inc() + respFail(w, r, http.StatusForbidden, ErrorCode_UNAUTHORIZED_GAMESERVER.MessageObj()) + return + } + + var state *connectState + if v := r.URL.Query().Get("token"); v == "" { + h.m().server_connect_requests_total.reject_invalid_connection_token.Inc() + respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("connection token is required")) + return + } else if v, ok := h.connect.Load(connectStateKey{ + ServerID: srv.ID, + Token: v, + }); !ok { + h.m().server_connect_requests_total.reject_invalid_connection_token.Inc() + respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("no such connection token (has it already been used?)")) + return + } else { + state = v.(*connectState) + } + + if r.Method == http.MethodGet { + state.gotPdata.Store(true) + h.m().server_connect_requests_total.success_pdata.Inc() + respMaybeCompress(w, r, http.StatusOK, state.pdata) + return + } + + var reject string + if v := r.URL.Query()["reject"]; len(v) != 1 { + h.m().server_connect_requests_total.reject_invalid_connection_token.Inc() + respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("reject is required (if no rejection reason, set to an empty string)")) + return + } else { + reject = v[0] + } + if n := 256; len(reject) > n { + reject = reject[:n] + } + + if reject == "" && !state.gotPdata.Load() { + h.m().server_connect_requests_total.reject_must_get_pdata.Inc() + respFail(w, r, http.StatusBadRequest, ErrorCode_BAD_REQUEST.MessageObjf("must get pdata before accepting connection")) + return + } + + select { + case state.res <- reject: + default: + } + + if reject == "" { + h.m().server_connect_requests_total.success.Inc() + } else { + h.m().server_connect_requests_total.success_reject.Inc() + } + respJSON(w, r, http.StatusOK, map[string]any{ + "success": true, + }) +} diff --git a/pkg/api/api0/serverlist.go b/pkg/api/api0/serverlist.go index 89b2e05..a1b3258 100644 --- a/pkg/api/api0/serverlist.go +++ b/pkg/api/api0/serverlist.go @@ -79,7 +79,7 @@ type Server struct { Order uint64 ID string // unique, must not be modified after creation Addr netip.AddrPort // unique, must not be modified after creation - AuthPort uint16 // unique with Addr.Addr(), must not be modified after creation + AuthPort uint16 // if zero, reuse game Addr for UDP-based auth, otherwise unique with Addr.Addr(), must not be modified after creation LauncherVersion string // for metrics @@ -112,6 +112,9 @@ type ServerModInfo struct { // AuthAddr returns the auth address for the server. func (s Server) AuthAddr() netip.AddrPort { + if s.AuthPort == 0 { + return s.Addr + } return netip.AddrPortFrom(s.Addr.Addr(), s.AuthPort) } @@ -904,9 +907,6 @@ func (s *ServerList) ServerHybridUpdatePut(u *ServerUpdate, c *Server, l ServerL if !nsrv.Addr.IsValid() { return nil, fmt.Errorf("addr is missing") } - if nsrv.AuthPort == 0 { - return nil, fmt.Errorf("authport is missing") - } // error if there's an existing server with a matching auth addr (note: // same ip as gameserver, different port) but different gameserver addr |