0

I'm trying to test new channels 2.0 with pytest-asyncio (0.8.0). If I place different assertions in the same function like:

import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer


@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

then I'm not getting an error. But if I separate test cases like:

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

then I'm getting following error upon the second receive_form call:

with pytest.raises(TimeoutError):
>           await communicator.receive_from()

someapp/tests/test_consumers_async.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

def _check_closed(self):
if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError

Also if I do (as in https://channels.readthedocs.io/en/latest/topics/testing.html):

await communicator.disconnect()

instead of:

await communicator.send_input({
    "type": "websocket.disconnect",
    "code": 1000,
})

then the following error is raised:

>       await communicator.disconnect()

someapp/tests/test_consumers_async.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
    await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
    await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
    return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}

    def websocket_disconnect(self, message):
        """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
        # TODO: group leaving
>       self.disconnect(message["code"])
E       TypeError: disconnect() takes 1 positional argument but 2 were given

What should I do to separate those test cases in the respective individual test functions?


Edit: disconnect error is a trivial one - I forgot to add that positional argument (code) in subclassed method:

def disconnect(self, code):
    AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)

Edit2: It was a bug related to redis channel - bugfix is in the latest channels and asgiref master branches on github.

2
  • I guess maybe it could the global connection or global event loop, you can try pytestmark = pytest.mark.asyncio(forbid_global_loop=True) and I wonder if there is sth similar in djangodb Commented Feb 16, 2018 at 13:11
  • No improvements; I'm afraid that is deprecated ("The forbid_global_loop parameter has been removed."). Commented Feb 19, 2018 at 17:12

1 Answer 1

1

Does this help? (I didn't test it though.)

@pytest.fixture
async def communicator(db):
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.asyncio
async def test_1(communicator):
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.asyncio
async def test_2(communicator):
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })
Sign up to request clarification or add additional context in comments.

1 Comment

I'm kind of allowed to separate test cases with this approach (slightly modified), but my testing code doesn't work as it should. Maybe I should play with event_loop fixture to solve this. I'll comment here if and when I'm done...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.