在上一篇Paramiko实战文章中(Python 自动化测试神器:Paramiko实现SSH远程操作),我们聚焦了SSHClient和SFTPClient等高频组件,但Paramiko中还有一个更底层、更灵活的核心组件 ——Transport。它直接封装了 SSH 传输层协议,支持长连接复用、多通道创建、密钥协商等高级功能,是实现复杂自动化场景(如批量命令执行、并行文件传输)的关键。本文将补齐这一知识点,详解 Transport 的核心功能、重点方法及测试自动化实战案例。

在Paramiko的架构中,Transport是SSH连接的“基石”——SSHClient本质是对Transport的上层封装,简化了常见操作,但牺牲了部分灵活性。当测试自动化需要更精细的控制时(如保持长连接、同时执行多个命令、自定义加密算法),就必须直接使用 Transport。
Transport 的使用流程遵循 “创建连接→认证→创建通道→执行操作→关闭资源” 的逻辑,以下 6 个核心方法是实现自动化的关键。
功能:初始化一个SSH传输层对象,可直接传入一个已建立的网络套接字(Socket),作为Transport的底层通信载体;也可以直接传入主机地址和端口的元组,列如传入‘("127.0.0.1",22)’,会自动建立Socket连接。
参数名 | 类型 | 是否必填 | 说明 | 示例 |
sock | socket.socket | 是 | 已连接到服务器 SSH 端口的 Socket 对象 | socket.create_connection(("192.168.1.100", 22)) 或者("127.0.0.1",22) |
import paramiko
import socket
def create_transport():
# 1. 创建Socket连接(指定服务器IP和SSH端口)
sock = socket.create_connection(
addr=("192.168.1.100", 22), #服务器地址+端口
timeout=10 #连接超时时间
)
# 2. 基于Socket创建Transport 实例
transport = paramiko.Transport(sock)
try:
# 3. 启动Transport会话(必须调用,否则无法后续操作)
transport.start_client()
print(f"✅ Transport 连接成功,服务器版本:{transport.remote_version}")
# 后续认证、创建通道等操作...
except Exception as e:
print(f"❌ Transport 操作失败:{str(e)}")
finally:
# 4. 关闭资源(关闭Transport,会关闭Socket)
if transport.is_active():
transport.close()
print(" Transport 已关闭")
# 执行创建
create_transport()功能:通过 “用户名 + 密码” 方式完成 Transport 连接的认证,是最简单的认证方式(适合快速测试场景)。
参数名 | 类型 | 是否必填 | 说明 | 示例 |
username | str | 是 | 服务器登录用户名 | "test_user" |
password | str | 是 | 登录密码 | "test_123456" |
通过auth_password()函数进行密码认证登录,通过open_session()建立ssh通道,通过exec_command()执行命令,通过recv()接收响应,recv_stderr()接收错误。
PS:start_client()和auth_password(),这两步可以替换为connect()函数,一个函数搞定。
import paramiko
import socket
def transport_password_auth():
sock = socket.create_connection(("192.168.1.100", 22), timeout=10)
transport = paramiko.Transport(sock)
try:
transport.start_client()
# 1. 密码认证
transport.auth_password(
username="test_user",
password="test_123456"
)
print("✅ 密码认证成功")
# 2. 创建命令通道(Session 通道,用于执行Shell 命令)
channel = transport.open_session()
# 3. 执行命令(查看服务器 CPU 使用率)
command = "top -bn1 | grep 'Cpu(s)' | awk '{print $2}'" # 非交互命令
channel.exec_command(command)
# 4. 读取命令结果(stdout是字节流,需解码)
stdout = channel.recv(1024).decode("utf-8").strip() # recv接收缓冲区数据
stderr = channel.recv_stderr(1024).decode("utf-8").strip()
if stdout:
print(f"✅ 命令执行成功,CPU 使用率:{stdout}%")
else:
print(f"❌ 命令执行失败:{stderr}")
# 5. 关闭通道
channel.close()
except Exception as e:
print(f"❌ 操作失败:{str(e)}")
finally:
if transport.is_active():
transport.close()
# 执行
transport_password_auth()功能:通过 RSA/DSA 公钥方式完成认证,比密码认证更安全,是正式测试环境的首选方式(需提前将公钥上传到服务器)。
参数名 | 类型 | 是否必填 | 说明 | 示例 |
username | str | 是 | 服务器登录用户名 | "test_user" |
pkey | paramiko.PKey | 是 | 私钥对象(支持 RSAKey、DSAKey 等) | paramiko.RSAKey.from_private_key_file("~/.ssh/id_rsa") |
import paramiko
import socket
from paramiko.rsakey import RSAKey
def transport_key_auth_sftp():
sock = socket.create_connection(("192.168.1.100", 22), timeout=10)
transport = paramiko.Transport(sock)
try:
transport.start_client()
# 1. 加载本地私钥
private_key = RSAKey.from_private_key_file(
filename="~/.ssh/id_rsa",
password="key_password" # 私钥密码(无则省略)
)
# 2. 公钥认证
transport.auth_publickey(
username="test_user",
pkey=private_key
)
print("✅ 公钥认证成功")
# 3. 基于 Transport 创建 SFTP 通道(复用同一连接)
sftp_client = paramiko.SFTPClient.from_transport(transport)
print("✅ SFTP 通道创建成功")
# 4. 执行 SFTP 操作(如列出目录)
remote_dir = "/opt/test_data"
file_list = sftp_client.listdir(remote_dir)
print(f"服务器 {remote_dir} 目录文件:{file_list}")
# 5. 关闭 SFTP 客户端(Transport 保持连接,可后续复用)
sftp_client.close()
except Exception as e:
print(f"❌ 操作失败:{str(e)}")
finally:
if transport.is_active():
transport.close()
sock.close()
# 执行
transport_key_auth_sftp()
功能:在已认证的Transport连接上创建一个SSH Session通道,用于执行Shell命令(类似SSHClient.exec_command,但更底层)。一个Transport可同时创建多个Session通道,实现 “并行命令执行”。
返回 paramiko.Channel对象,通过该对象的exec_command、recv、send等方法实现命令交互。
在测试自动化中,若需批量执行多个独立命令(如同时查看 CPU、内存、磁盘使用率),无需创建多个 SSH 连接,只需通过 Transport 开启多个通道并行执行,大幅节省时间。
python
运行
import paramiko
import socket
import threading
def execute_command(transport, command, desc):
"""
线程函数:在指定 Transport 上创建通道执行命令
:param transport: 已认证的 Transport 对象
:param command: 要执行的命令
:param desc: 命令描述(用于区分结果)
"""
try:
# 创建独立通道
channel = transport.open_session()
# 执行命令
channel.exec_command(command)
# 读取结果
stdout = channel.recv(2048).decode("utf-8").strip()
stderr = channel.recv_stderr(2048).decode("utf-8").strip()
if stdout:
print(f"✅ {desc}:{stdout}")
else:
print(f"❌ {desc}:{stderr}")
channel.close()
except Exception as e:
print(f"❌ {desc} 执行失败:{str(e)}")
def transport_multi_channel():
sock = socket.create_connection(("192.168.1.100", 22), timeout=10)
transport = paramiko.Transport(sock)
try:
transport.start_client()
# 密码认证(也可用公钥)
transport.auth_password("test_user", "test_123456")
print("✅ Transport 认证成功,开启多通道并行命令...")
# 定义需要批量执行的测试命令(查看服务器状态,用于测试环境健康检查)
commands = [
{
"cmd": "top -bn1 | grep 'Cpu(s)' | awk '{print $2}'",
"desc": "CPU 使用率"
},
{
"cmd": "free -m | grep 'Mem' | awk '{print $3/$2*100}' | cut -d. -f1",
"desc": "内存使用率"
},
{
"cmd": "df -h / | grep '/' | awk '{print $5}'",
"desc": "根目录磁盘使用率"
}
]
# 开启多线程,每个命令一个通道(并行执行)
threads = []
for cmd_info in commands:
t = threading.Thread(
target=execute_command,
args=(transport, cmd_info["cmd"], cmd_info["desc"])
)
threads.append(t)
t.start()
# 等待所有线程执行完成
for t in threads:
t.join()
except Exception as e:
print(f"❌ 操作失败:{str(e)}")
finally:
if transport.is_active():
transport.close()
# 执行多通道并行命令
transport_multi_channel()
✅ Transport 认证成功,开启多通道并行命令...
✅ CPU 使用率:3.2
✅ 内存使用率:45
✅ 根目录磁盘使用率:28%
核心优势:3 个命令并行执行,总耗时≈单个命令耗时(约 0.5 秒),若串行执行需 1.5 秒以上,测试效率提升显著。
功能:开启 Transport 连接的心跳检测,定期向服务器发送空数据包,避免长时间无操作导致连接被服务器防火墙断开(适合长时间运行的测试脚本,如持续集成中的夜间测试)。
参数名 | 类型 | 是否必填 | 说明 | 示例 |
interval | int | 是 | 心跳间隔时间(秒),提议设置 30-60 秒 | 30(每 30 秒发送一次心跳) |
import paramiko
import socket
import time
def transport_keepalive():
sock = socket.create_connection(("192.168.1.100", 22), timeout=10)
transport = paramiko.Transport(sock)
try:
transport.start_client()
transport.auth_password("test_user", "test_123456")
print("✅ Transport 认证成功")
# 开启心跳:每 30 秒发送一次心跳包
transport.set_keepalive(30)
print("✅ 心跳机制开启,间隔 30 秒")
# 模拟长时间测试流程(如每隔 1 分钟检查一次应用状态,持续 10 分钟)
for i in range(10):
print(f"
=== 第 {i+1} 次检查应用状态 ===")
channel = transport.open_session()
channel.exec_command("ps -ef | grep test_app | grep -v grep")
result = channel.recv(1024).decode("utf-8").strip()
if result:
print(f"✅ 应用运行正常({time.strftime('%H:%M:%S')})")
else:
print(f"❌ 应用已停止({time.strftime('%H:%M:%S')})")
channel.close()
# 等待 1 分钟
time.sleep(60)
except Exception as e:
print(f"❌ 长连接操作失败:{str(e)}")
finally:
if transport.is_active():
transport.close()
sock.close()
# 执行长连接测试(实际使用时可结合日志记录)
transport_keepalive()
功能:关闭 Transport 传输层连接,释放底层 Socket 资源,必须在操作结束后调用(尤其是长连接场景),避免服务器连接数耗尽。
功能:调用SFTPClient,启动sftp的transport通道,功能跟直接调用SFTPClient是一样的。

#按照上面的步骤创建并密码认证transport通道后,可直接附值给SSHClient的属性_transport,然后调用exec_command等函数
ssh_client = paramiko.SSHClient()
ssh_client._transport = transport许多测试同学会疑惑:既然 SSHClient 更简单,为什么还要用 Transport?两者的核心差异及适用场景如下表,可根据测试需求选择:
对比维度 | SSHClient | Transport |
封装程度 | 高层封装,API 简洁(如 exec_command 一步到位) | 底层实现,需手动管理 Socket、通道、认证 |
灵活性 | 低:无法并行执行命令,长连接需依赖默认机制 | 高:支持多通道并行、自定义心跳、动态认证 |
易用性 | 高:适合新手,几行代码实现基本操作 | 中:需理解 SSH 传输层原理,代码量稍多 |
性能 | 一般:每次命令执行可能隐含连接开销 | 高:长连接复用,多通道并行,减少握手耗时 |
测试场景推荐 | 1. 简单命令执行(如单条命令查看日志); 2. 单次文件上传 / 下载; 3. 快速验证服务器连通性。 | 1. 批量并行命令执行(如测试环境健康检查); 2. 长时间长连接(如夜间持续测试); 3. 复杂多通道操作(如同时传文件 + 执行命令); 4. 自定义 SSH 协议参数(如特殊加密算法)。 |
一句话总结:简单场景用 SSHClient 提效,复杂高级场景用 Transport 兜底。
结合 Transport 的多通道并行优势,实现一个 “测试环境批量健康检查” 脚本 —— 同时检查 3 台服务器的 CPU、内存、磁盘使用率及被测应用进程状态,结果汇总输出(这是测试自动化中 “环境巡检” 的典型需求)。
import paramiko
import socket
import threading
from paramiko.rsakey import RSAKey
def check_server_health(server_ip, username, private_key_path):
"""
检查单台服务器健康状态
:param server_ip: 服务器 IP
:param username: 登录用户名
:param private_key_path: 私钥路径
"""
print(f"
=== 开始检查服务器:{server_ip} ===")
sock = None
transport = None
try:
# 1. 创建 Socket + Transport 连接
sock = socket.create_connection((server_ip, 22), timeout=15)
transport = paramiko.Transport(sock)
transport.start_client()
# 2. 公钥认证
private_key = RSAKey.from_private_key_file(private_key_path)
transport.auth_publickey(username=username, pkey=private_key)
print(f"✅ {server_ip}:认证成功")
# 3. 定义需检查的命令(应用状态+资源使用率)
check_commands = [
{
"cmd": "ps -ef | grep test_app | grep -v grep | wc -l",
"desc": "被测应用进程数",
"check": lambda x: int(x) > 0 # 检查逻辑:进程数>0 表明正常
},
{
"cmd": "top -bn1 | grep 'Cpu(s)' | awk '{print $2}'",
"desc": "CPU 使用率(%)",
"check": lambda x: float(x) < 80 # 阈值:<80% 正常
},
{
"cmd": "free -m | grep 'Mem' | awk '{print $3/$2*100}' | cut -d. -f1",
"desc": "内存使用率(%)",
"check": lambda x: int(x) < 85 # 阈值:<85% 正常
},
{
"cmd": "df -h / | grep '/' | awk '{print $5}' | sed 's/%//g'",
"desc": "磁盘使用率(%)",
"check": lambda x: int(x) < 90 # 阈值:<90% 正常
}
]
# 4. 多线程并行执行检查命令
threads = []
lock = threading.Lock() # 线程锁,避免输出混乱
def run_check(cmd_info):
channel = transport.open_session()
channel.exec_command(cmd_info["cmd"])
stdout = channel.recv(1024).decode("utf-8").strip()
stderr = channel.recv_stderr(1024).decode("utf-8").strip()
with lock:
if stdout:
is_normal = cmd_info["check"](stdout)
status = "✅" if is_normal else "❌"
print(f"{status} {server_ip} - {cmd_info['desc']}:{stdout}")
else:
print(f"❌ {server_ip} - {cmd_info['desc']}:获取失败,{stderr}")
channel.close()
for cmd_info in check_commands:
t = threading.Thread(target=run_check, args=(cmd_info,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"=== {server_ip} 检查完成 ===
")
except Exception as e:
print(f"❌ {server_ip} 检查失败:{str(e)}
")
finally:
if transport and transport.is_active():
transport.close()
if sock:
sock.close()
if __name__ == "__main__":
# 批量检查配置(多台测试服务器信息)
servers_config = [
{
"ip": "192.168.1.100",
"username": "test_user",
"private_key": "~/.ssh/id_rsa"
},
{
"ip": "192.168.1.101",
"username": "test_user",
"private_key": "~/.ssh/id_rsa"
},
{
"ip": "192.168.1.102",
"username": "test_user",
"private_key": "~/.ssh/id_rsa"
}
]
# 开启多线程,同时检查多台服务器
server_threads = []
for server in servers_config:
t = threading.Thread(
target=check_server_health,
args=(server["ip"], server["username"], server["private_key"])
)
server_threads.append(t)
t.start()
for t in server_threads:
t.join()
print(" 所有服务器健康检查完成!")
多通道并发限制:
虽然 Transport 支持多通道,但服务器端一般会限制单个 SSH 连接的最大通道数(默认一般为 10-20),批量执行命令时需控制通道数量,避免超出限制被服务器拒绝。
数据接收不完整:
用 channel.recv(size) 读取命令结果时,若结果超过 size(如大日志),会导致数据截断。解决方法:循环读取直到通道关闭:
def read_full_channel(channel):
"""循环读取通道所有数据"""
stdout = b""
while not channel.exit_status_ready():
if channel.recv_ready():
stdout += channel.recv(4096) # 读取剩余数据
return stdout.decode("utf-8").strip()Socket 连接超时:
创建 Socket 时务必设置 timeout,避免服务器不可达时脚本无限阻塞;提议超时时间设置为 10-15 秒(兼顾网络延迟和快速失败)。
私钥权限问题:
公钥认证时,本地私钥文件权限需严格限制(Linux下为600,Windows 下为 “仅当前用户可读”),否则 Paramiko 会拒绝使用该私钥(安全机制)。
Transport作为Paramiko 的底层核心组件,虽然使用门槛比SSHClient稍高,但带来的灵活性和性能提升,使其成为解决复杂测试自动化场景的 “利器”—— 无论是批量环境巡检、长时间持续测试,还是多任务并行执行,Transport 都能完美适配。
结合本文的方法解析和实战案例,你可以根据测试需求灵活选择 SSHClient 或 Transport:简单场景用前者快速落地,复杂场景用后者突破限制。掌握这两个组件后,Paramiko 在测试自动化中的应用将无死角,真正实现 “服务器操作全自动化”。