一、系统概述
1.1 项目背景
微爱帮作为特殊群体通信服务平台,为确保信件邮寄的真实性和安全性,需要对用户进行严格的实名认证。通过对接阿里云实名认证服务,实现身份证+人脸的双重验证,保障通信双方身份真实性。
1.2 认证流程
┌─────────────────────────────────────────────────────────────┐ │ 实名认证完整流程 │ ├─────────────────────────────────────────────────────────────┤ │ 用户发起认证 → 输入身份证信息 → 活体检测 → 人脸比对 → 结果返回 │ └─────────────────────────────────────────────────────────────┘二、技术架构设计
2.1 系统架构图
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │ 客户端 │───▶│ API网关 │───▶│ 实名认证服务 │ │ (Web/H5/App)│ │ │ │ │ └─────────────┘ └──────────────┘ └─────────┬───────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ 阿里云实名认证 │ │ │ API服务 │ └─────────────────────────────────▶│ (实人认证) │ └─────────────────┘2.2 组件说明
前端SDK:集成阿里云实人认证SDK,实现活体检测
认证服务:处理认证逻辑,管理认证状态
结果回调:异步接收阿里云认证结果
数据存储:安全存储认证信息,满足合规要求
三、阿里云服务开通与配置
3.1 开通服务
# 1. 登录阿里云控制台 # 2. 搜索"实人认证"服务 # 3. 开通服务并创建应用 # 4. 获取以下关键信息: # - AccessKey ID & Secret # - SceneId(场景ID) # - AppId(应用ID) # - CertifyId(认证流程ID)3.2 SDK下载与配置
// 阿里云实人认证前端SDK配置 const config = { appId: 'your_app_id', sceneId: 'your_scene_id', bizCode: 'weiaibang_auth', language: 'zh_CN', successCallback: function(result) { console.log('认证成功:', result); }, failCallback: function(error) { console.log('认证失败:', error); } };四、核心代码实现
4.1 实名认证服务主类
import hashlib import hmac import base64 import json import time from datetime import datetime, timedelta from typing import Dict, Any, Optional, Tuple from dataclasses import dataclass, asdict from enum import Enum import uuid import requests from cryptography.fernet import Fernet class AuthStatus(Enum): """认证状态枚举""" PENDING = "pending" # 等待认证 PROCESSING = "processing" # 认证中 SUCCESS = "success" # 认证成功 FAILED = "failed" # 认证失败 EXPIRED = "expired" # 认证过期 @dataclass class UserAuthInfo: """用户认证信息""" user_id: str real_name: str id_card_number: str auth_status: AuthStatus certify_id: Optional[str] = None face_compare_score: Optional[float] = None auth_time: Optional[datetime] = None expire_time: Optional[datetime] = None fail_reason: Optional[str] = None request_id: Optional[str] = None front_image_url: Optional[str] = None # 身份证正面照 back_image_url: Optional[str] = None # 身份证反面照 face_image_url: Optional[str] = None # 人脸照片 class AliyunRealPersonAuthService: """阿里云实人认证服务""" def __init__(self, access_key_id: str, access_key_secret: str, scene_id: str, app_id: str, region_id: str = "cn-hangzhou"): """ 初始化实名认证服务 Args: access_key_id: 阿里云AccessKey ID access_key_secret: 阿里云AccessKey Secret scene_id: 场景ID app_id: 应用ID region_id: 区域ID """ self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.scene_id = scene_id self.app_id = app_id self.region_id = region_id # API端点 self.base_url = "https://cloudauth.aliyuncs.com" # 加密密钥(用于敏感信息存储) self.cipher_key = Fernet.generate_key() self.cipher_suite = Fernet(self.cipher_key) # 认证有效期(天) self.auth_validity_days = 365 # 人脸比对阈值 self.face_compare_threshold = 0.8 def _generate_signature(self, params: Dict[str, Any]) -> str: """ 生成阿里云API签名 Args: params: 请求参数 Returns: 签名字符串 """ # 按参数名排序 sorted_params = sorted(params.items()) # 构建规范化的查询字符串 canonicalized_query_string = '&'.join( [f"{self._percent_encode(k)}={self._percent_encode(str(v))}" for k, v in sorted_params] ) # 构建签名字符串 string_to_sign = ( "POST" + "&" + self._percent_encode("/") + "&" + self._percent_encode(canonicalized_query_string) ) # 计算签名 key = self.access_key_secret + "&" signature = base64.b64encode( hmac.new(key.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1).digest() ).decode('utf-8') return signature def _percent_encode(self, s: str) -> str: """URL编码""" import urllib.parse return urllib.parse.quote(s, safe='').replace('+', '%20').replace('*', '%2A').replace('%7E', '~') def _build_common_params(self, action: str) -> Dict[str, Any]: """构建公共参数""" return { "Format": "JSON", "Version": "2020-06-18", "AccessKeyId": self.access_key_id, "SignatureMethod": "HMAC-SHA1", "Timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "SignatureVersion": "1.0", "SignatureNonce": str(uuid.uuid4()), "RegionId": self.region_id, "Action": action } def init_real_person_auth(self, user_id: str, real_name: str, id_card_number: str, callback_url: Optional[str] = None, return_url: Optional[str] = None) -> Dict[str, Any]: """ 初始化实人认证 Args: user_id: 用户ID real_name: 真实姓名 id_card_number: 身份证号码 callback_url: 回调URL return_url: 返回URL Returns: 认证初始化结果 """ try: # 验证身份证号码格式 if not self._validate_id_card(id_card_number): raise ValueError("无效的身份证号码") # 构建请求参数 params = self._build_common_params("InitSmartVerify") params.update({ "SceneId": self.scene_id, "OuterOrderNo": f"weiaibang_{user_id}_{int(time.time())}", "Mode": "enhanced", # 增强模式:身份证+人脸 "CertType": "IDENTITY_CARD", "CertName": real_name, "CertNo": id_card_number, "Mobile": "", # 可选手机号 "Ip": "", # 用户IP "MetaInfo": json.dumps({ "appId": self.app_id, "userId": user_id, "source": "weiaibang" }) }) if callback_url: params["CallbackUrl"] = callback_url if return_url: params["ReturnUrl"] = return_url # 生成签名 params["Signature"] = self._generate_signature(params) # 发送请求 response = requests.post(self.base_url, data=params) result = response.json() if result.get("Code") != "200": raise Exception(f"阿里云认证初始化失败: {result.get('Message')}") # 解析响应 data = result.get("ResultObject", {}) return { "success": True, "certify_id": data.get("CertifyId"), "verify_url": data.get("VerifyUrl"), # 认证页面URL "request_id": result.get("RequestId"), "message": "认证初始化成功" } except Exception as e: return { "success": False, "error": str(e), "certify_id": None, "verify_url": None } def describe_auth_result(self, certify_id: str) -> Dict[str, Any]: """ 查询认证结果 Args: certify_id: 认证ID Returns: 认证结果详情 """ try: params = self._build_common_params("DescribeSmartVerify") params.update({ "CertifyId": certify_id, "PictureReturnType": "base64" # 返回base64格式的图片 }) # 生成签名 params["Signature"] = self._generate_signature(params) # 发送请求 response = requests.post(self.base_url, data=params) result = response.json() if result.get("Code") != "200": raise Exception(f"查询认证结果失败: {result.get('Message')}") data = result.get("ResultObject", {}) # 解析认证结果 verify_result = data.get("VerifyResult", {}) material_info = data.get("MaterialInfo", {}) return { "success": True, "passed": verify_result.get("Passed", False), "confidence": verify_result.get("Confidence", 0.0), "risk_score": verify_result.get("RiskScore", 0.0), "face_compare_score": verify_result.get("FaceComparisonScore", 0.0), "id_card_info": { "name": material_info.get("Name"), "id_number": material_info.get("IdCardNumber"), "front_image": material_info.get("IdCardFrontPic"), "back_image": material_info.get("IdCardBackPic"), "face_image": material_info.get("FacePic"), "start_date": material_info.get("StartDate"), "end_date": material_info.get("EndDate"), "authority": material_info.get("Authority") }, "verify_status": data.get("VerifyStatus"), "request_id": result.get("RequestId") } except Exception as e: return { "success": False, "error": str(e), "passed": False } def face_compare(self, source_image: str, # base64格式 target_image: str, # base64格式 source_type: str = "CERT_PHOTO", target_type: str = "FACE_PICTURE") -> Dict[str, Any]: """ 人脸比对 Args: source_image: 源图片(base64) target_image: 目标图片(base64) source_type: 源图片类型 target_type: 目标图片类型 Returns: 比对结果 """ try: params = self._build_common_params("CompareFaceVerify") params.update({ "SceneId": self.scene_id, "OuterOrderNo": f"face_compare_{int(time.time())}", "Mode": "normal", "SourceImageType": source_type, "SourceImage": source_image, "TargetImageType": target_type, "TargetImage": target_image, "MetaInfo": json.dumps({ "appId": self.app_id, "source": "weiaibang_face_compare" }) }) # 生成签名 params["Signature"] = self._generate_signature(params) # 发送请求 response = requests.post(self.base_url, data=params) result = response.json() if result.get("Code") != "200": raise Exception(f"人脸比对失败: {result.get('Message')}") data = result.get("ResultObject", {}) return { "success": True, "similarity_score": data.get("SimilarityScore", 0.0), "passed": data.get("SimilarityScore", 0.0) >= self.face_compare_threshold, "confidence": data.get("Confidence", 0.0), "request_id": result.get("RequestId") } except Exception as e: return { "success": False, "error": str(e), "similarity_score": 0.0, "passed": False } def verify_id_card(self, id_card_number: str, real_name: str) -> Dict[str, Any]: """ 身份证二要素验证 Args: id_card_number: 身份证号码 real_name: 真实姓名 Returns: 验证结果 """ try: # 这里使用阿里云的身份证二要素验证API # 注意:实际使用时需要确认具体API名称和参数 params = self._build_common_params("VerifyMaterial") params.update({ "SceneId": self.scene_id, "OuterOrderNo": f"id_verify_{int(time.time())}", "Name": real_name, "IdCardNumber": id_card_number }) # 生成签名 params["Signature"] = self._generate_signature(params) # 发送请求 response = requests.post(self.base_url, data=params) result = response.json() if result.get("Code") != "200": return { "success": False, "error": result.get("Message", "验证失败"), "passed": False } data = result.get("ResultObject", {}) return { "success": True, "passed": data.get("VerifyResult", False), "message": data.get("Message", "验证成功"), "request_id": result.get("RequestId") } except Exception as e: return { "success": False, "error": str(e), "passed": False } def _validate_id_card(self, id_card: str) -> bool: """ 验证身份证号码格式 Args: id_card: 身份证号码 Returns: 是否有效 """ if len(id_card) != 18: return False # 验证校验码 try: # 权重系数 weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2] # 校验码对应值 check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2'] # 计算校验码 total = 0 for i in range(17): total += int(id_card[i]) * weights[i] mod = total % 11 check_code = check_codes[mod] return id_card[17].upper() == check_code except: return False def encrypt_sensitive_data(self, data: str) -> str: """加密敏感数据""" encrypted = self.cipher_suite.encrypt(data.encode()) return encrypted.decode() def decrypt_sensitive_data(self, encrypted_data: str) -> str: """解密敏感数据""" decrypted = self.cipher_suite.decrypt(encrypted_data.encode()) return decrypted.decode()4.2 认证管理服务
python from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float, Boolean, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from contextlib import contextmanager import redis import pickle Base = declarative_base() class UserAuthenticationRecord(Base): """用户认证记录表""" __tablename__ = 'user_authentication_records' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(String(64), nullable=False, index=True) certify_id = Column(String(128), nullable=False, unique=True) real_name = Column(String(64), nullable=False) id_card_number_encrypted = Column(String(512), nullable=False) auth_status = Column(String(32), nullable=False) face_compare_score = Column(Float) auth_time = Column(DateTime) expire_time = Column(DateTime) fail_reason = Column(String(256)) request_id = Column(String(128)) front_image_url = Column(Text) back_image_url = Column(Text) face_image_url = Column(Text) meta_info = Column(Text) # 存储额外信息 created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class AuthenticationManager: """认证管理器""" def __init__(self, db_url: str, redis_host: str = 'localhost', redis_port: int = 6379, redis_password: str = None): # 数据库初始化 self.engine = create_engine(db_url) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) # Redis缓存 self.redis_client = redis.Redis( host=redis_host, port=redis_port, password=redis_password, decode_responses=False ) # 阿里云认证服务 self.auth_service = AliyunRealPersonAuthService( access_key_id=os.getenv("ALIYUN_ACCESS_KEY_ID"), access_key_secret=os.getenv("ALIYUN_ACCESS_KEY_SECRET"), scene_id=os.getenv("ALIYUN_SCENE_ID"), app_id=os.getenv("ALIYUN_APP_ID") ) # 认证缓存时间(秒) self.cache_ttl = 3600 @contextmanager def get_session(self): """获取数据库会话""" session = self.Session() try: yield session session.commit() except: session.rollback() raise finally: session.close() def init_user_auth(self, user_id: str, real_name: str, id_card_number: str, callback_url: Optional[str] = None) -> Dict[str, Any]: """ 初始化用户认证 Args: user_id: 用户ID real_name: 真实姓名 id_card_number: 身份证号码 callback_url: 回调URL Returns: 认证初始化结果 """ try: # 检查用户是否已认证 existing_auth = self.get_user_auth_status(user_id) if existing_auth and existing_auth.get("status") == AuthStatus.SUCCESS.value: return { "success": False, "error": "用户已认证", "code": "ALREADY_AUTHENTICATED" } # 调用阿里云初始化认证 init_result = self.auth_service.init_real_person_auth( user_id=user_id, real_name=real_name, id_card_number=id_card_number, callback_url=callback_url ) if not init_result["success"]: return init_result # 创建认证记录 certify_id = init_result["certify_id"] with self.get_session() as session: # 加密身份证号码 encrypted_id_card = self.auth_service.encrypt_sensitive_data(id_card_number) record = UserAuthenticationRecord( user_id=user_id, certify_id=certify_id, real_name=real_name, id_card_number_encrypted=encrypted_id_card, auth_status=AuthStatus.PENDING.value, request_id=init_result.get("request_id") ) session.add(record) # 缓存认证信息 cache_key = f"auth_init:{certify_id}" self.redis_client.setex( cache_key, self.cache_ttl, pickle.dumps({ "user_id": user_id, "real_name": real_name, "certify_id": certify_id, "init_time": datetime.now().isoformat() }) ) return init_result except Exception as e: return { "success": False, "error": f"初始化认证失败: {str(e)}" } def handle_auth_callback(self, certify_id: str, callback_data: Dict[str, Any]) -> Dict[str, Any]: """ 处理认证回调 Args: certify_id: 认证ID callback_data: 回调数据 Returns: 处理结果 """ try: # 查询认证记录 with self.get_session() as session: record = session.query(UserAuthenticationRecord).filter( UserAuthenticationRecord.certify_id == certify_id ).first() if not record: return {"success": False, "error": "认证记录不存在"} # 更新认证状态 passed = callback_data.get("passed", False) if passed: record.auth_status = AuthStatus.SUCCESS.value record.auth_time = datetime.now() record.expire_time = datetime.now() + timedelta( days=self.auth_service.auth_validity_days ) # 解析并存储认证详情 if "face_compare_score" in callback_data: record.face_compare_score = callback_data["face_compare_score"] if "id_card_info" in callback_data: id_card_info = callback_data["id_card_info"] record.front_image_url = id_card_info.get("front_image") record.back_image_url = id_card_info.get("back_image") record.face_image_url = id_card_info.get("face_image") # 存储额外信息 meta_info = { "id_card_start_date": id_card_info.get("start_date"), "id_card_end_date": id_card_info.get("end_date"), "id_card_authority": id_card_info.get("authority"), "confidence": callback_data.get("confidence"), "risk_score": callback_data.get("risk_score") } record.meta_info = json.dumps(meta_info) else: record.auth_status = AuthStatus.FAILED.value record.fail_reason = callback_data.get("fail_reason", "认证失败") session.commit() # 清理缓存 self.redis_client.delete(f"auth_init:{certify_id}") self.redis_client.delete(f"user_auth:{record.user_id}") return {"success": True, "user_id": record.user_id} except Exception as e: return {"success": False, "error": str(e)} def query_auth_result(self, certify_id: str) -> Dict[str, Any]: """ 查询认证结果 Args: certify_id: 认证ID Returns: 认证结果 """ try: # 检查缓存 cache_key = f"auth_result:{certify_id}" cached_result = self.redis_client.get(cache_key) if cached_result: return pickle.loads(cached_result) # 查询数据库 with self.get_session() as session: record = session.query(UserAuthenticationRecord).filter( UserAuthenticationRecord.certify_id == certify_id ).first() if not record: return {"success": False, "error": "认证记录不存在"} # 如果认证还在进行中,查询阿里云 if record.auth_status in [AuthStatus.PENDING.value, AuthStatus.PROCESSING.value]: result = self.auth_service.describe_auth_result(certify_id) if result["success"]: # 更新数据库 self._update_auth_from_query(record, result) session.commit() # 构建返回结果 auth_result = self._build_auth_result(record) # 缓存结果 self.redis_client.setex( cache_key, self.cache_ttl, pickle.dumps(auth_result) ) return auth_result except Exception as e: return {"success": False, "error": str(e)} def get_user_auth_status(self, user_id: str) -> Optional[Dict[str, Any]]: """ 获取用户认证状态 Args: user_id: 用户ID Returns: 认证状态信息 """ try: # 检查缓存 cache_key = f"user_auth:{user_id}" cached_result = self.redis_client.get(cache_key) if cached_result: return pickle.loads(cached_result) # 查询数据库 with self.get_session() as session: records = session.query(UserAuthenticationRecord).filter( UserAuthenticationRecord.user_id == user_id ).order_by(UserAuthenticationRecord.created_at.desc()).all() if not records: return None # 获取最新的认证记录 latest_record = records[0] # 检查认证是否过期 if (latest_record.auth_status == AuthStatus.SUCCESS.value and latest_record.expire_time and latest_record.expire_time < datetime.now()): latest_record.auth_status = AuthStatus.EXPIRED.value session.commit() # 构建结果 auth_info = self._build_auth_result(latest_record) # 缓存结果 self.redis_client.setex( cache_key, self.cache_ttl, pickle.dumps(auth_info) ) return auth_info except Exception as e: print(f"获取用户认证状态失败: {str(e)}") return None def verify_user_identity(self, user_id: str, face_image: Optional[str] = None) -> Dict[str, Any]: """ 验证用户身份(用于发送信件前的验证) Args: user_id: 用户ID face_image: 人脸图片(base64,可选) Returns: 验证结果 """ try: # 获取用户认证信息 auth_info = self.get_user_auth_status(user_id) if not auth_info: return { "success": False, "passed": False, "reason": "用户未认证" } # 检查认证状态 if auth_info["status"] != AuthStatus.SUCCESS.value: return { "success": False, "passed": False, "reason": f"认证状态: {auth_info['status']}" } # 检查认证是否过期 if auth_info.get("expire_time") and auth_info["expire_time"] < datetime.now().isoformat(): return { "success": False, "passed": False, "reason": "认证已过期" } # 如果需要人脸验证 if face_image and auth_info.get("face_image_url"): # 从数据库获取存储的人脸图片(简化处理,实际需要从存储服务获取) with self.get_session() as session: record = session.query(UserAuthenticationRecord).filter( UserAuthenticationRecord.user_id == user_id, UserAuthenticationRecord.auth_status == AuthStatus.SUCCESS.value ).order_by(UserAuthenticationRecord.created_at.desc()).first() if record and record.face_image_url: # 这里简化处理,实际需要获取图片数据 # stored_face_image = self._get_image_data(record.face_image_url) # 进行人脸比对 compare_result = self.auth_service.face_compare( source_image=face_image, target_image=face_image # 这里应该是存储的人脸图片 ) if not compare_result["passed"]: return { "success": False, "passed": False, "reason": "人脸比对失败", "similarity_score": compare_result["similarity_score"] } return { "success": True, "passed": True, "user_info": { "user_id": user_id, "real_name": auth_info["real_name"], "auth_time": auth_info["auth_time"] } } except Exception as e: return { "success": False, "passed": False, "reason": f"验证失败: {str(e)}" } def _update_auth_from_query(self, record: UserAuthenticationRecord, query_result: Dict[str, Any]): """根据查询结果更新认证记录""" if query_result.get("passed"): record.auth_status = AuthStatus.SUCCESS.value record.auth_time = datetime.now() record.expire_time = datetime.now() + timedelta( days=self.auth_service.auth_validity_days ) record.face_compare_score = query_result.get("face_compare_score") else: record.auth_status = AuthStatus.FAILED.value record.fail_reason = "阿里云认证失败" def _build_auth_result(self, record: UserAuthenticationRecord) -> Dict[str, Any]: """构建认证结果""" result = { "user_id": record.user_id, "certify_id": record.certify_id, "real_name": record.real_name, "status": record.auth_status, "auth_time": record.auth_time.isoformat() if record.auth_time else None, "expire_time": record.expire_time.isoformat() if record.expire_time else None, "face_compare_score": record.face_compare_score, "fail_reason": record.fail_reason } # 添加额外信息 if record.meta_info: try: meta = json.loads(record.meta_info) result.update(meta) except: pass return result4.3 FastAPI认证接口服务
python from fastapi import FastAPI, HTTPException, Depends, Header, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, validator from typing import Optional, List, Dict import uvicorn import os from datetime import datetime app = FastAPI( title="微爱帮实名认证API", description="对接阿里云实名认证服务,支持身份证+人脸双重验证", version="1.0.0" ) # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 生产环境应限制具体域名 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 依赖注入 def get_auth_manager(): """获取认证管理器""" db_url = os.getenv("DATABASE_URL", "sqlite:///./weiaibang_auth.db") return AuthenticationManager(db_url) # 数据模型 class InitAuthRequest(BaseModel): """初始化认证请求""" user_id: str = Field(..., description="用户ID", min_length=1, max_length=64) real_name: str = Field(..., description="真实姓名", min_length=2, max_length=32) id_card_number: str = Field(..., description="身份证号码", min_length=15, max_length=18) callback_url: Optional[str] = Field(None, description="回调URL") @validator('id_card_number') def validate_id_card(cls, v): """验证身份证号码""" # 简单格式验证 if len(v) not in [15, 18]: raise ValueError('身份证号码长度不正确') return v class InitAuthResponse(BaseModel): """初始化认证响应""" success: bool certify_id: Optional[str] = None verify_url: Optional[str] = None request_id: Optional[str] = None error: Optional[str] = None message: Optional[str] = None class AuthCallbackData(BaseModel): """认证回调数据""" certify_id: str passed: bool face_compare_score: Optional[float] = None confidence: Optional[float] = None risk_score: Optional[float] = None id_card_info: Optional[Dict] = None fail_reason: Optional[str] = None class QueryAuthRequest(BaseModel): """查询认证请求""" certify_id: str = Field(..., description="认证ID") class QueryAuthResponse(BaseModel): """查询认证响应""" success: bool user_id: Optional[str] = None status: Optional[str] = None passed: Optional[bool] = None auth_time: Optional[str] = None expire_time: Optional[str] = None face_compare_score: Optional[float] = None error: Optional[str] = None class VerifyIdentityRequest(BaseModel): """验证身份请求""" user_id: str = Field(..., description="用户ID") face_image: Optional[str] = Field(None, description="人脸图片(base64)") class VerifyIdentityResponse(BaseModel): """验证身份响应""" success: bool passed: bool reason: Optional[str] = None similarity_score: Optional[float] = None user_info: Optional[Dict] = None class UserAuthStatusResponse(BaseModel): """用户认证状态响应""" success: bool user_id: str status: str real_name: Optional[str] = None auth_time: Optional[str] = None expire_time: Optional[str] = None is_valid: bool = False days_remaining: Optional[int] = None # API接口 @app.post("/api/v1/auth/init", response_model=InitAuthResponse) async def init_authentication( request: InitAuthRequest, background_tasks: BackgroundTasks, auth_manager: AuthenticationManager = Depends(get_auth_manager) ): """ 初始化实名认证 该接口会调用阿里云实人认证服务,生成认证链接 """ try: # 初始化认证 result = auth_manager.init_user_auth( user_id=request.user_id, real_name=request.real_name, id_card_number=request.id_card_number, callback_url=request.callback_url ) if result["success"]: # 记录操作日志 background_tasks.add_task( log_auth_operation, "init", request.user_id, result["certify_id"], "认证初始化成功" ) return InitAuthResponse(**result) except Exception as e: raise HTTPException(status_code=500, detail=f"初始化认证失败: {str(e)}") @app.post("/api/v1/auth/callback") async def handle_auth_callback( callback_data: AuthCallbackData, auth_manager: AuthenticationManager = Depends(get_auth_manager), x_callback_token: Optional[str] = Header(None) ): """ 认证结果回调接口 阿里云认证完成后会回调此接口 """ # 验证回调Token(可选) if x_callback_token != os.getenv("CALLBACK_TOKEN"): raise HTTPException(status_code=401, detail="无效的回调Token") try: # 处理回调 result = auth_manager.handle_auth_callback( certify_id=callback_data.certify_id, callback_data=callback_data.dict() ) if result["success"]: # 发送认证结果通知 background_tasks.add_task( send_auth_notification, result["user_id"], callback_data.passed ) return {"success": True, "message": "回调处理成功"} else: return {"success": False, "error": result["error"]} except Exception as e: raise HTTPException(status_code=500, detail=f"回调处理失败: {str(e)}") @app.post("/api/v1/auth/query", response_model=QueryAuthResponse) async def query_auth_result( request: QueryAuthRequest, auth_manager: AuthenticationManager = Depends(get_auth_manager) ): """ 查询认证结果 通过certify_id查询认证状态 """ try: result = auth_manager.query_auth_result(request.certify_id) if not result["success"]: return QueryAuthResponse(success=False, error=result.get("error")) # 构建响应 response_data = { "success": True, "user_id": result.get("user_id"), "status": result.get("status"), "passed": result.get("status") == AuthStatus.SUCCESS.value, "auth_time": result.get("auth_time"), "expire_time": result.get("expire_time"), "face_compare_score": result.get("face_compare_score") } return QueryAuthResponse(**response_data) except Exception as e: return QueryAuthResponse(success=False, error=str(e)) @app.post("/api/v1/auth/verify", response_model=VerifyIdentityResponse) async def verify_user_identity( request: VerifyIdentityRequest, auth_manager: AuthenticationManager = Depends(get_auth_manager) ): """ 验证用户身份 用于发送信件前的身份验证 """ try: result = auth_manager.verify_user_identity( user_id=request.user_id, face_image=request.face_image ) return VerifyIdentityResponse(**result) except Exception as e: return VerifyIdentityResponse( success=False, passed=False, reason=f"验证失败: {str(e)}" ) @app.get("/api/v1/auth/status/{user_id}", response_model=UserAuthStatusResponse) async def get_user_auth_status( user_id: str, auth_manager: AuthenticationManager = Depends(get_auth_manager) ): """ 获取用户认证状态 """ try: auth_info = auth_manager.get_user_auth_status(user_id) if not auth_info: return UserAuthStatusResponse( success=False, user_id=user_id, status="未认证", is_valid=False ) # 计算剩余天数 days_remaining = None is_valid = False if auth_info.get("expire_time"): expire_time = datetime.fromisoformat(auth_info["expire_time"].replace('Z', '+00:00')) now = datetime.now() if expire_time > now: days_remaining = (expire_time - now).days is_valid = auth_info["status"] == AuthStatus.SUCCESS.value return UserAuthStatusResponse( success=True, user_id=user_id, status=auth_info.get("status", "未知"), real_name=auth_info.get("real_name"), auth_time=auth_info.get("auth_time"), expire_time=auth_info.get("expire_time"), is_valid=is_valid, days_remaining=days_remaining ) except Exception as e: return UserAuthStatusResponse( success=False, user_id=user_id, status="查询失败", is_valid=False ) @app.get("/api/v1/auth/supported_documents") async def get_supported_documents(): """ 获取支持的证件类型 """ return { "success": True, "documents": [ { "type": "IDENTITY_CARD", "name": "居民身份证", "description": "中国大陆居民身份证", "supported": True }, { "type": "PASSPORT", "name": "护照", "description": "中国护照", "supported": False # 后续支持 }, { "type": "HONGKONG_MACAO", "name": "港澳居民来往内地通行证", "description": "港澳居民证件", "supported": False # 后续支持 } ] } # 辅助函数 async def log_auth_operation(operation: str, user_id: str, certify_id: str, message: str): """记录认证操作日志""" log_entry = { "timestamp": datetime.now().isoformat(), "operation": operation, "user_id": user_id, "certify_id": certify_id, "message": message, "ip": "127.0.0.1" # 实际应从请求中获取 } # 这里应该将日志存入数据库或日志系统 print(f"Auth log: {json.dumps(log_entry, ensure_ascii=False)}") async def send_auth_notification(user_id: str, passed: bool): """发送认证结果通知""" # 这里应该发送短信、邮件或站内信通知用户 if passed: message = "恭喜您,实名认证已通过!现在可以开始使用微爱帮的服务了。" else: message = "您的实名认证未通过,请重新尝试或联系客服。" print(f"发送通知给用户 {user_id}: {message}") # 健康检查 @app.get("/health") async def health_check(): """健康检查接口""" return { "status": "healthy", "service": "weiaibang-authentication", "timestamp": datetime.now().isoformat(), "version": "1.0.0" } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8001)4.4 前端集成示例
javascript // real-name-auth.js class WeiAiBangRealNameAuth { constructor(options = {}) { this.options = { apiEndpoint: 'https://auth.weiaibang.com/api/v1/auth', aliyunScriptUrl: 'https://cloudauth.alicdn.com/cloudauth-component/cloudauth-component.js', ...options }; this.userInfo = null; this.authStatus = null; this.eventListeners = new Map(); // 加载阿里云SDK this.loadAliyunSDK(); } async loadAliyunSDK() { if (window.CloudAuthComponent) { return; } return new Promise((resolve, reject) => { const script = document.createElement('script'); script.src = this.options.aliyunScriptUrl; script.onload = resolve; script.onerror = reject; document.head.appendChild(script); }); } /** * 初始化实名认证 */ async initAuth(userInfo) { try { this.userInfo = userInfo; // 调用后端接口获取认证参数 const initResponse = await this.initAuthRequest(userInfo); if (!initResponse.success) { throw new Error(initResponse.error || '初始化认证失败'); } // 保存certify_id this.certifyId = initResponse.certify_id; // 触发认证初始化成功事件 this.dispatchEvent('authinit', { certifyId: this.certifyId, verifyUrl: initResponse.verify_url }); // 开始认证流程 await this.startAuthFlow(initResponse.verify_url); return initResponse; } catch (error) { this.dispatchEvent('autherror', { error: error.message, stage: 'init' }); throw error; } } async initAuthRequest(userInfo) { const response = await fetch(`${this.options.apiEndpoint}/init`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: userInfo.userId, real_name: userInfo.realName, id_card_number: userInfo.idCardNumber, callback_url: `${this.options.apiEndpoint}/callback` }) }); return await response.json(); } /** * 开始认证流程 */ async startAuthFlow(verifyUrl) { // 创建认证容器 const authContainer = this.createAuthContainer(); // 初始化阿里云认证组件 const authComponent = new CloudAuthComponent({ container: authContainer, url: verifyUrl, width: '100%', height: '500px', onSuccess: (result) => { this.handleAuthSuccess(result); }, onFail: (error) => { this.handleAuthFail(error); }, onCancel: () => { this.handleAuthCancel(); } }); // 启动认证 authComponent.start(); // 监听认证状态 this.startPolling(); } createAuthContainer() { // 移除已存在的容器 const existingContainer = document.getElementById('weiaibang-auth-container'); if (existingContainer) { existingContainer.remove(); } // 创建新容器 const container = document.createElement('div'); container.id = 'weiaibang-auth-container'; container.style.position = 'fixed'; container.style.top = '0'; container.style.left = '0'; container.style.width = '100%'; container.style.height = '100%'; container.style.backgroundColor = 'rgba(0, 0, 0, 0.8)'; container.style.zIndex = '9999'; container.style.display = 'flex'; container.style.alignItems = 'center'; container.style.justifyContent = 'center'; // 创建内容区域 const content = document.createElement('div'); content.style.backgroundColor = 'white'; content.style.borderRadius = '8px'; content.style.padding = '20px'; content.style.width = '90%'; content.style.maxWidth = '500px'; content.style.maxHeight = '90vh'; content.style.overflow = 'auto'; // 创建关闭按钮 const closeButton = document.createElement('button'); closeButton.innerHTML = '×'; closeButton.style.position = 'absolute'; closeButton.style.top = '10px'; closeButton.style.right = '10px'; closeButton.style.background = 'none'; closeButton.style.border = 'none'; closeButton.style.fontSize = '24px'; closeButton.style.cursor = 'pointer'; closeButton.style.color = '#666'; closeButton.onclick = () => { container.remove(); this.dispatchEvent('authcancel', {}); }; // 创建标题 const title = document.createElement('h2'); title.textContent = '实名认证'; title.style.textAlign = 'center'; title.style.marginBottom = '20px'; title.style.color = '#333'; // 创建说明 const description = document.createElement('p'); description.textContent = '请按照提示完成身份证信息验证和人脸识别'; description.style.textAlign = 'center'; description.style.color = '#666'; description.style.marginBottom = '20px'; // 创建认证区域 const authArea = document.createElement('div'); authArea.id = 'weiaibang-auth-area'; // 组装容器 content.appendChild(closeButton); content.appendChild(title); content.appendChild(description); content.appendChild(authArea); container.appendChild(content); document.body.appendChild(container); return authArea; } /** * 轮询认证结果 */ startPolling() { if (this.pollingInterval) { clearInterval(this.pollingInterval); } this.pollingInterval = setInterval(async () => { const result = await this.queryAuthResult(); if (result.status === 'success' || result.status === 'failed') { clearInterval(this.pollingInterval); if (result.status === 'success') { this.dispatchEvent('authsuccess', result); } else { this.dispatchEvent('authfail', { error: '认证失败', details: result }); } } }, 3000); // 每3秒查询一次 } async queryAuthResult() { if (!this.certifyId) { return { status: 'error', error: 'No certify ID' }; } try { const response = await fetch(`${this.options.apiEndpoint}/query`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ certify_id: this.certifyId }) }); return await response.json(); } catch (error) { return { status: 'error', error: error.message }; } } /** * 处理认证成功 */ handleAuthSuccess(result) { console.log('认证成功:', result); // 移除认证容器 const container = document.getElementById('weiaibang-auth-container'); if (container) { container.remove(); } // 停止轮询 if (this.pollingInterval) { clearInterval(this.pollingInterval); } // 触发认证成功事件 this.dispatchEvent('authsuccess', result); // 显示成功消息 this.showSuccessMessage(); } /** * 处理认证失败 */ handleAuthFail(error) { console.log('认证失败:', error); // 显示错误信息 this.showErrorMessage(error.message || '认证失败'); // 触发认证失败事件 this.dispatchEvent('authfail', { error }); } /** * 处理认证取消 */ handleAuthCancel() { console.log('认证取消'); // 移除认证容器 const container = document.getElementById('weiaibang-auth-container'); if (container) { container.remove(); } // 停止轮询 if (this.pollingInterval) { clearInterval(this.pollingInterval); } // 触发认证取消事件 this.dispatchEvent('authcancel', {}); } /** * 显示成功消息 */ showSuccessMessage() { const message = document.createElement('div'); message.style.position = 'fixed'; message.style.top = '20px'; message.style.right = '20px'; message.style.backgroundColor = '#4CAF50'; message.style.color = 'white'; message.style.padding = '15px 20px'; message.style.borderRadius = '4px'; message.style.boxShadow = '0 2px 10px rgba(0,0,0,0.2)'; message.style.zIndex = '10000'; message.textContent = '实名认证成功!'; document.body.appendChild(message); setTimeout(() => { message.remove(); }, 3000); } /** * 显示错误消息 */ showErrorMessage(errorMessage) { const message = document.createElement('div'); message.style.position = 'fixed'; message.style.top = '20px'; message.style.right = '20px'; message.style.backgroundColor = '#f44336'; message.style.color = 'white'; message.style.padding = '15px 20px'; message.style.borderRadius = '4px'; message.style.boxShadow = '0 2px 10px rgba(0,0,0,0.2)'; message.style.zIndex = '10000'; message.textContent = `认证失败: ${errorMessage}`; document.body.appendChild(message); setTimeout(() => { message.remove(); }, 5000); } /** * 检查用户认证状态 */ async checkAuthStatus(userId) { try { const response = await fetch(`${this.options.apiEndpoint}/status/${userId}`); const result = await response.json(); return result; } catch (error) { return { success: false, status: 'unknown', is_valid: false }; } } /** * 快速身份验证(用于发送信件前) */ async quickVerify(userId, faceImage = null) { try { const requestBody = { user_id: userId }; if (faceImage) { requestBody.face_image = faceImage; } const response = await fetch(`${this.options.apiEndpoint}/verify`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(requestBody) }); return await response.json(); } catch (error) { return { success: false, passed: false, reason: error.message }; } } /** * 事件监听 */ on(eventName, callback) { if (!this.eventListeners.has(eventName)) { this.eventListeners.set(eventName, []); } this.eventListeners.get(eventName).push(callback); } dispatchEvent(eventName, data) { const listeners = this.eventListeners.get(eventName) || []; listeners.forEach(callback => { try { callback(data); } catch (error) { console.error(`Error in event listener for ${eventName}:`, error); } }); } } // 使用示例 document.addEventListener('DOMContentLoaded', () => { const realNameAuth = new WeiAiBangRealNameAuth(); // 开始认证按钮 document.getElementById('startAuthButton').addEventListener('click', async () => { const userInfo = { userId: localStorage.getItem('weiaibang_user_id') || 'user_' + Math.random().toString(36).substr(2, 9), realName: document.getElementById('realName').value, idCardNumber: document.getElementById('idCardNumber').value }; if (!userInfo.realName || !userInfo.idCardNumber) { alert('请输入姓名和身份证号码'); return; } try { await realNameAuth.initAuth(userInfo); } catch (error) { alert('启动认证失败: ' + error.message); } }); // 监听认证事件 realNameAuth.on('authsuccess', (result) => { console.log('认证成功,结果:', result); alert('实名认证成功!您现在可以发送信件了。'); // 更新UI document.getElementById('authStatus').textContent = '已认证'; document.getElementById('authStatus').style.color = 'green'; document.getElementById('sendLetterButton').disabled = false; }); realNameAuth.on('authfail', (error) => { console.log('认证失败:', error); alert('实名认证失败,请重新尝试。'); }); realNameAuth.on('authcancel', () => { console.log('认证取消'); alert('您已取消实名认证。'); }); // 页面加载时检查认证状态 const userId = localStorage.getItem('weiaibang_user_id'); if (userId) { realNameAuth.checkAuthStatus(userId).then(status => { if (status.is_valid) { document.getElementById('authStatus').textContent = '已认证'; document.getElementById('authStatus').style.color = 'green'; document.getElementById('sendLetterButton').disabled = false; } else { document.getElementById('authStatus').textContent = '未认证'; document.getElementById('authStatus').style.color = 'red'; document.getElementById('sendLetterButton').disabled = true; } }); } // 发送信件前的快速验证 document.getElementById('sendLetterButton').addEventListener('click', async (e) => { e.preventDefault(); const userId = localStorage.getItem('weiaibang_user_id'); if (!userId) { alert('请先登录'); return; } // 检查认证状态 const authStatus = await realNameAuth.checkAuthStatus(userId); if (!authStatus.is_valid) { alert('您需要先完成实名认证才能发送信件'); return; } // 如果需要,进行快速人脸验证 const needsFaceVerify = false; // 可以根据业务需求调整 if (needsFaceVerify) { // 这里可以调用摄像头进行人脸验证 const faceImage = await captureFaceImage(); const verifyResult = await realNameAuth.quickVerify(userId, faceImage); if (!verifyResult.passed) { alert('身份验证失败: ' + verifyResult.reason); return; } } // 验证通过,可以发送信件 alert('身份验证通过,正在发送信件...'); // 这里执行发送信件的逻辑 }); // 人脸捕获函数(示例) async function captureFaceImage() { // 这里应该实现调用摄像头拍照的功能 // 简化实现,返回一个假的数据 return 'data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAA...'; } });五、部署与运维
5.1 Docker部署配置
# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ libssl-dev \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8001 # 启动命令 CMD ["uvicorn", "auth_api:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "4"]5.2 Nginx配置
# nginx.conf upstream auth_service { server 127.0.0.1:8001; keepalive 32; } server { listen 443 ssl http2; server_name auth.weiaibang.com; ssl_certificate /etc/ssl/weiaibang.crt; ssl_certificate_key /etc/ssl/weiaibang.key; # 安全头 add_header X-Frame-Options DENY; add_header X-Content-Type-Options nosniff; add_header X-XSS-Protection "1; mode=block"; location /api/v1/auth/ { proxy_pass http://auth_service; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; # 超时设置 proxy_connect_timeout 30s; proxy_send_timeout 60s; proxy_read_timeout 60s; } location /health { proxy_pass http://auth_service/health; access_log off; } }六、安全与合规
6.1 数据安全措施
# security_manager.py class AuthSecurityManager: """认证安全管理器""" def __init__(self): # 加密密钥(从环境变量获取) self.encryption_key = os.getenv("ENCRYPTION_KEY") # 访问控制配置 self.rate_limit_per_minute = 10 # 每分钟最大请求数 self.max_failed_attempts = 5 # 最大失败尝试次数 # 审计日志配置 self.audit_log_path = "/var/log/weiaibang/auth_audit.log" def validate_id_card_info(self, id_card: str, name: str) -> Dict[str, Any]: """验证身份证信息安全性""" # 1. 格式验证 if not self._validate_id_card_format(id_card): return {"valid": False, "reason": "身份证格式不正确"} # 2. 敏感信息过滤 if self._contains_sensitive_pattern(name): return {"valid": False, "reason": "姓名包含敏感词汇"} # 3. 黑名单检查 if self._is_in_blacklist(id_card, name): return {"valid": False, "reason": "信息在黑名单中"} return {"valid": True} def log_audit_trail(self, user_id: str, action: str, result: str, ip: str): """记录审计日志""" log_entry = { "timestamp": datetime.now().isoformat(), "user_id": user_id, "action": action, "result": result, "ip": ip, "user_agent": request.headers.get("User-Agent", ""), "trace_id": request.headers.get("X-Trace-ID", "") } # 写入审计日志文件 with open(self.audit_log_path, 'a') as f: f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')6.2 GDPR合规处理
# gdpr_compliance.py class GDPRComplianceManager: """GDPR合规管理器""" @staticmethod def anonymize_personal_data(data: Dict[str, Any]) -> Dict[str, Any]: """匿名化个人数据""" anonymized = data.copy() if "id_card_number" in anonymized: # 保留前6位和后4位 id_card = anonymized["id_card_number"] if len(id_card) == 18: anonymized["id_card_number"] = f"{id_card[:6]}******{id_card[-4:]}" if "real_name" in anonymized: # 姓名脱敏 name = anonymized["real_name"] if len(name) > 1: anonymized["real_name"] = name[0] + "*" * (len(name) - 1) return anonymized @staticmethod def delete_user_data(user_id: str) -> bool: """删除用户数据(被遗忘权)""" try: # 1. 标记删除用户记录 # 2. 清理相关文件 # 3. 记录删除操作 return True except: return False7.1 监控指标
# prometheus.yml scrape_configs: - job_name: 'weiaibang_auth_service' static_configs: - targets: ['auth-service:8001'] metrics_path: '/metrics' - job_name: 'auth_api' static_configs: - targets: ['auth-service:8001'] metrics_path: '/api/v1/auth/metrics' # 关键监控指标 metrics_to_monitor: - auth_init_total - auth_success_total - auth_failure_total - auth_processing_time_seconds - active_auth_sessions - cache_hit_rate - database_connection_pool八、测试方案
8.1 单元测试
# test_real_name_auth.py import pytest from unittest.mock import Mock, patch class TestRealNameAuthSystem: @pytest.fixture def auth_service(self): from real_name_auth import AliyunRealPersonAuthService return AliyunRealPersonAuthService( access_key_id='test_key', access_key_secret='test_secret', scene_id='test_scene', app_id='test_app' ) def test_id_card_validation(self, auth_service): """测试身份证验证""" # 测试有效身份证 valid_id = '110101199003079438' # 示例身份证 assert auth_service._validate_id_card(valid_id) == True # 测试无效身份证 invalid_id = '110101199003079439' # 校验码错误 assert auth_service._validate_id_card(invalid_id) == False @pytest.mark.asyncio async def test_init_auth(self, auth_service): """测试认证初始化""" with patch('requests.post') as mock_post: mock_response = Mock() mock_response.json.return_value = { "Code": "200", "ResultObject": { "CertifyId": "test_certify_id", "VerifyUrl": "https://verify.aliyun.com/test" } } mock_post.return_value = mock_response result = auth_service.init_real_person_auth( user_id="test_user", real_name="张三", id_card_number="110101199003079438" ) assert result["success"] == True assert "certify_id" in result九、总结
9.1 系统特点
多重验证:身份证二要素 + 人脸识别双重验证
高安全性:端到端加密,敏感信息脱敏存储
合规性保障:符合GDPR等数据保护法规
用户体验:流畅的认证流程,清晰的反馈
可扩展性:支持多种证件类型,易于扩展
9.2 技术优势
阿里云认证服务:依托阿里云强大的实人认证能力
分布式架构:支持高并发认证请求
智能缓存:提高查询性能,减轻数据库压力
完整审计:全程操作可追溯,满足监管要求
9.3 业务价值
保障真实性:确保通信双方身份真实,防止冒名顶替
增强信任:建立用户对平台的信任感
合规运营:满足监管部门对特殊通信服务的要求
数据基础:为后续个性化服务提供数据支撑
微爱帮通过阿里云实名认证系统的对接,不仅提升了平台的安全性,更为特殊群体通信服务筑起了一道坚实的安全防线。我们相信,只有真实可信的连接,才能真正温暖人心,传递希望。