Реализация асинхронности в Python с модулем asyncio

Асинхронное программирование — это особенность современных языков программирования, которая позволяет выполнять операции, не дожидаясь их завершения. Асинхронность — одна из важных причин популярности Node.js.

Представьте приложение для поиска по сети, которое открывает тысячу соединений. Можно открывать соединение, получать результат и переходить к следующему, двигаясь по очереди. Однако это значительно увеличивает задержку в работе программы. Ведь открытие соединение — операция, которая занимает время. И все это время последующие операции находятся в процессе ожидания.

А вот асинхронность предоставляет способ открытия тысячи соединений одновременно и переключения между ними. По сути, появляется возможность открыть соединение и переходить к следующему, ожидая ответа от первого. Так продолжается до тех пор, пока все не вернут результат.

использовании асинхронности

На графике видно, что синхронный подход займет 45 секунд, в то время как при использовании асинхронности время выполнения можно сократить до 20 секунд.

Где асинхронность применяется в реальном мире?

Асинхронность больше всего подходит для таких сценариев:

  1. Программа выполняется слишком долго.
  2. Причина задержки — не вычисления, а ожидания ввода или вывода.
  3. Задачи, которые включают несколько одновременных операций ввода и вывода.

Это могут быть:

  • Парсеры,
  • Сетевые сервисы.

Разница в понятиях параллелизма, concurrency, поточности и асинхронности

Параллелизм — это выполнение нескольких операций за раз. Многопроцессорность — один из примеров. Отлично подходит для задач, нагружающих CPU.

Concurrency — более широкое понятие, которое описывает несколько задач, выполняющихся с перекрытием друг друга.

Поточность — поток — это отдельный поток выполнения. Один процесс может содержать несколько потоков, где каждый будет работать независимо. Отлично подходит для IO-операций.

Асинхронность — однопоточный, однопроцессорный дизайн, использующий многозадачность. Другими словами, асинхронность создает впечатление параллелизма, используя один поток в одном процессе.

Составляющие асинхронного программирования

Разберем различные составляющие асинхронного программирования подробно. Также используем код для наглядности.

Сопрограммы

Сопрограммы (coroutine) — это обобщенные формы подпрограмм. Они используются для кооперативных задач и ведут себя как генераторы Python.

Для определения сопрограммы асинхронная функция использует ключевое слово await. При его использовании сопрограмма передает поток управления обратно в цикл событий (также известный как event loop).

Для запуска сопрограммы нужно запланировать его в цикле событий. После этого такие сопрограммы оборачиваются в задачи (Tasks) как объекты Future.

Пример сопрограммы

В коде ниже функция async_func вызывается из основной функции. Нужно добавить ключевое слово await при вызове синхронной функции. Функция async_func не будет делать ничего без await.

import asyncio


async def async_func():
    print('Запуск ...')
    await asyncio.sleep(1)
    print('... Готово!')


async def main():
    async_func()  # этот код ничего не вернет 
    await async_func()


asyncio.run(main())

Вывод:

Warning (from warnings module):
  File "\AppData\Local\Programs\Python\Python38\main.py", line 8
    async_func() # этот код ничего не вернет
RuntimeWarning: coroutine 'async_func' was never awaited
Запуск ...
... Готово!

Задачи (tasks)

Задачи используются для планирования параллельного выполнения сопрограмм.

При передаче сопрограммы в цикл событий для обработки можно получить объект Task, который предоставляет способ управления поведением сопрограммы извне цикла событий.

Пример задачи

В коде ниже создается create_task (встроенная функция библиотеки asyncio), после чего она запускается.

import asyncio


async def async_func():
    print('Запуск ...')
    await asyncio.sleep(1)
    print('... Готово!')


async def main():
    task = asyncio.create_task (async_func())
    await task

asyncio.run(main())

Вывод:

Запуск ...
... Готово!

Циклы событий

Этот механизм выполняет сопрограммы до тех пор, пока те не завершатся. Это можно представить как цикл while True, который отслеживает сопрограммы, узнавая, когда те находятся в режиме ожидания, чтобы в этот момент выполнить что-нибудь другое.

Он может разбудить спящую сопрограмму в тот момент, когда она ожидает своего времени, чтобы выполниться. В одно время может выполняться лишь один цикл событий в Python.

Пример цикла событий

Дальше создаются три задачи, которые добавляются в список. Они выполняются асинхронно с помощью get_event_loop, create_task и await библиотеки asyncio.

import asyncio


async def async_func(task_no):
    print(f'{task_no}: Запуск ...')
    await asyncio.sleep(1)
    print(f'{task_no}: ... Готово!')


async def main():
    taskA = loop.create_task (async_func('taskA'))
    taskB = loop.create_task(async_func('taskB'))
    taskC = loop.create_task(async_func('taskC'))
    await asyncio.wait([taskA,taskB,taskC])


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except :
        pass

Вывод:

taskA: Запуск ...
taskB: Запуск ...
taskC: Запуск ...
taskA: ... Готово!
taskB: ... Готово!
taskC: ... Готово!

Future

Future — это специальный низкоуровневый объект, который представляет окончательный результат выполнения асинхронной операции.

Если этот объект подождать (await), то сопрограмма дождется, пока Future не будет выполнен в другом месте.

В следующих разделах посмотрим, на то, как Future используется.


Сравнение многопоточности и асинхронности

Прежде чем переходить к асинхронности попробуем проверить многопоточность на производительность и сравним результаты. Для этого теста будем получать данные по URL с разной частотой: 1, 10, 50, 100 и 500 раз соответственно. После этого сравним производительность обоих подходов.

Реализация

Многопоточность:

import requests
import time
from concurrent.futures import ProcessPoolExecutor


def fetch_url_data(pg_url):
    try:
        resp = requests.get(pg_url)
    except Exception as e:
        print(f"Возникла ошибка при получении данных из url: {pg_url}")
    else:
        return resp.content
        

def get_all_url_data(url_list):
    with ProcessPoolExecutor() as executor:
        resp = executor.map(fetch_url_data, url_list)
    return resp
    

if __name__=='__main__':
    url = "https://www.uefa.com/uefaeuro-2020/"
    for ntimes in [1,10,50,100,500]:
        start_time = time.time()
        responses = get_all_url_data([url] * ntimes)
        print(f'Получено {ntimes} результатов запроса за {time.time() - start_time} секунд')

Вывод:

Получено 1 результатов запроса за 0.9133939743041992 секунд
Получено 10 результатов запроса за 1.7160518169403076 секунд
Получено 50 результатов запроса за 3.842841625213623 секунд
Получено 100 результатов запроса за 7.662721633911133 секунд
Получено 500 результатов запроса за 32.575703620910645 секунд

ProcessPoolExecutor — это пакет Python, который реализовывает интерфейс Executor. fetch_url_data — функция для получения данных по URL с помощью библиотеки request. После получения get_all_url_data используется, чтобы замапить function_url_data на список URL.

Асинхронность:

import asyncio
import time
from aiohttp import ClientSession, ClientResponseError


async def fetch_url_data(session, url):
    try:
        async with session.get(url, timeout=60) as response:
            resp = await response.read()
    except Exception as e:
        print(e)
    else:
        return resp
    return


async def fetch_async(loop, r):
    url = "https://www.uefa.com/uefaeuro-2020/"
    tasks = []
    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.ensure_future(fetch_url_data(session, url))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
    return responses


if __name__ == '__main__':
    for ntimes in [1, 10, 50, 100, 500]:
        start_time = time.time()
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(fetch_async(loop, ntimes))
        # будет выполняться до тех пор, пока не завершится или не возникнет ошибка
        loop.run_until_complete(future)
        responses = future.result()
        print(f'Получено {ntimes} результатов запроса за {time.time() - start_time} секунд')

Вывод:

Получено 1 результатов запроса за 0.41477298736572266 секунд
Получено 10 результатов запроса за 0.46897053718566895 секунд
Получено 50 результатов запроса за 2.3057644367218018 секунд
Получено 100 результатов запроса за 4.6860511302948 секунд
Получено 500 результатов запроса за 18.013994455337524 секунд

Нужно использовать функцию get_event_loop для создания и добавления задач. Чтобы использовать более одного URL, нужно применить функцию ensure_future.

Функция fetch_async используется для добавления задачи в объект цикла событий, а fetch_url_data — для чтения данных URL с помощью пакета session. Метод future_result возвращает ответ всех задач.

Результаты

Как можно увидеть, асинхронное программирование на порядок эффективнее многопоточности для этой программы.

Выводы

Асинхронное программирование демонстрирует более высокие результаты в плане производительности, задействуя параллелизм, а не многопоточность. Его стоит использовать в тех программах, где этот параллелизм можно применить.

Максим
Я создал этот блог в 2018 году, чтобы распространять полезные учебные материалы, документации и уроки на русском. На сайте опубликовано множество статей по основам python и библиотекам, уроков для начинающих и примеров написания программ.
Мои контакты: Почта
admin@pythonru.comAlex Zabrodin2018-10-26OnlinePython, Programming, HTML, CSS, JavaScript