from unittest.mock import AsyncMock, Mock import pytest from channels.testing import WebsocketCommunicator from baserow.config.asgi import application from baserow.ws.auth import ANONYMOUS_USER_TOKEN from baserow.ws.consumers import CoreConsumer, PageContext, PageScope, SubscribedPages from baserow.ws.registries import PageType, page_registry class AcceptingTestPageType(PageType): type = "test_page_type" parameters = ["test_param"] def can_add(self, user, web_socket_id, test_param, **kwargs): return True def get_group_name(self, test_param, **kwargs): return f"test-page-{test_param}" def get_permission_channel_group_name(self, test_param, **kwargs): return f"permissions-test-page-{test_param}" class NotAcceptingTestPageType(AcceptingTestPageType): type = "test_page_type_not_accepting" def can_add(self, user, web_socket_id, test_param, **kwargs): return False class DifferentPermissionsGroupTestPageType(PageType): type = "diff_perm_page_type" parameters = ["test_param"] def get_group_name(self, test_param, **kwargs): return f"different-perm-group-{test_param}" def get_permission_channel_group_name(self, test_param, **kwargs): return f"permissions-different-perm-group-{test_param}" @pytest.fixture def test_page_types(): page_types = ( AcceptingTestPageType(), NotAcceptingTestPageType(), DifferentPermissionsGroupTestPageType(), ) page_registry.register(page_types[0]) page_registry.register(page_types[1]) page_registry.register(page_types[2]) yield page_types page_registry.unregister(AcceptingTestPageType.type) page_registry.unregister(NotAcceptingTestPageType.type) page_registry.unregister(DifferentPermissionsGroupTestPageType.type) # Core consumer @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_connect_not_authenticated(data_fixture): communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token=", headers=[(b"origin", b"http://localhost")], ) connected, subprotocol = await communicator.connect() assert connected is True response = await communicator.receive_json_from() assert response["type"] == "authentication" assert response["success"] is False assert response["web_socket_id"] is None await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_connect_authenticated(data_fixture): user_1, token_1 = data_fixture.create_user_and_token() communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={token_1}", headers=[(b"origin", b"http://localhost")], ) connected, subprotocol = await communicator.connect() assert connected is True response = await communicator.receive_json_from() assert response["type"] == "authentication" assert response["success"] is True assert response["web_socket_id"] is not None await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_connect_authenticated_anonymous(data_fixture): user_1, token_1 = data_fixture.create_user_and_token() communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}", headers=[(b"origin", b"http://localhost")], ) connected, subprotocol = await communicator.connect() assert connected is True response = await communicator.receive_json_from() assert response["type"] == "authentication" assert response["success"] is True assert response["web_socket_id"] is not None await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_add_to_page_success(data_fixture, test_page_types): user_1, token_1 = data_fixture.create_user_and_token() communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={token_1}", headers=[(b"origin", b"http://localhost")], ) await communicator.connect() await communicator.receive_json_from() await communicator.send_json_to({"page": "test_page_type", "test_param": 1}) response = await communicator.receive_json_from(timeout=0.1) assert response["type"] == "page_add" assert response["page"] == "test_page_type" assert response["parameters"]["test_param"] == 1 # Adding again will have the same behavior # but the page will still be subscribed only once await communicator.send_json_to({"page": "test_page_type", "test_param": 1}) response = await communicator.receive_json_from(timeout=0.1) assert response["type"] == "page_add" assert response["page"] == "test_page_type" assert response["parameters"]["test_param"] == 1 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_add_page_doesnt_exist(data_fixture): # When trying to subscribe to not existing page # we do not expect the confirmation communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}", headers=[(b"origin", b"http://localhost")], ) await communicator.connect() await communicator.receive_json_from() await communicator.send_json_to({"page": "doesnt_exist", "test_param": 1}) assert communicator.output_queue.qsize() == 0 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_add_to_page_failure(data_fixture, test_page_types): user_1, token_1 = data_fixture.create_user_and_token() communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={token_1}", headers=[(b"origin", b"http://localhost")], ) await communicator.connect() await communicator.receive_json_from() # Page that will return False from can_add() # won't send the confirmation await communicator.send_json_to( {"page": "test_page_type_not_accepting", "test_param": 1} ) assert communicator.output_queue.qsize() == 0 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_remove_page_success(data_fixture, test_page_types): user_1, token_1 = data_fixture.create_user_and_token() communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={token_1}", headers=[(b"origin", b"http://localhost")], ) await communicator.connect() await communicator.receive_json_from() await communicator.send_json_to({"page": "test_page_type", "test_param": 1}) response = await communicator.receive_json_from(timeout=0.1) await communicator.send_json_to({"remove_page": "test_page_type", "test_param": 1}) response = await communicator.receive_json_from(timeout=0.1) assert response["type"] == "page_discard" assert response["page"] == "test_page_type" assert response["parameters"]["test_param"] == 1 # Removing a page will send a confirmation again # even if it is unsubscribed already await communicator.send_json_to({"remove_page": "test_page_type", "test_param": 1}) response = await communicator.receive_json_from(timeout=0.1) assert response["type"] == "page_discard" assert response["page"] == "test_page_type" assert response["parameters"]["test_param"] == 1 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_remove_page_doesnt_exist(data_fixture): # When trying to unsubscribe from not existing page # we do not expect the confirmation communicator = WebsocketCommunicator( application, f"ws/core/?jwt_token={ANONYMOUS_USER_TOKEN}", headers=[(b"origin", b"http://localhost")], ) await communicator.connect() await communicator.receive_json_from() await communicator.send_json_to({"page_remove": "doesnt_exist", "test_param": 1}) assert communicator.output_queue.qsize() == 0 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_get_page_context(data_fixture, test_page_types): page_type = test_page_types[0] user_1, token_1 = data_fixture.create_user_and_token() consumer = CoreConsumer() consumer.scope = {} consumer.scope["user"] = user_1 consumer.scope["web_socket_id"] = 234 content = { "page": page_type.type, "test_param": 2, } result = await consumer._get_page_context(content, "page") assert result == PageContext( resolved_page_type=page_type, page_scope=PageScope( page_type=page_type.type, page_parameters={"test_param": 2} ), user=user_1, web_socket_id=234, ) # Not existing page type content = { "page": "doesnt_exist", "test_param": 2, } result = await consumer._get_page_context(content, "page") assert result is None # Missing user consumer.scope["user"] = None content = { "page": page_type.type, "test_param": 2, } result = await consumer._get_page_context(content, "page") assert result is None @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @pytest.mark.websockets async def test_core_consumer_remove_all_page_scopes(data_fixture, test_page_types): user_1, token_1 = data_fixture.create_user_and_token() scope_1 = PageScope("test_page_type", {"test_param": 1}) scope_2 = PageScope("test_page_type", {"test_param": 2}) pages = SubscribedPages() pages.add(scope_1) pages.add(scope_2) consumer = CoreConsumer() consumer.scope = {"pages": pages, "user": user_1, "web_socket_id": 123} consumer.channel_name = "test_channel_name" consumer.channel_layer = AsyncMock() async def base_send(message): pass consumer.base_send = base_send assert len(consumer.scope["pages"]) == 2 await consumer._remove_all_page_scopes() assert len(consumer.scope["pages"]) == 0 # SubscribedPages @pytest.mark.websockets def test_subscribed_pages_adds_page_without_duplicates(): scope_1 = PageScope("test_page_type", {"test_param": 1}) scope_2 = PageScope("test_page_type", {"test_param": 2}) scope_3 = PageScope("test_page_type_not_accepting", {"test_param": 1}) scope_4 = PageScope("test_page_type", {"test_param": 1}) pages = SubscribedPages() pages.add(scope_1) pages.add(scope_2) pages.add(scope_3) pages.add(scope_4) assert len(pages) == 3 @pytest.mark.websockets def test_subscribed_pages_removes_pages_by_parameters(): scope_1 = PageScope("test_page_type", {"test_param": 1}) scope_2 = PageScope("test_page_type", {"test_param": 2}) scope_3 = PageScope("test_page_type", {"test_param": 3}) pages = SubscribedPages() pages.add(scope_1) pages.add(scope_2) pages.add(scope_3) assert len(pages) == 3 pages.remove(scope_2) assert scope_1 in pages.pages assert scope_2 not in pages.pages assert scope_3 in pages.pages @pytest.mark.websockets def test_subscribed_pages_removes_pages_without_error(): scope_1 = PageScope("test_page_type", {"test_param": 1}) scope_2 = PageScope("test_page_type", {"test_param": 2}) pages = SubscribedPages() pages.add(scope_1) pages.add(scope_2) assert len(pages) == 2 # should not throw error pages.remove(scope_1) pages.remove(scope_1) pages.remove(scope_2) pages.remove(scope_2) assert len(pages) == 0 @pytest.mark.websockets def test_subscribed_pages_is_page_in_permission_group(test_page_types): scope_1 = PageScope("test_page_type", {"test_param": 1}) pages = SubscribedPages() assert pages.is_page_in_permission_group(scope_1, "permissions-test-page-1") is True assert ( pages.is_page_in_permission_group(scope_1, "permissions-different-perm-group-1") is False ) @pytest.mark.websockets def test_subscribed_pages_has_pages_with_permission_group(test_page_types): scope_1 = PageScope("test_page_type", {"test_param": 1}) pages = SubscribedPages() pages.add(scope_1) assert pages.has_pages_with_permission_group("permissions-test-page-1") is True assert ( pages.has_pages_with_permission_group("permissions-different-perm-group-1") is False ) @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio @pytest.mark.websockets @pytest.mark.parametrize( "ignore_web_socket_id, web_socket_id, user, exclude_user_ids, expect_call", [ # Sending the message is expected: # ...because `ignore_web_socket_id` is empty: (None, "some_web_socket_id", Mock(id=1), [], True), # ...or when socket_ids are not matching and user ID is not exluded: ("some_web_socket_id", "another_web_socket_id", Mock(id=1), [], True), # ...or when `ignore_web_socket_id` is empty and the user is not excluded: (None, "some_web_socket_id", Mock(id=1), [2], True), # Sending the message is NOT expected: # ...because web socket IDs are matching and should be ignored: ("some_web_socket_id", "some_web_socket_id", Mock(id=1), [], False), # ...or when user_id is exluded: (None, "some_web_socket_id", Mock(id=1), [1], False), # ...or when user ID is excluded (within multiple IDs): (None, "some_web_socket_id", Mock(id=1), [1, 2], False), # ...or when user ID is excluded (within multiple IDs) and web socket # IDs are not matching: ("some_web_socket_id", "another_web_socket_id", Mock(id=1), [1, 2], False), ], ) async def test_core_consumer_broadcast_to_group( data_fixture, test_page_types, ignore_web_socket_id, web_socket_id, user, exclude_user_ids, expect_call, ): consumer = CoreConsumer() # Mock the required attributes and methods consumer.scope = { "web_socket_id": web_socket_id, "user": user, } event = { "payload": { "message": "test message", }, "ignore_web_socket_id": ignore_web_socket_id, "exclude_user_ids": exclude_user_ids, } mock_send_json = AsyncMock() consumer.send_json = mock_send_json await consumer.broadcast_to_group(event) if expect_call: mock_send_json.assert_called_once_with(event["payload"]) else: mock_send_json.assert_not_called()