下载看似简单,但在真实场景中会遇到:不稳定网络、大文件、并发限制、被反爬、断点续传需求、文件命名/分类、代理/证书问题等。掌握稳健、高效的下载技巧能让爬虫、数据采集与自动化任务更可靠并便于工程化。
本文围绕实践,分为:基础下载、进度与断点续传、并发加速(线程/异步)、完整批量工具案例、稳定性与安全、部署与扩展。
urllib.request 的简单用法
urllib 是内置库,适合轻量任务。
示例:下载图片并保存:
# save_with_urllib.py
from urllib.request import urlretrieve
url = "https://example.com/sample.jpg"
out = "sample.jpg"
urlretrieve(url, out)
print("Downloaded to", out)
优点:简单;缺点:功能有限(难以控制超时、重试、header、流式写入、进度)。
requests:更灵活的选择(推荐)
requests 支持流式下载、header、超时、会话、认证等。
示例:使用
stream=True,逐块写入,并实现基础进度输出:
# download_requests.py
import requests
def download(url, out_path, chunk_size=8192, timeout=10):
with requests.get(url, stream=True, timeout=timeout) as r:
r.raise_for_status()
total = r.headers.get('Content-Length')
if total is not None:
total = int(total)
downloaded = 0
with open(out_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total:
print(f"
{downloaded}/{total} bytes ({downloaded/total:.2%})", end="")
print("
Done")
if __name__ == "__main__":
download("https://example.com/sample.zip", "sample.zip")
说明:
stream=True 防止一次性把整个响应读入内存。
使用
r.raise_for_status() 直接抛出 HTTP 错误。
tqdm 显示漂亮的进度条
tqdm 非常方便,能够显示 ETA、速度等。
示例:
# download_with_tqdm.py
import requests
from tqdm import tqdm
def download(url, out_path, chunk_size=8192, timeout=10):
r = requests.get(url, stream=True, timeout=timeout)
r.raise_for_status()
total = int(r.headers.get('Content-Length', 0))
with open(out_path, 'wb') as f, tqdm(total=total, unit='B', unit_scale=True, desc=out_path) as pbar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
if __name__ == "__main__":
download("https://example.com/bigfile.iso", "bigfile.iso")
注意:
Content-Length 可能不存在(分块编码、动态生成),这时
tqdm 将无法显示总进度,但仍可显示已下载大小。
原理:使用 HTTP
Range 请求头请求文件的一部分(例如
Range: bytes=1000-),服务器若支持,会返回
206 Partial Content。实现步骤:
检查本地文件是否存在以及大小
local_size;
发起带
Range 头的请求
Range: bytes={local_size}-;
追加写入本地文件(打开方式
'ab');
如果服务器不支持 Range(返回 200),则需要重新下载或提示用户。
实现示例(requests):
# resume_download.py
import os
import requests
from tqdm import tqdm
def supports_range(url, timeout=10):
# 尝试 HEAD 请求判断是否支持 Accept-Ranges
try:
r = requests.head(url, timeout=timeout)
if r.status_code == 200:
return r.headers.get('Accept-Ranges', '') == 'bytes'
except requests.RequestException:
return False
return False
def resume_download(url, out_path, chunk_size=8192, timeout=10):
temp_size = os.path.getsize(out_path) if os.path.exists(out_path) else 0
headers = {}
if temp_size > 0:
headers['Range'] = f'bytes={temp_size}-'
r = requests.get(url, headers=headers, stream=True, timeout=timeout)
if r.status_code == 416: # Range not satisfiable -> already fully downloaded
print("Already downloaded.")
return
# If server doesn't support range, r.status_code may be 200 and we must overwrite
mode = 'ab' if r.status_code == 206 else 'wb'
total = r.headers.get('Content-Length')
if total is not None:
total = int(total) + (temp_size if r.status_code == 206 else 0)
with open(out_path, mode) as f, tqdm(total=total, unit='B', unit_scale=True, initial=temp_size, desc=out_path) as pbar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
if __name__ == "__main__":
url = "https://example.com/largefile.zip"
resume_download(url, "largefile.zip")
注意事项:
并非所有服务器支持
Range;有些使用 CDN 时也可能受限。
若已下载文件损坏(校验失败),需要重新下载或实现分块校验(见优化章节)。
并发可以加速下载(尤其是带宽足够、服务器支持 Range 分块请求时)。
思路:
先发
HEAD 请求获取
Content-Length(文件总大小)。
将文件按若干区间(chunks)划分。
每个线程请求一个区间(
Range),写入临时分块文件。
所有分块完成后拼接为最终文件,并删除临时分块。
示例代码(简化版):
# multithreaded_download.py
import os
import math
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def get_size(url):
r = requests.head(url)
r.raise_for_status()
return int(r.headers.get('Content-Length', 0))
def download_range(url, start, end, idx, temp_dir="tmp", timeout=10):
headers = {'Range': f'bytes={start}-{end}'}
r = requests.get(url, headers=headers, stream=True, timeout=timeout)
r.raise_for_status()
os.makedirs(temp_dir, exist_ok=True)
part_path = os.path.join(temp_dir, f"part_{idx}")
with open(part_path, 'wb') as f:
for chunk in r.iter_content(8192):
if chunk:
f.write(chunk)
return part_path
def multi_thread_download(url, out_path, workers=4):
total = get_size(url)
if total == 0:
raise RuntimeError("Couldn't determine file size.")
part_size = math.ceil(total / workers)
ranges = [(i*part_size, min((i+1)*part_size-1, total-1), i) for i in range(workers)]
temp_dir = out_path + "_parts"
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = [ex.submit(download_range, url, s, e, idx, temp_dir) for s,e,idx in ranges]
for f in tqdm(as_completed(futures), total=len(futures), desc="Downloading parts"):
f.result() # raise if any exception
# merge
with open(out_path, 'wb') as out_f:
for i in range(workers):
part_path = os.path.join(temp_dir, f"part_{i}")
with open(part_path, 'rb') as pf:
out_f.write(pf.read())
os.remove(part_path)
os.rmdir(temp_dir)
print("Merged to", out_path)
优点:在高延迟或带宽分配较差的情况下显著加速。缺点:可能增加服务器压力,部分托管/限速服务器禁止多连接。
asyncio +
aiohttp适用于大量小文件(如图片)并发下载,节省线程开销。
示例:异步批量下载图片,并发限制:
# aio_download.py
import asyncio
import aiohttp
from aiohttp import ClientTimeout
from tqdm import tqdm
import os
from asyncio import Semaphore
async def fetch(session, url, out_path, sem):
async with sem:
try:
async with session.get(url) as resp:
resp.raise_for_status()
with open(out_path, 'wb') as f:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
f.write(chunk)
except Exception as e:
return url, False, str(e)
return url, True, None
async def download_many(urls, out_dir="downloads", concurrency=10, timeout=30):
os.makedirs(out_dir, exist_ok=True)
sem = Semaphore(concurrency)
timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for url in urls:
name = os.path.basename(url.split("?")[0]) or "file"
out_path = os.path.join(out_dir, name)
tasks.append(fetch(session, url, out_path, sem))
results = []
for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
res = await f
results.append(res)
return results
if __name__ == "__main__":
urls = ["https://example.com/img1.jpg", "https://example.com/img2.jpg"] # ...
asyncio.run(download_many(urls, concurrency=20))
说明:
asyncio 更擅长大量 I/O-bound(网络)任务;
对于单个大文件,
aiohttp 也可用于分块并发,但比起线程/进程,实施稍复杂(需要小心文件写入顺序)。
单个大文件且服务器支持 Range:多线程分块通常效果最好。
大量小文件(图片、JSON):异步 (
aiohttp) 更节约资源且速度更快。
简单、兼容性高的脚本:
requests(同步)最易用。
目标:实现一个 CLI 程序,支持:
从 CSV/文本读取 URL 列表;
并发下载(线程或异步可选);
断点续传、重试、日志、限速;
自动分类保存(基于 Content-Type 或 URL 后缀);
简单的 config(JSON/CLI 参数)。
py-downloader/ ├─ downloader/ │ ├─ __init__.py │ ├─ core.py # 下载核心逻辑(requests / aiohttp) │ ├─ utils.py # 工具函数(文件名清洗、http helpers) │ ├─ cli.py # argparse 命令行接口 │ └─ config.py ├─ tests/ ├─ requirements.txt └─ setup.py
core.py 中一个结合重试、超时、进度、断点的同步下载函数:
# core.py (部分)
import requests, os
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def requests_session_with_retries(retries=3, backoff=0.5, status_forcelist=(500,502,503,504)):
s = requests.Session()
retry = Retry(total=retries, backoff_factor=backoff, status_forcelist=status_forcelist)
s.mount('http://', HTTPAdapter(max_retries=retry))
s.mount('https://', HTTPAdapter(max_retries=retry))
return s
def smart_download(url, out_path, session=None, chunk_size=8192):
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
if session is None:
session = requests_session_with_retries()
temp_size = os.path.getsize(out_path) if os.path.exists(out_path) else 0
headers = {}
if temp_size:
headers['Range'] = f'bytes={temp_size}-'
with session.get(url, stream=True, headers=headers, timeout=30) as r:
if r.status_code == 416:
return True
r.raise_for_status()
total = r.headers.get('Content-Length')
if total is not None:
total = int(total) + (temp_size if r.status_code == 206 else 0)
mode = 'ab' if r.status_code == 206 else 'wb'
with open(out_path, mode) as f, tqdm(total=total, initial=temp_size, unit='B', unit_scale=True) as pbar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
return True
使用
argparse,支持
--concurrency、
--mode=thread|async、
--input urls.txt:
# cli.py (概念)
import argparse
from .core import smart_download
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True, help="URLs file, one per line")
parser.add_argument('--outdir', default='downloads')
parser.add_argument('--concurrency', type=int, default=4)
args = parser.parse_args()
with open(args.input) as f:
urls = [line.strip() for line in f if line.strip()]
# 简化:串行下载示例
for url in urls:
name = url.split('/')[-1].split('?')[0] or 'file'
out_path = os.path.join(args.outdir, name)
smart_download(url, out_path)
必设
timeout(连接/读取),例如
requests.get(..., timeout=(5, 30))(5 秒连接超时,30 秒读取超时)。
使用
urllib3.Retry 或手动重试逻辑来处理瞬时网络错误。
对可能的 HTTP 429(Too Many Requests)实现退避(exponential backoff)。
经常需要设置
User-Agent、
Referer,模拟真实浏览器,否则被拒绝或返回不同内容。
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
"Referer": "https://example.com"
}
requests.get(url, headers=headers)
使用代理
proxies={'http': 'http://ip:port', 'https': 'http://ip:port'}。
若遇 SSL 证书错误,可暂时
verify=False(不建议生产环境),更好方式是给
verify 指定 CA bundle。
给每个请求加
time.sleep() 或使用令牌桶(token bucket)限速,避免对方封 IP。
尊重
robots.txt(虽然
robots.txt 不是强制,但遵守是礼貌与合规做法)。
对重要文件(如数据集、模型文件)做校验:MD5 / SHA256。如果服务器提供校验值,下载完比对一遍。
文件大小为 0 或
Content-Length 为 None:说明服务器没有返回长度(动态生成或 chunked encoding),进度条无法显示总量。仍可按块保存。
断点续传失败(返回 200 而不是 206):服务器不支持 Range,必须重新下载或采用分片重试策略(若服务器允许通过不同 URL 获取分片)。
多线程拼接后文件损坏:检查各分片的顺序和大小边界(off-by-one);确保每个分片写入完整并且拼接顺序正确。
403 Forbidden / 404 Not Found:检查是否需要 cookie / 登录、Referer、User-Agent 或请求频率过高导致被封。
下载速度慢:可能是服务器限速、网络带宽、单连接被限。尝试并发或使用代理。
将下载逻辑封装成库(如
downloader.core),CLI、Web 接口(Flask/FastAPI)或调度器(Airflow/Cron)调用。
对长期运行任务,加入日志(rotating logs)、监控(Prometheus + alert rules)、作业重试策略。
若要分布式下载或上传到云存储,考虑:
把下载后文件上传到对象存储(S3、GCS),用 multipart upload。
在云端直接使用服务端转发或利用云函数并行化下载。
打包:
setup.py 或
pyproject.toml + entry point,便于
pip install . 后直接调用 CLI。
我会给出一个最实用且常用的“单文件/续传 + 多线程分片”的脚本(带注释),便于直接在终端运行。下面是整合版(为节省篇幅为精简但可直接运行的版本):
# downloader_tool.py (整合示例)
import os, math
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def get_total_size(url):
r = requests.head(url, allow_redirects=True)
r.raise_for_status()
return int(r.headers.get('Content-Length', 0)), r.headers
def download_part(url, start, end, idx, temp_dir, headers=None):
h = headers.copy() if headers else {}
h['Range'] = f'bytes={start}-{end}'
r = requests.get(url, headers=h, stream=True)
r.raise_for_status()
part_path = os.path.join(temp_dir, f"part_{idx}")
with open(part_path, 'wb') as f:
for chunk in r.iter_content(8192):
if chunk:
f.write(chunk)
return part_path
def multithread_download(url, out_path, workers=4):
total_size, headers = get_total_size(url)
if total_size == 0:
# fallback to simple download
r = requests.get(url, stream=True)
r.raise_for_status()
with open(out_path, 'wb') as f:
for chunk in r.iter_content(8192):
if chunk: f.write(chunk)
return
temp_dir = out_path + "_parts"
os.makedirs(temp_dir, exist_ok=True)
part_size = math.ceil(total_size / workers)
ranges = [(i*part_size, min((i+1)*part_size - 1, total_size - 1), i) for i in range(workers)]
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = [ex.submit(download_part, url, s, e, idx, temp_dir, {}) for s,e,idx in ranges]
for f in tqdm(as_completed(futures), total=len(futures), desc="Downloading"):
f.result()
# merge
with open(out_path, 'wb') as out_f:
for i in range(workers):
path = os.path.join(temp_dir, f"part_{i}")
with open(path, 'rb') as pf:
out_f.write(pf.read())
os.remove(path)
os.rmdir(temp_dir)
print("Saved to", out_path)
if __name__ == "__main__":
url = input("URL: ").strip()
out = input("Output filename: ").strip() or "out.bin"
workers = int(input("Workers (default 4): ") or 4)
multithread_download(url, out, workers=workers)
入门:
urllib →
requests(熟练掌握流式与 header);
进阶:
tqdm(进度),断点续传(Range),重试策略;
并发:
threading/
concurrent.futures(大文件分片),
asyncio+
aiohttp(大量小文件);
工程化:日志、监控、限速、针对目标站点的礼貌策略、上传到云存储的自动化流程。
推荐练习:实现一个带
--mode(sync/async/thread)参数的下载器;为大文件实现断点续传 + 校验(SHA256)。
requests 简单超时:
requests.get(url, timeout=(5,30))
Range 示例头:
headers={'Range': 'bytes=1024-'}
tqdm 与
requests:
tqdm(total=total, unit='B', unit_scale=True)