mirror of
https://github.com/strukturag/nextcloud-spreed-signaling.git
synced 2025-03-14 11:32:46 +00:00
5738 lines
171 KiB
Go
5738 lines
171 KiB
Go
/**
|
|
* Standalone signaling server for the Nextcloud Spreed app.
|
|
* Copyright (C) 2017 struktur AG
|
|
*
|
|
* @author Joachim Bauch <bauch@struktur.de>
|
|
*
|
|
* @license GNU AGPL version 3 or any later version
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
package signaling
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/ed25519"
|
|
"crypto/elliptic"
|
|
"crypto/rand"
|
|
"crypto/rsa"
|
|
"crypto/x509"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"encoding/pem"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dlintw/goconf"
|
|
"github.com/golang-jwt/jwt/v5"
|
|
"github.com/gorilla/mux"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const (
|
|
testDefaultUserId = "test-userid"
|
|
authAnonymousUserId = "anonymous-userid"
|
|
|
|
testTimeout = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
testRoomProperties = []byte("{\"prop1\":\"value1\"}")
|
|
)
|
|
|
|
var (
|
|
clusteredTests = []string{
|
|
"local",
|
|
"clustered",
|
|
}
|
|
|
|
testHelloV2Algorithms = []string{
|
|
"RSA",
|
|
"ECDSA",
|
|
"Ed25519",
|
|
"Ed25519_Nextcloud",
|
|
}
|
|
)
|
|
|
|
// Only used for testing.
|
|
func (h *Hub) getRoom(id string) *Room {
|
|
h.ru.RLock()
|
|
defer h.ru.RUnlock()
|
|
// TODO: The same room might exist on different backends.
|
|
for _, room := range h.rooms {
|
|
if room.Id() == id {
|
|
return room
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isLocalTest(t *testing.T) bool {
|
|
return strings.HasSuffix(t.Name(), "/local")
|
|
}
|
|
|
|
func getTestConfig(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
config := goconf.NewConfigFile()
|
|
u, err := url.Parse(server.URL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config.AddOption("backend", "allowed", u.Host)
|
|
if u.Scheme == "http" {
|
|
config.AddOption("backend", "allowhttp", "true")
|
|
}
|
|
config.AddOption("backend", "secret", string(testBackendSecret))
|
|
config.AddOption("sessions", "hashkey", "12345678901234567890123456789012")
|
|
config.AddOption("sessions", "blockkey", "09876543210987654321098765432109")
|
|
config.AddOption("clients", "internalsecret", string(testInternalSecret))
|
|
config.AddOption("geoip", "url", "none")
|
|
return config, nil
|
|
}
|
|
|
|
func getTestConfigWithMultipleBackends(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
config, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.RemoveOption("backend", "allowed")
|
|
config.RemoveOption("backend", "secret")
|
|
config.AddOption("backend", "backends", "backend1, backend2")
|
|
|
|
config.AddOption("backend1", "url", server.URL+"/one")
|
|
config.AddOption("backend1", "secret", string(testBackendSecret))
|
|
|
|
config.AddOption("backend2", "url", server.URL+"/two/")
|
|
config.AddOption("backend2", "secret", string(testBackendSecret))
|
|
return config, nil
|
|
}
|
|
|
|
func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
|
|
require := require.New(t)
|
|
r := mux.NewRouter()
|
|
registerBackendHandler(t, r)
|
|
|
|
server := httptest.NewServer(r)
|
|
t.Cleanup(func() {
|
|
server.Close()
|
|
})
|
|
|
|
events := getAsyncEventsForTest(t)
|
|
config, err := getConfigFunc(server)
|
|
require.NoError(err)
|
|
h, err := NewHub(config, events, nil, nil, nil, r, "no-version")
|
|
require.NoError(err)
|
|
b, err := NewBackendServer(config, h, "no-version")
|
|
require.NoError(err)
|
|
require.NoError(b.Start(r))
|
|
|
|
go h.Run()
|
|
|
|
t.Cleanup(func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
WaitForHub(ctx, t, h)
|
|
})
|
|
|
|
return h, events, r, server
|
|
}
|
|
|
|
func CreateHubForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
|
|
return CreateHubForTestWithConfig(t, getTestConfig)
|
|
}
|
|
|
|
func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
|
|
h, events, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleBackends)
|
|
registerBackendHandlerUrl(t, r, "/one")
|
|
registerBackendHandlerUrl(t, r, "/two")
|
|
return h, events, r, server
|
|
}
|
|
|
|
func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, *Hub, *mux.Router, *mux.Router, *httptest.Server, *httptest.Server) {
|
|
require := require.New(t)
|
|
r1 := mux.NewRouter()
|
|
registerBackendHandler(t, r1)
|
|
|
|
server1 := httptest.NewServer(r1)
|
|
t.Cleanup(func() {
|
|
server1.Close()
|
|
})
|
|
|
|
r2 := mux.NewRouter()
|
|
registerBackendHandler(t, r2)
|
|
|
|
server2 := httptest.NewServer(r2)
|
|
t.Cleanup(func() {
|
|
server2.Close()
|
|
})
|
|
|
|
nats1, _ := startLocalNatsServer(t)
|
|
var nats2 *server.Server
|
|
if strings.Contains(t.Name(), "Federation") {
|
|
nats2, _ = startLocalNatsServer(t)
|
|
} else {
|
|
nats2 = nats1
|
|
}
|
|
grpcServer1, addr1 := NewGrpcServerForTest(t)
|
|
grpcServer2, addr2 := NewGrpcServerForTest(t)
|
|
|
|
if strings.Contains(t.Name(), "Federation") {
|
|
// Signaling servers should not form a cluster in federation tests.
|
|
addr1, addr2 = addr2, addr1
|
|
}
|
|
|
|
events1, err := NewAsyncEvents(nats1.ClientURL())
|
|
require.NoError(err)
|
|
t.Cleanup(func() {
|
|
events1.Close()
|
|
})
|
|
config1, err := getConfigFunc(server1)
|
|
require.NoError(err)
|
|
client1, _ := NewGrpcClientsForTest(t, addr2)
|
|
h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
|
|
require.NoError(err)
|
|
b1, err := NewBackendServer(config1, h1, "no-version")
|
|
require.NoError(err)
|
|
events2, err := NewAsyncEvents(nats2.ClientURL())
|
|
require.NoError(err)
|
|
t.Cleanup(func() {
|
|
events2.Close()
|
|
})
|
|
config2, err := getConfigFunc(server2)
|
|
require.NoError(err)
|
|
client2, _ := NewGrpcClientsForTest(t, addr1)
|
|
h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
|
|
require.NoError(err)
|
|
b2, err := NewBackendServer(config2, h2, "no-version")
|
|
require.NoError(err)
|
|
require.NoError(b1.Start(r1))
|
|
require.NoError(b2.Start(r2))
|
|
|
|
go h1.Run()
|
|
go h2.Run()
|
|
|
|
t.Cleanup(func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
WaitForHub(ctx, t, h1)
|
|
WaitForHub(ctx, t, h2)
|
|
})
|
|
|
|
return h1, h2, r1, r2, server1, server2
|
|
}
|
|
|
|
func CreateClusteredHubsForTest(t *testing.T) (*Hub, *Hub, *httptest.Server, *httptest.Server) {
|
|
h1, h2, _, _, server1, server2 := CreateClusteredHubsForTestWithConfig(t, getTestConfig)
|
|
return h1, h2, server1, server2
|
|
}
|
|
|
|
func WaitForHub(ctx context.Context, t *testing.T, h *Hub) {
|
|
// Wait for any channel messages to be processed.
|
|
time.Sleep(10 * time.Millisecond)
|
|
h.Stop()
|
|
for {
|
|
h.mu.Lock()
|
|
clients := len(h.clients)
|
|
sessions := len(h.sessions)
|
|
remoteSessions := len(h.remoteSessions)
|
|
h.mu.Unlock()
|
|
h.ru.Lock()
|
|
rooms := len(h.rooms)
|
|
h.ru.Unlock()
|
|
readActive := h.readPumpActive.Load()
|
|
writeActive := h.writePumpActive.Load()
|
|
if clients == 0 &&
|
|
rooms == 0 &&
|
|
sessions == 0 &&
|
|
remoteSessions == 0 &&
|
|
readActive == 0 &&
|
|
writeActive == 0 {
|
|
break
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
h.mu.Lock()
|
|
h.ru.Lock()
|
|
dumpGoroutines("", os.Stderr)
|
|
assert.Fail(t, "Error waiting for clients %+v / rooms %+v / sessions %+v / remoteSessions %v / %d read / %d write to terminate: %s", h.clients, h.rooms, h.sessions, h.remoteSessions, readActive, writeActive, ctx.Err())
|
|
h.ru.Unlock()
|
|
h.mu.Unlock()
|
|
return
|
|
default:
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
func validateBackendChecksum(t *testing.T, f func(http.ResponseWriter, *http.Request, *BackendClientRequest) *BackendClientResponse) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
require := require.New(t)
|
|
body, err := io.ReadAll(r.Body)
|
|
require.NoError(err)
|
|
|
|
rnd := r.Header.Get(HeaderBackendSignalingRandom)
|
|
checksum := r.Header.Get(HeaderBackendSignalingChecksum)
|
|
if rnd == "" || checksum == "" {
|
|
require.Fail("No checksum headers found in request to %s", r.URL)
|
|
}
|
|
|
|
if verify := CalculateBackendChecksum(rnd, body, testBackendSecret); verify != checksum {
|
|
require.Fail("Backend checksum verification failed for request to %s", r.URL)
|
|
}
|
|
|
|
var request BackendClientRequest
|
|
require.NoError(json.Unmarshal(body, &request))
|
|
|
|
response := f(w, r, &request)
|
|
if response == nil {
|
|
// Function already returned a response.
|
|
return
|
|
}
|
|
|
|
data, err := json.Marshal(response)
|
|
require.NoError(err)
|
|
|
|
if r.Header.Get("OCS-APIRequest") != "" {
|
|
var ocs OcsResponse
|
|
ocs.Ocs = &OcsBody{
|
|
Meta: OcsMeta{
|
|
Status: "ok",
|
|
StatusCode: http.StatusOK,
|
|
Message: http.StatusText(http.StatusOK),
|
|
},
|
|
Data: data,
|
|
}
|
|
data, err = json.Marshal(ocs)
|
|
require.NoError(err)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write(data) // nolint
|
|
}
|
|
}
|
|
|
|
func processAuthRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
|
|
require := require.New(t)
|
|
if request.Type != "auth" || request.Auth == nil {
|
|
require.Fail("Expected an auth backend request, got %+v", request)
|
|
}
|
|
|
|
var params TestBackendClientAuthParams
|
|
if len(request.Auth.Params) > 0 {
|
|
require.NoError(json.Unmarshal(request.Auth.Params, ¶ms))
|
|
}
|
|
if params.UserId == "" {
|
|
params.UserId = testDefaultUserId
|
|
} else if params.UserId == authAnonymousUserId {
|
|
params.UserId = ""
|
|
}
|
|
|
|
response := &BackendClientResponse{
|
|
Type: "auth",
|
|
Auth: &BackendClientAuthResponse{
|
|
Version: BackendVersion,
|
|
UserId: params.UserId,
|
|
},
|
|
}
|
|
userdata := map[string]string{
|
|
"displayname": "Displayname " + params.UserId,
|
|
}
|
|
data, err := json.Marshal(userdata)
|
|
require.NoError(err)
|
|
response.Auth.User = data
|
|
return response
|
|
}
|
|
|
|
func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
if request.Type != "room" || request.Room == nil {
|
|
require.Fail("Expected an room backend request, got %+v", request)
|
|
}
|
|
|
|
switch request.Room.RoomId {
|
|
case "test-room-slow":
|
|
time.Sleep(100 * time.Millisecond)
|
|
case "test-room-takeover-room-session":
|
|
// Additional checks for testcase "TestClientTakeoverRoomSession"
|
|
if request.Room.Action == "leave" && request.Room.UserId == "test-userid1" {
|
|
assert.Fail("Should not receive \"leave\" event for first user, received %+v", request.Room)
|
|
}
|
|
case "test-invalid-room":
|
|
response := &BackendClientResponse{
|
|
Type: "error",
|
|
Error: &Error{
|
|
Code: "no_such_room",
|
|
Message: "The user is not invited to this room.",
|
|
},
|
|
}
|
|
return response
|
|
}
|
|
|
|
if strings.Contains(t.Name(), "Federation") {
|
|
// Check additional fields present for federated sessions.
|
|
if strings.Contains(request.Room.SessionId, "@federated") {
|
|
assert.Equal(ActorTypeFederatedUsers, request.Room.ActorType)
|
|
assert.NotEmpty(request.Room.ActorId)
|
|
} else {
|
|
assert.Empty(request.Room.ActorType)
|
|
assert.Empty(request.Room.ActorId)
|
|
}
|
|
}
|
|
|
|
// Allow joining any room.
|
|
response := &BackendClientResponse{
|
|
Type: "room",
|
|
Room: &BackendClientRoomResponse{
|
|
Version: BackendVersion,
|
|
RoomId: request.Room.RoomId,
|
|
Properties: testRoomProperties,
|
|
},
|
|
}
|
|
switch request.Room.RoomId {
|
|
case "test-room-with-sessiondata":
|
|
data := map[string]string{
|
|
"userid": "userid-from-sessiondata",
|
|
}
|
|
tmp, err := json.Marshal(data)
|
|
require.NoError(err)
|
|
response.Room.Session = tmp
|
|
case "test-room-initial-permissions":
|
|
permissions := []Permission{PERMISSION_MAY_PUBLISH_AUDIO}
|
|
response.Room.Permissions = &permissions
|
|
}
|
|
return response
|
|
}
|
|
|
|
var (
|
|
sessionRequestHander struct {
|
|
sync.Mutex
|
|
handlers map[*testing.T]func(*BackendClientSessionRequest)
|
|
}
|
|
)
|
|
|
|
func setSessionRequestHandler(t *testing.T, f func(*BackendClientSessionRequest)) {
|
|
sessionRequestHander.Lock()
|
|
defer sessionRequestHander.Unlock()
|
|
if sessionRequestHander.handlers == nil {
|
|
sessionRequestHander.handlers = make(map[*testing.T]func(*BackendClientSessionRequest))
|
|
}
|
|
if _, found := sessionRequestHander.handlers[t]; !found {
|
|
t.Cleanup(func() {
|
|
sessionRequestHander.Lock()
|
|
defer sessionRequestHander.Unlock()
|
|
|
|
delete(sessionRequestHander.handlers, t)
|
|
})
|
|
}
|
|
sessionRequestHander.handlers[t] = f
|
|
}
|
|
|
|
func clearSessionRequestHandler(t *testing.T) { // nolint
|
|
sessionRequestHander.Lock()
|
|
defer sessionRequestHander.Unlock()
|
|
|
|
delete(sessionRequestHander.handlers, t)
|
|
}
|
|
|
|
func processSessionRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
|
|
if request.Type != "session" || request.Session == nil {
|
|
require.Fail(t, "Expected an session backend request, got %+v", request)
|
|
}
|
|
|
|
sessionRequestHander.Lock()
|
|
defer sessionRequestHander.Unlock()
|
|
if f, found := sessionRequestHander.handlers[t]; found {
|
|
f(request.Session)
|
|
}
|
|
|
|
response := &BackendClientResponse{
|
|
Type: "session",
|
|
Session: &BackendClientSessionResponse{
|
|
Version: BackendVersion,
|
|
RoomId: request.Session.RoomId,
|
|
},
|
|
}
|
|
return response
|
|
}
|
|
|
|
var pingRequests map[*testing.T][]*BackendClientRequest
|
|
|
|
func getPingRequests(t *testing.T) []*BackendClientRequest {
|
|
return pingRequests[t]
|
|
}
|
|
|
|
func clearPingRequests(t *testing.T) {
|
|
delete(pingRequests, t)
|
|
}
|
|
|
|
func storePingRequest(t *testing.T, request *BackendClientRequest) {
|
|
if entries, found := pingRequests[t]; !found {
|
|
if pingRequests == nil {
|
|
pingRequests = make(map[*testing.T][]*BackendClientRequest)
|
|
}
|
|
pingRequests[t] = []*BackendClientRequest{
|
|
request,
|
|
}
|
|
t.Cleanup(func() {
|
|
clearPingRequests(t)
|
|
})
|
|
} else {
|
|
pingRequests[t] = append(entries, request)
|
|
}
|
|
}
|
|
|
|
func processPingRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
|
|
if request.Type != "ping" || request.Ping == nil {
|
|
require.Fail(t, "Expected an ping backend request, got %+v", request)
|
|
}
|
|
|
|
if request.Ping.RoomId == "test-room-with-sessiondata" {
|
|
if entries := request.Ping.Entries; assert.Len(t, entries, 1) {
|
|
assert.Empty(t, entries[0].UserId)
|
|
}
|
|
}
|
|
|
|
storePingRequest(t, request)
|
|
|
|
response := &BackendClientResponse{
|
|
Type: "ping",
|
|
Ping: &BackendClientRingResponse{
|
|
Version: BackendVersion,
|
|
RoomId: request.Ping.RoomId,
|
|
},
|
|
}
|
|
return response
|
|
}
|
|
|
|
func ensureAuthTokens(t *testing.T) (string, string) {
|
|
require := require.New(t)
|
|
if privateKey := os.Getenv("PRIVATE_AUTH_TOKEN_" + t.Name()); privateKey != "" {
|
|
publicKey := os.Getenv("PUBLIC_AUTH_TOKEN_" + t.Name())
|
|
// should not happen, always both keys are created
|
|
require.NotEmpty(publicKey, "public key is empty")
|
|
return privateKey, publicKey
|
|
}
|
|
|
|
var private []byte
|
|
var public []byte
|
|
|
|
if strings.Contains(t.Name(), "ECDSA") {
|
|
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
|
require.NoError(err)
|
|
|
|
private, err = x509.MarshalECPrivateKey(key)
|
|
require.NoError(err)
|
|
private = pem.EncodeToMemory(&pem.Block{
|
|
Type: "ECDSA PRIVATE KEY",
|
|
Bytes: private,
|
|
})
|
|
|
|
public, err = x509.MarshalPKIXPublicKey(&key.PublicKey)
|
|
require.NoError(err)
|
|
public = pem.EncodeToMemory(&pem.Block{
|
|
Type: "ECDSA PUBLIC KEY",
|
|
Bytes: public,
|
|
})
|
|
} else if strings.Contains(t.Name(), "Ed25519") {
|
|
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
|
|
require.NoError(err)
|
|
|
|
private, err = x509.MarshalPKCS8PrivateKey(privateKey)
|
|
require.NoError(err)
|
|
private = pem.EncodeToMemory(&pem.Block{
|
|
Type: "Ed25519 PRIVATE KEY",
|
|
Bytes: private,
|
|
})
|
|
|
|
public, err = x509.MarshalPKIXPublicKey(publicKey)
|
|
require.NoError(err)
|
|
public = pem.EncodeToMemory(&pem.Block{
|
|
Type: "Ed25519 PUBLIC KEY",
|
|
Bytes: public,
|
|
})
|
|
} else {
|
|
key, err := rsa.GenerateKey(rand.Reader, 1024)
|
|
require.NoError(err)
|
|
|
|
private = pem.EncodeToMemory(&pem.Block{
|
|
Type: "RSA PRIVATE KEY",
|
|
Bytes: x509.MarshalPKCS1PrivateKey(key),
|
|
})
|
|
|
|
public, err = x509.MarshalPKIXPublicKey(&key.PublicKey)
|
|
require.NoError(err)
|
|
public = pem.EncodeToMemory(&pem.Block{
|
|
Type: "RSA PUBLIC KEY",
|
|
Bytes: public,
|
|
})
|
|
}
|
|
|
|
privateKey := base64.StdEncoding.EncodeToString(private)
|
|
t.Setenv("PRIVATE_AUTH_TOKEN_"+t.Name(), privateKey)
|
|
publicKey := base64.StdEncoding.EncodeToString(public)
|
|
t.Setenv("PUBLIC_AUTH_TOKEN_"+t.Name(), publicKey)
|
|
return privateKey, publicKey
|
|
}
|
|
|
|
func getPrivateAuthToken(t *testing.T) (key interface{}) {
|
|
private, _ := ensureAuthTokens(t)
|
|
data, err := base64.StdEncoding.DecodeString(private)
|
|
require.NoError(t, err)
|
|
if strings.Contains(t.Name(), "ECDSA") {
|
|
key, err = jwt.ParseECPrivateKeyFromPEM(data)
|
|
} else if strings.Contains(t.Name(), "Ed25519") {
|
|
key, err = jwt.ParseEdPrivateKeyFromPEM(data)
|
|
} else {
|
|
key, err = jwt.ParseRSAPrivateKeyFromPEM(data)
|
|
}
|
|
require.NoError(t, err)
|
|
return key
|
|
}
|
|
|
|
func getPublicAuthToken(t *testing.T) (key interface{}) {
|
|
_, public := ensureAuthTokens(t)
|
|
data, err := base64.StdEncoding.DecodeString(public)
|
|
require.NoError(t, err)
|
|
if strings.Contains(t.Name(), "ECDSA") {
|
|
key, err = jwt.ParseECPublicKeyFromPEM(data)
|
|
} else if strings.Contains(t.Name(), "Ed25519") {
|
|
key, err = jwt.ParseEdPublicKeyFromPEM(data)
|
|
} else {
|
|
key, err = jwt.ParseRSAPublicKeyFromPEM(data)
|
|
}
|
|
require.NoError(t, err)
|
|
return key
|
|
}
|
|
|
|
func registerBackendHandler(t *testing.T, router *mux.Router) {
|
|
registerBackendHandlerUrl(t, router, "/")
|
|
}
|
|
|
|
func registerBackendHandlerUrl(t *testing.T, router *mux.Router, url string) {
|
|
handleFunc := validateBackendChecksum(t, func(w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
|
|
switch request.Type {
|
|
case "auth":
|
|
return processAuthRequest(t, w, r, request)
|
|
case "room":
|
|
return processRoomRequest(t, w, r, request)
|
|
case "session":
|
|
return processSessionRequest(t, w, r, request)
|
|
case "ping":
|
|
return processPingRequest(t, w, r, request)
|
|
default:
|
|
require.Fail(t, "Unsupported request received: %+v", request)
|
|
return nil
|
|
}
|
|
})
|
|
|
|
router.HandleFunc(url, handleFunc)
|
|
if !strings.HasSuffix(url, "/") {
|
|
url += "/"
|
|
}
|
|
|
|
handleCapabilitiesFunc := func(w http.ResponseWriter, r *http.Request) {
|
|
features := []string{
|
|
"foo",
|
|
"bar",
|
|
}
|
|
if strings.Contains(t.Name(), "V3Api") {
|
|
features = append(features, "signaling-v3")
|
|
}
|
|
if strings.Contains(t.Name(), "Federation") {
|
|
features = append(features, "federation-v2")
|
|
}
|
|
signaling := map[string]interface{}{
|
|
"foo": "bar",
|
|
"baz": 42,
|
|
}
|
|
config := map[string]interface{}{
|
|
"signaling": signaling,
|
|
}
|
|
if strings.Contains(t.Name(), "MultiRoom") {
|
|
signaling[ConfigKeySessionPingLimit] = 2
|
|
}
|
|
useV2 := true
|
|
if os.Getenv("SKIP_V2_CAPABILITIES") != "" {
|
|
useV2 = false
|
|
}
|
|
if (strings.Contains(t.Name(), "V2") && useV2) || strings.Contains(t.Name(), "Federation") {
|
|
key := getPublicAuthToken(t)
|
|
public, err := x509.MarshalPKIXPublicKey(key)
|
|
require.NoError(t, err)
|
|
var pemType string
|
|
if strings.Contains(t.Name(), "ECDSA") {
|
|
pemType = "ECDSA PUBLIC KEY"
|
|
} else if strings.Contains(t.Name(), "Ed25519") {
|
|
pemType = "Ed25519 PUBLIC KEY"
|
|
} else {
|
|
pemType = "RSA PUBLIC KEY"
|
|
}
|
|
|
|
public = pem.EncodeToMemory(&pem.Block{
|
|
Type: pemType,
|
|
Bytes: public,
|
|
})
|
|
if strings.Contains(t.Name(), "Ed25519_Nextcloud") {
|
|
// Simulate Nextcloud which returns the Ed25519 key as base64-encoded data.
|
|
encoded := base64.StdEncoding.EncodeToString(key.(ed25519.PublicKey))
|
|
signaling[ConfigKeyHelloV2TokenKey] = encoded
|
|
} else {
|
|
signaling[ConfigKeyHelloV2TokenKey] = string(public)
|
|
}
|
|
}
|
|
spreedCapa, _ := json.Marshal(map[string]interface{}{
|
|
"features": features,
|
|
"config": config,
|
|
})
|
|
response := &CapabilitiesResponse{
|
|
Version: CapabilitiesVersion{
|
|
Major: 20,
|
|
},
|
|
Capabilities: map[string]json.RawMessage{
|
|
"spreed": spreedCapa,
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(response)
|
|
assert.NoError(t, err, "Could not marshal %+v", response)
|
|
|
|
var ocs OcsResponse
|
|
ocs.Ocs = &OcsBody{
|
|
Meta: OcsMeta{
|
|
Status: "ok",
|
|
StatusCode: http.StatusOK,
|
|
Message: http.StatusText(http.StatusOK),
|
|
},
|
|
Data: data,
|
|
}
|
|
data, err = json.Marshal(ocs)
|
|
require.NoError(t, err)
|
|
w.Header().Add("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write(data) // nolint
|
|
}
|
|
router.HandleFunc(url+"ocs/v2.php/cloud/capabilities", handleCapabilitiesFunc)
|
|
|
|
if strings.Contains(t.Name(), "V3Api") {
|
|
router.HandleFunc(url+"ocs/v2.php/apps/spreed/api/v3/signaling/backend", handleFunc)
|
|
} else {
|
|
router.HandleFunc(url+"ocs/v2.php/apps/spreed/api/v1/signaling/backend", handleFunc)
|
|
}
|
|
}
|
|
|
|
func performHousekeeping(hub *Hub, now time.Time) *sync.WaitGroup {
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
hub.performHousekeeping(now)
|
|
wg.Done()
|
|
}()
|
|
return &wg
|
|
}
|
|
|
|
func TestWebsocketFeatures(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
_, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
conn, response, err := websocket.DefaultDialer.DialContext(ctx, getWebsocketUrl(server.URL), nil)
|
|
require.NoError(err)
|
|
defer conn.Close() // nolint
|
|
|
|
serverHeader := response.Header.Get("Server")
|
|
assert.True(strings.HasPrefix(serverHeader, "nextcloud-spreed-signaling/"), "expected valid server header, got \"%s\"", serverHeader)
|
|
features := response.Header.Get("X-Spreed-Signaling-Features")
|
|
featuresList := make(map[string]bool)
|
|
for _, f := range strings.Split(features, ",") {
|
|
f = strings.TrimSpace(f)
|
|
if f != "" {
|
|
_, found := featuresList[f]
|
|
assert.False(found, "duplicate feature id \"%s\" in \"%s\"", f, features)
|
|
featuresList[f] = true
|
|
}
|
|
}
|
|
if len(featuresList) <= 1 {
|
|
assert.Fail("expected valid features header, got \"%s\"", features)
|
|
}
|
|
_, found := featuresList["hello-v2"]
|
|
assert.True(found, "expected feature \"hello-v2\", got \"%s\"", features)
|
|
|
|
assert.NoError(conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{}))
|
|
}
|
|
|
|
func TestInitialWelcome(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client := NewTestClientContext(ctx, t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
msg, err := client.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
|
|
assert.Equal("welcome", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Welcome, "%+v", msg) {
|
|
assert.NotEmpty(msg.Welcome.Version, "%+v", msg)
|
|
assert.NotEmpty(msg.Welcome.Features, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
func TestExpectClientHello(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
// The server will send an error and close the connection if no "Hello"
|
|
// is sent.
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
// Perform housekeeping in the future, this will cause the connection to
|
|
// be terminated due to the missing "Hello" request.
|
|
performHousekeeping(hub, time.Now().Add(initialHelloTimeout+time.Second))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
message2, err := client.RunUntilMessage(ctx)
|
|
if message2 != nil {
|
|
require.Fail("Received multiple messages, already have %+v, also got %+v", message, message2)
|
|
}
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "bye"); assert.NoError(err) {
|
|
assert.Equal("hello_timeout", message.Bye.Reason, "%+v", message.Bye)
|
|
}
|
|
}
|
|
|
|
func TestExpectClientHelloUnsupportedVersion(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
params := TestBackendClientAuthParams{
|
|
UserId: testDefaultUserId,
|
|
}
|
|
require.NoError(client.SendHelloParams(server.URL, "0.0", "", nil, params))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "error"); assert.NoError(err) {
|
|
assert.Equal("invalid_hello_version", message.Error.Code)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV1(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloV2(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
|
|
data := hub.decodePublicSessionId(hello.Hello.SessionId)
|
|
require.NotNil(data, "Could not decode session id: %s", hello.Hello.SessionId)
|
|
|
|
hub.mu.RLock()
|
|
session := hub.sessions[data.Sid]
|
|
hub.mu.RUnlock()
|
|
require.NotNil(session, "Could not get session for id %+v", data)
|
|
|
|
var userdata map[string]string
|
|
require.NoError(json.Unmarshal(session.UserData(), &userdata))
|
|
|
|
assert.Equal("Displayname "+testDefaultUserId, userdata["displayname"])
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_IssuedInFuture(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
issuedAt := time.Now().Add(tokenLeeway / 2)
|
|
expiresAt := issuedAt.Add(time.Second)
|
|
require.NoError(client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_IssuedFarInFuture(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
issuedAt := time.Now().Add(tokenLeeway * 2)
|
|
expiresAt := issuedAt.Add(time.Second)
|
|
require.NoError(client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "error"); assert.NoError(err) {
|
|
assert.Equal("token_not_valid_yet", message.Error.Code, "%+v", message)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_Expired(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
issuedAt := time.Now().Add(-tokenLeeway * 3)
|
|
expiresAt := time.Now().Add(-tokenLeeway * 2)
|
|
require.NoError(client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "error"); assert.NoError(err) {
|
|
assert.Equal("token_expired", message.Error.Code, "%+v", message)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_IssuedAtMissing(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
var issuedAt time.Time
|
|
expiresAt := time.Now().Add(time.Minute)
|
|
require.NoError(client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "error"); assert.NoError(err) {
|
|
assert.Equal("token_not_valid_yet", message.Error.Code, "%+v", message)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_ExpiresAtMissing(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
issuedAt := time.Now()
|
|
var expiresAt time.Time
|
|
require.NoError(client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
if err := checkMessageType(message, "error"); assert.NoError(err) {
|
|
assert.Equal("token_expired", message.Error.Code, "%+v", message)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloV2_CachedCapabilities(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, algo := range testHelloV2Algorithms {
|
|
t.Run(algo, func(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
// Simulate old-style Nextcloud without capabilities for Hello V2.
|
|
t.Setenv("SKIP_V2_CAPABILITIES", "1")
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHelloV1(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId+"1", hello1.Hello.UserId, "%+v", hello1.Hello)
|
|
assert.NotEmpty(hello1.Hello.SessionId, "%+v", hello1.Hello)
|
|
|
|
// Simulate updated Nextcloud with capabilities for Hello V2.
|
|
t.Setenv("SKIP_V2_CAPABILITIES", "")
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloV2(testDefaultUserId + "2"))
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId+"2", hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.NotEmpty(hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientHelloWithSpaces(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
userId := "test user with spaces"
|
|
require.NoError(client.SendHello(userId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(userId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloAllowAll(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
config, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.RemoveOption("backend", "allowed")
|
|
config.AddOption("backend", "allowall", "true")
|
|
return config, nil
|
|
})
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloSessionLimit(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
var router1 *mux.Router
|
|
hub1, _, router1, server1 = CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
config, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.RemoveOption("backend", "allowed")
|
|
config.RemoveOption("backend", "secret")
|
|
config.AddOption("backend", "backends", "backend1, backend2")
|
|
|
|
config.AddOption("backend1", "url", server.URL+"/one")
|
|
config.AddOption("backend1", "secret", string(testBackendSecret))
|
|
config.AddOption("backend1", "sessionlimit", "1")
|
|
|
|
config.AddOption("backend2", "url", server.URL+"/two")
|
|
config.AddOption("backend2", "secret", string(testBackendSecret))
|
|
return config, nil
|
|
})
|
|
|
|
registerBackendHandlerUrl(t, router1, "/one")
|
|
registerBackendHandlerUrl(t, router1, "/two")
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
var router1 *mux.Router
|
|
var router2 *mux.Router
|
|
hub1, hub2, router1, router2, server1, server2 = CreateClusteredHubsForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
// Make sure all backends use the same server
|
|
if server1 == nil {
|
|
server1 = server
|
|
} else {
|
|
server = server1
|
|
}
|
|
|
|
config, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.RemoveOption("backend", "allowed")
|
|
config.RemoveOption("backend", "secret")
|
|
config.AddOption("backend", "backends", "backend1, backend2")
|
|
|
|
config.AddOption("backend1", "url", server.URL+"/one")
|
|
config.AddOption("backend1", "secret", string(testBackendSecret))
|
|
config.AddOption("backend1", "sessionlimit", "1")
|
|
|
|
config.AddOption("backend2", "url", server.URL+"/two")
|
|
config.AddOption("backend2", "secret", string(testBackendSecret))
|
|
return config, nil
|
|
})
|
|
|
|
registerBackendHandlerUrl(t, router1, "/one")
|
|
registerBackendHandlerUrl(t, router1, "/two")
|
|
|
|
registerBackendHandlerUrl(t, router2, "/one")
|
|
registerBackendHandlerUrl(t, router2, "/two")
|
|
}
|
|
|
|
client := NewTestClient(t, server1, hub1)
|
|
defer client.CloseWithBye()
|
|
|
|
params1 := TestBackendClientAuthParams{
|
|
UserId: testDefaultUserId,
|
|
}
|
|
require.NoError(client.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params1))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
}
|
|
|
|
// The second client can't connect as it would exceed the session limit.
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
params2 := TestBackendClientAuthParams{
|
|
UserId: testDefaultUserId + "2",
|
|
}
|
|
require.NoError(client2.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params2))
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("session_limit_exceeded", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
// The client can connect to a different backend.
|
|
require.NoError(client2.SendHelloParams(server1.URL+"/two", HelloVersionV1, "client", nil, params2))
|
|
|
|
if hello, err := client2.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId+"2", hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
}
|
|
|
|
// If the first client disconnects (and releases the session), a new one can connect.
|
|
client.CloseWithBye()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
client3 := NewTestClient(t, server2, hub2)
|
|
defer client3.CloseWithBye()
|
|
|
|
params3 := TestBackendClientAuthParams{
|
|
UserId: testDefaultUserId + "3",
|
|
}
|
|
require.NoError(client3.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params3))
|
|
|
|
if hello, err := client3.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId+"3", hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSessionIdsUnordered(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
publicSessionIds := make([]string, 0)
|
|
for i := 0; i < 20; i++ {
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
|
|
data := hub.decodePublicSessionId(hello.Hello.SessionId)
|
|
if !assert.NotNil(data, "Could not decode session id: %s", hello.Hello.SessionId) {
|
|
break
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
session := hub.sessions[data.Sid]
|
|
hub.mu.RUnlock()
|
|
if !assert.NotNil(session, "Could not get session for id %+v", data) {
|
|
break
|
|
}
|
|
|
|
publicSessionIds = append(publicSessionIds, session.PublicId())
|
|
}
|
|
}
|
|
|
|
require.NotEmpty(publicSessionIds, "no session ids decoded")
|
|
|
|
larger := 0
|
|
smaller := 0
|
|
prevSid := ""
|
|
for i, sid := range publicSessionIds {
|
|
if i > 0 {
|
|
if sid > prevSid {
|
|
larger++
|
|
} else if sid < prevSid {
|
|
smaller--
|
|
} else {
|
|
assert.Fail("should not have received the same session id twice")
|
|
}
|
|
}
|
|
prevSid = sid
|
|
}
|
|
|
|
// Public session ids should not be ordered.
|
|
assert.NotEqual(larger, len(publicSessionIds), "the session ids are all larger than the previous ones")
|
|
assert.NotEqual(smaller, len(publicSessionIds), "the session ids are all smaller than the previous ones")
|
|
}
|
|
|
|
func TestClientHelloResume(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
require.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
if hello2, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloResumeThrottle(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
timing := &throttlerTiming{
|
|
t: t,
|
|
now: time.Now(),
|
|
}
|
|
th := newMemoryThrottlerForTest(t)
|
|
th.getNow = timing.getNow
|
|
th.doDelay = timing.doDelay
|
|
hub.throttler = th
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
timing.expectedSleep = 100 * time.Millisecond
|
|
require.NoError(client.SendHelloResume("this-is-invalid"))
|
|
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId)
|
|
assert.NotEmpty(hello.Hello.SessionId)
|
|
assert.NotEmpty(hello.Hello.ResumeId)
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
// Perform housekeeping in the future, this will cause the session to be
|
|
// cleaned up after it is expired.
|
|
performHousekeeping(hub, time.Now().Add(sessionExpireDuration+time.Second)).Wait()
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
// Valid but expired resume ids will not be throttled.
|
|
timing.expectedSleep = 0 * time.Millisecond
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
timing.expectedSleep = 200 * time.Millisecond
|
|
require.NoError(client.SendHelloResume("this-is-invalid"))
|
|
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClientHelloResumeExpired(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
// Perform housekeeping in the future, this will cause the session to be
|
|
// cleaned up after it is expired.
|
|
performHousekeeping(hub, time.Now().Add(sessionExpireDuration+time.Second)).Wait()
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClientHelloResumeTakeover(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
require.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloResume(hello.Hello.ResumeId))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// The first client got disconnected with a reason in a "Bye" message.
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("bye", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Bye, "%+v", msg) {
|
|
assert.Equal("session_resumed", msg.Bye.Reason, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); err == nil {
|
|
assert.Fail("Expected error but received %+v", msg)
|
|
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
|
assert.Fail("Expected close error but received %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloResumeOtherHub(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
// Simulate a restart of the hub.
|
|
hub.sid.Store(0)
|
|
sessions := make([]Session, 0)
|
|
hub.mu.Lock()
|
|
for _, session := range hub.sessions {
|
|
sessions = append(sessions, session)
|
|
}
|
|
hub.mu.Unlock()
|
|
for _, session := range sessions {
|
|
session.Close()
|
|
}
|
|
hub.mu.Lock()
|
|
count := len(hub.sessions)
|
|
hub.mu.Unlock()
|
|
assert.Equal(0, count, "Should have removed all sessions")
|
|
|
|
// The new client will get the same (internal) sid for his session.
|
|
newClient := NewTestClient(t, server, hub)
|
|
defer newClient.CloseWithBye()
|
|
|
|
require.NoError(newClient.SendHello(testDefaultUserId))
|
|
|
|
hello2, err := newClient.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.NotEmpty(hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.NotEmpty(hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// The previous session (which had the same internal sid) can't be resumed.
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
// Expire old sessions
|
|
hub.performHousekeeping(time.Now().Add(2 * sessionExpireDuration))
|
|
}
|
|
|
|
func TestClientHelloResumePublicId(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
// Test that a client can't resume a "public" session of another user.
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
data := "from-1-to-2"
|
|
client1.SendMessage(recipient2, data) // nolint
|
|
|
|
var payload string
|
|
var sender *MessageServerMessageSender
|
|
if err := checkReceiveClientMessageWithSender(ctx, client2, "session", hello1.Hello, &payload, &sender); assert.NoError(err) {
|
|
assert.Equal(data, payload)
|
|
}
|
|
|
|
client1.Close()
|
|
assert.NoError(client1.WaitForClientRemoved(ctx))
|
|
|
|
client1 = NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
// Can't resume a session with the id received from messages of a client.
|
|
require.NoError(client1.SendHelloResume(sender.SessionId))
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
// Expire old sessions
|
|
hub.performHousekeeping(time.Now().Add(2 * sessionExpireDuration))
|
|
}
|
|
|
|
func TestClientHelloByeResume(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
require.NoError(client.SendBye())
|
|
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(checkMessageType(message, "bye"))
|
|
}
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForSessionRemoved(ctx, hello.Hello.SessionId))
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("error", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Error, "%+v", msg) {
|
|
assert.Equal("no_such_session", msg.Error.Code, "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClientHelloResumeAndJoin(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client.Close()
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
client = NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloResume(hello.Hello.ResumeId))
|
|
hello2, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
}
|
|
|
|
func runGrpcProxyTest(t *testing.T, f func(hub1, hub2 *Hub, server1, server2 *httptest.Server)) {
|
|
t.Helper()
|
|
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
var router1 *mux.Router
|
|
var router2 *mux.Router
|
|
hub1, hub2, router1, router2, server1, server2 = CreateClusteredHubsForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
// Make sure all backends use the same server
|
|
if server1 == nil {
|
|
server1 = server
|
|
} else {
|
|
server = server1
|
|
}
|
|
|
|
config, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.RemoveOption("backend", "allowed")
|
|
config.RemoveOption("backend", "secret")
|
|
config.AddOption("backend", "backends", "backend1")
|
|
|
|
config.AddOption("backend1", "url", server.URL)
|
|
config.AddOption("backend1", "secret", string(testBackendSecret))
|
|
config.AddOption("backend1", "sessionlimit", "1")
|
|
return config, nil
|
|
})
|
|
|
|
registerBackendHandlerUrl(t, router1, "/")
|
|
registerBackendHandlerUrl(t, router2, "/")
|
|
|
|
f(hub1, hub2, server1, server2)
|
|
}
|
|
|
|
func TestClientHelloResumeProxy(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
ensureNoGoroutinesLeak(t, func(t *testing.T) {
|
|
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client1.Close()
|
|
assert.NoError(client1.WaitForClientRemoved(ctx))
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloResume(hello.Hello.ResumeId))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
room := hub1.getRoom(roomId)
|
|
require.NotNil(room, "Could not find room %s", roomId)
|
|
room2 := hub2.getRoom(roomId)
|
|
require.Nil(room2, "Should not have gotten room %s", roomId)
|
|
|
|
users := []map[string]interface{}{
|
|
{
|
|
"sessionId": "the-session-id",
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestClientHelloResumeProxy_Takeover(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
ensureNoGoroutinesLeak(t, func(t *testing.T) {
|
|
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
require.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloResume(hello.Hello.ResumeId))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// The first client got disconnected with a reason in a "Bye" message.
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("bye", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Bye, "%+v", msg) {
|
|
assert.Equal("session_resumed", msg.Bye.Reason, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); err == nil {
|
|
assert.Fail("Expected error but received %+v", msg)
|
|
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
|
assert.Fail("Expected close error but received %+v", err)
|
|
}
|
|
|
|
client3 := NewTestClient(t, server1, hub1)
|
|
defer client3.CloseWithBye()
|
|
|
|
require.NoError(client3.SendHelloResume(hello.Hello.ResumeId))
|
|
hello3, err := client3.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello3.Hello.UserId, "%+v", hello3.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello)
|
|
|
|
// The second client got disconnected with a reason in a "Bye" message.
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("bye", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Bye, "%+v", msg) {
|
|
assert.Equal("session_resumed", msg.Bye.Reason, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); err == nil {
|
|
assert.Fail("Expected error but received %+v", msg)
|
|
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
|
assert.Fail("Expected close error but received %+v", err)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestClientHelloResumeProxy_Disconnect(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
ensureNoGoroutinesLeak(t, func(t *testing.T) {
|
|
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
|
|
client1.Close()
|
|
assert.NoError(client1.WaitForClientRemoved(ctx))
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloResume(hello.Hello.ResumeId))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
|
|
// Simulate unclean shutdown of second instance.
|
|
hub2.rpcServer.conn.Stop()
|
|
|
|
assert.NoError(client2.WaitForClientRemoved(ctx))
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestClientHelloClient(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloClient(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloClient_V3Api(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
params := TestBackendClientAuthParams{
|
|
UserId: testDefaultUserId,
|
|
}
|
|
// The "/api/v1/signaling/" URL will be changed to use "v3" as the "signaling-v3"
|
|
// feature is returned by the capabilities endpoint.
|
|
require.NoError(client.SendHelloParams(server.URL+"/ocs/v2.php/apps/spreed/api/v1/signaling/backend", HelloVersionV1, "client", nil, params))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientHelloInternal(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHelloInternal())
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Empty(hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
}
|
|
}
|
|
|
|
func TestClientMessageToSessionId(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
mcu1, err := NewTestMCU()
|
|
require.NoError(err)
|
|
hub1.SetMcu(mcu1)
|
|
|
|
if hub1 != hub2 {
|
|
mcu2, err := NewTestMCU()
|
|
require.NoError(err)
|
|
hub2.SetMcu(mcu2)
|
|
}
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
data1 := map[string]interface{}{
|
|
"type": "test",
|
|
"message": "from-1-to-2",
|
|
}
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient1, data2) // nolint
|
|
|
|
var payload1 string
|
|
if err := checkReceiveClientMessage(ctx, client1, "session", hello2.Hello, &payload1); assert.NoError(err) {
|
|
assert.Equal(data2, payload1)
|
|
}
|
|
var payload2 map[string]interface{}
|
|
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload2); assert.NoError(err) {
|
|
assert.Equal(data1, payload2)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientControlToSessionId(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendControl(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendControl(recipient1, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
if err := checkReceiveClientControl(ctx, client2, "session", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientControlMissingPermissions(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
|
|
|
// Client 1 may not send control messages (will be ignored).
|
|
session1.SetPermissions([]Permission{
|
|
PERMISSION_MAY_PUBLISH_AUDIO,
|
|
PERMISSION_MAY_PUBLISH_VIDEO,
|
|
})
|
|
// Client 2 may send control messages.
|
|
session2.SetPermissions([]Permission{
|
|
PERMISSION_MAY_PUBLISH_AUDIO,
|
|
PERMISSION_MAY_PUBLISH_VIDEO,
|
|
PERMISSION_MAY_CONTROL,
|
|
})
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendControl(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendControl(recipient1, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
}
|
|
|
|
func TestClientMessageToUserId(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "user",
|
|
UserId: hello1.Hello.UserId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "user",
|
|
UserId: hello2.Hello.UserId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient1, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientMessage(ctx, client1, "user", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
if err := checkReceiveClientMessage(ctx, client2, "user", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
}
|
|
|
|
func TestClientControlToUserId(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "user",
|
|
UserId: hello1.Hello.UserId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "user",
|
|
UserId: hello2.Hello.UserId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendControl(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendControl(recipient1, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientControl(ctx, client1, "user", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
if err := checkReceiveClientControl(ctx, client2, "user", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
}
|
|
|
|
func TestClientMessageToUserIdMultipleSessions(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2a := NewTestClient(t, server, hub)
|
|
defer client2a.CloseWithBye()
|
|
require.NoError(client2a.SendHello(testDefaultUserId + "2"))
|
|
client2b := NewTestClient(t, server, hub)
|
|
defer client2b.CloseWithBye()
|
|
require.NoError(client2b.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2a, err := client2a.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2b, err := client2b.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2a.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.SessionId, hello2b.Hello.SessionId)
|
|
require.NotEqual(hello2a.Hello.SessionId, hello2b.Hello.SessionId)
|
|
|
|
require.NotEqual(hello1.Hello.UserId, hello2a.Hello.UserId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2b.Hello.UserId)
|
|
require.Equal(hello2a.Hello.UserId, hello2b.Hello.UserId)
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "user",
|
|
UserId: hello2a.Hello.UserId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient, data1) // nolint
|
|
|
|
// Both clients will receive the message as it was sent to the user.
|
|
var payload string
|
|
if err := checkReceiveClientMessage(ctx, client2a, "user", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
if err := checkReceiveClientMessage(ctx, client2b, "user", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
}
|
|
|
|
func WaitForUsersJoined(ctx context.Context, t *testing.T, client1 *TestClient, hello1 *ServerMessage, client2 *TestClient, hello2 *ServerMessage) {
|
|
// We will receive "joined" events for all clients. The ordering is not
|
|
// defined as messages are processed and sent by asynchronous event handlers.
|
|
assert.NoError(t, client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
assert.NoError(t, client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
}
|
|
|
|
func TestClientMessageToRoom(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "room",
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientMessage(ctx, client1, "room", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
if err := checkReceiveClientMessage(ctx, client2, "room", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientControlToRoom(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "room",
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendControl(recipient, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendControl(recipient, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientControl(ctx, client1, "room", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
if err := checkReceiveClientControl(ctx, client2, "room", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientMessageToCall(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users := []map[string]interface{}{
|
|
{
|
|
"sessionId": hello1.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room1 := hub1.getRoom(roomId)
|
|
require.NotNil(room1, "Could not find room %s", roomId)
|
|
room1.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "call",
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientMessage(ctx, client1, "call", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
// The second client is not in the call yet, so will not receive the message.
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client2.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message", "got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users = []map[string]interface{}{
|
|
{
|
|
"sessionId": hello1.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
{
|
|
"sessionId": hello2.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room2 := hub2.getRoom(roomId)
|
|
require.NotNil(room2, "Could not find room %s", roomId)
|
|
room2.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
client1.SendMessage(recipient, data1) // nolint
|
|
client2.SendMessage(recipient, data2) // nolint
|
|
|
|
if err := checkReceiveClientMessage(ctx, client1, "call", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
if err := checkReceiveClientMessage(ctx, client2, "call", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientControlToCall(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
require.NotEqual(hello1.Hello.UserId, hello2.Hello.UserId)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users := []map[string]interface{}{
|
|
{
|
|
"sessionId": hello1.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room1 := hub1.getRoom(roomId)
|
|
require.NotNil(room1, "Could not find room %s", roomId)
|
|
room1.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "call",
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendControl(recipient, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendControl(recipient, data2) // nolint
|
|
|
|
var payload string
|
|
if err := checkReceiveClientControl(ctx, client1, "call", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
|
|
// The second client is not in the call yet, so will not receive the message.
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client2.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message", "got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users = []map[string]interface{}{
|
|
{
|
|
"sessionId": hello1.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
{
|
|
"sessionId": hello2.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room2 := hub2.getRoom(roomId)
|
|
require.NotNil(room2, "Could not find room %s", roomId)
|
|
room2.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
client1.SendControl(recipient, data1) // nolint
|
|
client2.SendControl(recipient, data2) // nolint
|
|
|
|
if err := checkReceiveClientControl(ctx, client1, "call", hello2.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data2, payload)
|
|
}
|
|
if err := checkReceiveClientControl(ctx, client2, "call", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestJoinRoom(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
// Leave room.
|
|
roomMsg, err = client.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
}
|
|
|
|
func TestJoinInvalidRoom(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-invalid-room"
|
|
msg := &ClientMessage{
|
|
Id: "ABCD",
|
|
Type: "room",
|
|
Room: &RoomClientMessage{
|
|
RoomId: roomId,
|
|
SessionId: roomId + "-" + hello.Hello.SessionId,
|
|
},
|
|
}
|
|
require.NoError(client.WriteJSON(msg))
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
assert.Equal(msg.Id, message.Id)
|
|
if assert.NoError(checkMessageType(message, "error")) {
|
|
assert.Equal("no_such_room", message.Error.Code)
|
|
}
|
|
}
|
|
|
|
func TestJoinRoomTwice(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
require.Equal(string(testRoomProperties), string(roomMsg.Room.Properties))
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
msg := &ClientMessage{
|
|
Id: "ABCD",
|
|
Type: "room",
|
|
Room: &RoomClientMessage{
|
|
RoomId: roomId,
|
|
SessionId: roomId + "-" + client.publicId + "-2",
|
|
},
|
|
}
|
|
require.NoError(client.WriteJSON(msg))
|
|
|
|
message, err := client.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
|
|
assert.Equal(msg.Id, message.Id)
|
|
if assert.NoError(checkMessageType(message, "error")) {
|
|
assert.Equal("already_joined", message.Error.Code)
|
|
assert.NotNil(message.Error.Details)
|
|
}
|
|
|
|
var roomDetails RoomErrorDetails
|
|
if err := json.Unmarshal(message.Error.Details, &roomDetails); assert.NoError(err) {
|
|
if assert.NotNil(roomDetails.Room, "%+v", message) {
|
|
assert.Equal(roomId, roomDetails.Room.RoomId)
|
|
assert.Equal(string(testRoomProperties), string(roomDetails.Room.Properties))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestExpectAnonymousJoinRoom(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(authAnonymousUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
if assert.NoError(err) {
|
|
assert.Empty(hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
}
|
|
|
|
// Perform housekeeping in the future, this will cause the connection to
|
|
// be terminated because the anonymous client didn't join a room.
|
|
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
|
|
|
|
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if assert.NoError(checkMessageType(message, "bye")) {
|
|
assert.Equal("room_join_timeout", message.Bye.Reason, "%+v", message.Bye)
|
|
}
|
|
}
|
|
|
|
// Both the client and the session get removed from the hub.
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
assert.NoError(client.WaitForSessionRemoved(ctx, hello.Hello.SessionId))
|
|
}
|
|
|
|
func TestExpectAnonymousJoinRoomAfterLeave(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(authAnonymousUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
if assert.NoError(err) {
|
|
assert.Empty(hello.Hello.UserId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello.Hello)
|
|
assert.NotEmpty(hello.Hello.ResumeId, "%+v", hello.Hello)
|
|
}
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
// Perform housekeeping in the future, this will keep the connection as the
|
|
// session joined a room.
|
|
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
|
|
|
|
// No message about the closing is sent to the new connection.
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// Leave room
|
|
roomMsg, err = client.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
|
|
// Perform housekeeping in the future, this will cause the connection to
|
|
// be terminated because the anonymous client didn't join a room.
|
|
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
|
|
|
|
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if assert.NoError(checkMessageType(message, "bye")) {
|
|
assert.Equal("room_join_timeout", message.Bye.Reason, "%+v", message.Bye)
|
|
}
|
|
}
|
|
|
|
// Both the client and the session get removed from the hub.
|
|
assert.NoError(client.WaitForClientRemoved(ctx))
|
|
assert.NoError(client.WaitForSessionRemoved(ctx, hello.Hello.SessionId))
|
|
}
|
|
|
|
func TestJoinRoomChange(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
// Change room.
|
|
roomId = "other-test-room"
|
|
roomMsg, err = client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
// Leave room.
|
|
roomMsg, err = client.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
}
|
|
|
|
func TestJoinMultiple(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
// Join room by id (first client).
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
// Join room by id (second client).
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event for the first and the second client.
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
// The first client will also receive a "joined" event from the second client.
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello))
|
|
|
|
// Leave room.
|
|
roomMsg, err = client1.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
|
|
// The second client will now receive a "left" event
|
|
assert.NoError(client2.RunUntilLeft(ctx, hello1.Hello))
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
}
|
|
|
|
func TestJoinDisplaynamesPermission(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
|
|
|
// Client 2 may not receive display names.
|
|
session2.SetPermissions([]Permission{PERMISSION_HIDE_DISPLAYNAMES})
|
|
|
|
// Join room by id (first client).
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
// Join room by id (second client).
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event for the first and the second client.
|
|
if events, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello); assert.NoError(err) {
|
|
assert.Empty(unexpected)
|
|
if assert.Len(events, 2) {
|
|
assert.Nil(events[0].User)
|
|
assert.Nil(events[1].User)
|
|
}
|
|
}
|
|
// The first client will also receive a "joined" event from the second client.
|
|
if events, unexpected, err := client1.RunUntilJoinedAndReturn(ctx, hello2.Hello); assert.NoError(err) {
|
|
assert.Empty(unexpected)
|
|
if assert.Len(events, 1) {
|
|
assert.NotNil(events[0].User)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestInitialRoomPermissions(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room-initial-permissions"
|
|
roomMsg, err := client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
session := hub.GetSessionByPublicId(hello.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session, "Session %s does not exist", hello.Hello.SessionId)
|
|
|
|
assert.True(session.HasPermission(PERMISSION_MAY_PUBLISH_AUDIO), "Session %s should have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_AUDIO, session.permissions)
|
|
assert.False(session.HasPermission(PERMISSION_MAY_PUBLISH_VIDEO), "Session %s should not have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_VIDEO, session.permissions)
|
|
}
|
|
|
|
func TestJoinRoomSwitchClient(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room-slow"
|
|
msg := &ClientMessage{
|
|
Id: "ABCD",
|
|
Type: "room",
|
|
Room: &RoomClientMessage{
|
|
RoomId: roomId,
|
|
SessionId: roomId + "-" + hello.Hello.SessionId,
|
|
},
|
|
}
|
|
require.NoError(client.WriteJSON(msg))
|
|
// Wait a bit to make sure request is sent before closing client.
|
|
time.Sleep(1 * time.Millisecond)
|
|
client.Close()
|
|
require.NoError(client.WaitForClientRemoved(ctx))
|
|
|
|
// The client needs some time to reconnect.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHelloResume(hello.Hello.ResumeId))
|
|
if hello2, err := client2.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello2.Hello.UserId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.SessionId, hello2.Hello.SessionId, "%+v", hello2.Hello)
|
|
assert.Equal(hello.Hello.ResumeId, hello2.Hello.ResumeId, "%+v", hello2.Hello)
|
|
}
|
|
|
|
roomMsg, err := client2.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkUnexpectedClose(err))
|
|
require.NoError(checkMessageType(roomMsg, "room"))
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
// Leave room.
|
|
roomMsg, err = client2.JoinRoom(ctx, "")
|
|
require.NoError(err)
|
|
require.Equal("", roomMsg.Room.RoomId)
|
|
}
|
|
|
|
func TestGetRealUserIP(t *testing.T) {
|
|
testcases := []struct {
|
|
expected string
|
|
headers http.Header
|
|
trusted string
|
|
addr string
|
|
}{
|
|
{
|
|
"192.168.1.2",
|
|
nil,
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"10.11.12.13",
|
|
nil,
|
|
"192.168.0.0/16",
|
|
"10.11.12.13:23456",
|
|
},
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"10.11.12.13"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"2002:db8::1"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"10.11.12.13"},
|
|
},
|
|
"2001:db8::/48",
|
|
"[2001:db8::1]:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"2002:db8::1"},
|
|
},
|
|
"2001:db8::/48",
|
|
"[2001:db8::1]:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"2002:db8::1, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"2002:db8::1, 2001:db8::1"},
|
|
},
|
|
"192.168.0.0/16, 2001:db8::/48",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"2002:db8::1, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16, 2001:db8::/48",
|
|
"[2001:db8::1]:23456",
|
|
},
|
|
{
|
|
"2002:db8::1",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"2002:db8::1, 2001:db8::2"},
|
|
},
|
|
"2001:db8::/48",
|
|
"[2001:db8::1]:23456",
|
|
},
|
|
// "X-Real-IP" has preference before "X-Forwarded-For"
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"10.11.12.13"},
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
// Multiple "X-Forwarded-For" headers are merged.
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14", "192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"1.2.3.4", "11.12.13.14", "192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"1.2.3.4", "2.3.4.5", "11.12.13.14", "192.168.31.32", "192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
// Headers are ignored if coming from untrusted clients.
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"11.12.13.14"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"10.11.12.13:23456",
|
|
},
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"10.11.12.13:23456",
|
|
},
|
|
// X-Forwarded-For is filtered for trusted proxies.
|
|
{
|
|
"1.2.3.4",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 1.2.3.4"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"1.2.3.4",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 1.2.3.4, 192.168.2.3"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"10.11.12.13",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 1.2.3.4"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"10.11.12.13:23456",
|
|
},
|
|
// Invalid IPs are ignored.
|
|
{
|
|
"192.168.1.2",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"this-is-not-an-ip"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"this-is-not-an-ip"},
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 192.168.30.32"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"11.12.13.14",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-real-ip"): []string{"this-is-not-an-ip"},
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"11.12.13.14, 192.168.30.32, proxy1"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"192.168.1.2",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"this-is-not-an-ip"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
{
|
|
"192.168.2.3",
|
|
http.Header{
|
|
http.CanonicalHeaderKey("x-forwarded-for"): []string{"this-is-not-an-ip, 192.168.2.3"},
|
|
},
|
|
"192.168.0.0/16",
|
|
"192.168.1.2:23456",
|
|
},
|
|
}
|
|
|
|
for _, tc := range testcases {
|
|
trustedProxies, err := ParseAllowedIps(tc.trusted)
|
|
if !assert.NoError(t, err, "invalid trusted proxies in %+v", tc) {
|
|
continue
|
|
}
|
|
request := &http.Request{
|
|
RemoteAddr: tc.addr,
|
|
Header: tc.headers,
|
|
}
|
|
assert.Equal(t, tc.expected, GetRealUserIP(request, trustedProxies), "failed for %+v", tc)
|
|
}
|
|
}
|
|
|
|
func TestClientMessageToSessionIdWhileDisconnected(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
client2.Close()
|
|
assert.NoError(client2.WaitForClientRemoved(ctx))
|
|
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
// The two chat messages should get combined into one when receiving pending messages.
|
|
chat_refresh := "{\"type\":\"chat\",\"chat\":{\"refresh\":true}}"
|
|
var data1 map[string]interface{}
|
|
require.NoError(json.Unmarshal([]byte(chat_refresh), &data1))
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
|
|
// Simulate some time until client resumes the session.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
client2 = NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHelloResume(hello2.Hello.ResumeId))
|
|
if hello3, err := client2.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId+"2", hello3.Hello.UserId, "%+v", hello3.Hello)
|
|
assert.Equal(hello2.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello)
|
|
assert.Equal(hello2.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello)
|
|
}
|
|
|
|
var payload map[string]interface{}
|
|
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
}
|
|
|
|
func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users := []map[string]interface{}{
|
|
{
|
|
"sessionId": "the-session-id",
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room := hub.getRoom(roomId)
|
|
require.NotNil(room, "Could not find room %s", roomId)
|
|
room.PublishUsersInCallChanged(users, users)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
client2.Close()
|
|
assert.NoError(client2.WaitForClientRemoved(ctx))
|
|
|
|
room.PublishUsersInCallChanged(users, users)
|
|
|
|
// Give asynchronous events some time to be processed.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
chat_refresh := "{\"type\":\"chat\",\"chat\":{\"refresh\":true}}"
|
|
var data1 map[string]interface{}
|
|
require.NoError(json.Unmarshal([]byte(chat_refresh), &data1))
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
|
|
client2 = NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHelloResume(hello2.Hello.ResumeId))
|
|
if hello3, err := client2.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId+"2", hello3.Hello.UserId, "%+v", hello3.Hello)
|
|
assert.Equal(hello2.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello)
|
|
assert.Equal(hello2.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello)
|
|
}
|
|
|
|
// The participants list update event is triggered again after the session resume.
|
|
// TODO(jojo): Check contents of message and try with multiple users.
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
var payload map[string]interface{}
|
|
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); assert.NoError(err) {
|
|
assert.Equal(data1, payload)
|
|
}
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
}
|
|
|
|
func TestClientTakeoverRoomSession(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
RunTestClientTakeoverRoomSession(t)
|
|
})
|
|
}
|
|
}
|
|
|
|
func RunTestClientTakeoverRoomSession(t *testing.T) {
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room-takeover-room-session"
|
|
roomSessionid := "room-session-id"
|
|
roomMsg, err := client1.JoinRoomWithRoomSession(ctx, roomId, roomSessionid)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
hubRoom := hub1.getRoom(roomId)
|
|
require.NotNil(hubRoom, "Room %s does not exist", roomId)
|
|
|
|
session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId)
|
|
require.NotNil(session1, "There should be a session %s", hello1.Hello.SessionId)
|
|
|
|
client3 := NewTestClient(t, server2, hub2)
|
|
defer client3.CloseWithBye()
|
|
|
|
require.NoError(client3.SendHello(testDefaultUserId + "3"))
|
|
|
|
hello3, err := client3.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomMsg, err = client3.JoinRoomWithRoomSession(ctx, roomId, roomSessionid+"other")
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Wait until both users have joined.
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client3, hello3)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomMsg, err = client2.JoinRoomWithRoomSession(ctx, roomId, roomSessionid)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// The first client got disconnected with a reason in a "Bye" message.
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.Equal("bye", msg.Type, "%+v", msg)
|
|
if assert.NotNil(msg.Bye, "%+v", msg) {
|
|
assert.Equal("room_session_reconnected", msg.Bye.Reason, "%+v", msg)
|
|
}
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); err == nil {
|
|
assert.Fail("Expected error but received %+v", msg)
|
|
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
|
assert.Fail("Expected close error but received %+v", err)
|
|
}
|
|
|
|
// The first session has been closed
|
|
session1 = hub1.GetSessionByPublicId(hello1.Hello.SessionId)
|
|
assert.Nil(session1, "The session %s should have been removed", hello1.Hello.SessionId)
|
|
|
|
// The new client will receive "joined" events for the existing client3 and
|
|
// himself.
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello3.Hello, hello2.Hello))
|
|
|
|
// No message about the closing is sent to the new connection.
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client2.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// The permanently connected client will receive a "left" event from the
|
|
// overridden session and a "joined" for the new session. In that order as
|
|
// both were on the same server.
|
|
assert.NoError(client3.RunUntilLeft(ctx, hello1.Hello))
|
|
assert.NoError(client3.RunUntilJoined(ctx, hello2.Hello))
|
|
}
|
|
|
|
func TestClientSendOfferPermissions(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
|
|
|
// Client 1 is the moderator
|
|
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_PUBLISH_SCREEN})
|
|
// Client 2 is a guest participant.
|
|
session2.SetPermissions([]Permission{})
|
|
|
|
// Client 2 may not send an offer (he doesn't have the necessary permissions).
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "sendoffer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
msg, err := client2.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkMessageError(msg, "not_allowed"))
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo))
|
|
|
|
// Client 1 may send an offer.
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "sendoffer",
|
|
Sid: "54321",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
// The sender won't get a reply...
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client1.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// ...but the other peer will get an offer.
|
|
require.NoError(client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo))
|
|
}
|
|
|
|
func TestClientSendOfferPermissionsAudioOnly(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
|
|
// Client is allowed to send audio only.
|
|
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_AUDIO})
|
|
|
|
// Client may not send an offer with audio and video.
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "video",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
msg, err := client1.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkMessageError(msg, "not_allowed"))
|
|
|
|
// Client may send an offer (audio only).
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "video",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioOnly,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioOnly))
|
|
}
|
|
|
|
func TestClientSendOfferPermissionsAudioVideo(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
|
|
// Client is allowed to send audio and video.
|
|
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_AUDIO, PERMISSION_MAY_PUBLISH_VIDEO})
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "video",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo))
|
|
|
|
// Client is no longer allowed to send video, this will stop the publisher.
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "participants",
|
|
Participants: &BackendRoomParticipantsRequest{
|
|
Changed: []map[string]interface{}{
|
|
{
|
|
"sessionId": roomId + "-" + hello1.Hello.SessionId,
|
|
"permissions": []Permission{PERMISSION_MAY_PUBLISH_AUDIO},
|
|
},
|
|
},
|
|
Users: []map[string]interface{}{
|
|
{
|
|
"sessionId": roomId + "-" + hello1.Hello.SessionId,
|
|
"permissions": []Permission{PERMISSION_MAY_PUBLISH_AUDIO},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
|
|
|
|
ctx2, cancel2 := context.WithTimeout(ctx, time.Second)
|
|
defer cancel2()
|
|
|
|
pubs := mcu.GetPublishers()
|
|
require.Len(pubs, 1)
|
|
|
|
loop:
|
|
for {
|
|
if err := ctx2.Err(); err != nil {
|
|
assert.NoError(err, "publisher was not closed")
|
|
break
|
|
}
|
|
|
|
for _, pub := range pubs {
|
|
if pub.isClosed() {
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// Give some time to async processing.
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func TestClientSendOfferPermissionsAudioVideoMedia(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
|
|
// Client is allowed to send audio and video.
|
|
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_MEDIA})
|
|
|
|
// Client may send an offer (audio and video).
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "video",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo))
|
|
|
|
// Client is no longer allowed to send video, this will stop the publisher.
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "participants",
|
|
Participants: &BackendRoomParticipantsRequest{
|
|
Changed: []map[string]interface{}{
|
|
{
|
|
"sessionId": roomId + "-" + hello1.Hello.SessionId,
|
|
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_CONTROL},
|
|
},
|
|
},
|
|
Users: []map[string]interface{}{
|
|
{
|
|
"sessionId": roomId + "-" + hello1.Hello.SessionId,
|
|
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_CONTROL},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
|
|
|
|
ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
pubs := mcu.GetPublishers()
|
|
require.Len(pubs, 1)
|
|
|
|
loop:
|
|
for {
|
|
if err := ctx2.Err(); err != nil {
|
|
if err != context.DeadlineExceeded {
|
|
assert.Fail("error while waiting for publisher")
|
|
}
|
|
break
|
|
}
|
|
|
|
for _, pub := range pubs {
|
|
if !assert.False(pub.isClosed(), "publisher was closed") {
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// Give some time to async processing.
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func TestClientRequestOfferNotInRoom(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub1.SetMcu(mcu)
|
|
hub2.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1")
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "screen",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo))
|
|
|
|
// Client 2 may not request an offer (he is not in the room yet).
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "requestoffer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
msg, err := client2.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkMessageError(msg, "not_allowed"))
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// We will receive a "joined" event.
|
|
require.NoError(client1.RunUntilJoined(ctx, hello2.Hello))
|
|
require.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
|
|
// Client 2 may not request an offer (he is not in the call yet).
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "requestoffer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
msg, err = client2.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkMessageError(msg, "not_allowed"))
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users1 := []map[string]interface{}{
|
|
{
|
|
"sessionId": hello2.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room2 := hub2.getRoom(roomId)
|
|
require.NotNil(room2, "Could not find room %s", roomId)
|
|
room2.PublishUsersInCallChanged(users1, users1)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
// Client 2 may not request an offer (recipient is not in the call yet).
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "requestoffer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
msg, err = client2.RunUntilMessage(ctx)
|
|
require.NoError(err)
|
|
require.NoError(checkMessageError(msg, "not_allowed"))
|
|
|
|
// Simulate request from the backend that somebody joined the call.
|
|
users2 := []map[string]interface{}{
|
|
{
|
|
"sessionId": hello1.Hello.SessionId,
|
|
"inCall": 1,
|
|
},
|
|
}
|
|
room1 := hub1.getRoom(roomId)
|
|
require.NotNil(room1, "Could not find room %s", roomId)
|
|
room1.PublishUsersInCallChanged(users2, users2)
|
|
assert.NoError(checkReceiveClientEvent(ctx, client1, "update", nil))
|
|
assert.NoError(checkReceiveClientEvent(ctx, client2, "update", nil))
|
|
|
|
// Client 2 may request an offer now (both are in the same room and call).
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "requestoffer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
require.NoError(client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo))
|
|
|
|
require.NoError(client2.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "answer",
|
|
Sid: "12345",
|
|
RoomType: "screen",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpAnswerAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
// The sender won't get a reply...
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client2.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
// Clients can't send messages to sessions connected from other backends.
|
|
hub, _, _, server := CreateHubWithMultipleBackendsForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
params1 := TestBackendClientAuthParams{
|
|
UserId: "user1",
|
|
}
|
|
require.NoError(client1.SendHelloParams(server.URL+"/one", HelloVersionV1, "client", nil, params1))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
|
|
params2 := TestBackendClientAuthParams{
|
|
UserId: "user2",
|
|
}
|
|
require.NoError(client2.SendHelloParams(server.URL+"/two", HelloVersionV1, "client", nil, params2))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
recipient1 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}
|
|
recipient2 := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient2, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient1, data2) // nolint
|
|
|
|
var payload string
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
if err := checkReceiveClientMessage(ctx2, client1, "session", hello2.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
|
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel3()
|
|
if err := checkReceiveClientMessage(ctx3, client2, "session", hello1.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
}
|
|
|
|
func TestNoSameRoomOnDifferentBackends(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubWithMultipleBackendsForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
params1 := TestBackendClientAuthParams{
|
|
UserId: "user1",
|
|
}
|
|
require.NoError(client1.SendHelloParams(server.URL+"/one", HelloVersionV1, "client", nil, params1))
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server, hub)
|
|
defer client2.CloseWithBye()
|
|
|
|
params2 := TestBackendClientAuthParams{
|
|
UserId: "user2",
|
|
}
|
|
require.NoError(client2.SendHelloParams(server.URL+"/two", HelloVersionV1, "client", nil, params2))
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
if msg1, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client1.checkMessageJoined(msg1, hello1.Hello))
|
|
}
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
if msg2, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client2.checkMessageJoined(msg2, hello2.Hello))
|
|
}
|
|
|
|
hub.ru.RLock()
|
|
var rooms []*Room
|
|
for _, room := range hub.rooms {
|
|
defer room.Close()
|
|
rooms = append(rooms, room)
|
|
}
|
|
hub.ru.RUnlock()
|
|
|
|
if assert.Len(rooms, 2) {
|
|
if rooms[0].IsEqual(rooms[1]) {
|
|
assert.Fail("Rooms should be different: %+v", rooms)
|
|
}
|
|
}
|
|
|
|
recipient := MessageClientMessageRecipient{
|
|
Type: "room",
|
|
}
|
|
|
|
data1 := "from-1-to-2"
|
|
client1.SendMessage(recipient, data1) // nolint
|
|
data2 := "from-2-to-1"
|
|
client2.SendMessage(recipient, data2) // nolint
|
|
|
|
var payload string
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel2()
|
|
if err := checkReceiveClientMessage(ctx2, client1, "session", hello2.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
|
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel3()
|
|
if err := checkReceiveClientMessage(ctx3, client2, "session", hello1.Hello, &payload); err == nil {
|
|
assert.Fail("Expected no payload, got %+v", payload)
|
|
} else {
|
|
assert.ErrorIs(err, ErrNoMessageReceived)
|
|
}
|
|
}
|
|
|
|
func TestClientSendOffer(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub1.SetMcu(mcu)
|
|
hub2.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1")
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
// Give message processing some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
roomMsg, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "12345",
|
|
RoomType: "video",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioAndVideo,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo))
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello2.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "sendoffer",
|
|
RoomType: "video",
|
|
}))
|
|
|
|
// The sender won't get a reply...
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client1.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
|
|
// ...but the other peer will get an offer.
|
|
require.NoError(client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientUnshareScreen(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
mcu, err := NewTestMCU()
|
|
require.NoError(err)
|
|
require.NoError(mcu.Start(ctx))
|
|
defer mcu.Stop()
|
|
|
|
hub.SetMcu(mcu)
|
|
|
|
client1 := NewTestClient(t, server, hub)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
// Join room by id.
|
|
roomId := "test-room"
|
|
roomMsg, err := client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
require.Equal(roomId, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "offer",
|
|
Sid: "54321",
|
|
RoomType: "screen",
|
|
Payload: map[string]interface{}{
|
|
"sdp": MockSdpOfferAudioOnly,
|
|
},
|
|
}))
|
|
|
|
require.NoError(client1.RunUntilAnswer(ctx, MockSdpAnswerAudioOnly))
|
|
|
|
publisher := mcu.GetPublisher(hello1.Hello.SessionId)
|
|
require.NotNil(publisher, "No publisher for %s found", hello1.Hello.SessionId)
|
|
require.False(publisher.isClosed(), "Publisher %s should not be closed", hello1.Hello.SessionId)
|
|
|
|
old := cleanupScreenPublisherDelay
|
|
cleanupScreenPublisherDelay = time.Millisecond
|
|
defer func() {
|
|
cleanupScreenPublisherDelay = old
|
|
}()
|
|
|
|
require.NoError(client1.SendMessage(MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: hello1.Hello.SessionId,
|
|
}, MessageClientMessageData{
|
|
Type: "unshareScreen",
|
|
Sid: "54321",
|
|
RoomType: "screen",
|
|
}))
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
require.True(publisher.isClosed(), "Publisher %s should be closed", hello1.Hello.SessionId)
|
|
}
|
|
|
|
func TestVirtualClientSessions(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomId := "test-room"
|
|
_, err = client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloInternal())
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
|
|
|
_, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello))
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 1) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(3, msg.Users[0]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
_, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello)
|
|
assert.NoError(err)
|
|
|
|
if len(unexpected) == 0 {
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
unexpected = append(unexpected, msg)
|
|
}
|
|
}
|
|
|
|
require.Len(unexpected, 1)
|
|
if msg, err := checkMessageParticipantsInCall(unexpected[0]); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 1) {
|
|
assert.Equal(true, msg.Users[0]["internal"])
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"])
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"])
|
|
}
|
|
}
|
|
|
|
calledCtx, calledCancel := context.WithTimeout(ctx, time.Second)
|
|
|
|
virtualSessionId := "virtual-session-id"
|
|
virtualUserId := "virtual-user-id"
|
|
generatedSessionId := GetVirtualSessionId(session2, virtualSessionId)
|
|
|
|
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
|
|
defer calledCancel()
|
|
assert.Equal("add", request.Action, "%+v", request)
|
|
assert.Equal(roomId, request.RoomId, "%+v", request)
|
|
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
|
|
assert.Equal(virtualUserId, request.UserId, "%+v", request)
|
|
})
|
|
|
|
require.NoError(client2.SendInternalAddSession(&AddSessionInternalClientMessage{
|
|
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
|
|
SessionId: virtualSessionId,
|
|
RoomId: roomId,
|
|
},
|
|
UserId: virtualUserId,
|
|
Flags: FLAG_MUTED_SPEAKING,
|
|
}))
|
|
<-calledCtx.Done()
|
|
if err := calledCtx.Err(); err != nil {
|
|
require.ErrorIs(err, context.Canceled)
|
|
}
|
|
|
|
virtualSessions := session2.GetVirtualSessions()
|
|
for len(virtualSessions) == 0 {
|
|
time.Sleep(time.Millisecond)
|
|
virtualSessions = session2.GetVirtualSessions()
|
|
}
|
|
|
|
virtualSession := virtualSessions[0]
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 2) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 2) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
|
|
}
|
|
}
|
|
|
|
updatedFlags := uint32(0)
|
|
require.NoError(client2.SendInternalUpdateSession(&UpdateSessionInternalClientMessage{
|
|
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
|
|
SessionId: virtualSessionId,
|
|
RoomId: roomId,
|
|
},
|
|
|
|
Flags: &updatedFlags,
|
|
}))
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(0, flags.Flags)
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(0, flags.Flags)
|
|
}
|
|
}
|
|
|
|
calledCtx, calledCancel = context.WithTimeout(ctx, time.Second)
|
|
|
|
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
|
|
defer calledCancel()
|
|
assert.Equal("remove", request.Action, "%+v", request)
|
|
assert.Equal(roomId, request.RoomId, "%+v", request)
|
|
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
|
|
assert.Equal(virtualUserId, request.UserId, "%+v", request)
|
|
})
|
|
|
|
// Messages to virtual sessions are sent to the associated client session.
|
|
virtualRecipient := MessageClientMessageRecipient{
|
|
Type: "session",
|
|
SessionId: virtualSession.PublicId(),
|
|
}
|
|
|
|
data := "message-to-virtual"
|
|
client1.SendMessage(virtualRecipient, data) // nolint
|
|
|
|
var payload string
|
|
var sender *MessageServerMessageSender
|
|
var recipient *MessageClientMessageRecipient
|
|
if err := checkReceiveClientMessageWithSenderAndRecipient(ctx, client2, "session", hello1.Hello, &payload, &sender, &recipient); assert.NoError(err) {
|
|
assert.Equal(virtualSessionId, recipient.SessionId, "%+v", recipient)
|
|
assert.Equal(data, payload)
|
|
}
|
|
|
|
data = "control-to-virtual"
|
|
client1.SendControl(virtualRecipient, data) // nolint
|
|
|
|
if err := checkReceiveClientControlWithSenderAndRecipient(ctx, client2, "session", hello1.Hello, &payload, &sender, &recipient); assert.NoError(err) {
|
|
assert.Equal(virtualSessionId, recipient.SessionId, "%+v", recipient)
|
|
assert.Equal(data, payload)
|
|
}
|
|
|
|
require.NoError(client2.SendInternalRemoveSession(&RemoveSessionInternalClientMessage{
|
|
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
|
|
SessionId: virtualSessionId,
|
|
RoomId: roomId,
|
|
},
|
|
|
|
UserId: virtualUserId,
|
|
}))
|
|
<-calledCtx.Done()
|
|
if err := calledCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
|
|
require.NoError(err)
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client1.checkMessageRoomLeaveSession(msg, virtualSession.PublicId()))
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client1.checkMessageRoomLeaveSession(msg, virtualSession.PublicId()))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDuplicateVirtualSessions(t *testing.T) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
|
|
require.NoError(client1.SendHello(testDefaultUserId))
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomId := "test-room"
|
|
_, err = client1.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
|
|
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
|
|
require.NoError(client2.SendHelloInternal())
|
|
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
|
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
|
|
|
_, err = client2.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello))
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 1) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(3, msg.Users[0]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
_, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello)
|
|
assert.NoError(err)
|
|
|
|
if len(unexpected) == 0 {
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
unexpected = append(unexpected, msg)
|
|
}
|
|
}
|
|
|
|
require.Len(unexpected, 1)
|
|
if msg, err := checkMessageParticipantsInCall(unexpected[0]); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 1) {
|
|
assert.Equal(true, msg.Users[0]["internal"])
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"])
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"])
|
|
}
|
|
}
|
|
|
|
calledCtx, calledCancel := context.WithTimeout(ctx, time.Second)
|
|
|
|
virtualSessionId := "virtual-session-id"
|
|
virtualUserId := "virtual-user-id"
|
|
generatedSessionId := GetVirtualSessionId(session2, virtualSessionId)
|
|
|
|
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
|
|
defer calledCancel()
|
|
assert.Equal("add", request.Action, "%+v", request)
|
|
assert.Equal(roomId, request.RoomId, "%+v", request)
|
|
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
|
|
assert.Equal(virtualUserId, request.UserId, "%+v", request)
|
|
})
|
|
|
|
require.NoError(client2.SendInternalAddSession(&AddSessionInternalClientMessage{
|
|
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
|
|
SessionId: virtualSessionId,
|
|
RoomId: roomId,
|
|
},
|
|
UserId: virtualUserId,
|
|
Flags: FLAG_MUTED_SPEAKING,
|
|
}))
|
|
<-calledCtx.Done()
|
|
if err := calledCtx.Err(); err != nil {
|
|
require.ErrorIs(err, context.Canceled)
|
|
}
|
|
|
|
virtualSessions := session2.GetVirtualSessions()
|
|
for len(virtualSessions) == 0 {
|
|
time.Sleep(time.Millisecond)
|
|
virtualSessions = session2.GetVirtualSessions()
|
|
}
|
|
|
|
virtualSession := virtualSessions[0]
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 2) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 2) {
|
|
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
|
|
assert.Equal(roomId, flags.RoomId)
|
|
assert.Equal(virtualSession.PublicId(), flags.SessionId)
|
|
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
|
|
}
|
|
}
|
|
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "incall",
|
|
InCall: &BackendRoomInCallRequest{
|
|
InCall: []byte("0"),
|
|
Users: []map[string]interface{}{
|
|
{
|
|
"sessionId": virtualSession.PublicId(),
|
|
"participantPermissions": 246,
|
|
"participantType": 4,
|
|
"lastPing": 123456789,
|
|
},
|
|
{
|
|
// Request is coming from Nextcloud, so use its session id (which is our "room session id").
|
|
"sessionId": roomId + "-" + hello1.Hello.SessionId,
|
|
"participantPermissions": 254,
|
|
"participantType": 1,
|
|
"lastPing": 234567890,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
|
|
|
|
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 3) {
|
|
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
|
|
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
|
|
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 3) {
|
|
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
|
|
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
|
|
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
client1.Close()
|
|
assert.NoError(client1.WaitForClientRemoved(ctx))
|
|
|
|
client3 := NewTestClient(t, server1, hub1)
|
|
defer client3.CloseWithBye()
|
|
|
|
require.NoError(client3.SendHelloResume(hello1.Hello.ResumeId))
|
|
if hello3, err := client3.RunUntilHello(ctx); assert.NoError(err) {
|
|
assert.Equal(testDefaultUserId, hello3.Hello.UserId, "%+v", hello3.Hello)
|
|
assert.Equal(hello1.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello)
|
|
assert.Equal(hello1.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello)
|
|
}
|
|
|
|
if msg, err := client3.RunUntilMessage(ctx); assert.NoError(err) {
|
|
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
|
|
if assert.Len(msg.Users, 3) {
|
|
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
|
|
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
|
|
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
|
|
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
|
|
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
|
|
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
|
|
|
|
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
|
|
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
|
|
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
|
|
defer calledCancel()
|
|
assert.Equal("remove", request.Action, "%+v", request)
|
|
assert.Equal(roomId, request.RoomId, "%+v", request)
|
|
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
|
|
assert.Equal(virtualUserId, request.UserId, "%+v", request)
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomSessionId1 := "roomsession1"
|
|
roomId1 := "test-room"
|
|
roomMsg, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1)
|
|
require.NoError(err)
|
|
require.Equal(roomId1, roomMsg.Room.RoomId)
|
|
|
|
roomSessionId2 := "roomsession2"
|
|
roomMsg, err = client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2)
|
|
require.NoError(err)
|
|
require.Equal(roomId1, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
|
|
roomId2 := "test-room-2"
|
|
var sessions json.RawMessage
|
|
if details != nil {
|
|
sessions, err = json.Marshal(map[string]interface{}{
|
|
roomSessionId1: details,
|
|
})
|
|
require.NoError(err)
|
|
} else {
|
|
sessions, err = json.Marshal([]string{
|
|
roomSessionId1,
|
|
})
|
|
require.NoError(err)
|
|
}
|
|
|
|
// Notify first client to switch to different room.
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "switchto",
|
|
SwitchTo: &BackendRoomSwitchToMessageRequest{
|
|
RoomId: roomId2,
|
|
Sessions: sessions,
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
|
|
|
|
var detailsData json.RawMessage
|
|
if details != nil {
|
|
detailsData, err = json.Marshal(details)
|
|
require.NoError(err)
|
|
}
|
|
_, err = client1.RunUntilSwitchTo(ctx, roomId2, detailsData)
|
|
assert.NoError(err)
|
|
|
|
// The other client will not receive a message.
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel2()
|
|
|
|
if message, err := client2.RunUntilMessage(ctx2); err == nil {
|
|
assert.Fail("Expected no message, got %+v", message)
|
|
} else if err != ErrNoMessageReceived && err != context.DeadlineExceeded {
|
|
assert.NoError(err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSwitchToOneMap(t *testing.T) {
|
|
DoTestSwitchToOne(t, map[string]interface{}{
|
|
"foo": "bar",
|
|
})
|
|
}
|
|
|
|
func TestSwitchToOneList(t *testing.T) {
|
|
DoTestSwitchToOne(t, nil)
|
|
}
|
|
|
|
func DoTestSwitchToMultiple(t *testing.T, details1 map[string]interface{}, details2 map[string]interface{}) {
|
|
CatchLogForTest(t)
|
|
for _, subtest := range clusteredTests {
|
|
t.Run(subtest, func(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
var hub1 *Hub
|
|
var hub2 *Hub
|
|
var server1 *httptest.Server
|
|
var server2 *httptest.Server
|
|
|
|
if isLocalTest(t) {
|
|
hub1, _, _, server1 = CreateHubForTest(t)
|
|
|
|
hub2 = hub1
|
|
server2 = server1
|
|
} else {
|
|
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
|
}
|
|
|
|
client1 := NewTestClient(t, server1, hub1)
|
|
defer client1.CloseWithBye()
|
|
require.NoError(client1.SendHello(testDefaultUserId + "1"))
|
|
client2 := NewTestClient(t, server2, hub2)
|
|
defer client2.CloseWithBye()
|
|
require.NoError(client2.SendHello(testDefaultUserId + "2"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
hello1, err := client1.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
hello2, err := client2.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomSessionId1 := "roomsession1"
|
|
roomId1 := "test-room"
|
|
roomMsg, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1)
|
|
require.NoError(err)
|
|
require.Equal(roomId1, roomMsg.Room.RoomId)
|
|
|
|
roomSessionId2 := "roomsession2"
|
|
roomMsg, err = client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2)
|
|
require.NoError(err)
|
|
require.Equal(roomId1, roomMsg.Room.RoomId)
|
|
|
|
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
|
|
|
|
roomId2 := "test-room-2"
|
|
var sessions json.RawMessage
|
|
if details1 != nil || details2 != nil {
|
|
sessions, err = json.Marshal(map[string]interface{}{
|
|
roomSessionId1: details1,
|
|
roomSessionId2: details2,
|
|
})
|
|
require.NoError(err)
|
|
} else {
|
|
sessions, err = json.Marshal([]string{
|
|
roomSessionId1,
|
|
roomSessionId2,
|
|
})
|
|
require.NoError(err)
|
|
}
|
|
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "switchto",
|
|
SwitchTo: &BackendRoomSwitchToMessageRequest{
|
|
RoomId: roomId2,
|
|
Sessions: sessions,
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
|
|
|
|
var detailsData1 json.RawMessage
|
|
if details1 != nil {
|
|
detailsData1, err = json.Marshal(details1)
|
|
require.NoError(err)
|
|
}
|
|
_, err = client1.RunUntilSwitchTo(ctx, roomId2, detailsData1)
|
|
assert.NoError(err)
|
|
|
|
var detailsData2 json.RawMessage
|
|
if details2 != nil {
|
|
detailsData2, err = json.Marshal(details2)
|
|
require.NoError(err)
|
|
}
|
|
_, err = client2.RunUntilSwitchTo(ctx, roomId2, detailsData2)
|
|
assert.NoError(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSwitchToMultipleMap(t *testing.T) {
|
|
DoTestSwitchToMultiple(t, map[string]interface{}{
|
|
"foo": "bar",
|
|
}, map[string]interface{}{
|
|
"bar": "baz",
|
|
})
|
|
}
|
|
|
|
func TestSwitchToMultipleList(t *testing.T) {
|
|
DoTestSwitchToMultiple(t, nil, nil)
|
|
}
|
|
|
|
func TestSwitchToMultipleMixed(t *testing.T) {
|
|
DoTestSwitchToMultiple(t, map[string]interface{}{
|
|
"foo": "bar",
|
|
}, nil)
|
|
}
|
|
|
|
func TestGeoipOverrides(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
assert := assert.New(t)
|
|
country1 := "DE"
|
|
country2 := "IT"
|
|
country3 := "site1"
|
|
hub, _, _, _ := CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
|
|
conf, err := getTestConfig(server)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
conf.AddOption("geoip-overrides", "10.1.0.0/16", country1)
|
|
conf.AddOption("geoip-overrides", "10.2.0.0/16", country2)
|
|
conf.AddOption("geoip-overrides", "192.168.10.20", country3)
|
|
return conf, err
|
|
})
|
|
|
|
assert.Equal(loopback, hub.OnLookupCountry(&Client{addr: "127.0.0.1"}))
|
|
assert.Equal(unknownCountry, hub.OnLookupCountry(&Client{addr: "8.8.8.8"}))
|
|
assert.Equal(country1, hub.OnLookupCountry(&Client{addr: "10.1.1.2"}))
|
|
assert.Equal(country2, hub.OnLookupCountry(&Client{addr: "10.2.1.2"}))
|
|
assert.Equal(strings.ToUpper(country3), hub.OnLookupCountry(&Client{addr: "192.168.10.20"}))
|
|
}
|
|
|
|
func TestDialoutStatus(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
_, _, _, hub, _, server := CreateBackendServerForTest(t)
|
|
|
|
internalClient := NewTestClient(t, server, hub)
|
|
defer internalClient.CloseWithBye()
|
|
require.NoError(internalClient.SendHelloInternalWithFeatures([]string{"start-dialout"}))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
_, err := internalClient.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
roomId := "12345"
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
hello, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
_, err = client.JoinRoom(ctx, roomId)
|
|
require.NoError(err)
|
|
|
|
assert.NoError(client.RunUntilJoined(ctx, hello.Hello))
|
|
|
|
callId := "call-123"
|
|
|
|
stopped := make(chan struct{})
|
|
go func(client *TestClient) {
|
|
defer close(stopped)
|
|
|
|
msg, err := client.RunUntilMessage(ctx)
|
|
if !assert.NoError(err) {
|
|
return
|
|
}
|
|
|
|
if !assert.Equal("internal", msg.Type, "%+v", msg) ||
|
|
!assert.NotNil(msg.Internal, "%+v", msg) ||
|
|
!assert.Equal("dialout", msg.Internal.Type, "%+v", msg) ||
|
|
!assert.NotNil(msg.Internal.Dialout, "%+v", msg) {
|
|
return
|
|
}
|
|
|
|
assert.Equal(roomId, msg.Internal.Dialout.RoomId)
|
|
|
|
response := &ClientMessage{
|
|
Id: msg.Id,
|
|
Type: "internal",
|
|
Internal: &InternalClientMessage{
|
|
Type: "dialout",
|
|
Dialout: &DialoutInternalClientMessage{
|
|
Type: "status",
|
|
RoomId: msg.Internal.Dialout.RoomId,
|
|
Status: &DialoutStatusInternalClientMessage{
|
|
Status: "accepted",
|
|
CallId: callId,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
assert.NoError(client.WriteJSON(response))
|
|
}(internalClient)
|
|
|
|
defer func() {
|
|
<-stopped
|
|
}()
|
|
|
|
msg := &BackendServerRoomRequest{
|
|
Type: "dialout",
|
|
Dialout: &BackendRoomDialoutRequest{
|
|
Number: "+1234567890",
|
|
},
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
require.NoError(err)
|
|
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
|
|
require.NoError(err)
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
assert.NoError(err)
|
|
require.Equal(http.StatusOK, res.StatusCode, "Expected success, got %s", string(body))
|
|
|
|
var response BackendServerRoomResponse
|
|
if assert.NoError(json.Unmarshal(body, &response)) {
|
|
assert.Equal("dialout", response.Type)
|
|
if assert.NotNil(response.Dialout) {
|
|
assert.Nil(response.Dialout.Error, "expected dialout success, got %s", string(body))
|
|
assert.Equal(callId, response.Dialout.CallId)
|
|
}
|
|
}
|
|
|
|
key := "callstatus_" + callId
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(checkMessageTransientSet(msg, key, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "accepted",
|
|
}, nil))
|
|
}
|
|
|
|
require.NoError(internalClient.SendInternalDialout(&DialoutInternalClientMessage{
|
|
RoomId: roomId,
|
|
Type: "status",
|
|
Status: &DialoutStatusInternalClientMessage{
|
|
CallId: callId,
|
|
Status: "ringing",
|
|
},
|
|
}))
|
|
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(checkMessageTransientSet(msg, key, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "ringing",
|
|
}, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "accepted",
|
|
}))
|
|
}
|
|
|
|
old := removeCallStatusTTL
|
|
defer func() {
|
|
removeCallStatusTTL = old
|
|
}()
|
|
removeCallStatusTTL = 500 * time.Millisecond
|
|
|
|
clearedCause := "cleared-call"
|
|
require.NoError(internalClient.SendInternalDialout(&DialoutInternalClientMessage{
|
|
RoomId: roomId,
|
|
Type: "status",
|
|
Status: &DialoutStatusInternalClientMessage{
|
|
CallId: callId,
|
|
Status: "cleared",
|
|
Cause: clearedCause,
|
|
},
|
|
}))
|
|
|
|
if msg, err := client.RunUntilMessage(ctx); assert.NoError(err) {
|
|
assert.NoError(checkMessageTransientSet(msg, key, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "cleared",
|
|
"cause": clearedCause,
|
|
}, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "ringing",
|
|
}))
|
|
}
|
|
|
|
ctx2, cancel := context.WithTimeout(ctx, removeCallStatusTTL*2)
|
|
defer cancel()
|
|
|
|
if msg, err := client.RunUntilMessage(ctx2); assert.NoError(err) {
|
|
assert.NoError(checkMessageTransientRemove(msg, key, map[string]interface{}{
|
|
"callid": callId,
|
|
"status": "cleared",
|
|
"cause": clearedCause,
|
|
}))
|
|
}
|
|
}
|
|
|
|
func TestGracefulShutdownInitial(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
hub, _, _, _ := CreateHubForTest(t)
|
|
|
|
hub.ScheduleShutdown()
|
|
<-hub.ShutdownChannel()
|
|
}
|
|
|
|
func TestGracefulShutdownOnBye(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
_, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
hub.ScheduleShutdown()
|
|
select {
|
|
case <-hub.ShutdownChannel():
|
|
assert.Fail("should not have shutdown")
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
client.CloseWithBye()
|
|
|
|
select {
|
|
case <-hub.ShutdownChannel():
|
|
case <-time.After(100 * time.Millisecond):
|
|
assert.Fail("should have shutdown")
|
|
}
|
|
}
|
|
|
|
func TestGracefulShutdownOnExpiration(t *testing.T) {
|
|
t.Parallel()
|
|
CatchLogForTest(t)
|
|
require := require.New(t)
|
|
assert := assert.New(t)
|
|
hub, _, _, server := CreateHubForTest(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
|
defer cancel()
|
|
|
|
client := NewTestClient(t, server, hub)
|
|
defer client.CloseWithBye()
|
|
|
|
require.NoError(client.SendHello(testDefaultUserId))
|
|
|
|
_, err := client.RunUntilHello(ctx)
|
|
require.NoError(err)
|
|
|
|
hub.ScheduleShutdown()
|
|
select {
|
|
case <-hub.ShutdownChannel():
|
|
assert.Fail("should not have shutdown")
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
client.Close()
|
|
select {
|
|
case <-hub.ShutdownChannel():
|
|
assert.Fail("should not have shutdown")
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
performHousekeeping(hub, time.Now().Add(sessionExpireDuration+time.Second))
|
|
|
|
select {
|
|
case <-hub.ShutdownChannel():
|
|
case <-time.After(100 * time.Millisecond):
|
|
assert.Fail("should have shutdown")
|
|
}
|
|
}
|