如何实现Qwen流式输出?Flask异步接口代码实例
1. 背景与目标
随着大模型轻量化趋势的发展,将小型语言模型部署在资源受限环境(如CPU服务器、边缘设备)成为可能。Qwen1.5-0.5B-Chat作为通义千问系列中参数量最小的对话模型之一,具备响应快、内存占用低、推理门槛低等优势,非常适合用于构建轻量级智能对话服务。
然而,传统Web接口通常采用“请求-响应”模式,用户需等待模型完整生成结果后才能看到输出,体验较差。为提升交互流畅性,本文聚焦于如何在Flask框架下实现Qwen模型的流式输出,让用户像使用ChatGPT一样逐字接收回复,打造类实时对话体验。
本项目基于ModelScope生态完成模型加载与推理,并结合Flask的流式响应能力,提供一个可直接运行的本地化轻量对话系统解决方案。
2. 技术架构与核心原理
2.1 整体架构设计
系统由三部分组成:
- 前端界面:HTML + JavaScript 构建简易聊天页面,支持消息输入与流式文本渲染。
- 后端服务:Flask应用提供
/chat接口,处理用户输入并返回流式响应。 - 模型推理层:通过Transformers加载Qwen1.5-0.5B-Chat模型,在CPU上进行推理,利用
generate函数配合回调机制实现token级输出。
数据流如下:
用户输入 → Flask接收 → 模型编码 → 逐token生成 → 流式返回 → 前端实时显示2.2 流式输出的核心机制
要实现“打字机”效果,关键在于服务端持续推送未完成文本,而标准HTTP响应是单次闭合的。为此,我们使用Flask的Response对象配合生成器(generator),将模型每生成一个token的结果即时发送给客户端。
Python中可通过以下方式创建流式响应:
from flask import Response return Response(generate(), mimetype='text/plain')其中generate()是一个生成器函数,它在模型生成过程中不断yield部分内容。
2.3 ModelScope模型加载策略
Qwen1.5-0.5B-Chat托管于魔塔社区(ModelScope),我们使用其官方SDK自动下载并加载模型:
from modelscope.pipelines import pipeline from modelsoke.utils.constant import Tasks nlp_pipeline = pipeline(task=Tasks.chat, model='qwen/Qwen1.5-0.5B-Chat')该方式确保模型版本一致性,避免手动管理权重文件带来的兼容性问题。
3. 实现步骤详解
3.1 环境准备与依赖安装
首先创建独立Conda环境并安装必要库:
conda create -n qwen_env python=3.9 conda activate qwen_env pip install torch==2.1.0 transformers==4.36.0 flask jinja2 sentencepiece modelscope注意:建议使用较新版本的
modelscope(≥1.14.0)以支持Qwen1.5系列模型。
3.2 模型初始化与推理封装
定义全局模型加载逻辑,避免每次请求重复加载:
# app.py from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks # 全局变量存储模型管道 chat_pipeline = None def load_model(): global chat_pipeline if chat_pipeline is None: chat_pipeline = pipeline( task=Tasks.chat, model='qwen/Qwen1.5-0.5B-Chat', device='cpu' # 显式指定CPU推理 ) return chat_pipeline3.3 构建流式生成器函数
这是实现流式输出的核心模块。我们需要自定义生成器,在模型解码过程中逐个获取token并返回:
def generate_stream_response(user_input): global chat_pipeline if chat_pipeline is None: load_model() # 使用generate_with_callback模拟流式生成 def token_callback(text): yield f"data: {text}\n\n" # SSE格式 # 实际上Transformers不直接支持callback,需改用迭代方式 from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained('qwen/Qwen1.5-0.5B-Chat', trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained('qwen/Qwen1.5-0.5B-Chat', trust_remote_code=True, device_map='cpu') inputs = tokenizer(user_input, return_tensors="pt").to('cpu') streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=512) thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() for text in streamer: yield f"data: {text}\n\n" yield "data: [DONE]\n\n"注:上述代码使用了Hugging Face的
TextIteratorStreamer来实现token流捕获,需额外导入相关类。
完整导入如下:
from threading import Thread from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer3.4 Flask路由与流式接口实现
配置Flask应用,注册/chat流式接口和/主页路由:
from flask import Flask, request, render_template, Response app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') # 简单HTML页面 @app.route('/chat', methods=['POST']) def chat(): user_message = request.json.get('message', '') return Response( generate_stream_response(user_message), mimetype='text/event-stream' # 使用SSE协议 )3.5 前端页面实现(支持流式渲染)
创建templates/index.html,使用JavaScript监听SSE流并动态更新DOM:
<!DOCTYPE html> <html> <head> <title>Qwen1.5-0.5B-Chat 轻量对话</title> <style> #chat-box { width: 80%; height: 400px; border: 1px solid #ccc; margin: 20px auto; padding: 10px; overflow-y: auto; font-family: Arial, sans-serif; } #input-area { display: flex; width: 80%; margin: 0 auto; } #message-input { flex: 1; padding: 10px; font-size: 16px; } button { padding: 10px 20px; font-size: 16px; } </style> </head> <body> <h1 style="text-align:center;">🧠 Qwen1.5-0.5B-Chat 对话系统</h1> <div id="chat-box"></div> <div id="input-area"> <input type="text" id="message-input" placeholder="请输入您的问题..." /> <button onclick="sendMessage()">发送</button> </div> <script> const chatBox = document.getElementById('chat-box'); let eventSource = null; function sendMessage() { const input = document.getElementById('message-input'); const message = input.value.trim(); if (!message) return; // 显示用户消息 chatBox.innerHTML += `<p><strong>你:</strong>${message}</p>`; // 发起POST请求并开启SSE流 fetch('/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message }) }).then(() => { // 不等待响应体,直接建立SSE连接 if (eventSource) eventSource.close(); eventSource = new EventSource('/chat'); let reply = ''; eventSource.onmessage = function(event) { if (event.data === '[DONE]') { eventSource.close(); chatBox.scrollTop = chatBox.scrollHeight; return; } reply += event.data; chatBox.innerHTML = chatBox.innerHTML.replace(/<p><strong>AI:<\/strong>(.*?)<\/p>/, `<p><strong>AI:</strong>${reply}</p>`) || `<p><strong>AI:</strong>${reply}</p>`; chatBox.scrollTop = chatBox.scrollHeight; }; }); input.value = ''; } </script> </body> </html>3.6 完整启动脚本
整合所有组件,编写主程序入口:
# app.py from flask import Flask, request, render_template, Response from threading import Thread from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer import torch app = Flask(__name__) tokenizer = None model = None chat_pipeline = None def load_model(): global tokenizer, model if model is not None: return model_dir = 'qwen/Qwen1.5-0.5B-Chat' tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( model_dir, trust_remote_code=True, torch_dtype=torch.float32, device_map='cpu' ) def generate_stream_response(prompt): load_model() inputs = tokenizer(prompt, return_tensors="pt").to('cpu') streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generation_kwargs = { "input_ids": inputs["input_ids"], "max_new_tokens": 512, "temperature": 0.7, "do_sample": True, "streamer": streamer, } thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() for text in streamer: yield f"data: {text}\n\n" yield "data: [DONE]\n\n" @app.route('/') def index(): return render_template('index.html') @app.route('/chat', methods=['POST']) def chat(): user_message = request.json.get('message', '') return Response(generate_stream_response(user_message), mimetype='text/event-stream') if __name__ == '__main__': app.run(host='0.0.0.0', port=8080, threaded=True)4. 性能优化与实践建议
4.1 CPU推理加速技巧
尽管Qwen1.5-0.5B-Chat可在CPU运行,但仍可通过以下方式提升响应速度:
- 降低精度:使用
torch.float16或bfloat16(若支持) - 启用ONNX Runtime:将模型导出为ONNX格式,利用ORT优化推理
- 减少
max_new_tokens:限制生成长度防止过长输出阻塞流
示例(半精度加载):
model = AutoModelForCausalLM.from_pretrained( 'qwen/Qwen1.5-0.5B-Chat', torch_dtype=torch.float16, device_map='cpu' )注意:CPU对FP16支持有限,某些情况下反而变慢,需实测验证。
4.2 内存控制策略
对于低内存环境(如2GB以内),建议:
- 设置
low_cpu_mem_usage=True - 启用
offload_folder临时卸载参数 - 避免并发请求
model = AutoModelForCausalLM.from_pretrained( 'qwen/Qwen1.5-0.5B-Chat', low_cpu_mem_usage=True, offload_folder="./offload", device_map='cpu' )4.3 并发与线程安全注意事项
当前实现中,模型共享于全局变量,多用户同时访问可能导致冲突。生产环境中应考虑:
- 使用队列机制串行处理请求
- 或为每个请求分配独立上下文(成本较高)
简单防并发方案:
import threading lock = threading.Lock() @app.route('/chat', methods=['POST']) def chat(): with lock: user_message = request.json.get('message', '') return Response(generate_stream_response(user_message), mimetype='text/event-stream')5. 总结
5. 总结
本文详细介绍了如何基于Qwen1.5-0.5B-Chat模型和Flask框架构建一个支持流式输出的轻量级对话服务。主要内容包括:
- 利用ModelScope生态快速加载官方开源模型,保障模型来源可靠性;
- 通过
TextIteratorStreamer与多线程技术实现token级流式生成; - 使用SSE(Server-Sent Events)协议在前端实现“打字机”式逐字输出;
- 提供完整的前后端代码结构,支持本地一键部署;
- 给出了CPU环境下的性能优化与内存控制建议。
该项目充分体现了小模型在边缘侧部署的价值:无需GPU、内存占用低、响应及时,适合嵌入式设备、私有化部署、教育演示等多种场景。
未来可扩展方向包括:
- 添加对话历史记忆功能
- 支持语音输入/输出
- 集成RAG实现知识增强问答
- 封装为Docker镜像便于分发
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。