asyncioの非同期処理を試す

こんにちは、物件連動チームのコヤマです。

本記事では、Pythonが提供する非同期処理ライブラリのasyncioを学習します。非同期処理を理解して使用できるようになることが目標です!

同期処理と非同期処理

同期処理

同期処理は、プログラムがある処理を実行している間、その処理が終わるまで次の処理を開始ません。

処理が完了するまで待ってから次の処理を実行するので、処理の完了までに時間がかかる場合は待ち時間があります。

非同期処理

非同期処理は、処理の完了を待たずに次の処理を開始する方式です。

非同期処理は待機時間を他の処理の実行に割り当てることができるので、複数の処理を同時に実行しているようになります。 それにより、無駄な待ち時間を減らすことができ、速度向上が期待できます。

asyncioを使ってみる

asyncioとは

asyncioは、Pythonが提供する非同期I/O(入出力)を扱うためのライブラリです。asyncioを使用することで、非同期処理を簡単に実現することができます。

用語理解

asyncioを使う際には、以下の用語を知ることが大事です。

  • イベントループ (event loop):イベントループは、コルーチンの実行を管理する仕組みです。イベントループは、コルーチンを登録・実行し、他のコルーチンに制御を渡すことができます。これにより、複数のタスクが効率的に実行されます
  • コルーチン (coroutine):コルーチンは、一時停止・再開が可能な関数のことを指します。コルーチンは、非同期処理の途中で他のタスクに制御を渡すことができます。これにより、複数のタスクが効率的に実行されます。
  • async def:async defは、コルーチン関数を定義するためのキーワードです。関数定義の前にasyncを付けることで、その関数はコルーチン関数となります。
  • await:awaitは、コルーチンを一時停止・再開するためのキーワードです。awaitの後に続く待ち時間のある処理が完了するまで、現在のコルーチンは一時停止されます。一時停止している間に他のタスクに制御を渡すことで、非同期処理を実現します。処理が完了すると再開されます。

以下のような物件データファイル(property_data.json)を読み込んで各物件の賃料を変更する処理を、通常の同期処理とasyncioを用いた非同期処理で書いてみます。

# property_data.json
[
    {"id": 1, "name": "宇宙ハイツ", "chinryo": 120000},
    {"id": 2, "name": "コーポUSA", "chinryo": 90000},
    {"id": 3, "name": "サムライアパート", "chinryo": 75000},
    {"id": 4, "name": "キリンハイツ", "chinryo": 110000},
    {"id": 5, "name": "コーポコーポ", "chinryo": 95000},
    {"id": 6, "name": "ゴリラレジデンス", "chinryo": 145000},
    {"id": 7, "name": "ハイツ青空", "chinryo": 80000},
    {"id": 8, "name": "令和コーポ", "chinryo": 55000},
    {"id": 9, "name": "アパート横綱", "chinryo": 125000},
    {"id": 10, "name": "ホワイトレジデンス", "chinryo": 150000}
]

同期処理の例

import time
import json

class SampleClient:
    def update_property_data(self, property, new_chinryo):
        # 物件情報更新apiを叩く想定。
        print(f"updating...{property['id']}:{property['name']}")
        time.sleep(1)
        print(f"done!...{property['id']}:{property['name']}")

def load_properties(file_path):
    with open(file_path, "r") as f:
        return json.load(f)

def main():
    client = SampleClient()

    properties = load_properties("property_data.json")

    for property in properties:
        new_chinryo = property["chinryo"] * 1.1
        client.update_property_data(property, new_chinryo)

# 同期処理を測定
start_sync = time.time()
main()
end_sync = time.time()
print("========================================")
print(f"同期処理の実行時間: {end_sync - start_sync} 秒")
print("========================================")

以下実行結果です。各物件の賃料を更新するために1秒スリープされ、処理が順次実行されるので約10秒かかりました。

updating...1:宇宙ハイツ
done!...1:宇宙ハイツ
updating...2:コーポUSA
done!...2:コーポUSA
..省略..
updating...10:ホワイトレジデンス
done!...10:ホワイトレジデンス
========================================
同期処理の実行時間: 10.035651922225952 秒
========================================

非同期処理の例

import asyncio
import time
import json

class SampleClient:
    async def update_property_data_async(self, property, new_chinryo):
        print(f"updating...{property['id']}:{property['name']}")
        await asyncio.sleep(1)  # awaitで処理を待っている間に他のタスクを実行する
        print(f"done!...{property['id']}:{property['name']}")
    
        
def load_properties(filename):
    with open(filename, "r") as f:
        properties = json.load(f)
    return properties

async def update_properties_async(properties):
    client = SampleClient()

    tasks = []
    for property in properties:
        new_chinryo = property["chinryo"] * 1.1
        task = asyncio.create_task(client.update_property_data_async(property, new_chinryo))
        tasks.append(task)

    await asyncio.gather(*tasks)

async def main_async():
    properties = load_properties("property_data.json")
    await update_properties_async(properties)

    
# 非同期処理を測定
start_async = time.time()
asyncio.run(main_async())
end_async = time.time()
print("========================================")
print(f"非同期処理の実行時間: {end_async - start_async} 秒")
print("========================================")

以下実行結果です

updating...1:宇宙ハイツ
updating...2:コーポUSA
updating...3:サムライアパート
updating...4:キリンハイツ
updating...5:コーポコーポ
updating...6:ゴリラレジデンス
updating...7:ハイツ青空
updating...8:令和コーポ
updating...9:アパート横綱
updating...10:ホワイトレジデンス
done!...1:宇宙ハイツ
done!...2:コーポUSA
done!...3:サムライアパート
done!...4:キリンハイツ
done!...5:コーポコーポ
done!...6:ゴリラレジデンス
done!...7:ハイツ青空
done!...8:令和コーポ
done!...9:アパート横綱
done!...10:ホワイトレジデンス
========================================
非同期処理の実行時間: 1.004033088684082 秒
========================================

sleep待ちをしてる間に、別のタスクを実行することで高速で完了しました。すごい。

ちなみに

以下のように、タスクをまとめて実行する際にawaitを使用しない場合どうなるか試してみました。

async def update_properties_async(properties):
    client = SampleClient()

    tasks = []
    for property in properties:
        new_chinryo = int(property["chinryo"]) * 1.1
        task = asyncio.create_task(client.update_property_data_async(property, new_chinryo))
        tasks.append(task)

    # await asyncio.gather(*tasks)
    asyncio.gather(*tasks) # awaitを使用しない
========================================
非同期処理の実行時間: 0.0021240711212158203 秒
========================================
updating...1:宇宙ハイツ
updating...2:コーポUSA
updating...3:サムライアパート
updating...4:キリンハイツ
updating...5:コーポコーポ
updating...6:ゴリラレジデンス
updating...7:ハイツ青空
updating...8:令和コーポ
updating...9:アパート横綱
updating...10:ホワイトレジデンス

一見、さらに速くなったように見えますが、実際には実行時間結果が先に出力されており、done!…がありません。

実行時にawaitを使用しないと、asyncio.gather()がコルーチンオブジェクトを返すだけで、実際にタスクが完了するのを待たずに処理が続行されるようです。

それにより、タスクが途中で終了してしまいました。なるほど..!

終わりに

簡単なサンプルですが、非同期処理の力を実感できました。これから大量のデータ処理をする機会が増えると思うので活用していきたいです。非同期、怖くない!