Python 並發程式設計 (二):AsyncIO 與 async/await (第 23 章)

站主自己的課程,請大家支持
揭秘站長的架站心法:如何利用 Hugo × AI 打造高質感個人品牌網站? 揭秘站長的架站心法:如何利用 Hugo × AI 打造高質感個人品牌網站?
  • Post by
  • Jan 06, 2024
post-thumb

在上一章我們提到,Threading 適合 I/O 密集型任務。但在超高並發場景 (例如同時處理 10,000 個連線) 下,開啟 10,000 個執行緒的記憶體開銷與 Context Switch 成本是非常驚人的。

於是,AsyncIO (非同步 I/O) 誕生了。它的核心思想是:我們只需要一個執行緒 (Single Thread),透過 Event Loop (事件迴圈) 高速切換任務,就能達到並發效果。

這就像一個超級厲害的快餐店店員,他在等待薯條炸好的空檔,轉身去幫另一個客人點餐,完全不浪費任何等待時間。

1. 核心概念

  1. Coroutine (協程):可以被暫停和恢復執行的函數 (使用 async def 定義)。
  2. Await:等待一個 Coroutine 完成,同時釋放控制權讓 Event Loop 去做別的事。
  3. Event Loop:大總管,負責排程所有的 Coroutine。

2. 第一個 AsyncIO 程式

import asyncio
import time

# 定義一個 Coroutine
async def say_after(delay, what):
    print(f"等待 {delay} 秒...")
    # 這裡不能用 time.sleep() (它是同步的,會卡死整個執行緒)
    # 必須用 await asyncio.sleep() (非同步等待)
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"開始於 {time.strftime('%X')}")

    # 建立兩個任務同時跑
    # asyncio.create_task 會將協程排入 Event Loop 立即排程
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    # 等待兩個任務完成
    await task1
    await task2

    print(f"結束於 {time.strftime('%X')}")

# 執行 main 協程 (Python 3.7+)
asyncio.run(main())

輸出結果:

開始於 10:00:00
等待 1 秒...
等待 2 秒...
hello  (在 10:00:01 出現)
world  (在 10:00:02 出現)
結束於 10:00:02

注意:總執行時間約為 2 秒,而不是 1+2=3 秒!

3. 同步 vs 非同步的陷阱

新手最常犯的錯,就是在 async 函數裡面呼叫了同步的阻塞函數 (Blocking Function)。

# 錯誤示範!
async def bad_coroutine():
    # 這會卡死整個 Event Loop,其他幾千個連線都會被暫停!
    time.sleep(5) 
    # 或者是 requests.get(...) 也是同步的,不該在 async 中使用

在 AsyncIO 世界中,一切等待都必須是 awaitable

  • time.sleep() ❌ -> await asyncio.sleep()
  • requests.get() ❌ -> aiohttphttpx (非同步庫) ✅
  • open(file) ❌ -> aiofiles

4. 實戰:並發下載網頁

我們使用 aiohttp 套件 (需另外安裝 pip install aiohttp) 來示範真正的非同步爬蟲。

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["http://example.com" for _ in range(10)]
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_url(session, url))
            tasks.append(task)
        
        # gather 會同時等待所有任務完成,並收集結果
        responses = await asyncio.gather(*tasks)
        print(f"成功下載 {len(responses)} 個頁面")

start = time.time()
asyncio.run(main())
print(f"花費時間: {time.time() - start:.2f} 秒")

這段程式碼即便下載 100 個網頁,花費的時間也只取決於最慢的那一個網頁響應時間,幾乎是瞬間發出所有請求。

5. 總結

比較ThreadingAsyncIO
並發模型Pre-emptive Multitasking (作業系統決定何時切換)Cooperative Multitasking (程式自己決定何時切換 await)
Race Condition容易發生,需用 Lock較少發生,除非在 await 斷點間共享狀態
適用場景傳統 I/O、整合既有同步庫高並發 Web Server (如 FastAPI, Node.js 風格應用)

AsyncIO 是現代 Python 開發者 (特別是 Web 後端) 的必備技能。有了它,Python 也能寫出效能驚人的網頁伺服器!

下一章,我們將學習如何與外部世界溝通——使用 HTTP Requests 呼叫 API


延伸閱讀

LATEST POST
TAG