【asyncpg 教學】#03 CRUD 操作與參數化查詢

測驗:asyncpg CRUD 操作與參數化查詢

共 5 題,點選答案後會立即顯示結果

1. 在 asyncpg 中,當你需要查詢多筆資料並遍歷處理時,應該使用哪個方法?

  • A. fetch()
  • B. fetchrow()
  • C. fetchval()
  • D. execute()

2. 關於 fetchrow() 方法,以下敘述何者正確?

  • A. 找不到資料時會回傳空的 list []
  • B. 回傳型別是 list[Record]
  • C. 找不到資料時會回傳 None
  • D. 只能用於 SELECT COUNT(*) 這類聚合查詢

3. asyncpg 使用什麼格式的參數佔位符來進行參數化查詢?

  • A. 使用 %s 和 %d 格式化符號
  • B. 使用 $1, $2, $3 佔位符
  • C. 使用 ? 作為佔位符
  • D. 使用 :name 命名參數

4. 如果需要在 INSERT 後取得新插入資料的 ID,應該如何做?

  • A. 使用 execute() 並解析回傳的狀態字串
  • B. 先 INSERT 再用 SELECT MAX(id) 查詢
  • C. 使用 fetchrow() 配合 INSERT 語句
  • D. 使用 fetchval() 配合 RETURNING 子句

5. 關於 executemany() 方法,以下敘述何者正確?

  • A. 執行速度與迴圈執行多次 execute() 相同
  • B. 將所有資料在一次資料庫往返中完成,效能更佳
  • C. 只能用於 SELECT 查詢
  • D. 需要為每筆資料建立個別的連線

前言

在前兩篇文章中,我們學會了如何建立 asyncpg 連線以及使用連線池管理資料庫連線。本篇將深入探討如何使用 asyncpg 執行實際的資料庫操作:新增、查詢、更新、刪除(CRUD),以及如何正確使用參數化查詢來防止 SQL Injection 攻擊。

學習目標

讀完本篇後,你將能夠:

  • 使用 asyncpg 執行 INSERT、UPDATE、DELETE 操作
  • 正確使用參數化查詢防止 SQL Injection
  • 理解 fetch、fetchrow、fetchval、execute 的差異
  • 處理查詢結果並轉換為 Python 物件

查詢方法總覽

asyncpg 提供了四種主要的查詢方法,每種都有不同的用途:

方法 回傳值 使用時機
fetch() list[Record] 查詢多筆資料
fetchrow() RecordNone 查詢單筆資料
fetchval() 單一值或 None 取得單一欄位值
execute() 狀態字串 INSERT/UPDATE/DELETE

讓我們逐一了解這些方法的使用方式。

fetch() – 查詢多筆資料

當你需要取得多筆查詢結果時,使用 fetch()

import asyncpg

async def get_all_users(conn: asyncpg.Connection):
    rows = await conn.fetch("SELECT id, name, email FROM users")

    for row in rows:
        print(f"ID: {row['id']}, Name: {row['name']}")

    return rows
Code language: JavaScript (javascript)

fetch() 回傳的是 list[asyncpg.Record]。即使查詢結果為空,也會回傳空 list [],不會回傳 None

Record 物件的存取方式

asyncpg 的 Record 物件支援多種存取方式:

row = await conn.fetchrow("SELECT id, name FROM users WHERE id = $1", 1)

# 方式 1:像 dict 一樣用 key 存取
user_id = row['id']
name = row['name']

# 方式 2:像 tuple 一樣用 index 存取
user_id = row[0]
name = row[1]

# 方式 3:取得所有 keys
columns = row.keys()  # ['id', 'name']

# 方式 4:取得所有 values
values = row.values()  # [1, 'Alice']
Code language: PHP (php)

fetchrow() – 查詢單筆資料

當你確定只需要一筆資料時,使用 fetchrow()

async def get_user_by_id(conn: asyncpg.Connection, user_id: int):
    row = await conn.fetchrow(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id
    )

    if row is None:
        return None

    return dict(row)  # 轉換為 dict
Code language: PHP (php)

重要提醒fetchrow() 在找不到資料時會回傳 None,務必檢查回傳值。

fetchval() – 取得單一值

當你只需要查詢結果中的單一欄位值時,fetchval() 是最簡潔的選擇:

async def count_users(conn: asyncpg.Connection) -> int:
    count = await conn.fetchval("SELECT COUNT(*) FROM users")
    return count  # 直接回傳數字,例如 42

async def get_user_name(conn: asyncpg.Connection, user_id: int) -> str | None:
    name = await conn.fetchval(
        "SELECT name FROM users WHERE id = $1",
        user_id
    )
    return name  # 回傳 'Alice' 或 None
Code language: PHP (php)

fetchval() 預設回傳第一個欄位的值。如果需要取得其他欄位,可以指定 column 參數:

# 取得第二個欄位(index 從 0 開始)
email = await conn.fetchval(
    "SELECT id, name, email FROM users WHERE id = $1",
    user_id,
    column=2  # 取得 email 欄位
)
Code language: PHP (php)

execute() – 執行非查詢語句

對於 INSERT、UPDATE、DELETE 等不需要回傳資料的操作,使用 execute()

async def create_user(conn: asyncpg.Connection, name: str, email: str):
    result = await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        name, email
    )
    print(result)  # 'INSERT 0 1' - 表示成功插入 1 筆

async def update_user(conn: asyncpg.Connection, user_id: int, name: str):
    result = await conn.execute(
        "UPDATE users SET name = $1 WHERE id = $2",
        name, user_id
    )
    print(result)  # 'UPDATE 1' - 表示更新了 1 筆

async def delete_user(conn: asyncpg.Connection, user_id: int):
    result = await conn.execute(
        "DELETE FROM users WHERE id = $1",
        user_id
    )
    print(result)  # 'DELETE 1' - 表示刪除了 1 筆
Code language: PHP (php)

execute() 回傳的是一個狀態字串,格式為 "命令 OID 影響筆數"

取得 INSERT 後的新 ID

如果需要取得新插入資料的 ID,使用 RETURNING 子句配合 fetchval()

async def create_user_return_id(conn: asyncpg.Connection, name: str, email: str) -> int:
    new_id = await conn.fetchval(
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
        name, email
    )
    return new_id  # 回傳新建立的 user id
Code language: PHP (php)

參數化查詢:$1, $2 佔位符

asyncpg 使用 PostgreSQL 原生的參數佔位符格式 $1, $2, $3…,而非 Python DB-API 常見的 ?%s

基本用法

# 單一參數
row = await conn.fetchrow(
    "SELECT * FROM users WHERE id = $1",
    user_id
)

# 多個參數
rows = await conn.fetch(
    "SELECT * FROM users WHERE age > $1 AND city = $2",
    18, "Taipei"
)

# 同一參數使用多次
rows = await conn.fetch(
    "SELECT * FROM users WHERE first_name = $1 OR last_name = $1",
    "Chen"
)
Code language: PHP (php)

為什麼要用參數化查詢?

絕對不要這樣做(SQL Injection 漏洞):

# 危險!千萬不要這樣寫!
user_input = "'; DROP TABLE users; --"
query = f"SELECT * FROM users WHERE name = '{user_input}'"
await conn.fetch(query)  # 會執行惡意 SQL
Code language: PHP (php)

正確做法

# 安全:使用參數化查詢
user_input = "'; DROP TABLE users; --"
await conn.fetch(
    "SELECT * FROM users WHERE name = $1",
    user_input  # 會被當作純文字處理,不會執行
)
Code language: PHP (php)

參數化查詢會自動處理跳脫字元,確保使用者輸入不會被解讀為 SQL 指令。

executemany() – 批次插入資料

當需要一次插入大量資料時,executemany() 比迴圈執行多次 execute() 更有效率:

async def bulk_insert_users(conn: asyncpg.Connection, users: list[tuple]):
    await conn.executemany(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        users
    )

# 使用範例
users_data = [
    ("Alice", "[email protected]"),
    ("Bob", "[email protected]"),
    ("Charlie", "[email protected]"),
]

await bulk_insert_users(conn, users_data)
Code language: PHP (php)

executemany() 會將所有資料在一次資料庫往返中完成,大幅減少網路延遲的影響。

效能比較

import time

# 方式 1:迴圈執行(慢)
start = time.time()
for name, email in users_data:
    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        name, email
    )
print(f"迴圈執行:{time.time() - start:.3f} 秒")

# 方式 2:executemany(快)
start = time.time()
await conn.executemany(
    "INSERT INTO users (name, email) VALUES ($1, $2)",
    users_data
)
print(f"executemany:{time.time() - start:.3f} 秒")
Code language: PHP (php)

對於 1000 筆資料,executemany() 通常快 10 倍以上。

將 Record 轉換為 Python 物件

轉換為 dict

row = await conn.fetchrow("SELECT id, name, email FROM users WHERE id = $1", 1)

# 方式 1:使用 dict() 建構子
user_dict = dict(row)

# 方式 2:使用解包
user_dict = {**row}
Code language: PHP (php)

轉換為 Pydantic Model

在實際專案中,我們通常會使用 Pydantic 來定義資料模型:

from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

async def get_user(conn: asyncpg.Connection, user_id: int) -> User | None:
    row = await conn.fetchrow(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id
    )

    if row is None:
        return None

    return User(**dict(row))

async def get_all_users(conn: asyncpg.Connection) -> list[User]:
    rows = await conn.fetch("SELECT id, name, email FROM users")
    return [User(**dict(row)) for row in rows]

處理欄位名稱不一致

當資料庫欄位名稱與 Python 屬性名稱不同時:

class User(BaseModel):
    id: int
    name: str
    email_address: str  # Python 習慣用底線

async def get_user(conn: asyncpg.Connection, user_id: int) -> User | None:
    row = await conn.fetchrow(
        "SELECT id, name, email AS email_address FROM users WHERE id = $1",
        user_id
    )

    if row is None:
        return None

    return User(**dict(row))

使用 SQL 的 AS 關鍵字來重新命名欄位,讓查詢結果直接符合 Pydantic Model 的欄位名稱。

完整 CRUD 範例

以下是一個完整的使用者管理模組範例:

import asyncpg
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

class UserCreate(BaseModel):
    name: str
    email: str

class UserRepository:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def create(self, user: UserCreate) -> int:
        """新增使用者,回傳新 ID"""
        async with self.pool.acquire() as conn:
            new_id = await conn.fetchval(
                "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
                user.name, user.email
            )
            return new_id

    async def get_by_id(self, user_id: int) -> User | None:
        """根據 ID 查詢使用者"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT id, name, email FROM users WHERE id = $1",
                user_id
            )
            if row is None:
                return None
            return User(**dict(row))

    async def get_all(self) -> list[User]:
        """查詢所有使用者"""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, name, email FROM users")
            return [User(**dict(row)) for row in rows]

    async def update(self, user_id: int, name: str, email: str) -> bool:
        """更新使用者資料,回傳是否成功"""
        async with self.pool.acquire() as conn:
            result = await conn.execute(
                "UPDATE users SET name = $1, email = $2 WHERE id = $3",
                name, email, user_id
            )
            # result 格式為 'UPDATE N',N 是影響的筆數
            return result == "UPDATE 1"

    async def delete(self, user_id: int) -> bool:
        """刪除使用者,回傳是否成功"""
        async with self.pool.acquire() as conn:
            result = await conn.execute(
                "DELETE FROM users WHERE id = $1",
                user_id
            )
            return result == "DELETE 1"

    async def bulk_create(self, users: list[UserCreate]) -> None:
        """批次新增多位使用者"""
        async with self.pool.acquire() as conn:
            await conn.executemany(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                [(u.name, u.email) for u in users]
            )

方法選擇速查表

情境 推薦方法 原因
查詢多筆資料 fetch() 回傳 list,方便迭代
查詢單筆資料 fetchrow() 直接回傳 Record 或 None
只要單一欄位值 fetchval() 最簡潔,直接回傳值
COUNT、SUM 等聚合 fetchval() 直接取得計算結果
INSERT 並取回 ID fetchval() + RETURNING 一次完成插入和取值
一般 INSERT/UPDATE/DELETE execute() 不需要回傳資料
批次 INSERT executemany() 效能最佳

小結

本篇介紹了 asyncpg 的四種查詢方法和參數化查詢的使用方式:

  1. fetch() 用於查詢多筆資料
  2. fetchrow() 用於查詢單筆資料
  3. fetchval() 用於取得單一值
  4. execute() 用於執行 INSERT/UPDATE/DELETE
  5. executemany() 用於批次插入
  6. 使用 $1, $2... 參數佔位符防止 SQL Injection
  7. Record 可以轉換為 dict 或 Pydantic Model

下一篇我們將介紹交易(Transaction)管理,學習如何確保多個資料庫操作的原子性。

延伸閱讀

進階測驗:asyncpg CRUD 操作與參數化查詢

測驗目標:驗證你是否能在實際情境中應用所學。
共 5 題,包含情境題與錯誤診斷題。

1. 你正在開發一個使用者管理 API,需要實作「根據 ID 查詢單一使用者」的功能。查詢結果需要轉換為 Pydantic Model。以下哪種實作方式最正確? 情境題

  • A. 使用 fetch() 查詢後取第一筆,若結果為空則回傳 None
  • B. 使用 fetchrow() 查詢,檢查是否為 None,若有資料則用 User(**dict(row)) 轉換
  • C. 使用 fetchval() 直接取得使用者物件
  • D. 使用 execute() 執行 SELECT 語句並解析回傳值

2. 小明寫了以下程式碼來搜尋使用者,但被資安團隊標記為有安全漏洞。請問問題在哪裡? 錯誤診斷

async def search_user(conn, name: str): query = f”SELECT * FROM users WHERE name = ‘{name}'” return await conn.fetch(query)
  • A. fetch() 應該改成 fetchrow() 因為只搜尋一個名字
  • B. 應該使用 async with 來管理連線
  • C. 使用 f-string 組合 SQL 會造成 SQL Injection 漏洞,應改用參數化查詢 $1
  • D. SELECT * 效能不佳,應該明確指定欄位名稱

3. 你需要從資料庫取得使用者總數來顯示在儀表板上。以下哪個實作方式最簡潔有效? 情境題

  • A. rows = await conn.fetch("SELECT COUNT(*) FROM users") 然後取 rows[0][0]
  • B. row = await conn.fetchrow("SELECT COUNT(*) FROM users") 然後取 row[0]
  • C. await conn.execute("SELECT COUNT(*) FROM users") 並解析回傳的狀態字串
  • D. count = await conn.fetchval("SELECT COUNT(*) FROM users") 直接取得數字

4. 以下程式碼執行後,result 變數的值會是什麼? 錯誤診斷

result = await conn.execute( “UPDATE users SET name = $1 WHERE id = $2”, “Alice”, 999 # 假設 id=999 的使用者不存在 ) print(result)
  • A. 會拋出 Exception,因為找不到 id=999 的使用者
  • B. 會印出 "UPDATE 0",表示沒有任何資料被更新
  • C. 會回傳 None
  • D. 會回傳 False 表示更新失敗

5. 你需要一次性匯入 5000 位使用者的資料到資料庫。以下哪種做法效能最佳? 情境題

users_data = [(“User1”, “[email protected]”), (“User2”, “[email protected]”), …] # 5000 筆
  • A. 使用 for 迴圈逐筆呼叫 conn.execute() 插入
  • B. 將所有資料組成一條超長的 INSERT 語句執行
  • C. 使用 conn.executemany() 批次插入
  • D. 使用 asyncio.gather() 同時執行 5000 個 execute() 呼叫

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *