Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres: Cannot perform operation: another operation is in progress #22

Open
kellen opened this issue Sep 18, 2020 · 6 comments · May be fixed by #64
Open

Postgres: Cannot perform operation: another operation is in progress #22

kellen opened this issue Sep 18, 2020 · 6 comments · May be fixed by #64

Comments

@kellen
Copy link

kellen commented Sep 18, 2020

Using broadcaster w/ fastapi and seeing an exception when using broadcaster via websockets.

Relevant parts:

    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
    # ...
    'cannot perform operation: another operation is in progress')

Full trace:

INFO:     ('127.0.0.1', 52654) - "WebSocket /viewers" [accepted]
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".../site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 154, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File ".../site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File ".../site-packages/fastapi/applications.py", line 179, in __call__
    await super().__call__(scope, receive, send)
  File ".../site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".../site-packages/starlette/middleware/errors.py", line 146, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/exceptions.py", line 58, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 283, in handle
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 57, in app
    await func(session)
  File ".../site-packages/fastapi/routing.py", line 228, in app
    await dependant.call(**values)
  File "./main.py", line 198, in events_ws
    (viewers_ws_sender, {"websocket": websocket}),
  File ".../site-packages/starlette/concurrency.py", line 18, in run_until_first_complete
    [task.result() for task in done]
  File ".../site-packages/starlette/concurrency.py", line 18, in <listcomp>
    [task.result() for task in done]
  File "./main.py", line 204, in viewers_ws_receiver
    await broadcast.publish(channel="viewers", message=message)
  File ".../site-packages/broadcaster/_base.py", line 72, in publish
    await self._backend.publish(channel, message)
  File ".../site-packages/broadcaster/_backends/postgres.py", line 25, in publish
    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
  File ".../site-packages/asyncpg/connection.py", line 297, in execute
    _, status, _ = await self._execute(query, args, 0, timeout, True)
  File ".../site-packages/asyncpg/connection.py", line 1444, in _execute
    with self._stmt_exclusive_section:
  File ".../site-packages/asyncpg/connection.py", line 1891, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
INFO:     ('127.0.0.1', 52655) - "WebSocket /events" [accepted]

Doing this via code that looks something like this, where I think the lock is just slowing things down and exposing the contention.

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

Update: Refactored down to a single websocket and not using a lock for anything and saw this exception again. 🤷

@AntonOfTheWoods
Copy link

@kellen did you find a solution for this?

@kellen
Copy link
Author

kellen commented Oct 22, 2021

@AntonOfTheWoods no, I added redis as the backend instead.

pwoolvett added a commit to pwoolvett/broadcaster that referenced this issue May 6, 2022
This fix adds a lock (asyncio.Event based) to avoid
`asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress`

fixes encode#22
@pwoolvett pwoolvett linked a pull request May 6, 2022 that will close this issue
pwoolvett added a commit to pwoolvett/broadcaster that referenced this issue May 13, 2022
This fix adds an asyncio.Lock to avoid
`asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress`

fixes encode#22
pwoolvett added a commit to pwoolvett/broadcaster that referenced this issue May 13, 2022
This fix adds an asyncio.Lock to avoid
`asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress`

fixes encode#22
@uralbash
Copy link

uralbash commented Nov 21, 2023

@AntonOfTheWoods
A little delay solves the problem

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    await asyncio.sleep(0.01) # <-- pass error
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

@Trinkes
Copy link

Trinkes commented Aug 22, 2024

@AntonOfTheWoods A little delay solves the problem

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    await asyncio.sleep(0.01) # <-- pass error
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

The problem with your solution is that it's unreliable. If, for some reason, the connection is slow, you'll end up with this error anyway.

@alex-oleshkevich
Copy link
Member

It does not reproduce for me using the code above.
Please provide a minimal example that causes that issue (with app initialization).

cc https://github.com/Trinkes

@Trinkes
Copy link

Trinkes commented Sep 6, 2024

It does not reproduce for me using the code above. Please provide a minimal example that causes that issue (with app initialization).

cc https://github.com/Trinkes

just realised I'm now I'm getting the error while publishing an event. I'll try to come up with a minimal example

here's an example:

import asyncio
import threading

import pytest
from broadcaster import Broadcast

from app.core.config import settings


@pytest.mark.asyncio
async def test_broadcaster():
    # broadcast = Broadcast(str(settings.REDIS_URL))  # -> this works
    broadcast = Broadcast(str(settings.SQLALCHEMY_DATABASE_URI).replace("+psycopg", "")) # -> this doesn't work
    await broadcast.connect()

    # Run multiple receivers and senders in parallel
    await asyncio.gather(
        *[viewers_ws_receiver(broadcast) for _ in range(2)],  # 5 parallel receivers
        *[viewers_ws_sender(broadcast) for _ in range(2)],  # 5 parallel senders
    )


my_lock = threading.Lock()


async def viewers_ws_sender(broadcast: Broadcast):
    await asyncio.sleep(0.01)
    while True:
        await broadcast.publish(channel="viewers", message="message")
        await asyncio.sleep(1)


async def viewers_ws_receiver(broadcast: Broadcast):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            print(event)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants