0
0
Fork 0
mirror of https://github.com/strukturag/nextcloud-spreed-signaling.git synced 2025-03-14 19:42:48 +00:00

Add some Janus tests.

This commit is contained in:
Joachim Bauch 2024-11-07 16:57:31 +01:00
parent 469e97f483
commit 6038d66730
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

584
mcu_janus_test.go Normal file
View file

@ -0,0 +1,584 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2024 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/dlintw/goconf"
"github.com/notedit/janus-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type TestJanusHandle struct {
id uint64
}
type TestJanusRoom struct {
id uint64
}
type TestJanusHandler func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg)
type TestJanusGateway struct {
t *testing.T
sid atomic.Uint64
tid atomic.Uint64
hid atomic.Uint64
rid atomic.Uint64
mu sync.Mutex
sessions map[uint64]*JanusSession
transactions map[uint64]*transaction
handles map[uint64]*TestJanusHandle
rooms map[uint64]*TestJanusRoom
handlers map[string]TestJanusHandler
}
func NewTestJanusGateway(t *testing.T) *TestJanusGateway {
gateway := &TestJanusGateway{
t: t,
sessions: make(map[uint64]*JanusSession),
transactions: make(map[uint64]*transaction),
handles: make(map[uint64]*TestJanusHandle),
rooms: make(map[uint64]*TestJanusRoom),
handlers: make(map[string]TestJanusHandler),
}
t.Cleanup(func() {
assert := assert.New(t)
gateway.mu.Lock()
defer gateway.mu.Unlock()
assert.Len(gateway.sessions, 0)
assert.Len(gateway.transactions, 0)
assert.Len(gateway.handles, 0)
assert.Len(gateway.rooms, 0)
})
return gateway
}
func (g *TestJanusGateway) registerHandlers(handlers map[string]TestJanusHandler) {
g.mu.Lock()
defer g.mu.Unlock()
for name, handler := range handlers {
g.handlers[name] = handler
}
}
func (g *TestJanusGateway) Info(ctx context.Context) (*InfoMsg, error) {
return &InfoMsg{
Name: "TestJanus",
Version: 1400,
VersionString: "1.4.0",
Author: "struktur AG",
DataChannels: true,
FullTrickle: true,
Plugins: map[string]janus.PluginInfo{
pluginVideoRoom: {
Name: "Test VideoRoom plugin",
VersionString: "0.0.0",
Author: "struktur AG",
},
},
}, nil
}
func (g *TestJanusGateway) Create(ctx context.Context) (*JanusSession, error) {
sid := g.sid.Add(1)
session := &JanusSession{
Id: sid,
Handles: make(map[uint64]*JanusHandle),
gateway: g,
}
g.mu.Lock()
defer g.mu.Unlock()
g.sessions[sid] = session
return session, nil
}
func (g *TestJanusGateway) Close() error {
return nil
}
func (g *TestJanusGateway) processMessage(session *JanusSession, handle *TestJanusHandle, body map[string]interface{}) interface{} {
request := body["request"].(string)
switch request {
case "create":
room := &TestJanusRoom{
id: g.rid.Add(1),
}
g.rooms[room.id] = room
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{
"room": room.id,
},
},
}
case "join":
rid := body["room"].(float64)
room := g.rooms[uint64(rid)]
if room == nil {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM,
Reason: "Room not found",
},
}
}
assert.Equal(g.t, "publisher", body["ptype"])
return &janus.EventMsg{
Session: session.Id,
Handle: handle.id,
Plugindata: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{
"room": room.id,
},
},
}
case "destroy":
rid := body["room"].(float64)
room := g.rooms[uint64(rid)]
if room == nil {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM,
Reason: "Room not found",
},
}
}
delete(g.rooms, uint64(rid))
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{},
},
}
default:
rid := body["room"].(float64)
room := g.rooms[uint64(rid)]
if room == nil {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM,
Reason: "Room not found",
},
}
}
handler, found := g.handlers[request]
if found {
var err *janus.ErrorMsg
result, err := handler(room, body)
if err != nil {
result = err
}
return result
}
}
return nil
}
func (g *TestJanusGateway) processRequest(msg map[string]interface{}) interface{} {
method, found := msg["janus"]
if !found {
return nil
}
sid := msg["session_id"].(float64)
g.mu.Lock()
defer g.mu.Unlock()
session := g.sessions[uint64(sid)]
if session == nil {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_ERROR_SESSION_NOT_FOUND,
Reason: "Session not found",
},
}
}
switch method {
case "attach":
handle := &TestJanusHandle{
id: g.hid.Add(1),
}
g.handles[handle.id] = handle
return &janus.SuccessMsg{
Data: janus.SuccessData{
ID: handle.id,
},
}
case "detach":
hid := msg["handle_id"].(float64)
handle, found := g.handles[uint64(hid)]
if found {
delete(g.handles, handle.id)
}
if handle == nil {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_ERROR_HANDLE_NOT_FOUND,
Reason: "Handle not found",
},
}
}
return &janus.AckMsg{}
case "destroy":
delete(g.sessions, session.Id)
return &janus.AckMsg{}
case "message":
hid := msg["handle_id"].(float64)
handle, found := g.handles[uint64(hid)]
if !found {
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_ERROR_HANDLE_NOT_FOUND,
Reason: "Handle not found",
},
}
}
body := msg["body"].(map[string]interface{})
return g.processMessage(session, handle, body)
}
return nil
}
func (g *TestJanusGateway) send(msg map[string]interface{}, t *transaction) (uint64, error) {
tid := g.tid.Add(1)
data, err := json.Marshal(msg)
require.NoError(g.t, err)
err = json.Unmarshal(data, &msg)
require.NoError(g.t, err)
go t.run()
g.mu.Lock()
defer g.mu.Unlock()
g.transactions[tid] = t
go func() {
result := g.processRequest(msg)
if !assert.NotNil(g.t, result, "Unsupported request %+v", msg) {
result = &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_ERROR_UNKNOWN,
Reason: "Not implemented",
},
}
}
t.add(result)
}()
return tid, nil
}
func (g *TestJanusGateway) removeTransaction(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.transactions, id)
}
func (g *TestJanusGateway) removeSession(session *JanusSession) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.sessions, session.Id)
}
func newMcuJanusForTesting(t *testing.T) (*mcuJanus, *TestJanusGateway) {
gateway := NewTestJanusGateway(t)
config := goconf.NewConfigFile()
mcu, err := NewMcuJanus(context.Background(), "", config)
require.NoError(t, err)
t.Cleanup(func() {
mcu.Stop()
})
mcuJanus := mcu.(*mcuJanus)
mcuJanus.createJanusGateway = func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) {
return gateway, nil
}
require.NoError(t, mcu.Start(context.Background()))
return mcuJanus, gateway
}
type TestMcuListener struct {
id string
}
func (t *TestMcuListener) PublicId() string {
return t.id
}
func (t *TestMcuListener) OnUpdateOffer(client McuClient, offer map[string]interface{}) {
}
func (t *TestMcuListener) OnIceCandidate(client McuClient, candidate interface{}) {
}
func (t *TestMcuListener) OnIceCompleted(client McuClient) {
}
func (t *TestMcuListener) SubscriberSidUpdated(subscriber McuSubscriber) {
}
func (t *TestMcuListener) PublisherClosed(publisher McuPublisher) {
}
func (t *TestMcuListener) SubscriberClosed(subscriber McuSubscriber) {
}
type TestMcuController struct {
id string
}
func (c *TestMcuController) PublisherId() string {
return c.id
}
func (c *TestMcuController) StartPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error {
// TODO: Check parameters?
return nil
}
func (c *TestMcuController) StopPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error {
// TODO: Check parameters?
return nil
}
func (c *TestMcuController) GetStreams(ctx context.Context) ([]PublisherStream, error) {
streams := []PublisherStream{
{
Mid: "0",
Mindex: 0,
Type: "audio",
Codec: "opus",
},
}
return streams, nil
}
type TestMcuInitiator struct {
country string
}
func (i *TestMcuInitiator) Country() string {
return i.country
}
func Test_JanusPublisherSubscriber(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
require := require.New(t)
mcu, gateway := newMcuJanusForTesting(t)
gateway.registerHandlers(map[string]TestJanusHandler{})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "publisher-id"
listener1 := &TestMcuListener{
id: pubId,
}
settings1 := NewPublisherSettings{}
initiator1 := &TestMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, listener1, pubId, "sid", StreamTypeVideo, settings1, initiator1)
require.NoError(err)
defer pub.Close(context.Background())
listener2 := &TestMcuListener{
id: pubId,
}
initiator2 := &TestMcuInitiator{
country: "DE",
}
sub, err := mcu.NewSubscriber(ctx, listener2, pubId, StreamTypeVideo, initiator2)
require.NoError(err)
defer sub.Close(context.Background())
}
func Test_JanusSubscriberPublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
require := require.New(t)
mcu, gateway := newMcuJanusForTesting(t)
gateway.registerHandlers(map[string]TestJanusHandler{})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "publisher-id"
listener1 := &TestMcuListener{
id: pubId,
}
settings1 := NewPublisherSettings{}
initiator1 := &TestMcuInitiator{
country: "DE",
}
ready := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(100 * time.Millisecond)
pub, err := mcu.NewPublisher(ctx, listener1, pubId, "sid", StreamTypeVideo, settings1, initiator1)
require.NoError(err)
defer func() {
<-ready
pub.Close(context.Background())
}()
}()
listener2 := &TestMcuListener{
id: pubId,
}
initiator2 := &TestMcuInitiator{
country: "DE",
}
sub, err := mcu.NewSubscriber(ctx, listener2, pubId, StreamTypeVideo, initiator2)
require.NoError(err)
defer sub.Close(context.Background())
close(ready)
<-done
}
func Test_JanusRemotePublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
assert := assert.New(t)
require := require.New(t)
var added atomic.Int32
var removed atomic.Int32
mcu, gateway := newMcuJanusForTesting(t)
gateway.registerHandlers(map[string]TestJanusHandler{
"add_remote_publisher": func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg) {
assert.EqualValues(1, room.id)
if streams := body["streams"].([]interface{}); assert.Len(streams, 1) {
stream := streams[0].(map[string]interface{})
assert.Equal("0", stream["mid"])
assert.EqualValues(0, stream["mindex"])
assert.Equal("audio", stream["type"])
assert.Equal("opus", stream["codec"])
}
added.Add(1)
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{
"id": 12345,
"port": 10000,
"rtcp_port": 10001,
},
},
}, nil
},
"remove_remote_publisher": func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg) {
assert.EqualValues(1, room.id)
removed.Add(1)
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{},
},
}, nil
},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
listener1 := &TestMcuListener{
id: "publisher-id",
}
controller := &TestMcuController{
id: listener1.id,
}
pub, err := mcu.NewRemotePublisher(ctx, listener1, controller, StreamTypeVideo)
require.NoError(err)
defer pub.Close(context.Background())
assert.EqualValues(1, added.Load())
assert.EqualValues(0, removed.Load())
listener2 := &TestMcuListener{
id: "subscriber-id",
}
sub, err := mcu.NewRemoteSubscriber(ctx, listener2, pub)
require.NoError(err)
defer sub.Close(context.Background())
pub.Close(context.Background())
assert.EqualValues(1, added.Load())
// The publisher is ref-counted, and still referenced by the subscriber.
assert.EqualValues(0, removed.Load())
sub.Close(context.Background())
assert.EqualValues(1, added.Load())
assert.EqualValues(1, removed.Load())
}