graph TD
A[硬件配置] --> B[GPU配置]
A --> C[内存要求]
A --> D[存储空间]
B --> B1[高端: RTX 4090/A100]
B --> B2[中端: RTX 3080/3090]
B --> B3[入门: RTX 3060 12GB]
C --> C1[32GB+ 推荐]
C --> C2[16GB 最低要求]
D --> D1[SSD 1TB+]
D --> D2[模型存储 100GB+]

python
# 环境检查脚本
import torch
import psutil
import GPUtil
import platform
def check_environment():
print("=" * 50)
print("环境检查报告")
print("=" * 50)
# 系统信息
print(f"操作系统: {platform.system()} {platform.release()}")
print(f"Python版本: {platform.python_version()}")
# CPU信息
print(f"CPU核心数: {psutil.cpu_count()}")
print(f"内存总量: {psutil.virtual_memory().total / (1024**3):.2f} GB")
# GPU信息
if torch.cuda.is_available():
print(f"CUDA可用: 是")
print(f"CUDA版本: {torch.version.cuda}")
print(f"GPU数量: {torch.cuda.device_count()}")
for i in range(torch.cuda.device_count()):
gpu = GPUtil.getGPUs()[i]
print(f"GPU {i}: {gpu.name}, 显存: {gpu.memoryTotal}MB")
else:
print("CUDA可用: 否")
# PyTorch信息
print(f"PyTorch版本: {torch.__version__}")
if __name__ == "__main__":
check_environment()
| 模型名称 | 参数量 | 最小显存 | 推荐显存 | 特点 |
|---|---|---|---|---|
| Llama-2-7B | 70亿 | 16GB | 24GB | 质量好,许可证友好 |
| ChatGLM3-6B | 60亿 | 12GB | 16GB | 中英双语,推理能力强 |
| Qwen-7B | 70亿 | 16GB | 24GB | 中文优化,多轮对话 |
| Baichuan2-7B | 70亿 | 16GB | 24GB | 中文能力强,商业化友好 |
python
import os
import requests
import huggingface_hub
from huggingface_hub import snapshot_download
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
class ModelDownloader:
def __init__(self, cache_dir="./models"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def download_model(self, model_name, local_dir=None):
"""下载HuggingFace模型"""
if local_dir is None:
local_dir = os.path.join(self.cache_dir, model_name.replace('/', '_'))
print(f"开始下载模型: {model_name}")
print(f"保存路径: {local_dir}")
try:
# 使用snapshot_download下载整个模型仓库
snapshot_download(
repo_id=model_name,
local_dir=local_dir,
local_dir_use_symlinks=False,
resume_download=True
)
print(f"模型下载完成: {local_dir}")
return local_dir
except Exception as e:
print(f"下载失败: {e}")
return None
def load_model(self, model_path, model_name):
"""加载模型和tokenizer"""
print(f"正在加载模型: {model_name}")
# 加载tokenizer
tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True
)
# 加载模型
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True,
low_cpu_mem_usage=True
)
print("模型加载完成!")
return model, tokenizer
# 使用示例
if __name__ == "__main__":
downloader = ModelDownloader()
# 下载ChatGLM3-6B模型
model_path = downloader.download_model("THUDM/chatglm3-6b")
if model_path:
model, tokenizer = downloader.load_model(model_path, "ChatGLM3-6B")
python
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from accelerate import infer_auto_device_map, init_empty_weights
class QuantizedModelLoader:
def __init__(self):
self.supported_quant_types = ['int8', 'int4', 'fp16']
def load_quantized_model(self, model_name, quant_type='int8', device_map='auto'):
"""
加载量化模型
"""
if quant_type == 'int8':
return self._load_int8_model(model_name, device_map)
elif quant_type == 'int4':
return self._load_int4_model(model_name, device_map)
else:
return self._load_fp16_model(model_name, device_map)
def _load_int8_model(self, model_name, device_map):
"""加载8位量化模型"""
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False,
)
model = AutoModel.from_pretrained(
model_name,
quantization_config=quantization_config,
device_map=device_map,
trust_remote_code=True
)
return model
def _load_int4_model(self, model_name, device_map):
"""加载4位量化模型"""
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
model = AutoModel.from_pretrained(
model_name,
quantization_config=quantization_config,
device_map=device_map,
trust_remote_code=True,
torch_dtype=torch.float16,
)
return model
def _load_fp16_model(self, model_name, device_map):
"""加载FP16模型"""
model = AutoModel.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map=device_map,
trust_remote_code=True,
low_cpu_mem_usage=True
)
return model
# 量化部署示例
def demo_quantized_models():
loader = QuantizedModelLoader()
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
print("正在加载4位量化模型...")
model_int4 = loader.load_quantized_model("THUDM/chatglm3-6b", 'int4')
# 测试推理
prompt = "你好,请介绍一下人工智能的历史"
inputs = tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = model_int4.generate(
**inputs,
max_length=500,
temperature=0.7,
do_sample=True
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"模型回复: {response}")
if __name__ == "__main__":
demo_quantized_models()
python
from flask import Flask, request, jsonify
import torch
from transformers import AutoModel, AutoTokenizer
import threading
import queue
import time
class ModelServer:
def __init__(self, model_path, model_name):
self.app = Flask(__name__)
self.model_path = model_path
self.model_name = model_name
self.request_queue = queue.Queue()
self.result_dict = {}
self.request_id = 0
self.lock = threading.Lock()
# 初始化模型
self._setup_routes()
self._initialize_model()
self._start_worker()
def _initialize_model(self):
"""初始化模型"""
print("正在加载模型...")
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_path,
trust_remote_code=True
)
self.model = AutoModel.from_pretrained(
self.model_path,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
print("模型加载完成!")
def _setup_routes(self):
"""设置路由"""
@self.app.route('/chat', methods=['POST'])
def chat():
data = request.json
message = data.get('message', '')
history = data.get('history', [])
max_length = data.get('max_length', 1000)
temperature = data.get('temperature', 0.7)
# 生成请求ID
with self.lock:
request_id = self.request_id
self.request_id += 1
# 放入处理队列
self.request_queue.put({
'request_id': request_id,
'message': message,
'history': history,
'max_length': max_length,
'temperature': temperature
})
# 等待结果
start_time = time.time()
while request_id not in self.result_dict:
if time.time() - start_time > 60: # 超时60秒
return jsonify({'error': '请求超时'}), 500
time.sleep(0.1)
result = self.result_dict.pop(request_id)
return jsonify(result)
@self.app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy', 'model': self.model_name})
def _process_requests(self):
"""处理请求的工作线程"""
while True:
try:
request_data = self.request_queue.get(timeout=1)
self._handle_request(request_data)
except queue.Empty:
continue
def _handle_request(self, request_data):
"""处理单个请求"""
try:
request_id = request_data['request_id']
message = request_data['message']
history = request_data['history']
# 构建输入
inputs = self.tokenizer.encode(message, return_tensors="pt")
# 生成回复
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=request_data['max_length'],
temperature=request_data['temperature'],
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# 存储结果
self.result_dict[request_id] = {
'response': response,
'status': 'success'
}
except Exception as e:
self.result_dict[request_data['request_id']] = {
'error': str(e),
'status': 'error'
}
def _start_worker(self):
"""启动工作线程"""
worker_thread = threading.Thread(target=self._process_requests)
worker_thread.daemon = True
worker_thread.start()
def run(self, host='0.0.0.0', port=5000):
"""启动服务器"""
print(f"启动模型服务在 {host}:{port}")
self.app.run(host=host, port=port, threaded=True)
# 使用示例
if __name__ == "__main__":
# 启动服务
server = ModelServer("./models/chatglm3-6b", "ChatGLM3-6B")
server.run()
python
import psutil
import GPUtil
import time
import threading
from prometheus_client import start_http_server, Gauge, Counter
class PerformanceMonitor:
def __init__(self, port=8000):
self.port = port
# 定义监控指标
self.gpu_usage = Gauge('gpu_usage_percent', 'GPU使用率')
self.gpu_memory = Gauge('gpu_memory_usage', 'GPU显存使用量')
self.cpu_usage = Gauge('cpu_usage_percent', 'CPU使用率')
self.memory_usage = Gauge('memory_usage_percent', '内存使用率')
self.inference_count = Counter('inference_requests_total', '推理请求总数')
self.inference_duration = Gauge('inference_duration_seconds', '推理耗时')
self._start_monitoring()
def _start_monitoring(self):
"""启动监控"""
# 启动Prometheus metrics服务器
start_http_server(self.port)
print(f"监控服务启动在端口 {self.port}")
# 启动系统监控线程
monitor_thread = threading.Thread(target=self._system_monitor)
monitor_thread.daemon = True
monitor_thread.start()
def _system_monitor(self):
"""系统监控循环"""
while True:
try:
# GPU监控
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # 假设使用第一个GPU
self.gpu_usage.set(gpu.load * 100)
self.gpu_memory.set(gpu.memoryUsed)
# CPU和内存监控
self.cpu_usage.set(psutil.cpu_percent())
self.memory_usage.set(psutil.virtual_memory().percent)
except Exception as e:
print(f"监控错误: {e}")
time.sleep(5)
def record_inference(self, duration):
"""记录推理性能"""
self.inference_count.inc()
self.inference_duration.set(duration)
# 优化后的推理类
class OptimizedInference:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.monitor = PerformanceMonitor()
def generate_response(self, prompt, **kwargs):
"""优化后的生成方法"""
start_time = time.time()
try:
# 预处理输入
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=2048
)
# 生成参数
generate_kwargs = {
'max_length': kwargs.get('max_length', 1000),
'temperature': kwargs.get('temperature', 0.7),
'do_sample': kwargs.get('do_sample', True),
'top_p': kwargs.get('top_p', 0.9),
'pad_token_id': self.tokenizer.eos_token_id
}
# 使用torch.no_grad()减少内存使用
with torch.no_grad():
outputs = self.model.generate(
**inputs,
**generate_kwargs
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# 记录性能
duration = time.time() - start_time
self.monitor.record_inference(duration)
return response
except Exception as e:
print(f"推理错误: {e}")
return None
# 使用示例
def demo_optimized_inference():
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
# 创建优化推理实例
optimizer = OptimizedInference(model, tokenizer)
# 测试推理
prompt = "请写一个关于人工智能的短故事"
response = optimizer.generate_response(prompt, max_length=500)
print(f"优化推理结果: {response}")
if __name__ == "__main__":
demo_optimized_inference()
python
import json
import sqlite3
import datetime
from typing import List, Dict, Any
import hashlib
class CustomerServiceSystem:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.setup_database()
self.load_knowledge_base()
# 定义客服场景的prompt模板
self.prompt_templates = {
"greeting": "你是专业的客服助手,请友好地问候用户并询问需要什么帮助。",
"product_query": "用户询问产品信息,请根据知识库提供准确的产品介绍。",
"complaint": "用户投诉,请耐心倾听并表示理解,然后提供解决方案。",
"technical_support": "用户需要技术支持,请提供详细的解决步骤。",
"farewell": "礼貌地结束对话,并邀请用户再次咨询。"
}
def setup_database(self):
"""初始化数据库"""
self.conn = sqlite3.connect('customer_service.db', check_same_thread=False)
cursor = self.conn.cursor()
# 创建对话记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
user_message TEXT NOT NULL,
bot_response TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
sentiment REAL,
satisfaction INTEGER
)
''')
# 创建用户反馈表
cursor.execute('''
CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
rating INTEGER,
comment TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def load_knowledge_base(self):
"""加载产品知识库"""
self.knowledge_base = {
"products": {
"premium_plan": {
"name": "高级套餐",
"price": "299元/月",
"features": ["无限次咨询", "专属客服", "优先技术支持", "定制化服务"],
"description": "适合企业用户的高级服务套餐"
},
"basic_plan": {
"name": "基础套餐",
"price": "99元/月",
"features": ["每天5次咨询", "标准客服", "基础技术支持"],
"description": "适合个人用户的基础服务套餐"
}
},
"policies": {
"refund": "7天内无条件退款",
"support": "工作时间:9:00-18:00,紧急问题24小时响应",
"privacy": "我们严格保护用户隐私,不会泄露任何个人信息"
}
}
def classify_intent(self, message: str) -> str:
"""意图分类"""
intent_prompt = f"""
分析以下用户消息的意图,从以下选项中选择:
- greeting: 问候或开始对话
- product_query: 产品咨询
- complaint: 投诉或不满
- technical_support: 技术支持
- farewell: 结束对话
用户消息: {message}
意图:
"""
# 使用模型进行意图分类
inputs = self.tokenizer(intent_prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=100,
temperature=0.1,
do_sample=False
)
intent = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
intent = intent.strip().lower()
# 提取意图关键词
if "product" in intent or "查询" in intent or "了解" in intent:
return "product_query"
elif "投诉" in intent or "不满" in intent or "问题" in intent:
return "complaint"
elif "技术" in intent or "帮助" in intent or "解决" in intent:
return "technical_support"
elif "再见" in intent or "结束" in intent or "拜拜" in intent:
return "farewell"
else:
return "greeting"
def generate_response(self, message: str, session_id: str, history: List[Dict] = None) -> Dict[str, Any]:
"""生成客服回复"""
if history is None:
history = []
# 分析用户意图
intent = self.classify_intent(message)
# 构建上下文
context = self._build_context(intent, message, history)
# 生成回复
inputs = self.tokenizer(context, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=500,
temperature=0.7,
do_sample=True,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
response = self._post_process_response(response, context)
# 保存对话记录
self._save_conversation(session_id, message, response)
return {
"response": response,
"intent": intent,
"session_id": session_id,
"timestamp": datetime.datetime.now().isoformat()
}
def _build_context(self, intent: str, message: str, history: List[Dict]) -> str:
"""构建对话上下文"""
# 基础系统提示
context = """你是一个专业的客服助手,负责回答用户问题、解决用户问题。
请保持友好、专业、耐心的态度,根据知识库信息提供准确回答。
知识库信息:
"""
# 添加相关知识
if intent == "product_query":
context += f"产品信息: {json.dumps(self.knowledge_base['products'], ensure_ascii=False)}
"
elif intent == "complaint":
context += f"政策信息: {json.dumps(self.knowledge_base['policies'], ensure_ascii=False)}
"
# 添加上下文
context += "
对话历史:
"
for turn in history[-5:]: # 最近5轮对话
context += f"用户: {turn['user']}
"
context += f"助手: {turn['assistant']}
"
# 添加当前查询和指令
context += f"
当前用户消息: {message}
"
context += f"请根据以上信息生成专业、友好的客服回复:"
return context
def _post_process_response(self, response: str, context: str) -> str:
"""后处理回复"""
# 移除可能重复的上下文
if "当前用户消息" in response:
response = response.split("当前用户消息")[0]
# 确保回复简洁专业
response = response.strip()
if len(response) > 300:
response = response[:300] + "..."
return response
def _save_conversation(self, session_id: str, user_message: str, bot_response: str):
"""保存对话记录"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO conversations (session_id, user_message, bot_response)
VALUES (?, ?, ?)
''', (session_id, user_message, bot_response))
self.conn.commit()
def add_feedback(self, session_id: str, rating: int, comment: str = ""):
"""添加用户反馈"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO feedback (session_id, rating, comment)
VALUES (?, ?, ?)
''', (session_id, rating, comment))
self.conn.commit()
def get_conversation_history(self, session_id: str) -> List[Dict]:
"""获取对话历史"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT user_message, bot_response, timestamp
FROM conversations
WHERE session_id = ?
ORDER BY timestamp
''', (session_id,))
history = []
for row in cursor.fetchall():
history.append({
'user': row[0],
'assistant': row[1],
'timestamp': row[2]
})
return history
# 使用示例
def demo_customer_service():
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
# 创建客服系统
css = CustomerServiceSystem(model, tokenizer)
# 模拟对话
session_id = "test_session_001"
test_messages = [
"你好,我想了解你们的产品",
"高级套餐有什么功能?",
"价格是多少?",
"如果我不满意可以退款吗?",
"好的,谢谢你的帮助"
]
history = []
for message in test_messages:
print(f"用户: {message}")
result = css.generate_response(message, session_id, history)
print(f"客服: {result['response']}")
print(f"检测意图: {result['intent']}")
print("-" * 50)
# 更新历史
history.append({'user': message, 'assistant': result['response']})
# 添加反馈
css.add_feedback(session_id, 5, "服务很好,回答专业")
if __name__ == "__main__":
demo_customer_service()
python
import re
import jieba
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
class ContentCreationAssistant:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.content_templates = {
"article": {
"structure": ["标题", "引言", "正文", "结论"],
"prompt": "请写一篇关于{topic}的{style}风格文章,字数约{word_count}字"
},
"social_media": {
"structure": ["吸引注意", "核心信息", "行动号召"],
"prompt": "为{platform}平台创作一条关于{product}的社交媒体文案,要求{style}"
},
"email": {
"structure": ["主题", "问候", "正文", "结束语"],
"prompt": "写一封{type}邮件,收件人是{audience},主要内容是{content}"
}
}
def generate_content(self, content_type: str, **kwargs) -> Dict[str, Any]:
"""生成内容"""
if content_type not in self.content_templates:
raise ValueError(f"不支持的内容类型: {content_type}")
template = self.content_templates[content_type]
prompt = template["prompt"].format(**kwargs)
# 添加具体指令
enhanced_prompt = f"""
你是一个专业的内容创作助手。请根据以下要求创作内容:
要求: {prompt}
内容结构: {' -> '.join(template['structure'])}
请生成高质量、专业的内容:
"""
inputs = self.tokenizer(enhanced_prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=1500,
temperature=0.8,
do_sample=True,
top_p=0.9,
repetition_penalty=1.1,
pad_token_id=self.tokenizer.eos_token_id
)
content = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
content = self._extract_generated_content(content, enhanced_prompt)
# 分析内容质量
analysis = self.analyze_content(content)
return {
"content": content,
"type": content_type,
"word_count": len(content),
"analysis": analysis
}
def _extract_generated_content(self, content: str, prompt: str) -> str:
"""提取生成的内容"""
# 移除提示部分
if prompt in content:
content = content.replace(prompt, "")
# 清理格式
content = re.sub(r'
+', '
', content)
content = content.strip()
return content
def analyze_content(self, content: str) -> Dict[str, Any]:
"""分析内容质量"""
# 基础统计
words = jieba.lcut(content)
word_count = len(words)
char_count = len(content)
sentence_count = len(re.split(r'[。!?.!?]', content))
# 词频分析
word_freq = Counter(words)
common_words = word_freq.most_common(10)
# 可读性评分(简单版)
avg_sentence_length = word_count / max(sentence_count, 1)
readability = max(0, 100 - avg_sentence_length * 2)
return {
"word_count": word_count,
"char_count": char_count,
"sentence_count": sentence_count,
"avg_sentence_length": round(avg_sentence_length, 2),
"readability_score": round(min(100, readability), 2),
"common_words": common_words
}
def generate_content_ideas(self, topic: str, count: int = 5) -> List[str]:
"""生成内容创意"""
prompt = f"""
为主题"{topic}"生成{count}个有创意的内容点子。
每个点子应该包括:
1. 标题
2. 核心观点
3. 目标受众
请按以下格式返回:
1. 标题: [标题]
核心观点: [观点]
目标受众: [受众]
"""
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=1000,
temperature=0.9,
do_sample=True,
top_p=0.95
)
ideas_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
ideas = self._parse_ideas(ideas_text)
return ideas[:count]
def _parse_ideas(self, ideas_text: str) -> List[Dict]:
"""解析创意文本"""
ideas = []
current_idea = {}
lines = ideas_text.split('
')
for line in lines:
line = line.strip()
if line.startswith('标题:'):
if current_idea:
ideas.append(current_idea)
current_idea = {'title': line.replace('标题:', '').strip()}
elif line.startswith('核心观点:'):
current_idea['concept'] = line.replace('核心观点:', '').strip()
elif line.startswith('目标受众:'):
current_idea['audience'] = line.replace('目标受众:', '').strip()
if current_idea:
ideas.append(current_idea)
return ideas
def create_word_cloud(self, content: str, save_path: str = "wordcloud.png"):
"""生成词云图"""
# 中文文本处理
text = ' '.join(jieba.cut(content))
# 创建词云
wordcloud = WordCloud(
font_path='simhei.ttf', # 中文字体
width=800,
height=400,
background_color='white',
max_words=100
).generate(text)
# 保存图片
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('内容词云分析')
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
return save_path
# 使用示例
def demo_content_creation():
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
# 创建内容创作助手
assistant = ContentCreationAssistant(model, tokenizer)
# 生成文章
print("生成技术文章...")
article = assistant.generate_content(
"article",
topic="人工智能在教育领域的应用",
,
word_count="800"
)
print(f"文章内容:
{article['content']}")
print(f"
内容分析: {article['analysis']}")
# 生成创意点子
print("
生成内容创意...")
ideas = assistant.generate_content_ideas("可持续发展", 3)
for i, idea in enumerate(ideas, 1):
print(f"{i}. {idea}")
# 生成词云
wordcloud_path = assistant.create_word_cloud(article['content'])
print(f"
词云图已保存: {wordcloud_path}")
if __name__ == "__main__":
demo_content_creation()
python
import ast
import inspect
import time
from typing import List, Dict, Any
import subprocess
import sys
class CodeAssistant:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.supported_languages = ['python', 'javascript', 'java', 'cpp']
def generate_code(self, requirement: str, language: str = 'python',
style: str = 'clean') -> Dict[str, Any]:
"""根据需求生成代码"""
if language not in self.supported_languages:
raise ValueError(f"不支持的语言: {language}")
prompt = self._build_code_prompt(requirement, language, style)
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=1000,
temperature=0.3, # 较低温度以获得更确定的代码
do_sample=True,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id
)
code_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
code = self._extract_code(code_text, prompt)
# 代码分析
analysis = self.analyze_code(code, language)
return {
"code": code,
"language": language,
"analysis": analysis,
"requirement": requirement
}
def _build_code_prompt(self, requirement: str, language: str, style: str) -> str:
"""构建代码生成提示"""
style_guidelines = {
'clean': '代码应该简洁、可读性强,有适当的注释',
'efficient': '代码应该注重性能和效率',
'secure': '代码应该注重安全性,避免常见漏洞',
'production': '代码应该适合生产环境,包含错误处理'
}
prompt = f"""
你是一个专业的{language}开发工程师。请根据以下需求生成代码:
需求: {requirement}
要求:
- 语言: {language}
- 风格: {style_guidelines.get(style, style)}
- 包含必要的注释
- 代码要完整可运行
请只返回代码部分,不要包含其他解释:
"""
return prompt
def _extract_code(self, code_text: str, prompt: str) -> str:
"""提取生成的代码"""
# 移除提示部分
if prompt in code_text:
code_text = code_text.replace(prompt, "")
# 尝试提取代码块
code_blocks = re.findall(r'```(?:w+)?
(.*?)
```', code_text, re.DOTALL)
if code_blocks:
return code_blocks[0]
return code_text.strip()
def analyze_code(self, code: str, language: str) -> Dict[str, Any]:
"""分析代码质量"""
analysis = {
"syntax_valid": False,
"complexity": "unknown",
"lines": len(code.split('
')),
"issues": []
}
if language == 'python':
return self._analyze_python_code(code)
return analysis
def _analyze_python_code(self, code: str) -> Dict[str, Any]:
"""分析Python代码"""
analysis = {
"syntax_valid": False,
"complexity": "low",
"lines": len(code.split('
')),
"functions": 0,
"issues": [],
"suggestions": []
}
try:
# 语法检查
ast.parse(code)
analysis["syntax_valid"] = True
# 简单复杂度分析
tree = ast.parse(code)
analysis["functions"] = len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)])
# 检查常见问题
if "input(" in code and "eval(" in code:
analysis["issues"].append("使用eval处理用户输入可能存在安全风险")
if "except:" in code or "except Exception:" in code:
analysis["issues"].append("过于宽泛的异常捕获")
# 复杂度评估
if analysis["lines"] > 50:
analysis["complexity"] = "high"
elif analysis["lines"] > 20:
analysis["complexity"] = "medium"
else:
analysis["complexity"] = "low"
except SyntaxError as e:
analysis["issues"].append(f"语法错误: {e}")
return analysis
def optimize_code(self, code: str, language: str,
optimization_goal: str = 'performance') -> Dict[str, Any]:
"""代码优化"""
prompt = f"""
请优化以下{language}代码,优化目标: {optimization_goal}
原始代码:
```{language}
{code}
```
请提供:
1. 优化后的代码
2. 优化说明
3. 性能改进估计
优化后的代码:
"""
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=1500,
temperature=0.3,
do_sample=True
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# 解析响应
optimized_code = self._extract_code(response, prompt)
explanation = self._extract_explanation(response)
return {
"original_code": code,
"optimized_code": optimized_code,
"explanation": explanation,
"optimization_goal": optimization_goal
}
def _extract_explanation(self, response: str) -> str:
"""提取优化说明"""
# 简单提取说明部分
lines = response.split('
')
explanation_lines = []
in_explanation = False
for line in lines:
if any(keyword in line.lower() for keyword in ['优化说明', '说明', 'explanation']):
in_explanation = True
continue
if in_explanation and line.strip() and not line.startswith('```'):
explanation_lines.append(line)
elif line.startswith('```'):
break
return '
'.join(explanation_lines)
def test_code_generation(self):
"""测试代码生成功能"""
test_requirements = [
"写一个Python函数,计算斐波那契数列的第n项",
"创建一个JavaScript函数,验证电子邮件格式",
"写一个Python类,表示二叉树并实现遍历方法"
]
for req in test_requirements:
print(f"需求: {req}")
result = self.generate_code(req, 'python', 'clean')
print(f"生成代码:
{result['code']}")
print(f"代码分析: {result['analysis']}")
print("-" * 50)
# 使用示例
def demo_code_assistant():
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
# 创建代码助手
assistant = CodeAssistant(model, tokenizer)
# 测试代码生成
print("测试代码生成...")
assistant.test_code_generation()
# 优化示例代码
sample_code = """
def calculate_sum(numbers):
result = 0
for i in range(len(numbers)):
result = result + numbers[i]
return result
"""
print("代码优化示例...")
optimization = assistant.optimize_code(sample_code, 'python', 'performance')
print(f"原始代码:
{optimization['original_code']}")
print(f"优化后代码:
{optimization['optimized_code']}")
print(f"优化说明:
{optimization['explanation']}")
if __name__ == "__main__":
demo_code_assistant()
python
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import faiss
import pickle
import os
class RAGSystem:
def __init__(self, model, tokenizer, knowledge_base_path=None):
self.model = model
self.tokenizer = tokenizer
self.vectorizer = TfidfVectorizer(max_features=1000)
self.index = None
self.documents = []
self._initialize_rag()
if knowledge_base_path and os.path.exists(knowledge_base_path):
self.load_knowledge_base(knowledge_base_path)
def _initialize_rag(self):
"""初始化RAG系统"""
self.embeddings_cache = {}
def add_documents(self, documents: List[str]):
"""添加文档到知识库"""
self.documents.extend(documents)
self._build_index()
def _build_index(self):
"""构建文档索引"""
if not self.documents:
return
# 使用TF-IDF向量化
tfidf_matrix = self.vectorizer.fit_transform(self.documents)
# 转换为密集矩阵
dense_matrix = tfidf_matrix.toarray().astype('float32')
# 创建FAISS索引
dimension = dense_matrix.shape[1]
self.index = faiss.IndexFlatIP(dimension) # 内积相似度
# 归一化向量
faiss.normalize_L2(dense_matrix)
self.index.add(dense_matrix)
def search(self, query: str, top_k: int = 3) -> List[Dict]:
"""搜索相关文档"""
if self.index is None or len(self.documents) == 0:
return []
# 向量化查询
query_vec = self.vectorizer.transform([query]).toarray().astype('float32')
faiss.normalize_L2(query_vec)
# 搜索
scores, indices = self.index.search(query_vec, min(top_k, len(self.documents)))
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.documents):
results.append({
'document': self.documents[idx],
'score': float(score),
'index': idx
})
return results
def generate_with_rag(self, query: str, top_k: int = 3) -> Dict[str, Any]:
"""使用RAG生成回答"""
# 检索相关文档
relevant_docs = self.search(query, top_k)
# 构建增强的prompt
context = "相关背景信息:
"
for i, doc in enumerate(relevant_docs):
context += f"{i+1}. {doc['document']}
"
prompt = f"""
基于以下背景信息回答用户问题:
{context}
用户问题: {query}
请根据提供的背景信息给出准确、专业的回答。如果信息不足,请明确说明。
回答:
"""
# 生成回答
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=800,
temperature=0.7,
do_sample=True,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
response = self._clean_response(response, prompt)
return {
"response": response,
"relevant_documents": relevant_docs,
"query": query
}
def _clean_response(self, response: str, prompt: str) -> str:
"""清理生成的响应"""
if prompt in response:
response = response.replace(prompt, "")
return response.strip()
def save_knowledge_base(self, filepath: str):
"""保存知识库"""
data = {
'documents': self.documents,
'vectorizer': self.vectorizer,
'index': self.index
}
with open(filepath, 'wb') as f:
pickle.dump(data, f)
def load_knowledge_base(self, filepath: str):
"""加载知识库"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.vectorizer = data['vectorizer']
self.index = data['index']
# 使用示例
def demo_rag_system():
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
).eval()
# 创建RAG系统
rag = RAGSystem(model, tokenizer)
# 添加知识文档
documents = [
"机器学习是人工智能的一个分支,专注于让计算机通过经验自动改进。",
"深度学习使用多层神经网络来学习数据的层次化表示。",
"Transformer架构是当前自然语言处理中最成功的模型架构。",
"BERT模型通过双向编码器表示实现了在多个NLP任务上的突破。",
"GPT系列模型使用自回归生成方式,在文本生成任务上表现出色。"
]
rag.add_documents(documents)
# 测试RAG查询
queries = [
"什么是机器学习?",
"Transformer架构有什么特点?",
"深度学习和机器学习有什么关系?"
]
for query in queries:
print(f"查询: {query}")
result = rag.generate_with_rag(query)
print(f"回答: {result['response']}")
print(f"相关文档: {len(result['relevant_documents'])}个")
for doc in result['relevant_documents']:
print(f" - 相似度 {doc['score']:.3f}: {doc['document'][:50]}...")
print("-" * 50)
if __name__ == "__main__":
demo_rag_system()
python
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model, TaskType
import datasets
from typing import Dict, List
class FineTuningDataset(Dataset):
def __init__(self, tokenizer, texts, max_length=512):
self.tokenizer = tokenizer
self.texts = texts
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
# 令牌化
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': encoding['input_ids'].flatten()
}
class ModelFineTuner:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.setup_lora()
def setup_lora(self):
"""设置LoRA配置"""
self.lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False,
r=8,
lora_alpha=32,
lora_dropout=0.1,
target_modules=["query_key_value"] # ChatGLM的目标模块
)
def prepare_model_for_training(self):
"""准备训练模型"""
# 应用LoRA
self.model = get_peft_model(self.model, self.lora_config)
self.model.print_trainable_parameters()
return self.model
def fine_tune(self, train_texts: List[str], val_texts: List[str] = None,
output_dir: str = "./finetuned_model", **training_kwargs):
"""微调模型"""
# 准备数据集
train_dataset = FineTuningDataset(self.tokenizer, train_texts)
val_dataset = FineTuningDataset(self.tokenizer, val_texts) if val_texts else None
# 训练参数
training_args = TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=True,
num_train_epochs=training_kwargs.get('num_train_epochs', 3),
per_device_train_batch_size=training_kwargs.get('batch_size', 2),
per_device_eval_batch_size=2,
warmup_steps=500,
logging_steps=100,
save_steps=1000,
evaluation_strategy="steps" if val_texts else "no",
eval_steps=500,
learning_rate=training_kwargs.get('learning_rate', 5e-5),
fp16=torch.cuda.is_available(),
dataloader_pin_memory=False,
)
# 数据收集器
data_collator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm=False,
)
# 训练器
trainer = Trainer(
model=self.model,
args=training_args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=val_dataset,
)
# 开始训练
print("开始微调...")
trainer.train()
# 保存模型
trainer.save_model()
self.tokenizer.save_pretrained(output_dir)
print(f"微调完成,模型保存在: {output_dir}")
return trainer
def generate_finetuned_text(self, prompt: str, **generate_kwargs) -> str:
"""使用微调后的模型生成文本"""
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=generate_kwargs.get('max_length', 200),
temperature=generate_kwargs.get('temperature', 0.7),
do_sample=True,
top_p=generate_kwargs.get('top_p', 0.9),
pad_token_id=self.tokenizer.eos_token_id
)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# 使用示例
def demo_fine_tuning():
# 加载基础模型
tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True)
model = AutoModel.from_pretrained(
"THUDM/chatglm3-6b",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
# 创建微调器
fine_tuner = ModelFineTuner(model, tokenizer)
# 准备训练数据(示例)
train_texts = [
"问题: 什么是人工智能?回答: 人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的机器。",
"问题: 机器学习有哪些类型?回答: 机器学习主要分为监督学习、无监督学习和强化学习三种类型。",
"问题: 深度学习是什么?回答: 深度学习是机器学习的一个子领域,使用多层神经网络来学习和表示数据。",
] * 100 # 重复数据以模拟训练集
# 微调模型
print("开始模型微调...")
fine_tuner.prepare_model_for_training()
fine_tuner.fine_tune(
train_texts,
output_dir="./chatglm3-6b-finetuned",
num_train_epochs=2,
batch_size=2,
learning_rate=1e-4
)
# 测试微调后的模型
test_prompts = [
"问题: 什么是神经网络?回答:",
"问题: 解释一下监督学习?回答:"
]
for prompt in test_prompts:
response = fine_tuner.generate_finetuned_text(prompt)
print(f"提示: {prompt}")
print(f"回答: {response}")
print("-" * 50)
if __name__ == "__main__":
# 注意:实际运行需要较长时间和大量资源
# demo_fine_tuning()
print("微调示例代码已准备就绪")
本指南详细介绍了大模型的本地部署流程和多个实战应用案例,涵盖了从环境准备到高级优化的完整流程。主要内容包括:
环境配置:硬件要求、依赖安装、模型下载
量化部署:4bit/8bit量化技术,大幅降低显存需求
Web服务:Flask API服务,支持并发处理
性能监控:实时监控系统资源使用情况
智能客服:意图识别、知识库集成、对话管理
内容创作:多种内容类型生成、质量分析、创意激发
代码助手:代码生成、优化、分析和测试
RAG系统:知识检索、增强生成、文档管理
模型微调:LoRA高效微调、自定义训练
使用4bit量化在消费级GPU上运行70亿参数模型
实现基于FAISS的高效向量检索
应用LoRA进行参数高效微调
构建完整的Web服务和监控体系