본문 바로가기

Engineering

파이썬 비동기 프로그래밍에서 작업 큐 사용하기

이전 글에 이어서, 비동기 프로그래밍에서 작업 큐를 사용하는 방법에 대해 알아보려고 한다. 먼저, 작업 큐를 사용하는 이유에 대해서 생각해보기 위해 아래 예시를 살펴보자.

import asyncio

async def perform_task(num: int):
  await asyncio.sleep(1)
  return num + 1
  
async def main():
  tasks = [perform_task(num) for num in range(1000)]
  futures = asyncio.gather(tasks)
  return futures
  
asyncio.run(main())

위 코드는 1000개의 테스크를 동시에 처리하는 코드이다. 문제는, 1000개를 동시에 처리할 수 없는 제한 조건이 있는 경우가 많다는 것이다. 예를 들어, 특정 API가 최대 10개의 요청만 동시에 처리할 수 있어서 동시에 1000개의 요청을 날릴 경우, 대부분은 타임아웃 예외가 발생하게 되는 경우가 있다. 이런 경우, 작업 큐를 사용해 동시에 처리하는 테스크 수를 제한하면 쉽게 처리할 수 있다.

작업 큐 기본 문법

감사하게도, 작업 큐는 파이썬 공식 모듈에 비동기 프로그래밍을 고려하여 잘 만들어져 있다. 이 모듈을 활용해 작업 큐를 만드는 간단한 방법을 시도해보자.

공식 문서: https://docs.python.org/3/library/asyncio-queue.html#asyncio-queues

 

Queues

Source code: Lib/asyncio/queues.py asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically i...

docs.python.org

import asyncio

async def worker(name, queue):
  while True:
    sleep_for = await queue.get()
    await asyncio.sleep(sleep_for)
    queue.task_done()
    print(f"{name} has slept for {sleep_for:.2f}")
    
async def main():
  queue = asyncio.Queue()
  
  for sleep_for in range(20):
    queue.put_nowait(sleep_for)
  
  workers = [
    asyncio.create_task(worker(f"worker-{i}", queue))
    for i in range(3)
  ]
  
  await queue.join()
  
  for worker in workers:
    worker.cancel()
  await asyncio.gather(*workers, return_exceptions=True)
  print("END")
  
asyncio.run(main())

위 코드는 총 20개의 테스크가 있고, 이를 3개의 워커가 처리하는 구조로 구성되어 있다. 각 워커들은 하나의 큐를 공유해서 사용한다. 아래에서 코드를 하나하나씩 분석해본다.

queue = asyncio.Queue()

큐를 생성하는 방법이다. 이 큐를 통해서 워커들이 처리해야 할 데이터들을 전달받는다. 구체적으로, 메인 함수에서 큐에다가 데이터를 제공하면, 여러 워커 중 한 워커가 제공된 데이터를 받아서 처리한다.

for sleep_for in range(20):
  queue.put_nowait(sleep_for)

메인 함수에서 큐에다가 20개의 데이터를 집어넣는 연산이다. put_nowait는 큐에 포함된 하나의 함수로 큐의 크기가 지정되어있고, 그 큐가 꽉차있을 때에는 큐가 비는 것을 기다리지 않고 QueueFull이라는 예외를 발생시킨다. 현재는 큐의 크기를 지정하지 않았으므로, 어떠한 경우에도 예외가 발생하지 않고 큐에 데이터를 바로 채운다.

workers = [
  asyncio.create_task(worker(f"worker-{i}", queue))
  for i in range(3)
]

큐에 제공되는 데이터를 처리할 워커를 만든다. 워커 함수는 비동기 함수여야 하며, 인자로 큐를 받아 다중 워커가 큐를 공유하는 상태를 구성한다. 이 코드에서는 총 3개의 워커를 만들어 큐를 공유한다.

await queue.join()

이는 큐에 포함된 데이터들이 모두 처리될 때까지 기다리겠다는 의미이다. 데이터들이 처리되었다는 사실은 큐 내부에서 관리되고 있는 카운터로 추적한다. 카운터는 큐에 put이나 put_nowait 함수 호출을 통해 데이터가 추가될 때, 1씩 올라가고 task_done 함수 호출을 통해 데이터가 처리되었음을 인지하고 1씩 내려간다. 이 카운터가 0이 될 때까지 기다리는 함수가 join이다.

async def worker(name, queue):
  while True:
    sleep_for = await queue.get()
    await asyncio.sleep(sleep_for)
    queue.task_done()
    print(f"{name} has slept for {sleep_for:.2f}")

워커의 동작 흐름이다. 파이썬 공식 문서에서는 주로 데이터를 처리하는 컨슈머 (우리 입장에서는 워커)는 큐로부터 get이나 get_nowait를 통해 데이터를 얻은 뒤, 그 데이터를 모두 처리하고 task_done 함수를 호출하는 형태로 코드를 작성하라고 추천한다. 만약, task_done이 put이나 put_nowait 함수보다 더 자주 호출될 경우, ValueError를 발생시킬 것이므로 정말 특별한 경우가 아닌 이상 공식 문서에서 추천하는 바대로 코드를 작성하는 것이 좋을 것 같다. 위 워커는 매번 데이터를 큐로부터 전달받아 처리한 다음, task_done으로 해당 데이터의 처리가 끝났음을 큐에 알린다.

for worker in workers:
  worker.cancel()

이는 큐에 적재된 데이터의 처리가 모두 끝난 뒤, 워커에 cancel을 호출해 워커를 종료시킨다. 구체적으로, cancel을 호출하면 워커 함수에 CancelledError 예외를 발생시킨다. 따라서, 워커가 해당 예외처리를 하지 않을 경우, 워커는 종료되고 발생된 예외를 워커를 기다리는 함수에 비동기적으로 전달하게 된다.

await asyncio.gather(*workers, return_exceptions=True)

마지막으로, 워커를 asyncio.gather를 통해서 모두 await를 진행한다. 좀전에 모든 워커에 CancelledError 예외를 발생시켰으므로, 워커의 반환값 대신 CancelledError 예외가 넘어오게 되는데 이 또한 처리할 수 있게 return_exceptions=True를 설정한다.

결론

간단하게 작업 큐를 사용하는 방법에 대해서 알아보았다. 큐를 사용하면 워커를 통해 동시에 실행되는 갯수를 통제할 수 있다는 장점이 있다. 이 때, 작업 큐를 통해 처리된 데이터를 또 다른 큐를 활용해 다시 수집할 수도 있다. 이런 점들을 잘 활용해 다양한 비동기 프로그래밍을 해보자.