Асинхронное приложение ч.2 / tkinter 16

Скачайте код уроков с GitLab: https://gitlab.com/PythonRu/tkinter-uroki

Выполнение HTTP-запросов

Общение приложения с удаленным сервером с помощью HTTP — это распространенный случай в асинхронном программирования. Клиент делает запрос, который передается по сети по протоколу TCP/IP. После этого сервер обрабатывает информацию и отправляет клиенту ответ.

Время выполнения операции может варьироваться от нескольких миллисекунд до секунд, но в большинстве случаев стоит предполагать, что пользователи способны заметить задержку.

Есть много сторонних веб-сервисов, которые можно использовать на этапе разработки в целях прототипирования. Однако лучше этого не делать, ведь их API может поменяться или же они вообще станут недоступны.

В этом примере реализуем HTTP-сервер, который генерирует случайный ответ в формате JSON и выведем его в приложении с графическим интерфейсом.


import time
import json
import random
from http.server import HTTPServer, BaseHTTPRequestHandler

class RandomRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# Имитация задержки
time.sleep(3)

# Добавляем заголовки ответа
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()

# Добавляем тело ответа
body = json.dumps({'random': random.random()})
self.wfile.write(bytes(body, "utf8"))

def main():
"""Запускает HTTP-сервер на порту 8090"""
server_address = ('', 8090)
httpd = HTTPServer(server_address, RandomRequestHandler)
httpd.serve_forever()

if __name__ == "__main__":
main()

Для запуска сервера нужно выполнить скрипт server.py и оставить процесс запущенным для получения запросов на локальном порте 8090.

Клиентское приложение состоит из метки для показа информации пользователям и кнопки для выполнения нового HTTP-запроса на локальный сервер:


import json
import threading
import urllib.request
import tkinter as tk

class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Выполнение HTTP-запросов")
self.label = tk.Label(self, text="Нажмите 'Старт', чтобы получить случайное значение.")
self.button = tk.Button(self, text="Старт",
command=self.start_action)

self.label.pack(padx=60, pady=10)
self.button.pack(pady=10)

def start_action(self):
self.button.config(state=tk.DISABLED)
thread = AsyncAction()
thread.start()
self.check_thread(thread)

def check_thread(self, thread):
if thread.is_alive():
self.after(100, lambda: self.check_thread(thread))
else:
text = "Случайное значение: {}".format(thread.result)
self.label.config(text=text)
self.button.config(state=tk.NORMAL)

class AsyncAction(threading.Thread):
def run(self):
self.result = None
url = "http://localhost:8090"
with urllib.request.urlopen(url) as f:
obj = json.loads(f.read().decode("utf-8"))
self.result = obj["random"]

if __name__ == "__main__":
app = App()
app.mainloop()

После завершения запроса метка покажем случайное значение, которое было сгенерировано на сервере:

Выполнение HTTP-запросов

При выполнении асинхронной операции кнопка традиционно будет становиться неактивной, что не позволит сделать новый запрос до выполнения текущего.

Как работают HTTP-запросы

В этом примере класс Thread был расширен для реализации логики, которая должна работать на отдельном потоке с применением более объектно-ориентированного подхода. Это делается за счет переопределения метода run(), который будет отвечать за выполнение HTTP-запроса на локальный сервер:


class AsyncAction(threading.Thread):
def run(self):
# ...

Существует множество клиентских HTTP-библиотек, но в этом примере используем модуль urllib.request из стандартной библиотеки. Он включает функцию urlopen(), которая принимает URL в виде строки и возвращает HTTP-ответ, который может работать как контекстный менеджер. Это позволит безопасно прочитать информацию и закрыть его с помощью with.

Сервер возвращает приблизительной такой JSON-документ (увидеть его можно, открыв http://localhost:8080 в бразуере):

{"random": 0.0915826359180778}

Чтобы декодировать строку в объект, нужно передать содержимое ответа функции loads() из модуля json. Благодаря этому можно получить доступ к случайному значению с помощью словаря и сохранить его в атрибуте result, экземпляр которого создается со значением None. Благодаря этому основной поток не будет считывать этот атрибут в случае ошибки:


def run(self):
self.result = None
url = "http://localhost:8090"
with urllib.request.urlopen(url) as f:
obj = json.loads(f.read().decode("utf-8"))
self.result = obj["random"]

После этого приложение с графическим интерфейсом периодически опрашивает статус потока, как было видно в прошлом примере:


def check_thread(self, thread):
if thread.is_alive():
self.after(100, lambda: self.check_thread(thread))
else:
text = "Random value: {}".format(thread.result)
self.label.config(text=text)
self.button.config(state=tk.NORMAL)

Основное отличие в том, что когда поток не активен, можно получить значение result, потому что оно было задано до завершения выполнения.

Соединение потоков прогрессбаром

Шкалы прогресса — это удобные индикаторы статуса фоновых задач, которые показывают пропорционально заполняемую часть шкалы относительно общего прогресса. Обычно они используются в тех операциях, выполнение которых занимает много времени. Распространенная практика — объединять их с потоками, которые выполняют эти задачи для предоставления визуальной обратной связи для конечных пользователей.

Приложение будет состоять из горизонтальной шкалы прогресса, которая будет постепенно увеличивать количество прогресса после нажатия на кнопку Старт:

Соединение потоков прогрессбаром

Для симуляции выполнения фоновой задачи инкремент шкалы будет генерироваться в другом потоке, который должен приостанавливать выполнение на 1 секунду для каждого шага.

Коммуникация будет настроена с помощью синхронизированной очереди, которая позволяет обмениваться информации, сохраняя потокобезопасность:


import time
import queue
import threading
import tkinter as tk
import tkinter.ttk as ttk
import tkinter.messagebox as mb

class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Пример Progressbar")
self.queue = queue.Queue()
self.progressbar = ttk.Progressbar(self, length=300,
orient=tk.HORIZONTAL)
self.button = tk.Button(self, text="Старт",
command=self.start_action)

self.progressbar.pack(padx=10, pady=10)
self.button.pack(padx=10, pady=10)

def start_action(self):
self.button.config(state=tk.DISABLED)
thread = AsyncAction(self.queue, 20)
thread.start()
self.poll_thread(thread)

def poll_thread(self, thread):
self.check_queue()
if thread.is_alive():
self.after(100, lambda: self.poll_thread(thread))
else:
self.button.config(state=tk.NORMAL)
mb.showinfo("Готово!", "Асинхронное действие завершено")

def check_queue(self):
while self.queue.qsize():
try:
step = self.queue.get(0)
self.progressbar.step(step * 100)
except queue.Empty:
pass

class AsyncAction(threading.Thread):
def __init__(self, queue, steps):
super().__init__()
self.queue = queue
self.steps = steps

def run(self):
for _ in range(self.steps):
time.sleep(1)
self.queue.put(1 / self.steps)

if __name__ == "__main__":
app = App()
app.mainloop()

Как работает прогрессбар с потоками

Progressbar — это тематический виджет из модуля tkinter.ttk.

Также нужно импортировать модуль queue, который определяет синхронизированные коллекции, такие как Queue. Синхронность — важная тема в средах с несколькими потоками, потому что если к ресурсам с общим доступом попытаться получить доступ одновременно, результат будет непредсказуемым. Такие маловероятные, но все равно возможные сценарии называются состоянием гонки.

Теперь класс App включает такие новые инструкции:


# ...
import queue
import tkinter.ttk as ttk

class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Пример Progressbar")
self.queue = queue.Queue()
self.progressbar = ttk.Progressbar(self, length=300,
orient=tk.HORIZONTAL)

Как и в предыдущих примерах метод start_action() запускает поток, передавая queue и количество шагов, которые будут симулировать долгоиграющую задачу:


def start_action(self):
self.button.config(state=tk.DISABLED)
thread = AsyncAction(self.queue, 20)
thread.start()
self.poll_thread(thread)

Подкласс AsyncAction определяет конструктор для получения этих параметров, которые позже будут использованы в методе run():


class AsyncAction(threading.Thread):
def __init__(self, queue, steps):
super().__init__()
self.queue = queue
self.steps = steps

def run(self):
for _ in range(self.steps):
time.sleep(1)
self.queue.put(1 / self.steps)

Цикл приостанавливает выполнение потока на 1 секунду и добавляет инкремент в очередь в зависимости от значения атрибута steps.

Элемент, добавленный в очередь, считывается из экземпляра приложения с помощью чтения очереди из check_queue():


def check_queue(self):
while self.queue.qsize():
try:
step = self.queue.get(0)
self.progressbar.step(step * 100)
except queue.Empty:
pass

Следующий метод периодически вызывается из poll_thread(), который опрашивает статус потока и планирует сам себя с помощью after(), пока поток не завершит выполнение:


def poll_thread(self, thread):
self.check_queue()
if thread.is_alive():
self.after(100, lambda: self.poll_thread(thread))
else:
self.button.config(state=tk.NORMAL)
mb.showinfo("Готово!", "Асинхронное действие завершено")

Отмена запланированных действий

Механизм планирования Tkinter не только предоставляет методы для откладывания выполнения функций обратного вызова, но дает возможность отменять те, которые еще не были выполнены. Это может быть операция, которая занимает много времени. И можно разрешить пользователям остановить ее, нажав на кнопку или закрыв приложение.

Возьмем пример из первого приложения и добавим кнопку Stop, которая позволяет остановить запланированное действие.

Она будет активной только при наличии запланированного действия. Это значит, что после нажатия кнопки слева пользователь может подождать 5 секунд или нажать на Stop, чтобы снова сделать ее доступной:

Отмена запланированных действий

Метод after_cancel() отменяет выполнение запланированного действия, используя идентификатор, который вернулся после вызова after(). В этом примере это значение хранится в атрибуте scheduled_id:


import tkinter as tk

class App(tk.Tk):
def __init__(self):
super().__init__()
self.button = tk.Button(self, command=self.start_action,
text="Подождите 5 секунд")
self.cancel = tk.Button(self, command=self.cancel_action,
text="Стоп", state=tk.DISABLED)
self.button.pack(padx=30, pady=20, side=tk.LEFT)
self.cancel.pack(padx=30, pady=20, side=tk.LEFT)

def start_action(self):
self.button.config(state=tk.DISABLED)
self.cancel.config(state=tk.NORMAL)
self.scheduled_id = self.after(5000, self.init_buttons)

def init_buttons(self):
self.button.config(state=tk.NORMAL)
self.cancel.config(state=tk.DISABLED)

def cancel_action(self):
print("Отмена событий", self.scheduled_id)
self.after_cancel(self.scheduled_id)
self.init_buttons()

if __name__ == "__main__":
app = App()
app.mainloop()

Как происходит отмена событий

Чтобы отменить запланированное действие для функции обратного вызова сперва нужен идентификатор, который возвращает after(). Сохраним его в атрибуте scheduled_id, поскольку он понадобится в отдельном методе:


def start_action(self):
self.button.config(state=tk.DISABLED)
self.cancel.config(state=tk.NORMAL)
self.scheduled_id = self.after(5000, self.init_buttons)

Затем это поле передается в after_callback() обратного вызова кнопки Стоп:


def cancel_action(self):
print("Отмена событий", self.scheduled_id)
self.after_cancel(self.scheduled_id)
self.init_buttons()

Важно отключить кнопку Старт после клика, ведь если нажать ее дважды, переменная scheduled_id будет перезаписана, и кнопка Стоп сможет отменить только последнее запланированное действие.

Также стоит отметить, что after_cancel() не сработает, если вызвать ее без идентификатора действия, которое уже было выполнено.

В этом разделе было рассмотрено, как отменить запланированное действие, но если эта функция опрашивала статус фонового потока, важно знать, как остановить и его тоже.

К сожалению, не существует официального API для остановки экземпляра Thread. При определении кастомного подкласса может потребоваться добавить флаг, который периодически проверяется в методе run():


class MyAsyncAction(threading.Thread):
def __init__(self):
super().__init__()
self.do_stop = False

def run(self):
# Начать выполнение...
if not self.do_stop:
# Продолжить выполнение...

Затем этот флаг может быть изменен внешне с помощью thread.do_stop = True при вызове after_cancel() для остановки и потока.

Этот подход зависит от операций в методе runнапример, его проще будет реализовать при наличии цикла, ведь будет возможность выполнять проверку между итерациями.

А с Python 3.4 можно использовать модуль asyncio, который включает классы и функции для управления асинхронными операциями, включая отмены. Хотя он и не касается этого материала, на него обязательно стоит обратить внимание.

Появились вопросы? Задайте на Яндекс Кью

У блога есть сообщество на Кью, подписывайтесь >> Python Q << и задавайте вопросы. Спрашивайте по контенту, про python и программирование в целом. Обещаю отвечать.

Вам помогла эта статья? Поделитесь в соцсетях или блоге. Репосты помогают сайту развиться.

Тест на знание python

Какой будет результат выполнения кода — print(type(1J)) ?
Что нужно вcтавить после "if", для вывода "x четное число"
Что выведет этот код?
Какой будет результат выполнения кода — print(type(lambda: None)) ?
Какой будет результат выполнения этого кода?
Александр
Я создал этот блог в 2018 году, чтобы распространять полезные учебные материалы, документации и уроки на русском. На сайте опубликовано множество статей по основам python и библиотекам, уроков для начинающих и примеров написания программ.