1. 项目概述:从“能跑就行”到“坚如磐石”
干了这么多年开发,我见过太多项目在初期为了赶进度,把安全这事儿往后放,结果上线没多久就出幺蛾子。最常见的场景就是,一个简单的登录框,后台直接拼接SQL字符串,用户名输入个admin'--,密码随便输,直接就进去了。这可不是危言耸听,而是每天都在发生的真实案例。今天这个项目,我们就来聊聊Python Web开发中两个最核心、也最容易被忽视的安全议题:SQL注入防御和JWT认证的正确姿势。这不仅仅是两个孤立的技术点,而是构建一个“可信”后端服务的基石。无论你是刚入门的Python开发者,还是已经写过几个CRUD应用的老手,如果你对“安全”的理解还停留在“用个复杂密码”的层面,那这篇文章就是为你准备的。我们将从最危险的漏洞(SQL注入)入手,手把手教你如何堵上这个“潘多拉魔盒”,再深入到现代API设计中不可或缺的JWT认证,告诉你如何正确签发、验证和销毁令牌,避免让你的应用成为攻击者的“提款机”。目标是让你写出的代码,不仅能实现功能,更能经得起考验。
2. 安全编程核心思路:从被动防御到主动设计
很多开发者把安全编程等同于“加几个验证”,这是一种典型的被动防御思维。真正的安全,应该像建筑物的承重结构一样,从设计之初就融入其中。我们这次实战的核心思路,就是完成一次从“亡羊补牢”到“未雨绸缪”的转变。
2.1 为什么SQL注入依然是头号威胁?
尽管SQL注入是一个老生常谈的问题,但在各种漏洞报告中它常年位居榜首。根本原因在于,很多初级教程和快速开发框架在演示时,为了直观,依然在使用字符串拼接的方式构建SQL语句。这种写法深入人心,危害也最大。攻击者无需任何特殊工具,仅凭一个浏览器地址栏或一个表单,就能尝试进行攻击。其危害不仅仅是数据泄露,更可能导致整个数据库被篡改、删除,甚至通过数据库提权获取服务器控制权。我们的防御策略必须建立在“绝不信任用户输入”这一黄金法则之上,而参数化查询(或预处理语句)是实现这一法则最直接、最有效的手段。它让SQL指令和数据分离,从根源上切断了注入的可能性。
2.2 JWT认证:便捷背后的安全陷阱
JWT(JSON Web Token)因其无状态、易于跨域等优点,已成为RESTful API认证的主流方案。但“易于使用”往往伴随着“易于误用”。很多人拿到一个JWT库,照着文档三行代码生成一个token就以为万事大吉,却忽略了令牌的泄露、伪造和过期管理等问题。一个设计不当的JWT系统,其危害可能比单纯的密码泄露更大,因为令牌往往拥有更长的生命周期和更广泛的权限。我们的目标不仅仅是“实现JWT认证”,而是要构建一个“安全的JWT认证体系”,这包括密钥的安全管理、令牌的有效期与刷新机制、以及关键操作的二次验证等。
2.3 整体技术栈选型与考量
为了聚焦安全本身,我们选择最经典、最透明的技术组合进行演示,避免框架的魔法带来理解上的隔阂。
- Web框架:Flask。它轻量、直接,没有过多的抽象,能让我们清晰地看到HTTP请求、响应以及安全处理的每一个环节。相比于Django“全家桶”式的封装,Flask更适合教学和原理剖析。
- 数据库操作:SQLAlchemy Core + 原生参数化查询。虽然SQLAlchemy ORM本身能有效防止注入,但我们仍会展示原生SQL的场景,因为很多遗留项目或复杂查询中仍会用到。我们将对比危险拼接与安全参数化两种写法。
- JWT库:PyJWT。这是Python社区最广泛使用的JWT实现,API清晰,支持标准的所有算法。我们将使用它来演示令牌的生成、验证以及常见安全配置。
- 辅助工具:Postman。用于模拟前端,对API进行各种安全测试,例如发送畸形token、尝试注入payload等。
这个组合确保了我们的每一行代码都在解决明确的安全问题,你可以轻松地将这些理念移植到FastAPI、Django或其他任何Python Web框架中。
3. 实战环境搭建与危险代码重现
在学会防御之前,我们必须亲眼看看攻击是如何发生的。让我们先搭建一个最简单的、存在漏洞的Web应用。
3.1 初始化项目与依赖安装
首先,创建一个干净的目录并安装必要的依赖。这里我们使用pip进行管理。
# 创建项目目录并进入 mkdir python-security-lab && cd python-security-lab # 创建虚拟环境(强烈推荐,避免包冲突) python -m venv venv # 激活虚拟环境 # Windows: venv\Scripts\activate # Linux/Mac: source venv/bin/activate # 安装核心依赖 pip install flask pyjwt cryptographycryptography是PyJWT用于支持非对称加密算法(如RS256)的底层库,一并安装。
接下来,我们创建一个存在严重SQL注入漏洞的Flask应用。假设我们有一个用户登录接口。
# app_vulnerable.py from flask import Flask, request, jsonify import sqlite3 app = Flask(__name__) # 初始化一个简单的内存数据库,并插入一条测试数据 def init_db(): conn = sqlite3.connect(':memory:') cursor = conn.cursor() cursor.execute('CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT, password TEXT)') cursor.execute("INSERT INTO users (username, password) VALUES ('admin', 'secret123')") conn.commit() return conn conn = init_db() @app.route('/login_unsafe', methods=['POST']) def login_unsafe(): """危险!存在SQL注入漏洞的登录接口""" data = request.get_json() username = data.get('username') password = data.get('password') # !!! 致命的字符串拼接 !!! query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'" cursor = conn.cursor() cursor.execute(query) # 直接执行拼接后的SQL user = cursor.fetchone() if user: return jsonify({'message': 'Login successful (unsafe)!', 'user_id': user[0]}) else: return jsonify({'message': 'Invalid credentials'}), 401 if __name__ == '__main__': app.run(debug=True)3.2 发起一次真实的SQL注入攻击
运行这个应用 (python app_vulnerable.py),然后用Postman或curl进行测试。
正常登录请求:
POST http://127.0.0.1:5000/login_unsafe Content-Type: application/json { "username": "admin", "password": "secret123" }返回:
{"message": "Login successful (unsafe)!"}发起SQL注入攻击(绕过密码):
{ "username": "admin'--", "password": "anything" }此时,后台拼接的SQL语句变为:
SELECT * FROM users WHERE username = 'admin'--' AND password = 'anything'--在SQL中是注释符,它使得后面的AND password = ...条件完全失效。这条语句等价于SELECT * FROM users WHERE username = 'admin',从而成功绕过密码验证,返回登录成功!更危险的攻击(删除数据): 如果接口设计得更糟糕,攻击者甚至可以通过注入执行删除操作。虽然这个登录接口本身可能不行,但想象一个根据ID查询用户的接口
GET /user/<id>,如果后端拼接SQL,攻击者访问/user/1; DROP TABLE users--,后果不堪设想。
注意:以上演示在
debug=True模式下进行。在实际生产环境中,Flask的调试模式必须关闭,因为它会暴露堆栈信息,为攻击者提供更多线索。但关闭调试模式并不能修复SQL注入漏洞,它只是减少了信息泄露。
4. 构建防线:彻底杜绝SQL注入
知道了漏洞如何产生,修复就有了明确的方向。我们的核心武器就是参数化查询(Parameterized Query),也叫预处理语句(Prepared Statement)。
4.1 使用参数化查询修复漏洞
几乎所有主流数据库驱动(sqlite3,psycopg2,PyMySQL,cx_Oracle)都支持参数化查询。其原理是将SQL语句的骨架(包含占位符)与数据分开发送给数据库。数据库引擎会先编译语句结构,再将数据作为纯参数代入,从根本上杜绝了数据被解释为代码的可能性。
让我们修复之前的登录接口:
# app_safe.py (部分代码) import sqlite3 from flask import Flask, request, jsonify app = Flask(__name__) conn = sqlite3.connect(':memory:', check_same_thread=False) cursor = conn.cursor() cursor.execute('CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT, password TEXT)') cursor.execute("INSERT INTO users (username, password) VALUES ('admin', 'secret123')") conn.commit() @app.route('/login_safe', methods=['POST']) def login_safe(): """安全!使用参数化查询的登录接口""" data = request.get_json() username = data.get('username') password = data.get('password') # 使用 ? 作为占位符(sqlite3风格)。其他数据库可能使用 %s 或 :name。 query = "SELECT * FROM users WHERE username = ? AND password = ?" cursor = conn.cursor() # 将参数作为一个元组传递给 execute 方法的第二个参数 cursor.execute(query, (username, password)) # 关键在这里! user = cursor.fetchone() if user: return jsonify({'message': 'Login successful (safe)!', 'user_id': user[0]}) else: return jsonify({'message': 'Invalid credentials'}), 401现在,无论攻击者输入admin'--还是其他任何恶意字符串,这些内容都只会被当作查询username字段的字符串值来处理。数据库会去寻找一个用户名** literally** 等于admin'--的记录,显然找不到,因此返回认证失败。
4.2 使用ORM框架作为更优解
对于现代应用,直接使用原生SQL的情况在减少,更多时候我们会使用ORM(对象关系映射)框架,如SQLAlchemy ORM、Django ORM等。这些ORM框架在内部普遍使用参数化查询,只要正确使用其查询API,就能天然免疫SQL注入。
以SQLAlchemy ORM为例:
from flask_sqlalchemy import SQLAlchemy from app import db class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password = db.Column(db.String(120), nullable=False) # 安全的查询方式 user = User.query.filter_by(username=username, password=password).first() # 或者使用 filter user = User.query.filter(User.username == username, User.password == password).first()filter_by或filter方法接收的参数都会被SQLAlchemy自动转换为参数化查询,安全无忧。
实操心得:即使使用ORM,也并非绝对安全。如果你因为性能或特殊需求必须使用原始SQL字符串(例如调用复杂的存储过程或窗口函数),务必使用ORM提供的文本SQL(text())与参数绑定功能,而不是手动拼接。
from sqlalchemy import text # 安全的方式 safe_sql = text("SELECT * FROM users WHERE username = :username AND created_at > :date") result = db.session.execute(safe_sql, {'username': username, 'date': some_date}) # 危险!永远不要这样做! dangerous_sql = f"SELECT * FROM users WHERE username = '{username}'"
4.3 输入验证与净化:双保险策略
参数化查询解决了“数据变代码”的问题,但良好的安全实践还需要输入验证。例如,对于用户名字段,我们可以规定其只能包含字母、数字和下划线,且长度在3-20字符之间。这不仅能防止一些边缘情况(虽然参数化查询已能防御),更能保证业务数据的规范性,并抵御潜在的逻辑漏洞或资源消耗攻击(如超长字符串)。
import re def validate_username(username): pattern = r'^[a-zA-Z0-9_]{3,20}$' return bool(re.match(pattern, username)) def validate_password(password): # 至少8位,包含字母和数字 if len(password) < 8: return False if not re.search(r'[a-zA-Z]', password): return False if not re.search(r'\d', password): return False return True @app.route('/login_robust', methods=['POST']) def login_robust(): data = request.get_json() username = data.get('username', '').strip() password = data.get('password', '') # 1. 输入验证 if not validate_username(username): return jsonify({'message': 'Invalid username format'}), 400 if not validate_password(password): return jsonify({'message': 'Password does not meet requirements'}), 400 # 2. 参数化查询(核心防御) query = "SELECT id, username FROM users WHERE username = ? AND password = ?" cursor.execute(query, (username, password)) user = cursor.fetchone() # 3. 统一的认证失败响应(避免信息泄露) if not user: # 不要提示是“用户名错误”还是“密码错误”,统一为“认证失败” return jsonify({'message': 'Authentication failed'}), 401 # 4. 登录成功,准备进入JWT认证环节... user_id, username = user # ... 后续生成JWT令牌这里我们构建了一个更健壮的接口:验证先行,参数化查询作为核心防御,辅以统一的错误信息。这是构建安全API的标准化模式。
5. 进阶守卫:实现安全的JWT认证
用户通过第一道关卡(密码验证)后,我们不应该在每次请求中都让其再次输入密码。这时就需要一个可信的凭证——JWT令牌。
5.1 JWT令牌的生成与安全要点
JWT由三部分组成:Header(头部)、Payload(负载)、Signature(签名)。签名部分确保了令牌的完整性和来源可信,是安全的关键。
# auth.py import jwt import datetime from functools import wraps from flask import request, jsonify, current_app # 一个强密钥,必须保密!生产环境应从环境变量或密钥管理服务获取。 # 这是一个对称密钥(HS256算法使用)。对于更高安全要求,应使用非对称密钥(RS256)。 SECRET_KEY = 'your-very-long-and-very-secret-key-change-this-in-production' def generate_jwt(user_id, username, role='user'): """ 生成JWT令牌 Args: user_id: 用户唯一标识 username: 用户名 role: 用户角色,用于权限控制 Returns: str: 编码后的JWT令牌 """ # 定义令牌的过期时间,例如15分钟后过期 expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) # 构建Payload(负载) payload = { 'user_id': user_id, 'username': username, 'role': role, 'exp': expiration, # 标准字段:过期时间 'iat': datetime.datetime.utcnow(), # 标准字段:签发时间 'iss': 'your-app-name', # 标准字段:签发者(可选) } # 使用HS256算法和密钥生成令牌 token = jwt.encode(payload, SECRET_KEY, algorithm='HS256') # 注意:PyJWT >= 2.0.0 返回的是字符串,之前版本返回字节。 return token # 在登录成功的接口中调用 @app.route('/api/login', methods=['POST']) def api_login(): # ... (之前的输入验证和数据库查询逻辑) if user: user_id, username = user token = generate_jwt(user_id, username) return jsonify({'access_token': token, 'token_type': 'bearer'}) else: return jsonify({'message': 'Authentication failed'}), 401安全要点解析:
- 密钥管理:
SECRET_KEY是生命线。绝对不要硬编码在代码中并提交到版本库。必须使用环境变量(如os.getenv('JWT_SECRET_KEY'))或从专业的密钥管理服务(如AWS KMS, HashiCorp Vault)获取。 - 算法选择:
HS256(对称加密)简单高效,但要求服务端绝对保密密钥。对于分布式系统或需要第三方验证的场景(如单点登录),应使用RS256(非对称加密),服务端持有私钥签名,资源服务器用公钥验证,公钥可以安全分发。 - Payload内容:不要在Payload中存放敏感信息(如密码、密钥)。因为JWT的Payload只是Base64编码,并非加密,任何人都可以解码查看。只存放必要的、非敏感的身份标识信息。
- 过期时间(exp):这是必须的。短时间的令牌(如15-30分钟)可以限制令牌泄露后的危害窗口。这引出了下一个核心机制——刷新令牌。
5.2 令牌验证与保护API端点
生成了令牌,接下来就要在需要保护的API端点前验证它。我们创建一个Flask装饰器来实现这个功能。
# auth.py (续) def token_required(f): """验证JWT令牌的装饰器""" @wraps(f) def decorated_function(*args, **kwargs): token = None # 从请求头中获取令牌,标准格式:Authorization: Bearer <token> auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): token = auth_header.split(' ')[1] if not token: return jsonify({'message': 'Token is missing!'}), 401 try: # 解码并验证令牌 # `options` 参数可以设置验证项,这里验证签名和过期时间 data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'], options={'verify_exp': True}) # 将解码后的用户信息存入请求上下文,方便视图函数使用 request.current_user = data except jwt.ExpiredSignatureError: return jsonify({'message': 'Token has expired!'}), 401 except jwt.InvalidTokenError as e: # 捕获所有其他无效令牌错误(如签名错误、格式错误等) current_app.logger.warning(f'Invalid token attempt: {e}') return jsonify({'message': 'Token is invalid!'}), 401 return f(*args, **kwargs) return decorated_function # 使用装饰器保护API @app.route('/api/protected', methods=['GET']) @token_required def protected_resource(): """一个受保护的资源端点,需要有效的JWT令牌才能访问""" current_user = request.current_user return jsonify({ 'message': f'Hello, {current_user["username"]}!', 'user_id': current_user['user_id'], 'role': current_user.get('role') })现在,客户端在调用/api/protected时,必须在请求头中携带Authorization: Bearer <your-jwt-token>。我们的装饰器会自动完成验证。
5.3 实现令牌刷新机制
由于访问令牌(Access Token)有效期很短,用户不可能每15分钟就重新登录一次。这就需要引入刷新令牌(Refresh Token)机制。
- 访问令牌:生命周期短(如15分钟),用于访问API资源。即使泄露,危害期也有限。
- 刷新令牌:生命周期长(如7天),仅用于获取新的访问令牌,不能直接访问资源。它被安全地存储在服务端(如数据库或Redis)并与用户关联。
# auth.py (续,简化版刷新逻辑) def generate_refresh_token(user_id): """生成一个唯一的刷新令牌,并存储到数据库""" import uuid refresh_token = str(uuid.uuid4()) expiration = datetime.datetime.utcnow() + datetime.timedelta(days=7) # 这里应该将 refresh_token, user_id, expiration 存入数据库的 refresh_tokens 表 # 例如:db.session.add(RefreshToken(token=refresh_token, user_id=user_id, expires_at=expiration)) # db.session.commit() store_refresh_token_in_db(user_id, refresh_token, expiration) # 假设的函数 return refresh_token @app.route('/api/refresh', methods=['POST']) def refresh_access_token(): """使用刷新令牌获取新的访问令牌""" data = request.get_json() refresh_token = data.get('refresh_token') if not refresh_token: return jsonify({'message': 'Refresh token is required'}), 400 # 1. 验证刷新令牌是否有效且在数据库中且未过期 is_valid, user_id = validate_refresh_token_from_db(refresh_token) # 假设的函数 if not is_valid: return jsonify({'message': 'Invalid or expired refresh token'}), 401 # 2. 可选:使旧的刷新令牌失效(单次使用),增强安全性 # revoke_refresh_token(refresh_token) # 3. 生成新的访问令牌和(可选的)新的刷新令牌 user = get_user_by_id(user_id) # 假设的函数 new_access_token = generate_jwt(user.id, user.username, user.role) # new_refresh_token = generate_refresh_token(user.id) # 如果需要轮换刷新令牌 return jsonify({ 'access_token': new_access_token, 'token_type': 'bearer', # 'refresh_token': new_refresh_token })客户端在访问令牌过期后,使用刷新令牌调用/api/refresh获取新的访问令牌。如果刷新令牌也过期或无效,用户才需要重新登录。这种机制在安全性和用户体验之间取得了良好平衡。
注意事项:刷新令牌必须安全存储。对于Web应用,应使用
HttpOnly、Secure、SameSite标记的Cookie来存储,防止XSS攻击窃取。对于原生App,应使用操作系统的安全存储机制(如Keychain/Keystore)。
6. 常见安全陷阱与深度防御策略
即使我们正确使用了参数化查询和JWT,仍然可能掉入其他安全陷阱。下面是一些高级威胁和应对策略。
6.1 JWT安全进阶:抵御令牌伪造与泄露
密钥强度与泄露:
- 问题:使用弱密钥(如
secret、password)或密钥泄露,攻击者可以伪造任意令牌。 - 对策:使用强随机密钥(如
os.urandom(32)生成),并通过环境变量管理。定期轮换密钥(需做好新旧令牌同时有效的过渡期)。对于RS256算法,保护好私钥。
- 问题:使用弱密钥(如
算法混淆攻击:
- 问题:JWT头部中的
alg字段指定签名算法。如果服务器配置为支持多种算法(如HS256和RS256),攻击者可能将alg改为none(如果服务器错误地允许)或从非对称算法改为对称算法,从而绕过验证。 - 对策:在解码时,显式指定允许的算法列表,如
algorithms=['HS256']。永远不要使用None算法。
# 错误!容易受到算法混淆攻击 # decoded = jwt.decode(token, SECRET_KEY) # 正确!显式指定算法 decoded = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])- 问题:JWT头部中的
令牌泄露与吊销:
- 问题:JWT一旦签发,在过期前一直有效。如果令牌在有效期内泄露(如通过XSS攻击、日志记录),无法立即使其失效。
- 对策:
- 短期令牌:设置较短的过期时间(如15分钟),配合刷新令牌。
- 令牌黑名单:对于关键操作(如登出、修改密码),将相关令牌的
jti(JWT ID)加入黑名单(存储在Redis或数据库),并在验证时检查。这会引入状态,但提升了安全性。 - 使用会话:对于安全性要求极高的应用,可以考虑回归有状态的会话管理,将令牌仅作为会话ID。
6.2 超越SQL注入:其他常见Web安全漏洞
跨站脚本攻击(XSS):
- 问题:攻击者将恶意脚本(如JavaScript)注入到网页中,当其他用户浏览时执行,窃取Cookie、会话令牌或进行其他操作。
- 对策(后端角度):
- 输出编码:将所有用户可控的数据在输出到HTML、JavaScript、CSS或URL时进行正确的编码。使用模板引擎(如Jinja2)的自动转义功能(默认开启),并避免使用
|safe过滤器除非绝对必要。 - 设置安全Header:使用
Flask-Talisman等扩展自动设置Content-Security-Policy (CSP),限制页面可以加载资源的来源,能有效缓解XSS。
- 输出编码:将所有用户可控的数据在输出到HTML、JavaScript、CSS或URL时进行正确的编码。使用模板引擎(如Jinja2)的自动转义功能(默认开启),并避免使用
跨站请求伪造(CSRF):
- 问题:诱骗已登录的用户在不知情的情况下提交恶意请求(如转账、改密码)。
- 对策:对于有状态的Web应用(使用Cookie-Session),应使用CSRF令牌。对于纯API(使用JWT等Token认证),由于通常不依赖浏览器自动发送的Cookie(而是手动在Header中添加Token),CSRF风险较低,但仍需注意。
敏感信息泄露:
- 问题:在错误信息、日志、响应体中暴露数据库结构、服务器路径、API密钥等。
- 对策:
- 生产环境关闭
DEBUG模式。 - 自定义统一的、信息模糊的错误处理页面(如“服务器内部错误”)。
- 审查日志内容,避免记录敏感数据(如完整请求体、密码、令牌)。
- 确保
.git目录、配置文件(如.env)、备份文件等不被部署到Web根目录。
- 生产环境关闭
6.3 安全配置检查清单
在应用上线前,建议对照此清单进行检查:
| 检查项 | 安全实践 | 检查方法/工具 |
|---|---|---|
| 依赖安全 | 定期更新依赖,修复已知漏洞。 | pip list --outdated,safety check, GitHub Dependabot |
| SQL注入 | 全部使用参数化查询或ORM。 | 代码审查,搜索execute(和字符串拼接(如+,%,format)。 |
| JWT配置 | 使用强密钥,设置合理过期时间,指定算法列表。 | 检查jwt.encode/decode调用。密钥是否从环境变量读取? |
| XSS防护 | 模板自动转义开启,设置CSP Header。 | 浏览器开发者工具查看响应头,检查是否有unsafe-inline。 |
| 敏感数据 | 密码哈希存储(如bcrypt),日志不记录敏感信息。 | 检查密码存储字段,是否为长随机字符串?查看日志文件内容。 |
| HTTPS强制 | 生产环境全部使用HTTPS,HTTP重定向到HTTPS。 | 访问http://版本,看是否跳转到https://。 |
| 错误处理 | 生产环境关闭调试模式,返回通用错误页面。 | 访问一个不存在的路由,看是否暴露堆栈信息。 |
| 文件上传 | 限制文件类型、大小,重命名文件,存储在Web根目录外。 | 检查上传处理代码,是否有白名单验证? |
7. 实战演练:构建一个完整的受保护用户信息API
让我们综合运用以上所有知识,构建一个简单的用户信息管理系统。它包含注册、登录、查看/更新个人信息等端点,并全程实施安全防护。
# app_final.py (核心部分展示) from flask import Flask, request, jsonify, g import sqlite3 import bcrypt import jwt import datetime from functools import wraps app = Flask(__name__) DATABASE = 'app.db' SECRET_KEY = 'your-secret-key-from-env' # 应从环境变量读取 # 数据库工具函数(略) def get_db(): # ... 返回数据库连接 def init_db(): # ... 创建users表,密码字段存储bcrypt哈希值 # 安全工具函数 def hash_password(password): salt = bcrypt.gensalt() return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') def check_password(hashed, password): return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def create_token(user_id): payload = { 'user_id': user_id, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=15), 'iat': datetime.datetime.utcnow() } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') def token_required(f): @wraps(f) def decorated(*args, **kwargs): # ... 同之前的token_required装饰器实现 # 将解码后的user_id存入g对象 g.user_id = data['user_id'] return f(*args, **kwargs) return decorated # API端点 @app.route('/api/register', methods=['POST']) def register(): # 1. 输入验证(用户名、邮箱格式,密码强度) # 2. 检查用户名是否已存在(使用参数化查询) # 3. 密码哈希处理 hash_password() # 4. 将用户信息存入数据库 # 5. 返回成功,不直接返回令牌(需要登录) pass @app.route('/api/login', methods=['POST']) def login(): username = request.json.get('username') password = request.json.get('password') # 1. 输入验证 # 2. 参数化查询获取用户信息(包括密码哈希) db = get_db() cursor = db.cursor() cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) user = cursor.fetchone() # 3. 验证密码 if user and check_password(user[1], password): token = create_token(user[0]) return jsonify({'access_token': token, 'token_type': 'bearer'}) else: return jsonify({'message': 'Authentication failed'}), 401 @app.route('/api/profile', methods=['GET']) @token_required def get_profile(): user_id = g.user_id # 使用参数化查询,并且只返回当前token对应的用户信息 db = get_db() cursor = db.cursor() cursor.execute("SELECT id, username, email, created_at FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if user: return jsonify(dict(zip(['id','username','email','created_at'], user))) else: return jsonify({'message': 'User not found'}), 404 @app.route('/api/profile', methods=['PUT']) @token_required def update_profile(): user_id = g.user_id new_email = request.json.get('email') # 1. 验证新邮箱格式 # 2. 使用参数化查询更新当前用户的邮箱 db = get_db() cursor = db.cursor() cursor.execute("UPDATE users SET email = ? WHERE id = ?", (new_email, user_id)) db.commit() return jsonify({'message': 'Profile updated'}) if __name__ == '__main__': init_db() app.run(debug=False) # 生产环境务必设为False这个示例展示了一个安全后端API的骨架:输入验证、参数化查询、密码哈希、JWT认证、基于令牌的授权。每一个环节都扣上了安全锁。
安全编程不是一项功能,而是一种贯穿始终的思维方式。它要求我们在写下每一行与用户输入、外部交互、数据存储相关的代码时,都本能地问一句:“如果这里是恶意的,会怎么样?” 从今天起,忘掉字符串拼接的SQL,仔细检查你的JWT配置,像设计功能一样设计你的安全边界。真正的安全,就藏在这些看似枯燥的细节与持续的良好习惯之中。