strukturag_nextcloud-spreed.../proxy/proxy_server_test.go
2024-11-11 10:47:05 +01:00

1348 lines
36 KiB
Go

/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"net"
"net/http/httptest"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/dlintw/goconf"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
const (
KeypairSizeForTest = 2048
TokenIdForTest = "foo"
testTimeout = 10 * time.Second
)
func getWebsocketUrl(url string) string {
if strings.HasPrefix(url, "http://") {
return "ws://" + url[7:] + "/proxy"
} else if strings.HasPrefix(url, "https://") {
return "wss://" + url[8:] + "/proxy"
} else {
panic("Unsupported URL: " + url)
}
}
func WaitForProxyServer(ctx context.Context, t *testing.T, proxy *ProxyServer) {
// Wait for any channel messages to be processed.
time.Sleep(10 * time.Millisecond)
proxy.Stop()
for {
proxy.clientsLock.RLock()
clients := len(proxy.clients)
proxy.clientsLock.RUnlock()
proxy.sessionsLock.RLock()
sessions := len(proxy.sessions)
proxy.sessionsLock.RUnlock()
proxy.remoteConnectionsLock.Lock()
remoteConnections := len(proxy.remoteConnections)
proxy.remoteConnectionsLock.Unlock()
if clients == 0 &&
sessions == 0 &&
remoteConnections == 0 {
break
}
select {
case <-ctx.Done():
proxy.clientsLock.Lock()
proxy.remoteConnectionsLock.Lock()
assert.Fail(t, fmt.Sprintf("Error waiting for clients %+v / sessions %+v / remoteConnections %+v to terminate: %+v", proxy.clients, proxy.sessions, proxy.remoteConnections, ctx.Err()))
proxy.remoteConnectionsLock.Unlock()
proxy.clientsLock.Unlock()
return
default:
time.Sleep(time.Millisecond)
}
}
}
func newProxyServerForTest(t *testing.T) (*ProxyServer, *rsa.PrivateKey, *httptest.Server) {
require := require.New(t)
tempdir := t.TempDir()
var proxy *ProxyServer
t.Cleanup(func() {
if proxy != nil {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
WaitForProxyServer(ctx, t, proxy)
}
})
r := mux.NewRouter()
key, err := rsa.GenerateKey(rand.Reader, KeypairSizeForTest)
require.NoError(err)
priv := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
}
privkey, err := os.CreateTemp(tempdir, "privkey*.pem")
require.NoError(err)
require.NoError(pem.Encode(privkey, priv))
require.NoError(privkey.Close())
pubData, err := x509.MarshalPKIXPublicKey(&key.PublicKey)
require.NoError(err)
pub := &pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: pubData,
}
pubkey, err := os.CreateTemp(tempdir, "pubkey*.pem")
require.NoError(err)
require.NoError(pem.Encode(pubkey, pub))
require.NoError(pubkey.Close())
config := goconf.NewConfigFile()
config.AddOption("tokens", TokenIdForTest, pubkey.Name())
proxy, err = NewProxyServer(r, "0.0", config)
require.NoError(err)
server := httptest.NewServer(r)
t.Cleanup(func() {
server.Close()
})
return proxy, key, server
}
func TestTokenValid(t *testing.T) {
signaling.CatchLogForTest(t)
proxy, key, _ := newProxyServerForTest(t)
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(t, err)
hello := &signaling.HelloProxyClientMessage{
Version: "1.0",
Token: tokenString,
}
if session, err := proxy.NewSession(hello); assert.NoError(t, err) {
defer proxy.DeleteSession(session.Sid())
}
}
func TestTokenNotSigned(t *testing.T) {
signaling.CatchLogForTest(t)
proxy, _, _ := newProxyServerForTest(t)
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodNone, claims)
tokenString, err := token.SignedString(jwt.UnsafeAllowNoneSignatureType)
require.NoError(t, err)
hello := &signaling.HelloProxyClientMessage{
Version: "1.0",
Token: tokenString,
}
if session, err := proxy.NewSession(hello); !assert.ErrorIs(t, err, TokenAuthFailed) {
if session != nil {
defer proxy.DeleteSession(session.Sid())
}
}
}
func TestTokenUnknown(t *testing.T) {
signaling.CatchLogForTest(t)
proxy, key, _ := newProxyServerForTest(t)
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest + "2",
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(t, err)
hello := &signaling.HelloProxyClientMessage{
Version: "1.0",
Token: tokenString,
}
if session, err := proxy.NewSession(hello); !assert.ErrorIs(t, err, TokenAuthFailed) {
if session != nil {
defer proxy.DeleteSession(session.Sid())
}
}
}
func TestTokenInFuture(t *testing.T) {
signaling.CatchLogForTest(t)
proxy, key, _ := newProxyServerForTest(t)
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(time.Hour)),
Issuer: TokenIdForTest,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(t, err)
hello := &signaling.HelloProxyClientMessage{
Version: "1.0",
Token: tokenString,
}
if session, err := proxy.NewSession(hello); !assert.ErrorIs(t, err, TokenNotValidYet) {
if session != nil {
defer proxy.DeleteSession(session.Sid())
}
}
}
func TestTokenExpired(t *testing.T) {
signaling.CatchLogForTest(t)
proxy, key, _ := newProxyServerForTest(t)
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge * 2)),
Issuer: TokenIdForTest,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(t, err)
hello := &signaling.HelloProxyClientMessage{
Version: "1.0",
Token: tokenString,
}
if session, err := proxy.NewSession(hello); !assert.ErrorIs(t, err, TokenExpired) {
if session != nil {
defer proxy.DeleteSession(session.Sid())
}
}
}
func TestPublicIPs(t *testing.T) {
assert := assert.New(t)
public := []string{
"8.8.8.8",
"172.15.1.2",
"172.32.1.2",
"192.167.0.1",
"192.169.0.1",
}
private := []string{
"127.0.0.1",
"10.1.2.3",
"172.16.1.2",
"172.31.1.2",
"192.168.0.1",
"192.168.254.254",
}
for _, s := range public {
ip := net.ParseIP(s)
if assert.NotEmpty(ip, "invalid IP: %s", s) {
assert.True(IsPublicIP(ip), "should be public IP: %s", s)
}
}
for _, s := range private {
ip := net.ParseIP(s)
if assert.NotEmpty(ip, "invalid IP: %s", s) {
assert.False(IsPublicIP(ip), "should be private IP: %s", s)
}
}
}
func TestWebsocketFeatures(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
_, _, server := newProxyServerForTest(t)
conn, response, err := websocket.DefaultDialer.DialContext(context.Background(), getWebsocketUrl(server.URL), nil)
require.NoError(t, err)
defer conn.Close() // nolint
if server := response.Header.Get("Server"); !strings.HasPrefix(server, "nextcloud-spreed-signaling-proxy/") {
assert.Fail("expected valid server header, got \"%s\"", server)
}
features := response.Header.Get("X-Spreed-Signaling-Features")
featuresList := make(map[string]bool)
for _, f := range strings.Split(features, ",") {
f = strings.TrimSpace(f)
if f != "" {
if _, found := featuresList[f]; found {
assert.Fail("duplicate feature id \"%s\" in \"%s\"", f, features)
}
featuresList[f] = true
}
}
assert.NotEmpty(featuresList, "expected valid features header, got \"%s\"", features)
if _, found := featuresList["remote-streams"]; !found {
assert.Fail("expected feature \"remote-streams\", got \"%s\"", features)
}
assert.NoError(conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{}))
}
func TestProxyCreateSession(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
_, key, server := newProxyServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
}
type TestMCU struct {
t *testing.T
}
func (m *TestMCU) Start(ctx context.Context) error {
return nil
}
func (m *TestMCU) Stop() {
}
func (m *TestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *TestMCU) SetOnConnected(f func()) {
}
func (m *TestMCU) SetOnDisconnected(f func()) {
}
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
return nil, errors.New("not implemented")
}
func (m *TestMCU) NewSubscriber(ctx context.Context, listener signaling.McuListener, publisher string, streamType signaling.StreamType, initiator signaling.McuInitiator) (signaling.McuSubscriber, error) {
return nil, errors.New("not implemented")
}
type TestMCUPublisher struct {
id string
sid string
streamType signaling.StreamType
}
func (p *TestMCUPublisher) Id() string {
return p.id
}
func (p *TestMCUPublisher) Sid() string {
return p.sid
}
func (p *TestMCUPublisher) StreamType() signaling.StreamType {
return p.streamType
}
func (p *TestMCUPublisher) MaxBitrate() int {
return 0
}
func (p *TestMCUPublisher) Close(ctx context.Context) {
}
func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *signaling.MessageClientMessage, data *signaling.MessageClientMessageData, callback func(error, map[string]interface{})) {
callback(errors.New("not implemented"), nil)
}
func (p *TestMCUPublisher) HasMedia(signaling.MediaType) bool {
return false
}
func (p *TestMCUPublisher) SetMedia(mediaTypes signaling.MediaType) {
}
func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]signaling.PublisherStream, error) {
return nil, errors.New("not implemented")
}
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("not implemented")
}
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("not implemented")
}
type HangingTestMCU struct {
TestMCU
ctx context.Context
creating chan struct{}
created chan struct{}
cancelled atomic.Bool
}
func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
ctx, closeFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
closeFunc()
})
return &HangingTestMCU{
TestMCU: TestMCU{
t: t,
},
ctx: ctx,
creating: make(chan struct{}),
created: make(chan struct{}),
}
}
func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()
m.creating <- struct{}{}
defer func() {
m.created <- struct{}{}
}()
select {
case <-ctx.Done():
m.cancelled.Store(true)
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
}
}
func (m *HangingTestMCU) NewSubscriber(ctx context.Context, listener signaling.McuListener, publisher string, streamType signaling.StreamType, initiator signaling.McuInitiator) (signaling.McuSubscriber, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()
m.creating <- struct{}{}
defer func() {
m.created <- struct{}{}
}()
select {
case <-ctx.Done():
m.cancelled.Store(true)
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
}
}
func TestProxyCancelOnClose(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewHangingTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
},
}))
// Simulate expired session while request is still being processed.
go func() {
<-mcu.creating
if session := proxy.GetSession(1); assert.NotNil(session) {
session.Close()
}
}()
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
if err := checkMessageType(message, "bye"); assert.NoError(err) {
assert.Equal("session_closed", message.Bye.Reason)
}
}
if message, err := client.RunUntilMessage(ctx); assert.Error(err) {
assert.True(websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived), "expected close error, got %+v", err)
} else {
t.Errorf("expected error, got %+v", message)
}
<-mcu.created
assert.True(mcu.cancelled.Load())
}
type CodecsTestMCU struct {
TestMCU
}
func NewCodecsTestMCU(t *testing.T) *CodecsTestMCU {
return &CodecsTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
func (m *CodecsTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
assert.Equal(m.t, "opus,g722", settings.AudioCodec)
assert.Equal(m.t, "vp9,vp8,av1", settings.VideoCodec)
return &TestMCUPublisher{
id: id,
sid: sid,
streamType: streamType,
}, nil
}
func TestProxyCodecs(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewCodecsTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
AudioCodec: "opus,g722",
VideoCodec: "vp9,vp8,av1",
},
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
assert.NotEmpty(message.Command.Id)
}
}
}
type RemoteSubscriberTestMCU struct {
TestMCU
publisher *TestRemotePublisher
subscriber *TestRemoteSubscriber
}
func NewRemoteSubscriberTestMCU(t *testing.T) *RemoteSubscriberTestMCU {
return &RemoteSubscriberTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
type TestRemotePublisher struct {
t *testing.T
streamType signaling.StreamType
refcnt atomic.Int32
closed context.Context
closeFunc context.CancelFunc
}
func (p *TestRemotePublisher) Id() string {
return "id"
}
func (p *TestRemotePublisher) Sid() string {
return "sid"
}
func (p *TestRemotePublisher) StreamType() signaling.StreamType {
return p.streamType
}
func (p *TestRemotePublisher) MaxBitrate() int {
return 0
}
func (p *TestRemotePublisher) Close(ctx context.Context) {
if count := p.refcnt.Add(-1); assert.True(p.t, count >= 0) && count == 0 {
p.closeFunc()
}
}
func (p *TestRemotePublisher) SendMessage(ctx context.Context, message *signaling.MessageClientMessage, data *signaling.MessageClientMessageData, callback func(error, map[string]interface{})) {
callback(errors.New("not implemented"), nil)
}
func (p *TestRemotePublisher) Port() int {
return 1
}
func (p *TestRemotePublisher) RtcpPort() int {
return 2
}
func (m *RemoteSubscriberTestMCU) NewRemotePublisher(ctx context.Context, listener signaling.McuListener, controller signaling.RemotePublisherController, streamType signaling.StreamType) (signaling.McuRemotePublisher, error) {
require.Nil(m.t, m.publisher)
assert.EqualValues(m.t, "video", streamType)
closeCtx, closeFunc := context.WithCancel(context.Background())
m.publisher = &TestRemotePublisher{
t: m.t,
streamType: streamType,
closed: closeCtx,
closeFunc: closeFunc,
}
m.publisher.refcnt.Add(1)
return m.publisher, nil
}
type TestRemoteSubscriber struct {
t *testing.T
publisher *TestRemotePublisher
closed context.Context
closeFunc context.CancelFunc
}
func (s *TestRemoteSubscriber) Id() string {
return "id"
}
func (s *TestRemoteSubscriber) Sid() string {
return "sid"
}
func (s *TestRemoteSubscriber) StreamType() signaling.StreamType {
return s.publisher.StreamType()
}
func (s *TestRemoteSubscriber) MaxBitrate() int {
return 0
}
func (s *TestRemoteSubscriber) Close(ctx context.Context) {
s.publisher.Close(ctx)
s.closeFunc()
}
func (s *TestRemoteSubscriber) SendMessage(ctx context.Context, message *signaling.MessageClientMessage, data *signaling.MessageClientMessageData, callback func(error, map[string]interface{})) {
callback(errors.New("not implemented"), nil)
}
func (s *TestRemoteSubscriber) Publisher() string {
return s.publisher.Id()
}
func (m *RemoteSubscriberTestMCU) NewRemoteSubscriber(ctx context.Context, listener signaling.McuListener, publisher signaling.McuRemotePublisher) (signaling.McuRemoteSubscriber, error) {
require.Nil(m.t, m.subscriber)
pub, ok := publisher.(*TestRemotePublisher)
require.True(m.t, ok)
closeCtx, closeFunc := context.WithCancel(context.Background())
m.subscriber = &TestRemoteSubscriber{
t: m.t,
publisher: pub,
closed: closeCtx,
closeFunc: closeFunc,
}
pub.refcnt.Add(1)
return m.subscriber, nil
}
func TestProxyRemoteSubscriber(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewRemoteSubscriberTestMCU(t)
proxy.mcu = mcu
// Unused but must be set so remote subscribing works
proxy.tokenId = "token"
proxy.tokenKey = key
proxy.remoteHostname = "test-hostname"
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
publisherId := "the-publisher-id"
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest,
Subject: publisherId,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-subscriber",
StreamType: signaling.StreamTypeVideo,
PublisherId: publisherId,
RemoteUrl: "https://remote-hostname",
RemoteToken: tokenString,
},
}))
var clientId string
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
clientId = message.Command.Id
}
}
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "3456",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "delete-subscriber",
ClientId: clientId,
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("3456", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
assert.Equal(clientId, message.Command.Id)
}
}
if assert.NotNil(mcu.publisher) && assert.NotNil(mcu.subscriber) {
select {
case <-mcu.subscriber.closed.Done():
case <-ctx.Done():
assert.Fail("subscriber was not closed")
}
select {
case <-mcu.publisher.closed.Done():
case <-ctx.Done():
assert.Fail("publisher was not closed")
}
}
}
func TestProxyCloseRemoteOnSessionClose(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewRemoteSubscriberTestMCU(t)
proxy.mcu = mcu
// Unused but must be set so remote subscribing works
proxy.tokenId = "token"
proxy.tokenKey = key
proxy.remoteHostname = "test-hostname"
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
publisherId := "the-publisher-id"
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest,
Subject: publisherId,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
require.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-subscriber",
StreamType: signaling.StreamTypeVideo,
PublisherId: publisherId,
RemoteUrl: "https://remote-hostname",
RemoteToken: tokenString,
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
// Closing the session will cause any active remote publishers stop be stopped.
client.CloseWithBye()
if assert.NotNil(mcu.publisher) && assert.NotNil(mcu.subscriber) {
select {
case <-mcu.subscriber.closed.Done():
case <-ctx.Done():
assert.Fail("subscriber was not closed")
}
select {
case <-mcu.publisher.closed.Done():
case <-ctx.Done():
assert.Fail("publisher was not closed")
}
}
}
type UnpublishRemoteTestMCU struct {
TestMCU
publisher atomic.Pointer[UnpublishRemoteTestPublisher]
}
func NewUnpublishRemoteTestMCU(t *testing.T) *UnpublishRemoteTestMCU {
return &UnpublishRemoteTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
type UnpublishRemoteTestPublisher struct {
TestMCUPublisher
t *testing.T
mu sync.RWMutex
remoteId string
remoteData *remotePublisherData
}
func (m *UnpublishRemoteTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
publisher := &UnpublishRemoteTestPublisher{
TestMCUPublisher: TestMCUPublisher{
id: id,
sid: sid,
streamType: streamType,
},
t: m.t,
}
m.publisher.Store(publisher)
return publisher, nil
}
func (p *UnpublishRemoteTestPublisher) getRemoteId() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.remoteId
}
func (p *UnpublishRemoteTestPublisher) getRemoteData() *remotePublisherData {
p.mu.RLock()
defer p.mu.RUnlock()
return p.remoteData
}
func (p *UnpublishRemoteTestPublisher) clearRemote() {
p.mu.Lock()
defer p.mu.Unlock()
p.remoteId = ""
p.remoteData = nil
}
func (p *UnpublishRemoteTestPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
p.mu.Lock()
defer p.mu.Unlock()
if assert.Empty(p.t, p.remoteId) {
p.remoteId = remoteId
p.remoteData = &remotePublisherData{
hostname: hostname,
port: port,
rtcpPort: rtcpPort,
}
}
return nil
}
func (p *UnpublishRemoteTestPublisher) UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
p.mu.Lock()
defer p.mu.Unlock()
assert.Equal(p.t, remoteId, p.remoteId)
if remoteData := p.remoteData; assert.NotNil(p.t, remoteData) &&
assert.Equal(p.t, remoteData.hostname, hostname) &&
assert.EqualValues(p.t, remoteData.port, port) &&
assert.EqualValues(p.t, remoteData.rtcpPort, rtcpPort) {
p.remoteId = ""
p.remoteData = nil
}
return nil
}
func TestProxyUnpublishRemote(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewUnpublishRemoteTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewProxyTestClient(ctx, t, server.URL)
defer client1.CloseWithBye()
require.NoError(client1.SendHello(key))
if hello, err := client1.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client1.RunUntilLoad(ctx, 0)
assert.NoError(err)
publisherId := "the-publisher-id"
require.NoError(client1.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
PublisherId: publisherId,
Sid: "1234-abcd",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
Bitrate: 1234567,
MediaTypes: signaling.MediaTypeAudio | signaling.MediaTypeVideo,
},
},
}))
var clientId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
clientId = message.Command.Id
}
}
client2 := NewProxyTestClient(ctx, t, server.URL)
defer client2.CloseWithBye()
require.NoError(client2.SendHello(key))
hello2, err := client2.RunUntilHello(ctx)
if assert.NoError(err) {
assert.NotEmpty(hello2.Hello.SessionId, "%+v", hello2)
}
_, err = client2.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client2.WriteJSON(&signaling.ProxyClientMessage{
Id: "3456",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "publish-remote",
StreamType: signaling.StreamTypeVideo,
ClientId: clientId,
Hostname: "remote-host",
Port: 10001,
RtcpPort: 10002,
},
}))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("3456", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Equal(hello2.Hello.SessionId, publisher.getRemoteId())
if remoteData := publisher.getRemoteData(); assert.NotNil(remoteData) {
assert.Equal("remote-host", remoteData.hostname)
assert.EqualValues(10001, remoteData.port)
assert.EqualValues(10002, remoteData.rtcpPort)
}
}
require.NoError(client2.WriteJSON(&signaling.ProxyClientMessage{
Id: "4567",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "unpublish-remote",
StreamType: signaling.StreamTypeVideo,
ClientId: clientId,
Hostname: "remote-host",
Port: 10001,
RtcpPort: 10002,
},
}))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("4567", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Empty(publisher.getRemoteId())
assert.Nil(publisher.getRemoteData())
}
}
func TestProxyUnpublishRemotePublisherClosed(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewUnpublishRemoteTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewProxyTestClient(ctx, t, server.URL)
defer client1.CloseWithBye()
require.NoError(client1.SendHello(key))
if hello, err := client1.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client1.RunUntilLoad(ctx, 0)
assert.NoError(err)
publisherId := "the-publisher-id"
require.NoError(client1.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
PublisherId: publisherId,
Sid: "1234-abcd",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
Bitrate: 1234567,
MediaTypes: signaling.MediaTypeAudio | signaling.MediaTypeVideo,
},
},
}))
var clientId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
clientId = message.Command.Id
}
}
client2 := NewProxyTestClient(ctx, t, server.URL)
defer client2.CloseWithBye()
require.NoError(client2.SendHello(key))
hello2, err := client2.RunUntilHello(ctx)
if assert.NoError(err) {
assert.NotEmpty(hello2.Hello.SessionId, "%+v", hello2)
}
_, err = client2.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client2.WriteJSON(&signaling.ProxyClientMessage{
Id: "3456",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "publish-remote",
StreamType: signaling.StreamTypeVideo,
ClientId: clientId,
Hostname: "remote-host",
Port: 10001,
RtcpPort: 10002,
},
}))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("3456", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Equal(hello2.Hello.SessionId, publisher.getRemoteId())
if remoteData := publisher.getRemoteData(); assert.NotNil(remoteData) {
assert.Equal("remote-host", remoteData.hostname)
assert.EqualValues(10001, remoteData.port)
assert.EqualValues(10002, remoteData.rtcpPort)
}
}
require.NoError(client1.WriteJSON(&signaling.ProxyClientMessage{
Id: "4567",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "delete-publisher",
ClientId: clientId,
},
}))
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("4567", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
// Remote publishing was not stopped explicitly...
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Equal(hello2.Hello.SessionId, publisher.getRemoteId())
if remoteData := publisher.getRemoteData(); assert.NotNil(remoteData) {
assert.Equal("remote-host", remoteData.hostname)
assert.EqualValues(10001, remoteData.port)
assert.EqualValues(10002, remoteData.rtcpPort)
}
}
// ...but the session no longer contains information on the remote publisher.
if data, err := proxy.cookie.DecodePublic(hello2.Hello.SessionId); assert.NoError(err) {
session := proxy.GetSession(data.Sid)
if assert.NotNil(session) {
session.remotePublishersLock.Lock()
defer session.remotePublishersLock.Unlock()
assert.Empty(session.remotePublishers)
}
}
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
publisher.clearRemote()
}
}
func TestProxyUnpublishRemoteOnSessionClose(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewUnpublishRemoteTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewProxyTestClient(ctx, t, server.URL)
defer client1.CloseWithBye()
require.NoError(client1.SendHello(key))
if hello, err := client1.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client1.RunUntilLoad(ctx, 0)
assert.NoError(err)
publisherId := "the-publisher-id"
require.NoError(client1.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
PublisherId: publisherId,
Sid: "1234-abcd",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
Bitrate: 1234567,
MediaTypes: signaling.MediaTypeAudio | signaling.MediaTypeVideo,
},
},
}))
var clientId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
clientId = message.Command.Id
}
}
client2 := NewProxyTestClient(ctx, t, server.URL)
defer client2.CloseWithBye()
require.NoError(client2.SendHello(key))
hello2, err := client2.RunUntilHello(ctx)
if assert.NoError(err) {
assert.NotEmpty(hello2.Hello.SessionId, "%+v", hello2)
}
_, err = client2.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client2.WriteJSON(&signaling.ProxyClientMessage{
Id: "3456",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "publish-remote",
StreamType: signaling.StreamTypeVideo,
ClientId: clientId,
Hostname: "remote-host",
Port: 10001,
RtcpPort: 10002,
},
}))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("3456", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
require.NotEmpty(message.Command.Id)
}
}
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Equal(hello2.Hello.SessionId, publisher.getRemoteId())
if remoteData := publisher.getRemoteData(); assert.NotNil(remoteData) {
assert.Equal("remote-host", remoteData.hostname)
assert.EqualValues(10001, remoteData.port)
assert.EqualValues(10002, remoteData.rtcpPort)
}
}
// Closing the session will cause any active remote publishers stop be stopped.
client2.CloseWithBye()
if publisher := mcu.publisher.Load(); assert.NotNil(publisher) {
assert.Empty(publisher.getRemoteId())
assert.Nil(publisher.getRemoteData())
}
}