Skip to content

FastAPI 表单处理

概述

Web应用中经常需要处理HTML表单数据和文件上传。FastAPI提供了强大的表单处理能力,支持传统的表单数据、文件上传、多部分表单等。本章将详细介绍如何在FastAPI中处理各种类型的表单数据。

📝 基础表单处理

安装依赖

首先需要安装python-multipart来支持表单数据处理:

bash
pip install python-multipart

简单表单数据

python
from fastapi import FastAPI, Form, HTTPException, status
from typing import Optional
import re

app = FastAPI()

@app.post("/login/")
async def login(
    username: str = Form(..., min_length=3, max_length=50),
    password: str = Form(..., min_length=6),
    remember_me: bool = Form(False)
):
    # 模拟用户验证
    if username == "admin" and password == "secret123":
        return {
            "message": "登录成功",
            "username": username,
            "remember_me": remember_me,
            "token": "fake-jwt-token"
        }
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误"
        )

@app.post("/contact/")
async def submit_contact_form(
    name: str = Form(..., min_length=2, max_length=100),
    email: str = Form(..., regex=r"^[^@]+@[^@]+\.[^@]+$"),
    subject: str = Form(..., min_length=5, max_length=200),
    message: str = Form(..., min_length=10, max_length=2000),
    newsletter: bool = Form(False)
):
    return {
        "message": "联系表单提交成功",
        "contact": {
            "name": name,
            "email": email,
            "subject": subject,
            "message_length": len(message),
            "newsletter_subscription": newsletter
        },
        "reference_id": f"CONTACT-{hash(email) % 100000}"
    }

表单数据验证

python
from pydantic import BaseModel, validator
from datetime import datetime, date

class UserRegistration(BaseModel):
    username: str
    email: str
    full_name: str
    birth_date: date
    phone: Optional[str] = None
    terms_accepted: bool

    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('用户名只能包含字母、数字和下划线')
        return v

    @validator('birth_date')
    def validate_age(cls, v):
        today = date.today()
        age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
        if age < 13:
            raise ValueError('用户年龄必须满13岁')
        if age > 120:
            raise ValueError('请输入有效的出生日期')
        return v

    @validator('phone')
    def validate_phone(cls, v):
        if v and not re.match(r'^\+?1?\d{9,15}$', v):
            raise ValueError('请输入有效的电话号码')
        return v

    @validator('terms_accepted')
    def terms_must_be_accepted(cls, v):
        if not v:
            raise ValueError('必须接受服务条款')
        return v

@app.post("/register/")
async def register_user(
    username: str = Form(...),
    email: str = Form(...),
    password: str = Form(..., min_length=8),
    confirm_password: str = Form(...),
    full_name: str = Form(...),
    birth_date: date = Form(...),
    phone: Optional[str] = Form(None),
    terms_accepted: bool = Form(...)
):
    # 密码确认验证
    if password != confirm_password:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="密码和确认密码不匹配"
        )
    
    # 使用Pydantic模型验证
    try:
        user_data = UserRegistration(
            username=username,
            email=email,
            full_name=full_name,
            birth_date=birth_date,
            phone=phone,
            terms_accepted=terms_accepted
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=str(e)
        )
    
    return {
        "message": "用户注册成功",
        "user": {
            "username": user_data.username,
            "email": user_data.email,
            "full_name": user_data.full_name,
            "age": (date.today() - user_data.birth_date).days // 365
        },
        "next_steps": ["验证邮箱", "完善个人资料", "设置偏好"]
    }

📁 文件上传处理

单文件上传

python
from fastapi import File, UploadFile
import aiofiles
import os
from pathlib import Path

# 创建上传目录
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
    # 文件类型验证
    allowed_types = ["image/jpeg", "image/png", "image/gif", "text/plain", "application/pdf"]
    if file.content_type not in allowed_types:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"不支持的文件类型: {file.content_type}"
        )
    
    # 文件大小验证 (5MB)
    max_size = 5 * 1024 * 1024
    content = await file.read()
    if len(content) > max_size:
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail="文件大小不能超过5MB"
        )
    
    # 生成安全的文件名
    safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
    file_path = UPLOAD_DIR / safe_filename
    
    # 保存文件
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(content)
    
    return {
        "message": "文件上传成功",
        "filename": file.filename,
        "saved_as": safe_filename,
        "content_type": file.content_type,
        "size_bytes": len(content),
        "size_mb": round(len(content) / (1024 * 1024), 2)
    }

@app.post("/upload-image/")
async def upload_image(
    title: str = Form(..., min_length=1, max_length=200),
    description: Optional[str] = Form(None, max_length=1000),
    tags: str = Form("", description="逗号分隔的标签"),
    is_public: bool = Form(True),
    image: UploadFile = File(...)
):
    # 验证是否为图片
    if not image.content_type.startswith("image/"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="只能上传图片文件"
        )
    
    # 读取和验证文件
    content = await image.read()
    if len(content) > 10 * 1024 * 1024:  # 10MB
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail="图片大小不能超过10MB"
        )
    
    # 处理标签
    tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
    
    # 保存文件
    safe_filename = f"img_{int(datetime.now().timestamp())}_{image.filename}"
    file_path = UPLOAD_DIR / safe_filename
    
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(content)
    
    return {
        "message": "图片上传成功",
        "image_info": {
            "title": title,
            "description": description,
            "tags": tag_list,
            "is_public": is_public,
            "filename": image.filename,
            "saved_as": safe_filename,
            "size_mb": round(len(content) / (1024 * 1024), 2)
        }
    }

多文件上传

python
from typing import List

@app.post("/upload-multiple/")
async def upload_multiple_files(
    files: List[UploadFile] = File(...),
    folder_name: str = Form(..., min_length=1, max_length=50)
):
    if len(files) > 10:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="一次最多只能上传10个文件"
        )
    
    # 创建文件夹
    folder_path = UPLOAD_DIR / folder_name
    folder_path.mkdir(exist_ok=True)
    
    uploaded_files = []
    total_size = 0
    
    for file in files:
        # 读取文件内容
        content = await file.read()
        file_size = len(content)
        total_size += file_size
        
        # 检查总大小限制 (50MB)
        if total_size > 50 * 1024 * 1024:
            raise HTTPException(
                status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                detail="所有文件总大小不能超过50MB"
            )
        
        # 保存文件
        safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
        file_path = folder_path / safe_filename
        
        async with aiofiles.open(file_path, 'wb') as f:
            await f.write(content)
        
        uploaded_files.append({
            "original_name": file.filename,
            "saved_as": safe_filename,
            "content_type": file.content_type,
            "size_bytes": file_size
        })
    
    return {
        "message": f"成功上传{len(files)}个文件",
        "folder": folder_name,
        "files": uploaded_files,
        "total_size_mb": round(total_size / (1024 * 1024), 2)
    }

@app.post("/upload-with-metadata/")
async def upload_with_metadata(
    title: str = Form(...),
    category: str = Form(...),
    tags: str = Form(""),
    main_file: UploadFile = File(...),
    thumbnails: List[UploadFile] = File([]),
    documents: List[UploadFile] = File([])
):
    result = {
        "title": title,
        "category": category,
        "tags": [tag.strip() for tag in tags.split(",") if tag.strip()],
        "uploaded_files": {}
    }
    
    # 处理主文件
    if main_file.filename:
        main_content = await main_file.read()
        main_filename = f"main_{int(datetime.now().timestamp())}_{main_file.filename}"
        main_path = UPLOAD_DIR / main_filename
        
        async with aiofiles.open(main_path, 'wb') as f:
            await f.write(main_content)
        
        result["uploaded_files"]["main"] = {
            "filename": main_file.filename,
            "saved_as": main_filename,
            "size_mb": round(len(main_content) / (1024 * 1024), 2)
        }
    
    # 处理缩略图
    if thumbnails and thumbnails[0].filename:
        thumb_files = []
        for i, thumb in enumerate(thumbnails):
            thumb_content = await thumb.read()
            thumb_filename = f"thumb_{i}_{int(datetime.now().timestamp())}_{thumb.filename}"
            thumb_path = UPLOAD_DIR / thumb_filename
            
            async with aiofiles.open(thumb_path, 'wb') as f:
                await f.write(thumb_content)
            
            thumb_files.append({
                "filename": thumb.filename,
                "saved_as": thumb_filename,
                "size_kb": round(len(thumb_content) / 1024, 2)
            })
        
        result["uploaded_files"]["thumbnails"] = thumb_files
    
    # 处理文档
    if documents and documents[0].filename:
        doc_files = []
        for doc in documents:
            doc_content = await doc.read()
            doc_filename = f"doc_{int(datetime.now().timestamp())}_{doc.filename}"
            doc_path = UPLOAD_DIR / doc_filename
            
            async with aiofiles.open(doc_path, 'wb') as f:
                await f.write(doc_content)
            
            doc_files.append({
                "filename": doc.filename,
                "saved_as": doc_filename,
                "content_type": doc.content_type,
                "size_kb": round(len(doc_content) / 1024, 2)
            })
        
        result["uploaded_files"]["documents"] = doc_files
    
    return result

📋 复杂表单处理

动态表单字段

python
@app.post("/dynamic-form/")
async def handle_dynamic_form(
    form_type: str = Form(...),
    base_info: str = Form(...),  # JSON字符串
    additional_fields: Optional[str] = Form(None),  # JSON字符串
    attachments: List[UploadFile] = File([])
):
    import json
    
    try:
        base_data = json.loads(base_info)
    except json.JSONDecodeError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="base_info必须是有效的JSON字符串"
        )
    
    # 解析附加字段
    additional_data = {}
    if additional_fields:
        try:
            additional_data = json.loads(additional_fields)
        except json.JSONDecodeError:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="additional_fields必须是有效的JSON字符串"
            )
    
    # 根据表单类型验证必需字段
    required_fields = {
        "contact": ["name", "email", "message"],
        "application": ["full_name", "position", "experience"],
        "survey": ["participant_id", "responses"]
    }
    
    if form_type in required_fields:
        missing_fields = [field for field in required_fields[form_type] 
                         if field not in base_data]
        if missing_fields:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=f"缺少必需字段: {', '.join(missing_fields)}"
            )
    
    # 处理附件
    attachment_info = []
    for attachment in attachments:
        if attachment.filename:
            content = await attachment.read()
            filename = f"attach_{int(datetime.now().timestamp())}_{attachment.filename}"
            file_path = UPLOAD_DIR / filename
            
            async with aiofiles.open(file_path, 'wb') as f:
                await f.write(content)
            
            attachment_info.append({
                "original_name": attachment.filename,
                "saved_as": filename,
                "size_kb": round(len(content) / 1024, 2)
            })
    
    return {
        "form_type": form_type,
        "base_info": base_data,
        "additional_fields": additional_data,
        "attachments": attachment_info,
        "processed_at": datetime.now().isoformat()
    }

表单数据和JSON混合

python
from fastapi import Body

@app.post("/hybrid-form/")
async def handle_hybrid_form(
    # 表单字段
    title: str = Form(...),
    category: str = Form(...),
    is_featured: bool = Form(False),
    
    # JSON数据
    metadata: dict = Body(...),
    
    # 文件
    cover_image: Optional[UploadFile] = File(None),
    gallery_images: List[UploadFile] = File([])
):
    result = {
        "form_data": {
            "title": title,
            "category": category,
            "is_featured": is_featured
        },
        "metadata": metadata,
        "files": {}
    }
    
    # 处理封面图片
    if cover_image and cover_image.filename:
        cover_content = await cover_image.read()
        cover_filename = f"cover_{int(datetime.now().timestamp())}_{cover_image.filename}"
        cover_path = UPLOAD_DIR / cover_filename
        
        async with aiofiles.open(cover_path, 'wb') as f:
            await f.write(cover_content)
        
        result["files"]["cover"] = {
            "filename": cover_image.filename,
            "saved_as": cover_filename,
            "size_kb": round(len(cover_content) / 1024, 2)
        }
    
    # 处理画廊图片
    if gallery_images and gallery_images[0].filename:
        gallery_files = []
        for i, img in enumerate(gallery_images):
            img_content = await img.read()
            img_filename = f"gallery_{i}_{int(datetime.now().timestamp())}_{img.filename}"
            img_path = UPLOAD_DIR / img_filename
            
            async with aiofiles.open(img_path, 'wb') as f:
                await f.write(img_content)
            
            gallery_files.append({
                "filename": img.filename,
                "saved_as": img_filename,
                "size_kb": round(len(img_content) / 1024, 2)
            })
        
        result["files"]["gallery"] = gallery_files
    
    return result

🔒 安全考虑

文件类型和大小限制

python
import magic  # python-magic库,用于检测文件类型

class FileValidator:
    def __init__(self):
        self.allowed_extensions = {
            'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'],
            'document': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
            'archive': ['.zip', '.rar', '.7z', '.tar', '.gz']
        }
        
        self.allowed_mime_types = {
            'image': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'],
            'document': ['application/pdf', 'application/msword', 'text/plain'],
            'archive': ['application/zip', 'application/x-rar-compressed']
        }
        
        self.max_sizes = {
            'image': 10 * 1024 * 1024,      # 10MB
            'document': 50 * 1024 * 1024,   # 50MB
            'archive': 100 * 1024 * 1024    # 100MB
        }
    
    async def validate_file(self, file: UploadFile, file_type: str) -> dict:
        # 检查文件名
        if not file.filename:
            raise HTTPException(status_code=400, detail="文件名不能为空")
        
        # 检查扩展名
        file_ext = Path(file.filename).suffix.lower()
        if file_ext not in self.allowed_extensions.get(file_type, []):
            raise HTTPException(
                status_code=400,
                detail=f"不支持的文件扩展名: {file_ext}"
            )
        
        # 读取文件内容
        content = await file.read()
        file_size = len(content)
        
        # 检查文件大小
        max_size = self.max_sizes.get(file_type, 1024 * 1024)
        if file_size > max_size:
            raise HTTPException(
                status_code=413,
                detail=f"文件大小超过限制: {max_size // (1024*1024)}MB"
            )
        
        # 检查MIME类型
        allowed_mimes = self.allowed_mime_types.get(file_type, [])
        if file.content_type not in allowed_mimes:
            raise HTTPException(
                status_code=400,
                detail=f"不支持的文件类型: {file.content_type}"
            )
        
        # 使用magic检查真实文件类型(防止文件扩展名伪造)
        try:
            detected_mime = magic.from_buffer(content, mime=True)
            if detected_mime not in allowed_mimes:
                raise HTTPException(
                    status_code=400,
                    detail="文件内容与扩展名不匹配"
                )
        except Exception:
            # 如果magic检测失败,继续使用HTTP头信息
            pass
        
        return {
            "content": content,
            "size": file_size,
            "mime_type": file.content_type,
            "extension": file_ext
        }

file_validator = FileValidator()

@app.post("/secure-upload/")
async def secure_file_upload(
    file_type: str = Form(..., regex="^(image|document|archive)$"),
    description: str = Form(..., min_length=1, max_length=500),
    file: UploadFile = File(...)
):
    # 验证文件
    validation_result = await file_validator.validate_file(file, file_type)
    
    # 生成安全文件名(移除特殊字符)
    safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', file.filename)
    timestamp = int(datetime.now().timestamp())
    final_filename = f"{timestamp}_{safe_name}"
    
    # 保存文件
    file_path = UPLOAD_DIR / final_filename
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(validation_result["content"])
    
    return {
        "message": "文件上传成功",
        "file_info": {
            "original_name": file.filename,
            "saved_as": final_filename,
            "type": file_type,
            "size_mb": round(validation_result["size"] / (1024 * 1024), 2),
            "mime_type": validation_result["mime_type"],
            "description": description
        }
    }

表单数据清理

python
import html
import bleach

def sanitize_input(text: str, allow_html: bool = False) -> str:
    """清理用户输入"""
    if not text:
        return ""
    
    # 移除前后空白
    text = text.strip()
    
    if allow_html:
        # 允许特定HTML标签
        allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
        text = bleach.clean(text, tags=allowed_tags, strip=True)
    else:
        # 转义HTML字符
        text = html.escape(text)
    
    return text

@app.post("/secure-form/")
async def handle_secure_form(
    name: str = Form(...),
    email: str = Form(...),
    message: str = Form(...),
    allow_html_message: bool = Form(False)
):
    # 清理输入数据
    clean_name = sanitize_input(name)
    clean_email = sanitize_input(email)
    clean_message = sanitize_input(message, allow_html=allow_html_message)
    
    # 验证邮箱格式
    if not re.match(r'^[^@]+@[^@]+\.[^@]+$', clean_email):
        raise HTTPException(
            status_code=400,
            detail="邮箱格式无效"
        )
    
    # 检查消息长度
    if len(clean_message) < 10:
        raise HTTPException(
            status_code=400,
            detail="消息内容至少需要10个字符"
        )
    
    return {
        "message": "表单提交成功",
        "data": {
            "name": clean_name,
            "email": clean_email,
            "message": clean_message,
            "message_length": len(clean_message),
            "contains_html": allow_html_message
        }
    }

总结

本章详细介绍了FastAPI中的表单处理功能:

  • 基础表单:Form字段、数据验证、错误处理
  • 文件上传:单文件、多文件、文件验证
  • 复杂表单:动态字段、混合数据类型
  • 安全考虑:文件类型检查、大小限制、数据清理

FastAPI的表单处理功能强大而灵活,结合Pydantic的数据验证能力,可以构建安全可靠的表单处理系统。

表单处理最佳实践

  • 始终验证文件类型和大小
  • 清理和转义用户输入数据
  • 使用安全的文件名生成策略
  • 限制上传文件的数量和总大小
  • 提供详细的错误信息
  • 考虑异步处理大文件上传

在下一章中,我们将学习FastAPI的中间件系统,了解如何处理跨切面关注点。

本站内容仅供学习和研究使用。