FastAPI 表单处理
概述
Web应用中经常需要处理HTML表单数据和文件上传。FastAPI提供了强大的表单处理能力,支持传统的表单数据、文件上传、多部分表单等。本章将详细介绍如何在FastAPI中处理各种类型的表单数据。
📝 基础表单处理
安装依赖
首先需要安装python-multipart来支持表单数据处理:
bash
pip install python-multipart简单表单数据
python
from fastapi import FastAPI, Form, HTTPException, status
from typing import Optional
import re
app = FastAPI()
@app.post("/login/")
async def login(
username: str = Form(..., min_length=3, max_length=50),
password: str = Form(..., min_length=6),
remember_me: bool = Form(False)
):
# 模拟用户验证
if username == "admin" and password == "secret123":
return {
"message": "登录成功",
"username": username,
"remember_me": remember_me,
"token": "fake-jwt-token"
}
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
@app.post("/contact/")
async def submit_contact_form(
name: str = Form(..., min_length=2, max_length=100),
email: str = Form(..., regex=r"^[^@]+@[^@]+\.[^@]+$"),
subject: str = Form(..., min_length=5, max_length=200),
message: str = Form(..., min_length=10, max_length=2000),
newsletter: bool = Form(False)
):
return {
"message": "联系表单提交成功",
"contact": {
"name": name,
"email": email,
"subject": subject,
"message_length": len(message),
"newsletter_subscription": newsletter
},
"reference_id": f"CONTACT-{hash(email) % 100000}"
}表单数据验证
python
from pydantic import BaseModel, validator
from datetime import datetime, date
class UserRegistration(BaseModel):
username: str
email: str
full_name: str
birth_date: date
phone: Optional[str] = None
terms_accepted: bool
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('用户名只能包含字母、数字和下划线')
return v
@validator('birth_date')
def validate_age(cls, v):
today = date.today()
age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
if age < 13:
raise ValueError('用户年龄必须满13岁')
if age > 120:
raise ValueError('请输入有效的出生日期')
return v
@validator('phone')
def validate_phone(cls, v):
if v and not re.match(r'^\+?1?\d{9,15}$', v):
raise ValueError('请输入有效的电话号码')
return v
@validator('terms_accepted')
def terms_must_be_accepted(cls, v):
if not v:
raise ValueError('必须接受服务条款')
return v
@app.post("/register/")
async def register_user(
username: str = Form(...),
email: str = Form(...),
password: str = Form(..., min_length=8),
confirm_password: str = Form(...),
full_name: str = Form(...),
birth_date: date = Form(...),
phone: Optional[str] = Form(None),
terms_accepted: bool = Form(...)
):
# 密码确认验证
if password != confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="密码和确认密码不匹配"
)
# 使用Pydantic模型验证
try:
user_data = UserRegistration(
username=username,
email=email,
full_name=full_name,
birth_date=birth_date,
phone=phone,
terms_accepted=terms_accepted
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
return {
"message": "用户注册成功",
"user": {
"username": user_data.username,
"email": user_data.email,
"full_name": user_data.full_name,
"age": (date.today() - user_data.birth_date).days // 365
},
"next_steps": ["验证邮箱", "完善个人资料", "设置偏好"]
}📁 文件上传处理
单文件上传
python
from fastapi import File, UploadFile
import aiofiles
import os
from pathlib import Path
# 创建上传目录
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
# 文件类型验证
allowed_types = ["image/jpeg", "image/png", "image/gif", "text/plain", "application/pdf"]
if file.content_type not in allowed_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件类型: {file.content_type}"
)
# 文件大小验证 (5MB)
max_size = 5 * 1024 * 1024
content = await file.read()
if len(content) > max_size:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="文件大小不能超过5MB"
)
# 生成安全的文件名
safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
file_path = UPLOAD_DIR / safe_filename
# 保存文件
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return {
"message": "文件上传成功",
"filename": file.filename,
"saved_as": safe_filename,
"content_type": file.content_type,
"size_bytes": len(content),
"size_mb": round(len(content) / (1024 * 1024), 2)
}
@app.post("/upload-image/")
async def upload_image(
title: str = Form(..., min_length=1, max_length=200),
description: Optional[str] = Form(None, max_length=1000),
tags: str = Form("", description="逗号分隔的标签"),
is_public: bool = Form(True),
image: UploadFile = File(...)
):
# 验证是否为图片
if not image.content_type.startswith("image/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="只能上传图片文件"
)
# 读取和验证文件
content = await image.read()
if len(content) > 10 * 1024 * 1024: # 10MB
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="图片大小不能超过10MB"
)
# 处理标签
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
# 保存文件
safe_filename = f"img_{int(datetime.now().timestamp())}_{image.filename}"
file_path = UPLOAD_DIR / safe_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return {
"message": "图片上传成功",
"image_info": {
"title": title,
"description": description,
"tags": tag_list,
"is_public": is_public,
"filename": image.filename,
"saved_as": safe_filename,
"size_mb": round(len(content) / (1024 * 1024), 2)
}
}多文件上传
python
from typing import List
@app.post("/upload-multiple/")
async def upload_multiple_files(
files: List[UploadFile] = File(...),
folder_name: str = Form(..., min_length=1, max_length=50)
):
if len(files) > 10:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="一次最多只能上传10个文件"
)
# 创建文件夹
folder_path = UPLOAD_DIR / folder_name
folder_path.mkdir(exist_ok=True)
uploaded_files = []
total_size = 0
for file in files:
# 读取文件内容
content = await file.read()
file_size = len(content)
total_size += file_size
# 检查总大小限制 (50MB)
if total_size > 50 * 1024 * 1024:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="所有文件总大小不能超过50MB"
)
# 保存文件
safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
file_path = folder_path / safe_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
uploaded_files.append({
"original_name": file.filename,
"saved_as": safe_filename,
"content_type": file.content_type,
"size_bytes": file_size
})
return {
"message": f"成功上传{len(files)}个文件",
"folder": folder_name,
"files": uploaded_files,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
@app.post("/upload-with-metadata/")
async def upload_with_metadata(
title: str = Form(...),
category: str = Form(...),
tags: str = Form(""),
main_file: UploadFile = File(...),
thumbnails: List[UploadFile] = File([]),
documents: List[UploadFile] = File([])
):
result = {
"title": title,
"category": category,
"tags": [tag.strip() for tag in tags.split(",") if tag.strip()],
"uploaded_files": {}
}
# 处理主文件
if main_file.filename:
main_content = await main_file.read()
main_filename = f"main_{int(datetime.now().timestamp())}_{main_file.filename}"
main_path = UPLOAD_DIR / main_filename
async with aiofiles.open(main_path, 'wb') as f:
await f.write(main_content)
result["uploaded_files"]["main"] = {
"filename": main_file.filename,
"saved_as": main_filename,
"size_mb": round(len(main_content) / (1024 * 1024), 2)
}
# 处理缩略图
if thumbnails and thumbnails[0].filename:
thumb_files = []
for i, thumb in enumerate(thumbnails):
thumb_content = await thumb.read()
thumb_filename = f"thumb_{i}_{int(datetime.now().timestamp())}_{thumb.filename}"
thumb_path = UPLOAD_DIR / thumb_filename
async with aiofiles.open(thumb_path, 'wb') as f:
await f.write(thumb_content)
thumb_files.append({
"filename": thumb.filename,
"saved_as": thumb_filename,
"size_kb": round(len(thumb_content) / 1024, 2)
})
result["uploaded_files"]["thumbnails"] = thumb_files
# 处理文档
if documents and documents[0].filename:
doc_files = []
for doc in documents:
doc_content = await doc.read()
doc_filename = f"doc_{int(datetime.now().timestamp())}_{doc.filename}"
doc_path = UPLOAD_DIR / doc_filename
async with aiofiles.open(doc_path, 'wb') as f:
await f.write(doc_content)
doc_files.append({
"filename": doc.filename,
"saved_as": doc_filename,
"content_type": doc.content_type,
"size_kb": round(len(doc_content) / 1024, 2)
})
result["uploaded_files"]["documents"] = doc_files
return result📋 复杂表单处理
动态表单字段
python
@app.post("/dynamic-form/")
async def handle_dynamic_form(
form_type: str = Form(...),
base_info: str = Form(...), # JSON字符串
additional_fields: Optional[str] = Form(None), # JSON字符串
attachments: List[UploadFile] = File([])
):
import json
try:
base_data = json.loads(base_info)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="base_info必须是有效的JSON字符串"
)
# 解析附加字段
additional_data = {}
if additional_fields:
try:
additional_data = json.loads(additional_fields)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="additional_fields必须是有效的JSON字符串"
)
# 根据表单类型验证必需字段
required_fields = {
"contact": ["name", "email", "message"],
"application": ["full_name", "position", "experience"],
"survey": ["participant_id", "responses"]
}
if form_type in required_fields:
missing_fields = [field for field in required_fields[form_type]
if field not in base_data]
if missing_fields:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"缺少必需字段: {', '.join(missing_fields)}"
)
# 处理附件
attachment_info = []
for attachment in attachments:
if attachment.filename:
content = await attachment.read()
filename = f"attach_{int(datetime.now().timestamp())}_{attachment.filename}"
file_path = UPLOAD_DIR / filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
attachment_info.append({
"original_name": attachment.filename,
"saved_as": filename,
"size_kb": round(len(content) / 1024, 2)
})
return {
"form_type": form_type,
"base_info": base_data,
"additional_fields": additional_data,
"attachments": attachment_info,
"processed_at": datetime.now().isoformat()
}表单数据和JSON混合
python
from fastapi import Body
@app.post("/hybrid-form/")
async def handle_hybrid_form(
# 表单字段
title: str = Form(...),
category: str = Form(...),
is_featured: bool = Form(False),
# JSON数据
metadata: dict = Body(...),
# 文件
cover_image: Optional[UploadFile] = File(None),
gallery_images: List[UploadFile] = File([])
):
result = {
"form_data": {
"title": title,
"category": category,
"is_featured": is_featured
},
"metadata": metadata,
"files": {}
}
# 处理封面图片
if cover_image and cover_image.filename:
cover_content = await cover_image.read()
cover_filename = f"cover_{int(datetime.now().timestamp())}_{cover_image.filename}"
cover_path = UPLOAD_DIR / cover_filename
async with aiofiles.open(cover_path, 'wb') as f:
await f.write(cover_content)
result["files"]["cover"] = {
"filename": cover_image.filename,
"saved_as": cover_filename,
"size_kb": round(len(cover_content) / 1024, 2)
}
# 处理画廊图片
if gallery_images and gallery_images[0].filename:
gallery_files = []
for i, img in enumerate(gallery_images):
img_content = await img.read()
img_filename = f"gallery_{i}_{int(datetime.now().timestamp())}_{img.filename}"
img_path = UPLOAD_DIR / img_filename
async with aiofiles.open(img_path, 'wb') as f:
await f.write(img_content)
gallery_files.append({
"filename": img.filename,
"saved_as": img_filename,
"size_kb": round(len(img_content) / 1024, 2)
})
result["files"]["gallery"] = gallery_files
return result🔒 安全考虑
文件类型和大小限制
python
import magic # python-magic库,用于检测文件类型
class FileValidator:
def __init__(self):
self.allowed_extensions = {
'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'],
'document': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
'archive': ['.zip', '.rar', '.7z', '.tar', '.gz']
}
self.allowed_mime_types = {
'image': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'],
'document': ['application/pdf', 'application/msword', 'text/plain'],
'archive': ['application/zip', 'application/x-rar-compressed']
}
self.max_sizes = {
'image': 10 * 1024 * 1024, # 10MB
'document': 50 * 1024 * 1024, # 50MB
'archive': 100 * 1024 * 1024 # 100MB
}
async def validate_file(self, file: UploadFile, file_type: str) -> dict:
# 检查文件名
if not file.filename:
raise HTTPException(status_code=400, detail="文件名不能为空")
# 检查扩展名
file_ext = Path(file.filename).suffix.lower()
if file_ext not in self.allowed_extensions.get(file_type, []):
raise HTTPException(
status_code=400,
detail=f"不支持的文件扩展名: {file_ext}"
)
# 读取文件内容
content = await file.read()
file_size = len(content)
# 检查文件大小
max_size = self.max_sizes.get(file_type, 1024 * 1024)
if file_size > max_size:
raise HTTPException(
status_code=413,
detail=f"文件大小超过限制: {max_size // (1024*1024)}MB"
)
# 检查MIME类型
allowed_mimes = self.allowed_mime_types.get(file_type, [])
if file.content_type not in allowed_mimes:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file.content_type}"
)
# 使用magic检查真实文件类型(防止文件扩展名伪造)
try:
detected_mime = magic.from_buffer(content, mime=True)
if detected_mime not in allowed_mimes:
raise HTTPException(
status_code=400,
detail="文件内容与扩展名不匹配"
)
except Exception:
# 如果magic检测失败,继续使用HTTP头信息
pass
return {
"content": content,
"size": file_size,
"mime_type": file.content_type,
"extension": file_ext
}
file_validator = FileValidator()
@app.post("/secure-upload/")
async def secure_file_upload(
file_type: str = Form(..., regex="^(image|document|archive)$"),
description: str = Form(..., min_length=1, max_length=500),
file: UploadFile = File(...)
):
# 验证文件
validation_result = await file_validator.validate_file(file, file_type)
# 生成安全文件名(移除特殊字符)
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', file.filename)
timestamp = int(datetime.now().timestamp())
final_filename = f"{timestamp}_{safe_name}"
# 保存文件
file_path = UPLOAD_DIR / final_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(validation_result["content"])
return {
"message": "文件上传成功",
"file_info": {
"original_name": file.filename,
"saved_as": final_filename,
"type": file_type,
"size_mb": round(validation_result["size"] / (1024 * 1024), 2),
"mime_type": validation_result["mime_type"],
"description": description
}
}表单数据清理
python
import html
import bleach
def sanitize_input(text: str, allow_html: bool = False) -> str:
"""清理用户输入"""
if not text:
return ""
# 移除前后空白
text = text.strip()
if allow_html:
# 允许特定HTML标签
allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
text = bleach.clean(text, tags=allowed_tags, strip=True)
else:
# 转义HTML字符
text = html.escape(text)
return text
@app.post("/secure-form/")
async def handle_secure_form(
name: str = Form(...),
email: str = Form(...),
message: str = Form(...),
allow_html_message: bool = Form(False)
):
# 清理输入数据
clean_name = sanitize_input(name)
clean_email = sanitize_input(email)
clean_message = sanitize_input(message, allow_html=allow_html_message)
# 验证邮箱格式
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', clean_email):
raise HTTPException(
status_code=400,
detail="邮箱格式无效"
)
# 检查消息长度
if len(clean_message) < 10:
raise HTTPException(
status_code=400,
detail="消息内容至少需要10个字符"
)
return {
"message": "表单提交成功",
"data": {
"name": clean_name,
"email": clean_email,
"message": clean_message,
"message_length": len(clean_message),
"contains_html": allow_html_message
}
}总结
本章详细介绍了FastAPI中的表单处理功能:
- ✅ 基础表单:Form字段、数据验证、错误处理
- ✅ 文件上传:单文件、多文件、文件验证
- ✅ 复杂表单:动态字段、混合数据类型
- ✅ 安全考虑:文件类型检查、大小限制、数据清理
FastAPI的表单处理功能强大而灵活,结合Pydantic的数据验证能力,可以构建安全可靠的表单处理系统。
表单处理最佳实践
- 始终验证文件类型和大小
- 清理和转义用户输入数据
- 使用安全的文件名生成策略
- 限制上传文件的数量和总大小
- 提供详细的错误信息
- 考虑异步处理大文件上传
在下一章中,我们将学习FastAPI的中间件系统,了解如何处理跨切面关注点。