AI笔记09:Function Calling与协作
来源:     阅读:3
易浩激活码
发布于 2025-11-13 22:12
查看主页

一、Function calling在大模型的作用

二、天气调用(Function Calling)

1. 构造对话消息

2. 调用Qwen3大模型

3. 处理模型返回结果

Python实现代码:

import requests
from http import HTTPStatus
import dashscope
import os

# 设置 DashScope API Key
dashscope.api_key = os.getenv('DASHSCOPE_API_KEY', '')  # 从环境变量获取 API Key

# 高德天气 API 的 天气工具定义(JSON 格式)
weather_tool = {
    "type": "function",
    "function": {
        "name": "get_current_weather",
        "description": "Get the current weather in a given location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "The city name, e.g. 北京",
                },
                "adcode": {
                    "type": "string",
                    "description": "The city code, e.g. 110000 (北京)",
                }
            },
            "required": ["location"],
        },
    },
}

def get_weather_from_gaode(location: str, adcode: str = None):
    """调用高德地图API查询天气"""
    gaode_api_key = "08744255ea79dae883b***c03b416abf"  # 替换成你的高德API Key
    base_url = "https://restapi.amap.com/v3/weather/weatherInfo"
    
    params = {
        "key": gaode_api_key,
        "city": adcode if adcode else location,
        "extensions": "base",  # 可改为 "all" 获取预报
    }
    
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"Failed to fetch weather: {response.status_code}"}

def run_weather_query():
    """使用 Qwen3 + 查询天气,并让大模型输出最终结果"""
    messages = [
        {"role": "system", "content": "你是一个智能助手,可以查询天气信息。"},
        {"role": "user", "content": "北京目前天气怎么样?"}
    ]
    
    print("第一次调用大模型...")
    response = dashscope.Generation.call(
        model="qwen-turbo",  # 可使用 Qwen3 最新版本
        messages=messages,
        tools=[weather_tool],  # 传入工具定义
        tool_choice="auto",  # 让模型决定是否调用工具
    )
    
    if response.status_code == HTTPStatus.OK:
        tool_map = {
            "get_current_weather": get_weather_from_gaode,
            # 如有更多工具,在此添加
        }
        
        # 从响应中获撤销息
        assistant_message = response.output.choices[0].message
        
        # 检查是否需要调用工具
        if hasattr(assistant_message, "tool_calls") and assistant_message.tool_calls:
            print("检测到工具调用...")
            
            # 转换 assistant 消息为标准字典格式
            assistant_dict = {
                "role": "assistant",
                "content": assistant_message.content if hasattr(assistant_message, "content") else None
            }
            
            # 添加 tool_calls 到 assistant 消息
            if hasattr(assistant_message, "tool_calls"):
                assistant_dict["tool_calls"] = assistant_message.tool_calls
                
                # 生成工具调用回复消息
                tool_response_messages = []
                import json
                for tool_call in assistant_message.tool_calls:
                    print(f"处理工具调用: {tool_call['function']['name']}, ID: {tool_call['id']}")
                    
                    func_name = tool_call["function"]["name"]
                    func_args = json.loads(tool_call["function"]["arguments"])
                    
                    if func_name in tool_map:
                        # 调用工具函数
                        from inspect import signature
                        sig = signature(tool_map[func_name])
                        valid_args = {k: v for k, v in func_args.items() if k in sig.parameters}
                        result = tool_map[func_name](**valid_args)
                        
                        # 创建工具回复消息
                        tool_response = {
                            "role": "tool",
                            "tool_call_id": tool_call["id"],
                            "name": func_name,
                            "content": json.dumps(result, ensure_ascii=False)
                        }
                        tool_response_messages.append(tool_response)
                
                # 组装完整消息列表
                updated_messages = messages + [assistant_dict] + tool_response_messages
                
                print(f"完整消息列表: {updated_messages}")
                
                # 第二次调用大模型
                print("第二次调用大模型...")
                response2 = dashscope.Generation.call(
                    model="qwen-turbo",
                    messages=updated_messages,
                    tools=[weather_tool],
                    tool_choice="auto",
                )
                
                if response2.status_code == HTTPStatus.OK:
                    final_response = response2.output.choices[0].message.content
                    print("最终回复:", final_response)
                else:
                    print(f"请求失败: {response2.code} - {response2.message}")
            else:
                print("assistant 消息中没有 tool_calls 字段")
                print(assistant_message)
        else:
            # 如果没有调用工具,直接输出模型回复
            print("无工具调用,直接输出回复:", assistant_message.content)
    else:
        print(f"请求失败: {response.code} - {response.message}")

if __name__ == "__main__":
    run_weather_query()

三、门票助手(Function Calling)

Step1. 系统初始化

Step2. 助手实例化

Step3. 设置交互模式

Step4. Function Call 机制

Python实现代码:

import os
import asyncio
from typing import Optional
import dashscope
from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI
import pandas as pd
from sqlalchemy import create_engine
from qwen_agent.tools.base import BaseTool, register_tool
import matplotlib.pyplot as plt
import io
import base64
import time

# 定义资源文件根目录
ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), 'resource')

# 配置 DashScope
dashscope.api_key = os.getenv('DASHSCOPE_API_KEY', '')  # 从环境变量获取 API Key
dashscope.timeout = 30  # 设置超时时间为 30 秒

# ====== 门票助手 system prompt 和函数描述 ======
system_prompt = """我是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询
-- 门票订单表
CREATE TABLE tkt_orders (
    order_time DATETIME,             -- 订单日期
    account_id INT,                  -- 预定用户ID
    gov_id VARCHAR(18),              -- 商品使用人ID(身份证号)
    gender VARCHAR(10),              -- 使用人性别
    age INT,                         -- 年龄
    province VARCHAR(30),           -- 使用人省份
    SKU VARCHAR(100),                -- 商品SKU名
    product_serial_no VARCHAR(30),  -- 商品ID
    eco_main_order_id VARCHAR(20),  -- 订单ID
    sales_channel VARCHAR(20),      -- 销售渠道
    status VARCHAR(30),             -- 商品状态
    order_value DECIMAL(10,2),       -- 订单金额
    quantity INT                     -- 商品数量
);
一日门票,对应多种SKU:
Universal Studios Beijing One-Day Dated Ticket-Standard
Universal Studios Beijing One-Day Dated Ticket-Child
Universal Studios Beijing One-Day Dated Ticket-Senior
二日门票,对应多种SKU:
USB 1.5-Day Dated Ticket Standard
USB 1.5-Day Dated Ticket Discounted
一日门票、二日门票查询
SUM(CASE WHEN SKU LIKE 'Universal Studios Beijing One-Day%' THEN quantity ELSE 0 END) AS one_day_ticket_sales,
SUM(CASE WHEN SKU LIKE 'USB%' THEN quantity ELSE 0 END) AS two_day_ticket_sales
我将回答用户关于门票相关的问题

每当 exc_sql 工具返回 markdown 表格和图片时,你必须原样输出工具返回的全部内容(包括图片 markdown),不要只总结表格,也不要省略图片。这样用户才能直接看到表格和图片。
"""

functions_desc = [
    {
        "name": "exc_sql",
        "description": "对于生成的SQL,进行SQL查询",
        "parameters": {
            "type": "object",
            "properties": {
                "sql_input": {
                    "type": "string",
                    "description": "生成的SQL语句",
                }
            },
            "required": ["sql_input"],
        },
    },
]

# ====== 会话隔离 DataFrame 存储 ======
# 用于存储每个会话的 DataFrame,避免多用户数据串扰
_last_df_dict = {}

def get_session_id(kwargs):
    """根据 kwargs 获取当前会话的唯一 session_id,这里用 messages 的 id"""
    messages = kwargs.get('messages')
    if messages is not None:
        return id(messages)
    return None

# ====== exc_sql 工具类实现 ======
@register_tool('exc_sql')
class ExcSQLTool(BaseTool):
    """
    SQL查询工具,执行传入的SQL语句并返回结果,并自动进行可视化。
    """
    description = '对于生成的SQL,进行SQL查询,并自动可视化'
    parameters = [{
        'name': 'sql_input',
        'type': 'string',
        'description': '生成的SQL语句',
        'required': True
    }]

    def call(self, params: str, **kwargs) -> str:
        import json
        import matplotlib.pyplot as plt
        import io, os, time
        args = json.loads(params)
        sql_input = args['sql_input']
        database = args.get('database', 'ubr')
        engine = create_engine(
            f'mysql+mysqlconnector://st*********:********21@rm-uf6z891lon6dxuqblqo.mysql.rds.aliyuncs.com:3306/{database}?charset=utf8mb4',# 用自己的阿里云数据库账号
            connect_args={'connect_timeout': 10}, pool_size=10, max_overflow=20
        )
        try:
            df = pd.read_sql(sql_input, engine)
            md = df.to_markdown(index=False)
            # 自动推断x/y字段
            x_candidates = df.select_dtypes(include=['object']).columns.tolist()
            if not x_candidates:
                x_candidates = df.columns.tolist()
            x = x_candidates[0]
            y_candidates = df.select_dtypes(include=['number']).columns.tolist()
            y_fields = y_candidates
            # 绘制柱状图
            plt.figure(figsize=(8, 5))
            bar_width = 0.35 if len(y_fields) > 1 else 0.6
            x_labels = df[x].astype(str)
            x_pos = range(len(df))
            for idx, y_col in enumerate(y_fields):
                plt.bar([p + idx*bar_width for p in x_pos], df[y_col], width=bar_width, label=y_col)
            plt.xlabel(x)
            plt.ylabel(','.join(y_fields))
            plt.title(f"{' & '.join(y_fields)} by {x}")
            plt.xticks([p + bar_width*(len(y_fields)-1)/2 for p in x_pos], x_labels, rotation=45, ha='right')
            plt.legend()
            plt.tight_layout()
            # 自动创建目录
            save_dir = os.path.join(os.path.dirname(__file__), 'image_show')
            os.makedirs(save_dir, exist_ok=True)
            # 生成唯一文件名
            filename = f'bar_{int(time.time()*1000)}.png'
            save_path = os.path.join(save_dir, filename)
            plt.savefig(save_path)
            plt.close()
            img_path = os.path.join('image_show', filename)
            img_md = f'![柱状图]({img_path})'
            return f"{md}

{img_md}"
        except Exception as e:
            return f"SQL执行或可视化出错: {str(e)}"

# ====== 初始化门票助手服务 ======
def init_agent_service():
    """初始化门票助手服务"""
    llm_cfg = {
        'model': 'qwen-turbo-2025-04-28',
        'timeout': 30,
        'retry_count': 3,
    }
    try:
        bot = Assistant(
            llm=llm_cfg,
            name='门票助手',
            description='门票查询与订单分析',
            system_message=system_prompt,
            function_list=['exc_sql'],  # 移除绘图工具
        )
        print("助手初始化成功!")
        return bot
    except Exception as e:
        print(f"助手初始化失败: {str(e)}")
        raise

def app_tui():
    """终端交互模式
    
    提供命令行交互界面,支持:
    - 连续对话
    - 文件输入
    - 实时响应
    """
    try:
        # 初始化助手
        bot = init_agent_service()

        # 对话历史
        messages = []
        while True:
            try:
                # 获取用户输入
                query = input('user question: ')
                # 获取可选的文件输入
                file = input('file url (press enter if no file): ').strip()
                
                # 输入验证
                if not query:
                    print('user question cannot be empty!')
                    continue
                    
                # 构建消息
                if not file:
                    messages.append({'role': 'user', 'content': query})
                else:
                    messages.append({'role': 'user', 'content': [{'text': query}, {'file': file}]})

                print("正在处理您的请求...")
                # 运行助手并处理响应
                response = []
                for response in bot.run(messages):
                    print('bot response:', response)
                messages.extend(response)
            except Exception as e:
                print(f"处理请求时出错: {str(e)}")
                print("请重试或输入新的问题")
    except Exception as e:
        print(f"启动终端模式失败: {str(e)}")


def app_gui():
    """图形界面模式,提供 Web 图形界面"""
    try:
        print("正在启动 Web 界面...")
        # 初始化助手
        bot = init_agent_service()
        # 配置聊天界面,列举3个典型门票查询问题
        chatbot_config = {
            'prompt.suggestions': [
                '2023年4、5、6月一日门票,二日门票的销量多少?帮我按照周进行统计',
                '2023年7月的不同省份的入园人数统计',
                '帮我查看2023年10月1-7日销售渠道订单金额排名',
            ]
        }
        print("Web 界面准备就绪,正在启动服务...")
        # 启动 Web 界面
        WebUI(
            bot,
            chatbot_config=chatbot_config
        ).run()
    except Exception as e:
        print(f"启动 Web 界面失败: {str(e)}")
        print("请检查网络连接和 API Key 配置")


if __name__ == '__main__':
    # 运行模式选择
    app_gui()          # 图形界面模式(默认)
免责声明:本文为用户发表,不代表网站立场,仅供参考,不构成引导等用途。 系统环境
相关推荐
首页
搜索
订单
购物车
我的