programing

비동기 코루틴으로 동기 함수를 래핑하려면 어떻게 해야 합니까?

abcjava 2023. 5. 1. 19:49
반응형

비동기 코루틴으로 동기 함수를 래핑하려면 어떻게 해야 합니까?

저는 iohttp를 사용하여 별도의 서버로 TCP 요청을 보내는 API 서버를 구축하고 있습니다.TCP 요청을 보내는 모듈은 동기식이며 내 목적을 위한 블랙박스입니다.그래서 저의 문제는 이러한 요청들이 전체 API를 차단하고 있다는 것입니다.나머지 API를 차단하지 않는 비동기 코루틴으로 모듈 요청을 포장하는 방법이 필요합니다.

그래서 그냥.sleep간단한 예로, 시간이 많이 소요되는 동기 코드를 다음과 같은 비연속 코루틴으로 어떻게든 래핑할 수 있는 방법이 있습니까?

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'

결국 저는 이 스레드에서 답을 찾았습니다.제가 찾던 메소드는 run_in_executor입니다.이를 통해 이벤트 루프를 차단하지 않고 동기화 기능을 비동기식으로 실행할 수 있습니다.

에서sleep위에 올린 예는 다음과 같습니다.

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

또한 다음 답변을 참조하십시오. -> 코루틴이 예상되는 정상 함수를 어떻게 부르나요?

장식자를 사용하여 동기화 버전을 비동기 버전으로 래핑할 수 있습니다.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

오래됨, 유지 보수 모드임

또는 aioify 라이브러리를 사용합니다.

% pip install aioify

그리고나서

@aioify
def sleep_async(delay):
    pass

python 3.9부터 가장 깨끗한 방법은 asyncio를 사용하는 것입니다.to_module 메서드는 기본적으로 다음에 대한 바로 가기입니다.run_in_executor모든 컨텍스트 변수를 유지합니다.

또한 To_thread이므로 GIL을 고려해주시기 바랍니다.다음과 같은 작업에 대해 CPU 바인딩 작업을 실행할 수 있습니다.numpy문서에서:

Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.

아마도 누군가가 이 문제에 대한 나의 해결책을 필요로 할 것입니다.저는 이것을 해결하기 위해 저만의 라이브러리를 썼습니다. 장식기를 사용하여 어떤 기능이든 비동기식으로 만들 수 있게 해줍니다.

라이브러리를 설치하려면 다음 명령을 실행합니다.

$ pip install awaits

함수를 비동기식으로 만들려면 다음과 같이 @waitable 데코레이터를 추가하면 됩니다.

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b

이제 기능이 실제로 비동기 코루틴인지 확인할 수 있습니다.

print(asyncio.run(sum(2, 2)))

후드 아래에서 당신의 기능은 스레드 풀에서 실행될 것입니다.이 스레드 풀은 함수를 호출할 때마다 다시 생성되지 않습니다.스레드 풀은 한 번 생성되고 대기열을 통해 새 작업을 수락합니다.스레드를 추가로 만드는 것은 추가적인 오버헤드이기 때문에 다른 솔루션을 사용하는 것보다 프로그램을 더 빨리 실행할 수 있습니다.

데코레이터는 이 경우에 유용하고 차단 기능을 다른 스레드에서 실행합니다.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))

너무 늦었는지는 모르겠지만 장식기를 사용하여 실에서 기능을 수행할 수도 있습니다.그러나 Co-op 차단인 비동기식과 달리 여전히 Non-Coop 차단입니다.

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run

언급URL : https://stackoverflow.com/questions/43241221/how-can-i-wrap-a-synchronous-function-in-an-async-coroutine

반응형