天气服务最后跑通了:
我把高德的地理编码和天气接口串在一起,挂到一个叫 FastMCP 的小服务上,用 httpx 的异步客户端去并发请求。实际效果就是:你丢一堆城市名过来,服务几乎同时把请求发出去,慢的等慢的,快的先回。举个直观的场景:三次查询同时发起,网络分别是 5s、2s、1s,整个操作大致耗时就是那最慢的 5s,而不是把 5+2+1 全都加起来。关键就在于事件循环把协程挂起和恢复的机制——每个 query_weather 执行到 await client.get(...) 的时候会暂停,事件循环马上去跑别的协程,哪个请求先回哪个就先继续往下走。
先说点容易被误解的:别把 await 当成“马上返回”的东西。await 是让当前协程停下来,等被等待的异步操作完成再继续。如果你把 await 删掉,client.get(...) 会直接返回一个协程对象,不是响应,后面去访问 resp.text 那类字段就会出错。在普通函数里写 await 更是语法错误,Python 要求 await 必须在 async def 里。那些坑,我也踩过,踩得印象深刻。
代码结构上按套路来会比较稳。外层用 async def query_weather(address),里头用 async with httpx.AsyncClient() 打开异步客户端,再 await client.get(...) 发请求。别尝试在同步函数里用 async with,会报错;也别在异步函数里换成同步的 httpx.Client,那样会把整个协程堵住,失去并发优势。实际运行时,这两层异步配合事件循环,才能把多个请求并行化处理。
在框架层面,FastMCP 支持把函数用 @app.tool 装饰,这玩意儿会在框架内部把异步函数 await 掉。换句话说,外部调用工具的时候不用你再去显式 await。框架底层就是把工具名分发到对应协程,await 那个协程拿回结果。如果没有装饰器,直接调用这个 async 函数就必须在 async 环境里用 await,否则只会得到一个 coroutine 对象,需要 asyncio.run 或在别的 async 函数里 await。
下面贴出我实际跑通的核心代码,和我本地测试一致。注意把高德的 KEY 放在环境变量里,别写到代码里去:
from mcp.server import FastMCP
from mcp.types import TextContent
import httpx, json, os
app = FastMCP("weather-server")
HOST = "https://restapi.amap.com"
KEY = os.getenv("AMAP_KEY")
@app.tool(name="query_weather", description="查询指定地区的天气情况")
async def query_weather(address) -> list[TextContent]:
async with httpx.AsyncClient() as client:
try:
# 地理编码,拿到行政区划编码(adcode)
resp = await client.get(f"{HOST}/v3/geocode/geo", params={"key": KEY, "address": address})
if resp.status_code != 200:
return [TextContent(type="text", text=f"request error: {resp.status_code} {resp.text}")]
data = json.loads(resp.text)
if data.get("status") != "1":
return [TextContent(type="text", text=f"request error: {data.get('info', '')}")]
city_code = data["geocodes"][0]["adcode"]
# 用城市编码去查天气
resp = await client.get(f"{HOST}/v3/weather/weatherInfo", params={"key": KEY, "city": city_code})
if resp.status_code != 200:
return [TextContent(type="text", text=f"request error: {resp.status_code} {resp.text}")]
weather = json.loads(resp.text)
if weather.get("status") != "1":
return [TextContent(type="text", text=f"request error: {weather.get('info', '')}")]
return [TextContent(type="text", text=json.dumps(weather))]
except Exception as e:
raise e
if __name__ == "__main__":
app.run(transport="sse")
并发调用的示例我也写了,思路是把多个 query_weather 放到任务列表里,用 asyncio.gather 一起跑。框架外调用就得手动 await,像下面这样:
import asyncio
async def query_multiple(cities):
tasks = [query_weather(c) for c in cities]
results = await asyncio.gather(*tasks)
return results
async def main():
cities = ["北京", "上海", "广州", "深圳"]
res = await query_multiple(cities)
for r in res:
print(r)
if __name__ == "__main__":
asyncio.run(main())
用这种方式跑几次并发测试,能直观看到时间节省。把请求同时发出去,整体耗时就是最慢的那个请求完成的时间。这个效果对高并发小请求场景特别明显,列如给移动端的天气查询接口做聚合,或者批量把多个城市的天气拉下来做展示,用同步 blocking 客户端会把响应时间拉得很长。
说到常见错误,下面这些坑要避免:
- 在普通函数里直接写 await,会报 'await' outside async function。
- 把异步客户端换成同步的 httpx.Client,会让协程阻塞,从并发变成串行。
- 忘了加 await,最后传来传去打印的只是 ,不是实际结果。
这些问题看起来代码能跑,但并不是你想要的并发效果。能不能把 await 和 async 的边界搞清楚,决定了代码是真异步还是“假异步”。
网络请求部分也别偷懒,出错要有处理逻辑。每次请求后都要检查 HTTP 状态码,拿到 JSON 后看 status 字段是不是表明成功,要不就把错误信息返给调用方。这样做有两层好处:一是调用方能拿到原始错误信息方便定位,二是在多接口联调时不会由于格式不对把后续逻辑带歪。环境变量里放 KEY 是基本操作,别把密钥写死在代码库里,这样泄露风险太大。
我本地把服务挂起来后做了几轮并发测试,场景是同时查询多城市,网络质量参差不齐。结果很直观:并发请求只花了最慢那个请求的时间,CPU 和线程都没被挤爆,服务处理其他并发也很稳。当你理解了事件循环是怎么让出和恢复任务,以及 await 在什么时候暂停协程、什么时候恢复的,调试异步代码就不会那么迷糊了。