【asyncpg 教學】#02 連線管理:Connection Pool 實戰

測驗:asyncpg 連線池 Connection Pool 實戰

共 5 題,點選答案後會立即顯示結果

1. 為什麼在高並發的 Web API 中,單一資料庫連線不夠用?

  • A. 因為單一連線無法執行 SQL 查詢
  • B. 因為多個請求必須排隊等待同一條連線,造成效能瓶頸
  • C. 因為 asyncpg 不支援單一連線模式
  • D. 因為單一連線無法使用 async/await 語法

2. 使用 asyncpg 建立連線池時,應該使用哪個函數?

  • A. asyncpg.connect()
  • B. asyncpg.pool()
  • C. asyncpg.create_pool()
  • D. asyncpg.new_pool()

3. 從連線池取得連線的推薦方式是什麼?

  • A. 使用 async with pool.acquire()
  • B. 直接呼叫 pool.get_connection()
  • C. 使用 pool.borrow()
  • D. 每次都呼叫 asyncpg.connect()

4. 關於連線池的 min_sizemax_size 參數,下列敘述何者正確?

  • A. min_size 是連線池最多可建立的連線數
  • B. min_size 是連線池預先建立並保持的最少連線數
  • C. max_size 是連線池啟動時建立的連線數
  • D. 這兩個參數的值必須相同

5. 在 FastAPI 中整合 asyncpg 連線池時,連線池應該在什麼時機建立和關閉?

  • A. 在每個 endpoint 函數內建立和關閉
  • B. 在全域變數宣告時建立,永不關閉
  • C. 在應用程式啟動時建立(lifespan),關閉時釋放
  • D. 由 FastAPI 自動管理,不需要手動處理

前言

在上一篇文章中,我們學會了使用 asyncpg.connect() 建立單一連線。但在真實的應用場景中,單一連線往往不夠用。這篇文章將帶你理解連線池(Connection Pool)的概念,以及如何在 asyncpg 中正確使用它。

學習目標

讀完本篇後,你將能夠:

  • 理解為什麼需要連線池(Connection Pool)
  • 正確建立與配置 asyncpg 連線池
  • 使用 async with 安全地取得與釋放連線
  • 設定連線池的大小與逾時參數

為什麼單一連線不夠用?

假設你有一個 Web API,同時有 100 個使用者發送請求。如果只有一條連線,會發生什麼事?

# 這是有問題的做法
conn = await asyncpg.connect(...)  # 只有一條連線

async def handle_request():
    # 所有請求都搶這一條連線
    result = await conn.fetch("SELECT * FROM users")
    return result
Code language: PHP (php)

問題在於:

  1. 效能瓶頸:100 個請求必須排隊等待同一條連線
  2. 連線斷線風險:如果這條連線斷了,整個服務就掛了
  3. 資源浪費:資料庫明明可以處理多個連線,卻只用一條

Connection Pool 是什麼?

連線池就像是一個「連線的倉庫」:

  • 預先建立多條連線放在倉庫裡
  • 需要時從倉庫借一條出來用
  • 用完歸還,讓其他人可以使用
┌─────────────────────────────────────┐
│           Connection Pool           │
│  ┌────┐ ┌────┐ ┌────┐ ┌────┐       │
│  │conn│ │conn│ │conn│ │conn│ ...   │
│  └────┘ └────┘ └────┘ └────┘       │
└─────────────────────────────────────┘
     ↑         ↑
  acquire   release
     │         │
   使用中    歸還

建立連線池:create_pool()

asyncpg 提供 create_pool() 函數來建立連線池:

import asyncpg
import asyncio

async def main():
    # 建立連線池
    pool = await asyncpg.create_pool(
        host='localhost',
        port=5432,
        user='postgres',
        password='password',
        database='mydb',
        min_size=5,      # 最少保持 5 條連線
        max_size=20,     # 最多建立 20 條連線
    )

    # 使用連線池...

    # 關閉連線池
    await pool.close()

asyncio.run(main())
Code language: PHP (php)

也可以使用連線字串:

pool = await asyncpg.create_pool(
    'postgresql://postgres:password@localhost:5432/mydb',
    min_size=5,
    max_size=20,
)
Code language: JavaScript (javascript)

從連線池取得連線

有兩種方式可以從連線池取得連線:

方式一:使用 async with(推薦)

async with pool.acquire() as conn:
    # 在這個區塊內使用 conn
    result = await conn.fetch("SELECT * FROM users")
    # 離開區塊時,連線自動歸還給連線池
Code language: PHP (php)

這是最安全的方式,因為:

  • 連線一定會被歸還(即使發生例外)
  • 不需要手動管理連線的生命週期

方式二:手動 acquire/release

conn = await pool.acquire()
try:
    result = await conn.fetch("SELECT * FROM users")
finally:
    await pool.release(conn)
Code language: JavaScript (javascript)

這種方式比較容易出錯,建議除非有特殊需求,否則優先使用 async with

連線池的生命週期管理

完整的連線池生命週期如下:

import asyncpg
import asyncio

async def main():
    # 1. 建立連線池
    pool = await asyncpg.create_pool(
        'postgresql://postgres:password@localhost:5432/mydb',
        min_size=2,
        max_size=10,
    )

    try:
        # 2. 使用連線池執行查詢
        async with pool.acquire() as conn:
            rows = await conn.fetch("SELECT * FROM users LIMIT 5")
            for row in rows:
                print(dict(row))
    finally:
        # 3. 關閉連線池(重要!)
        await pool.close()

asyncio.run(main())
Code language: PHP (php)

也可以把連線池本身當作 context manager:

async def main():
    async with asyncpg.create_pool(...) as pool:
        async with pool.acquire() as conn:
            result = await conn.fetch("SELECT 1")
    # 離開區塊時,連線池自動關閉
Code language: PHP (php)

重要參數說明

min_size 與 max_size

pool = await asyncpg.create_pool(
    ...,
    min_size=5,   # 連線池最少保持的連線數
    max_size=20,  # 連線池最多可建立的連線數
)
Code language: PHP (php)
  • min_size:連線池會預先建立這麼多連線,隨時待命
  • max_size:當所有連線都被借走,最多可以再建立到這個數量
  • 當連線閒置過久,會縮減回 min_size

調校建議

  • min_size:設為平時的平均並發數
  • max_size:設為尖峰時期的最大並發數
  • 通常 minsize 設 5-10,maxsize 設 20-50

command_timeout

pool = await asyncpg.create_pool(
    ...,
    command_timeout=60,  # 單位:秒
)
Code language: PHP (php)

單一 SQL 指令的執行時間上限。超過時間會拋出 asyncpg.exceptions.QueryCanceledError

timeout(acquire 逾時)

當所有連線都被借走,新的 acquire() 要等待。可以設定等待的上限:

try:
    # 最多等待 10 秒
    async with pool.acquire(timeout=10) as conn:
        result = await conn.fetch("SELECT 1")
except asyncio.TimeoutError:
    print("無法取得連線,連線池已滿")
Code language: PHP (php)

連線池的狀態監控

asyncpg 連線池提供幾個有用的屬性:

pool = await asyncpg.create_pool(...)

# 查看連線池狀態
print(f"連線池大小: {pool.get_size()}")        # 目前總連線數
print(f"閒置連線數: {pool.get_idle_size()}")   # 可用的連線數
print(f"最小連線數: {pool.get_min_size()}")    # min_size 設定值
print(f"最大連線數: {pool.get_max_size()}")    # max_size 設定值
Code language: PHP (php)

這對於監控和除錯非常有用。

整合 FastAPI 的最佳實踐

在 FastAPI 中使用 asyncpg 連線池的推薦模式:

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import asyncpg

# 全域變數存放連線池
pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時建立連線池
    global pool
    pool = await asyncpg.create_pool(
        'postgresql://postgres:password@localhost:5432/mydb',
        min_size=5,
        max_size=20,
    )
    print("連線池已建立")

    yield  # 應用程式運行中

    # 關閉時釋放連線池
    await pool.close()
    print("連線池已關閉")

app = FastAPI(lifespan=lifespan)

# 依賴注入:取得連線
async def get_conn():
    async with pool.acquire() as conn:
        yield conn

@app.get("/users")
async def get_users(conn: asyncpg.Connection = Depends(get_conn)):
    rows = await conn.fetch("SELECT * FROM users LIMIT 10")
    return [dict(row) for row in rows]

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn: asyncpg.Connection = Depends(get_conn)):
    row = await conn.fetchrow(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )
    if row is None:
        return {"error": "User not found"}
    return dict(row)
Code language: PHP (php)

這個模式的優點:

  1. 生命週期管理:連線池隨 FastAPI 啟動/關閉
  2. 依賴注入:每個 endpoint 自動取得連線
  3. 自動歸還:請求結束時連線自動歸還

完整範例

以下是一個完整的範例,展示連線池的各種操作:

import asyncpg
import asyncio

async def main():
    # 建立連線池
    pool = await asyncpg.create_pool(
        host='localhost',
        port=5432,
        user='postgres',
        password='password',
        database='mydb',
        min_size=2,
        max_size=10,
        command_timeout=60,
    )

    print(f"初始連線數: {pool.get_size()}")
    print(f"閒置連線數: {pool.get_idle_size()}")

    # 模擬多個並發請求
    async def fetch_data(task_id: int):
        async with pool.acquire() as conn:
            print(f"Task {task_id}: 取得連線,閒置數={pool.get_idle_size()}")
            # 模擬查詢耗時
            await asyncio.sleep(1)
            result = await conn.fetchval("SELECT COUNT(*) FROM users")
            print(f"Task {task_id}: 查詢完成,共 {result} 筆")
            return result

    # 同時執行 5 個查詢
    tasks = [fetch_data(i) for i in range(5)]
    results = await asyncio.gather(*tasks)

    print(f"所有查詢完成: {results}")
    print(f"最終閒置連線數: {pool.get_idle_size()}")

    # 關閉連線池
    await pool.close()

asyncio.run(main())
Code language: PHP (php)

常見錯誤與解決方案

錯誤 1:忘記關閉連線池

# 錯誤:連線池沒有關閉
async def main():
    pool = await asyncpg.create_pool(...)
    # ... 使用連線池 ...
    # 忘記 await pool.close()
Code language: PHP (php)

解決方案:使用 async withtry/finally

錯誤 2:在連線池關閉後還想使用

# 錯誤:連線池已關閉
await pool.close()
async with pool.acquire() as conn:  # 會拋出錯誤
    ...
Code language: PHP (php)

解決方案:確保連線池的生命週期涵蓋所有使用它的程式碼。

錯誤 3:連線池耗盡

max_size 條連線都被借走,新的 acquire() 會卡住:

# 可能會永遠等待
async with pool.acquire() as conn:
    ...
Code language: PHP (php)

解決方案:設定 timeout 參數:

try:
    async with pool.acquire(timeout=5) as conn:
        ...
except asyncio.TimeoutError:
    # 處理連線池滿的情況
    pass
Code language: PHP (php)

小結

這篇文章介紹了 asyncpg 連線池的核心概念:

  1. 為什麼需要連線池:解決單一連線的效能瓶頸和可靠性問題
  2. 建立連線池:使用 asyncpg.create_pool() 建立
  3. 取得連線:使用 async with pool.acquire() 安全地借用連線
  4. 參數調校minsizemaxsizetimeout 的設定
  5. FastAPI 整合:使用 lifespan 和依賴注入的最佳實踐

下一篇文章將介紹如何使用連線池執行交易(Transaction),確保多個 SQL 操作的原子性。

進階測驗:asyncpg 連線池 Connection Pool 實戰

測驗目標:驗證你是否能在實際情境中應用所學。
共 5 題,包含情境題與錯誤診斷題。

1. 你正在開發一個 FastAPI 應用程式,預期平時有 10 個並發連線,尖峰時期可能達到 40 個。你應該如何設定連線池? 情境題

  • A. min_size=1, max_size=100
  • B. min_size=10, max_size=40
  • C. min_size=40, max_size=40
  • D. min_size=40, max_size=100

2. 同事寫了以下程式碼,但在壓力測試時發現連線沒有被正確歸還。問題出在哪裡? 錯誤診斷

async def get_users(): conn = await pool.acquire() result = await conn.fetch(“SELECT * FROM users”) return result
  • A. 應該使用 conn.fetchall() 而不是 conn.fetch()
  • B. 連線池沒有正確初始化
  • C. 沒有使用 try/finallyasync with 確保連線歸還
  • D. 應該在函數結束前呼叫 conn.close()

3. 你的應用程式在高流量時收到以下錯誤,應該如何處理? 錯誤診斷

asyncio.TimeoutError: connection pool is exhausted
  • A. 重新啟動應用程式
  • B. 將 command_timeout 設得更大
  • C. 改用單一連線模式
  • D. 增加 max_size 或設定 acquire(timeout=...) 並處理逾時情況

4. 你需要在 FastAPI 中實作一個 endpoint,讓每個請求自動取得連線並在結束時歸還。最佳實踐是什麼? 情境題

  • A. 在每個 endpoint 內手動呼叫 pool.acquire()pool.release()
  • B. 建立一個依賴注入函數,使用 async with pool.acquire() 並 yield 連線
  • C. 將連線存在全域變數中,所有 endpoint 共用
  • D. 在每個 endpoint 建立新的連線池

5. 以下程式碼在應用程式關閉後仍然嘗試使用連線池,導致錯誤。根本原因是什麼? 情境題

@asynccontextmanager async def lifespan(app: FastAPI): global pool pool = await asyncpg.create_pool(…) yield await pool.close() # 在某個背景任務中 async def background_task(): await asyncio.sleep(60) async with pool.acquire() as conn: # 這裡出錯 await conn.execute(“INSERT INTO logs …”)
  • A. pool.acquire() 語法錯誤
  • B. 背景任務不能使用連線池
  • C. 背景任務的執行時間超過了應用程式的生命週期,連線池已在 lifespan 結束時關閉
  • D. 應該使用 pool.release() 而不是 async with

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *