測驗:asyncpg CRUD 操作與參數化查詢
共 5 題,點選答案後會立即顯示結果
1. 在 asyncpg 中,當你需要查詢多筆資料並遍歷處理時,應該使用哪個方法?
2. 關於 fetchrow() 方法,以下敘述何者正確?
3. asyncpg 使用什麼格式的參數佔位符來進行參數化查詢?
4. 如果需要在 INSERT 後取得新插入資料的 ID,應該如何做?
5. 關於 executemany() 方法,以下敘述何者正確?
前言
在前兩篇文章中,我們學會了如何建立 asyncpg 連線以及使用連線池管理資料庫連線。本篇將深入探討如何使用 asyncpg 執行實際的資料庫操作:新增、查詢、更新、刪除(CRUD),以及如何正確使用參數化查詢來防止 SQL Injection 攻擊。
學習目標
讀完本篇後,你將能夠:
- 使用 asyncpg 執行 INSERT、UPDATE、DELETE 操作
- 正確使用參數化查詢防止 SQL Injection
- 理解 fetch、fetchrow、fetchval、execute 的差異
- 處理查詢結果並轉換為 Python 物件
查詢方法總覽
asyncpg 提供了四種主要的查詢方法,每種都有不同的用途:
| 方法 | 回傳值 | 使用時機 |
|---|---|---|
fetch() |
list[Record] |
查詢多筆資料 |
fetchrow() |
Record 或 None |
查詢單筆資料 |
fetchval() |
單一值或 None |
取得單一欄位值 |
execute() |
狀態字串 | INSERT/UPDATE/DELETE |
讓我們逐一了解這些方法的使用方式。
fetch() – 查詢多筆資料
當你需要取得多筆查詢結果時,使用 fetch():
import asyncpg
async def get_all_users(conn: asyncpg.Connection):
rows = await conn.fetch("SELECT id, name, email FROM users")
for row in rows:
print(f"ID: {row['id']}, Name: {row['name']}")
return rows
Code language: JavaScript (javascript)fetch() 回傳的是 list[asyncpg.Record]。即使查詢結果為空,也會回傳空 list [],不會回傳 None。
Record 物件的存取方式
asyncpg 的 Record 物件支援多種存取方式:
row = await conn.fetchrow("SELECT id, name FROM users WHERE id = $1", 1)
# 方式 1:像 dict 一樣用 key 存取
user_id = row['id']
name = row['name']
# 方式 2:像 tuple 一樣用 index 存取
user_id = row[0]
name = row[1]
# 方式 3:取得所有 keys
columns = row.keys() # ['id', 'name']
# 方式 4:取得所有 values
values = row.values() # [1, 'Alice']
Code language: PHP (php)fetchrow() – 查詢單筆資料
當你確定只需要一筆資料時,使用 fetchrow():
async def get_user_by_id(conn: asyncpg.Connection, user_id: int):
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
if row is None:
return None
return dict(row) # 轉換為 dict
Code language: PHP (php)重要提醒:fetchrow() 在找不到資料時會回傳 None,務必檢查回傳值。
fetchval() – 取得單一值
當你只需要查詢結果中的單一欄位值時,fetchval() 是最簡潔的選擇:
async def count_users(conn: asyncpg.Connection) -> int:
count = await conn.fetchval("SELECT COUNT(*) FROM users")
return count # 直接回傳數字,例如 42
async def get_user_name(conn: asyncpg.Connection, user_id: int) -> str | None:
name = await conn.fetchval(
"SELECT name FROM users WHERE id = $1",
user_id
)
return name # 回傳 'Alice' 或 None
Code language: PHP (php)fetchval() 預設回傳第一個欄位的值。如果需要取得其他欄位,可以指定 column 參數:
# 取得第二個欄位(index 從 0 開始)
email = await conn.fetchval(
"SELECT id, name, email FROM users WHERE id = $1",
user_id,
column=2 # 取得 email 欄位
)
Code language: PHP (php)execute() – 執行非查詢語句
對於 INSERT、UPDATE、DELETE 等不需要回傳資料的操作,使用 execute():
async def create_user(conn: asyncpg.Connection, name: str, email: str):
result = await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
name, email
)
print(result) # 'INSERT 0 1' - 表示成功插入 1 筆
async def update_user(conn: asyncpg.Connection, user_id: int, name: str):
result = await conn.execute(
"UPDATE users SET name = $1 WHERE id = $2",
name, user_id
)
print(result) # 'UPDATE 1' - 表示更新了 1 筆
async def delete_user(conn: asyncpg.Connection, user_id: int):
result = await conn.execute(
"DELETE FROM users WHERE id = $1",
user_id
)
print(result) # 'DELETE 1' - 表示刪除了 1 筆
Code language: PHP (php)execute() 回傳的是一個狀態字串,格式為 "命令 OID 影響筆數"。
取得 INSERT 後的新 ID
如果需要取得新插入資料的 ID,使用 RETURNING 子句配合 fetchval():
async def create_user_return_id(conn: asyncpg.Connection, name: str, email: str) -> int:
new_id = await conn.fetchval(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
name, email
)
return new_id # 回傳新建立的 user id
Code language: PHP (php)參數化查詢:$1, $2 佔位符
asyncpg 使用 PostgreSQL 原生的參數佔位符格式 $1, $2, $3…,而非 Python DB-API 常見的 ? 或 %s。
基本用法
# 單一參數
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
# 多個參數
rows = await conn.fetch(
"SELECT * FROM users WHERE age > $1 AND city = $2",
18, "Taipei"
)
# 同一參數使用多次
rows = await conn.fetch(
"SELECT * FROM users WHERE first_name = $1 OR last_name = $1",
"Chen"
)
Code language: PHP (php)為什麼要用參數化查詢?
絕對不要這樣做(SQL Injection 漏洞):
# 危險!千萬不要這樣寫!
user_input = "'; DROP TABLE users; --"
query = f"SELECT * FROM users WHERE name = '{user_input}'"
await conn.fetch(query) # 會執行惡意 SQL
Code language: PHP (php)正確做法:
# 安全:使用參數化查詢
user_input = "'; DROP TABLE users; --"
await conn.fetch(
"SELECT * FROM users WHERE name = $1",
user_input # 會被當作純文字處理,不會執行
)
Code language: PHP (php)參數化查詢會自動處理跳脫字元,確保使用者輸入不會被解讀為 SQL 指令。
executemany() – 批次插入資料
當需要一次插入大量資料時,executemany() 比迴圈執行多次 execute() 更有效率:
async def bulk_insert_users(conn: asyncpg.Connection, users: list[tuple]):
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
users
)
# 使用範例
users_data = [
("Alice", "[email protected]"),
("Bob", "[email protected]"),
("Charlie", "[email protected]"),
]
await bulk_insert_users(conn, users_data)
Code language: PHP (php)executemany() 會將所有資料在一次資料庫往返中完成,大幅減少網路延遲的影響。
效能比較
import time
# 方式 1:迴圈執行(慢)
start = time.time()
for name, email in users_data:
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
name, email
)
print(f"迴圈執行:{time.time() - start:.3f} 秒")
# 方式 2:executemany(快)
start = time.time()
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
users_data
)
print(f"executemany:{time.time() - start:.3f} 秒")
Code language: PHP (php)對於 1000 筆資料,executemany() 通常快 10 倍以上。
將 Record 轉換為 Python 物件
轉換為 dict
row = await conn.fetchrow("SELECT id, name, email FROM users WHERE id = $1", 1)
# 方式 1:使用 dict() 建構子
user_dict = dict(row)
# 方式 2:使用解包
user_dict = {**row}
Code language: PHP (php)轉換為 Pydantic Model
在實際專案中,我們通常會使用 Pydantic 來定義資料模型:
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
email: str
async def get_user(conn: asyncpg.Connection, user_id: int) -> User | None:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
if row is None:
return None
return User(**dict(row))
async def get_all_users(conn: asyncpg.Connection) -> list[User]:
rows = await conn.fetch("SELECT id, name, email FROM users")
return [User(**dict(row)) for row in rows]
處理欄位名稱不一致
當資料庫欄位名稱與 Python 屬性名稱不同時:
class User(BaseModel):
id: int
name: str
email_address: str # Python 習慣用底線
async def get_user(conn: asyncpg.Connection, user_id: int) -> User | None:
row = await conn.fetchrow(
"SELECT id, name, email AS email_address FROM users WHERE id = $1",
user_id
)
if row is None:
return None
return User(**dict(row))
使用 SQL 的 AS 關鍵字來重新命名欄位,讓查詢結果直接符合 Pydantic Model 的欄位名稱。
完整 CRUD 範例
以下是一個完整的使用者管理模組範例:
import asyncpg
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
email: str
class UserCreate(BaseModel):
name: str
email: str
class UserRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def create(self, user: UserCreate) -> int:
"""新增使用者,回傳新 ID"""
async with self.pool.acquire() as conn:
new_id = await conn.fetchval(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
user.name, user.email
)
return new_id
async def get_by_id(self, user_id: int) -> User | None:
"""根據 ID 查詢使用者"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
if row is None:
return None
return User(**dict(row))
async def get_all(self) -> list[User]:
"""查詢所有使用者"""
async with self.pool.acquire() as conn:
rows = await conn.fetch("SELECT id, name, email FROM users")
return [User(**dict(row)) for row in rows]
async def update(self, user_id: int, name: str, email: str) -> bool:
"""更新使用者資料,回傳是否成功"""
async with self.pool.acquire() as conn:
result = await conn.execute(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
name, email, user_id
)
# result 格式為 'UPDATE N',N 是影響的筆數
return result == "UPDATE 1"
async def delete(self, user_id: int) -> bool:
"""刪除使用者,回傳是否成功"""
async with self.pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM users WHERE id = $1",
user_id
)
return result == "DELETE 1"
async def bulk_create(self, users: list[UserCreate]) -> None:
"""批次新增多位使用者"""
async with self.pool.acquire() as conn:
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
[(u.name, u.email) for u in users]
)
方法選擇速查表
| 情境 | 推薦方法 | 原因 |
|---|---|---|
| 查詢多筆資料 | fetch() |
回傳 list,方便迭代 |
| 查詢單筆資料 | fetchrow() |
直接回傳 Record 或 None |
| 只要單一欄位值 | fetchval() |
最簡潔,直接回傳值 |
| COUNT、SUM 等聚合 | fetchval() |
直接取得計算結果 |
| INSERT 並取回 ID | fetchval() + RETURNING |
一次完成插入和取值 |
| 一般 INSERT/UPDATE/DELETE | execute() |
不需要回傳資料 |
| 批次 INSERT | executemany() |
效能最佳 |
小結
本篇介紹了 asyncpg 的四種查詢方法和參數化查詢的使用方式:
- fetch() 用於查詢多筆資料
- fetchrow() 用於查詢單筆資料
- fetchval() 用於取得單一值
- execute() 用於執行 INSERT/UPDATE/DELETE
- executemany() 用於批次插入
- 使用
$1, $2...參數佔位符防止 SQL Injection - Record 可以轉換為 dict 或 Pydantic Model
下一篇我們將介紹交易(Transaction)管理,學習如何確保多個資料庫操作的原子性。
延伸閱讀
進階測驗:asyncpg CRUD 操作與參數化查詢
共 5 題,包含情境題與錯誤診斷題。