《Python下载实战技巧:从文件到多线程的完整指南》大纲

  • 时间:2025-11-05 16:44 作者: 来源: 阅读:0
  • 扫一扫,手机访问
摘要:一、前言 — 为什么要认真学“下载”? 下载看似简单,但在真实场景中会遇到:不稳定网络、大文件、并发限制、被反爬、断点续传需求、文件命名/分类、代理/证书问题等。掌握稳健、高效的下载技巧能让爬虫、数据采集与自动化任务更可靠并便于工程化。 本文围绕实践,分为:基础下载、进度与断点续传、并发加速(线程/异步)、完整批量工具案例、稳定性与安全、部署与扩展。 二、基础篇:从最简单的下载开始 2.

一、前言 — 为什么要认真学“下载”?

下载看似简单,但在真实场景中会遇到:不稳定网络、大文件、并发限制、被反爬、断点续传需求、文件命名/分类、代理/证书问题等。掌握稳健、高效的下载技巧能让爬虫、数据采集与自动化任务更可靠并便于工程化。

本文围绕实践,分为:基础下载、进度与断点续传、并发加速(线程/异步)、完整批量工具案例、稳定性与安全、部署与扩展。


二、基础篇:从最简单的下载开始

2.1 urllib.request 的简单用法

urllib 是内置库,适合轻量任务。

示例:下载图片并保存:



# save_with_urllib.py
from urllib.request import urlretrieve
 
url = "https://example.com/sample.jpg"
out = "sample.jpg"
urlretrieve(url, out)
print("Downloaded to", out)

优点:简单;缺点:功能有限(难以控制超时、重试、header、流式写入、进度)。

2.2 requests:更灵活的选择(推荐)

requests 支持流式下载、header、超时、会话、认证等。

示例:使用 stream=True,逐块写入,并实现基础进度输出:



# download_requests.py
import requests
 
def download(url, out_path, chunk_size=8192, timeout=10):
    with requests.get(url, stream=True, timeout=timeout) as r:
        r.raise_for_status()
        total = r.headers.get('Content-Length')
        if total is not None:
            total = int(total)
        downloaded = 0
        with open(out_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total:
                        print(f"
{downloaded}/{total} bytes ({downloaded/total:.2%})", end="")
    print("
Done")
 
if __name__ == "__main__":
    download("https://example.com/sample.zip", "sample.zip")

说明:

stream=True 防止一次性把整个响应读入内存。

使用 r.raise_for_status() 直接抛出 HTTP 错误。


三、进阶篇:进度条与断点续传

3.1 用 tqdm 显示漂亮的进度条

tqdm 非常方便,能够显示 ETA、速度等。

示例:



# download_with_tqdm.py
import requests
from tqdm import tqdm
 
def download(url, out_path, chunk_size=8192, timeout=10):
    r = requests.get(url, stream=True, timeout=timeout)
    r.raise_for_status()
    total = int(r.headers.get('Content-Length', 0))
    with open(out_path, 'wb') as f, tqdm(total=total, unit='B', unit_scale=True, desc=out_path) as pbar:
        for chunk in r.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                pbar.update(len(chunk))
 
if __name__ == "__main__":
    download("https://example.com/bigfile.iso", "bigfile.iso")

注意: Content-Length 可能不存在(分块编码、动态生成),这时 tqdm 将无法显示总进度,但仍可显示已下载大小。

3.2 断点续传(Resume)原理与实现

原理:使用 HTTP Range 请求头请求文件的一部分(例如 Range: bytes=1000-),服务器若支持,会返回 206 Partial Content。实现步骤:

检查本地文件是否存在以及大小 local_size

发起带 Range 头的请求 Range: bytes={local_size}-

追加写入本地文件(打开方式 'ab');

如果服务器不支持 Range(返回 200),则需要重新下载或提示用户。

实现示例(requests):



# resume_download.py
import os
import requests
from tqdm import tqdm
 
def supports_range(url, timeout=10):
    # 尝试 HEAD 请求判断是否支持 Accept-Ranges
    try:
        r = requests.head(url, timeout=timeout)
        if r.status_code == 200:
            return r.headers.get('Accept-Ranges', '') == 'bytes'
    except requests.RequestException:
        return False
    return False
 
def resume_download(url, out_path, chunk_size=8192, timeout=10):
    temp_size = os.path.getsize(out_path) if os.path.exists(out_path) else 0
    headers = {}
    if temp_size > 0:
        headers['Range'] = f'bytes={temp_size}-'
    r = requests.get(url, headers=headers, stream=True, timeout=timeout)
    if r.status_code == 416:  # Range not satisfiable -> already fully downloaded
        print("Already downloaded.")
        return
    # If server doesn't support range, r.status_code may be 200 and we must overwrite
    mode = 'ab' if r.status_code == 206 else 'wb'
    total = r.headers.get('Content-Length')
    if total is not None:
        total = int(total) + (temp_size if r.status_code == 206 else 0)
    with open(out_path, mode) as f, tqdm(total=total, unit='B', unit_scale=True, initial=temp_size, desc=out_path) as pbar:
        for chunk in r.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                pbar.update(len(chunk))

if __name__ == "__main__":
    url = "https://example.com/largefile.zip"
    resume_download(url, "largefile.zip")

注意事项:

并非所有服务器支持 Range;有些使用 CDN 时也可能受限。

若已下载文件损坏(校验失败),需要重新下载或实现分块校验(见优化章节)。


四、性能篇:并发下载(线程 / 异步)

并发可以加速下载(尤其是带宽足够、服务器支持 Range 分块请求时)。

4.1 多线程分块并行下载(基于 Range)

思路:

先发 HEAD 请求获取 Content-Length(文件总大小)。

将文件按若干区间(chunks)划分。

每个线程请求一个区间( Range),写入临时分块文件。

所有分块完成后拼接为最终文件,并删除临时分块。

示例代码(简化版):



# multithreaded_download.py
import os
import math
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
 
def get_size(url):
    r = requests.head(url)
    r.raise_for_status()
    return int(r.headers.get('Content-Length', 0))
 
def download_range(url, start, end, idx, temp_dir="tmp", timeout=10):
    headers = {'Range': f'bytes={start}-{end}'}
    r = requests.get(url, headers=headers, stream=True, timeout=timeout)
    r.raise_for_status()
    os.makedirs(temp_dir, exist_ok=True)
    part_path = os.path.join(temp_dir, f"part_{idx}")
    with open(part_path, 'wb') as f:
        for chunk in r.iter_content(8192):
            if chunk:
                f.write(chunk)
    return part_path
 
def multi_thread_download(url, out_path, workers=4):
    total = get_size(url)
    if total == 0:
        raise RuntimeError("Couldn't determine file size.")
    part_size = math.ceil(total / workers)
    ranges = [(i*part_size, min((i+1)*part_size-1, total-1), i) for i in range(workers)]
    temp_dir = out_path + "_parts"
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = [ex.submit(download_range, url, s, e, idx, temp_dir) for s,e,idx in ranges]
        for f in tqdm(as_completed(futures), total=len(futures), desc="Downloading parts"):
            f.result()  # raise if any exception
    # merge
    with open(out_path, 'wb') as out_f:
        for i in range(workers):
            part_path = os.path.join(temp_dir, f"part_{i}")
            with open(part_path, 'rb') as pf:
                out_f.write(pf.read())
            os.remove(part_path)
    os.rmdir(temp_dir)
    print("Merged to", out_path)

优点:在高延迟或带宽分配较差的情况下显著加速。缺点:可能增加服务器压力,部分托管/限速服务器禁止多连接。

4.2 异步下载: asyncio + aiohttp

适用于大量小文件(如图片)并发下载,节省线程开销。

示例:异步批量下载图片,并发限制:



# aio_download.py
import asyncio
import aiohttp
from aiohttp import ClientTimeout
from tqdm import tqdm
import os
from asyncio import Semaphore
 
async def fetch(session, url, out_path, sem):
    async with sem:
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                with open(out_path, 'wb') as f:
                    while True:
                        chunk = await resp.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
        except Exception as e:
            return url, False, str(e)
    return url, True, None
 
async def download_many(urls, out_dir="downloads", concurrency=10, timeout=30):
    os.makedirs(out_dir, exist_ok=True)
    sem = Semaphore(concurrency)
    timeout = ClientTimeout(total=timeout)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        tasks = []
        for url in urls:
            name = os.path.basename(url.split("?")[0]) or "file"
            out_path = os.path.join(out_dir, name)
            tasks.append(fetch(session, url, out_path, sem))
        results = []
        for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            res = await f
            results.append(res)
    return results
 
if __name__ == "__main__":
    urls = ["https://example.com/img1.jpg", "https://example.com/img2.jpg"]  # ...
    asyncio.run(download_many(urls, concurrency=20))

说明:

asyncio 更擅长大量 I/O-bound(网络)任务;

对于单个大文件, aiohttp 也可用于分块并发,但比起线程/进程,实施稍复杂(需要小心文件写入顺序)。

4.3 同步 vs 异步 vs 多线程:何时用哪种?

单个大文件且服务器支持 Range:多线程分块通常效果最好。

大量小文件(图片、JSON):异步 ( aiohttp) 更节约资源且速度更快。

简单、兼容性高的脚本: requests(同步)最易用。


五、实战篇:构建一个可复用的批量下载工具(示例项目)

目标:实现一个 CLI 程序,支持:

从 CSV/文本读取 URL 列表;

并发下载(线程或异步可选);

断点续传、重试、日志、限速;

自动分类保存(基于 Content-Type 或 URL 后缀);

简单的 config(JSON/CLI 参数)。

5.1 项目结构建议

py-downloader/
├─ downloader/
│  ├─ __init__.py
│  ├─ core.py          # 下载核心逻辑(requests / aiohttp)
│  ├─ utils.py         # 工具函数(文件名清洗、http helpers)
│  ├─ cli.py           # argparse 命令行接口
│  └─ config.py
├─ tests/
├─ requirements.txt
└─ setup.py

5.2 核心要点示例(简化)

core.py 中一个结合重试、超时、进度、断点的同步下载函数:



# core.py (部分)
import requests, os
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
def requests_session_with_retries(retries=3, backoff=0.5, status_forcelist=(500,502,503,504)):
    s = requests.Session()
    retry = Retry(total=retries, backoff_factor=backoff, status_forcelist=status_forcelist)
    s.mount('http://', HTTPAdapter(max_retries=retry))
    s.mount('https://', HTTPAdapter(max_retries=retry))
    return s
 
def smart_download(url, out_path, session=None, chunk_size=8192):
    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    if session is None:
        session = requests_session_with_retries()
    temp_size = os.path.getsize(out_path) if os.path.exists(out_path) else 0
    headers = {}
    if temp_size:
        headers['Range'] = f'bytes={temp_size}-'
    with session.get(url, stream=True, headers=headers, timeout=30) as r:
        if r.status_code == 416:
            return True
        r.raise_for_status()
        total = r.headers.get('Content-Length')
        if total is not None:
            total = int(total) + (temp_size if r.status_code == 206 else 0)
        mode = 'ab' if r.status_code == 206 else 'wb'
        with open(out_path, mode) as f, tqdm(total=total, initial=temp_size, unit='B', unit_scale=True) as pbar:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
                    pbar.update(len(chunk))
    return True

5.3 CLI(示例)

使用 argparse,支持 --concurrency --mode=thread|async --input urls.txt



# cli.py (概念)
import argparse
from .core import smart_download
 
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True, help="URLs file, one per line")
    parser.add_argument('--outdir', default='downloads')
    parser.add_argument('--concurrency', type=int, default=4)
    args = parser.parse_args()
 
    with open(args.input) as f:
        urls = [line.strip() for line in f if line.strip()]
    # 简化:串行下载示例
    for url in urls:
        name = url.split('/')[-1].split('?')[0] or 'file'
        out_path = os.path.join(args.outdir, name)
        smart_download(url, out_path)

六、稳定性、安全性与反爬对策(实用技巧)

6.1 超时、异常与重试

必设 timeout(连接/读取),例如 requests.get(..., timeout=(5, 30))(5 秒连接超时,30 秒读取超时)。

使用 urllib3.Retry 或手动重试逻辑来处理瞬时网络错误。

对可能的 HTTP 429(Too Many Requests)实现退避(exponential backoff)。

6.2 请求头与伪装(User-Agent、Referer)

经常需要设置 User-Agent Referer,模拟真实浏览器,否则被拒绝或返回不同内容。



headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
    "Referer": "https://example.com"
}
requests.get(url, headers=headers)

6.3 代理与证书

使用代理 proxies={'http': 'http://ip:port', 'https': 'http://ip:port'}

若遇 SSL 证书错误,可暂时 verify=False(不建议生产环境),更好方式是给 verify 指定 CA bundle。

6.4 限速与礼貌抓取

给每个请求加 time.sleep() 或使用令牌桶(token bucket)限速,避免对方封 IP。

尊重 robots.txt(虽然 robots.txt 不是强制,但遵守是礼貌与合规做法)。

6.5 数据完整性校验

对重要文件(如数据集、模型文件)做校验:MD5 / SHA256。如果服务器提供校验值,下载完比对一遍。


七、常见问题与调试技巧(Q&A)

文件大小为 0 或 Content-Length 为 None:说明服务器没有返回长度(动态生成或 chunked encoding),进度条无法显示总量。仍可按块保存。

断点续传失败(返回 200 而不是 206):服务器不支持 Range,必须重新下载或采用分片重试策略(若服务器允许通过不同 URL 获取分片)。

多线程拼接后文件损坏:检查各分片的顺序和大小边界(off-by-one);确保每个分片写入完整并且拼接顺序正确。

403 Forbidden / 404 Not Found:检查是否需要 cookie / 登录、Referer、User-Agent 或请求频率过高导致被封。

下载速度慢:可能是服务器限速、网络带宽、单连接被限。尝试并发或使用代理。


八、工程化建议与部署

将下载逻辑封装成库(如 downloader.core),CLI、Web 接口(Flask/FastAPI)或调度器(Airflow/Cron)调用。

对长期运行任务,加入日志(rotating logs)、监控(Prometheus + alert rules)、作业重试策略。

若要分布式下载或上传到云存储,考虑:

把下载后文件上传到对象存储(S3、GCS),用 multipart upload。

在云端直接使用服务端转发或利用云函数并行化下载。

打包: setup.py pyproject.toml + entry point,便于 pip install . 后直接调用 CLI。


九、完整示例(把上面整合为可直接使用的小工具)

我会给出一个最实用且常用的“单文件/续传 + 多线程分片”的脚本(带注释),便于直接在终端运行。下面是整合版(为节省篇幅为精简但可直接运行的版本):



# downloader_tool.py (整合示例)
import os, math
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
 
def get_total_size(url):
    r = requests.head(url, allow_redirects=True)
    r.raise_for_status()
    return int(r.headers.get('Content-Length', 0)), r.headers
 
def download_part(url, start, end, idx, temp_dir, headers=None):
    h = headers.copy() if headers else {}
    h['Range'] = f'bytes={start}-{end}'
    r = requests.get(url, headers=h, stream=True)
    r.raise_for_status()
    part_path = os.path.join(temp_dir, f"part_{idx}")
    with open(part_path, 'wb') as f:
        for chunk in r.iter_content(8192):
            if chunk:
                f.write(chunk)
    return part_path
 
def multithread_download(url, out_path, workers=4):
    total_size, headers = get_total_size(url)
    if total_size == 0:
        # fallback to simple download
        r = requests.get(url, stream=True)
        r.raise_for_status()
        with open(out_path, 'wb') as f:
            for chunk in r.iter_content(8192):
                if chunk: f.write(chunk)
        return
    temp_dir = out_path + "_parts"
    os.makedirs(temp_dir, exist_ok=True)
    part_size = math.ceil(total_size / workers)
    ranges = [(i*part_size, min((i+1)*part_size - 1, total_size - 1), i) for i in range(workers)]
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = [ex.submit(download_part, url, s, e, idx, temp_dir, {}) for s,e,idx in ranges]
        for f in tqdm(as_completed(futures), total=len(futures), desc="Downloading"):
            f.result()
    # merge
    with open(out_path, 'wb') as out_f:
        for i in range(workers):
            path = os.path.join(temp_dir, f"part_{i}")
            with open(path, 'rb') as pf:
                out_f.write(pf.read())
            os.remove(path)
    os.rmdir(temp_dir)
    print("Saved to", out_path)
 
if __name__ == "__main__":
    url = input("URL: ").strip()
    out = input("Output filename: ").strip() or "out.bin"
    workers = int(input("Workers (default 4): ") or 4)
    multithread_download(url, out, workers=workers)

十、总结与学习路线

入门: urllib requests(熟练掌握流式与 header);

进阶: tqdm(进度),断点续传(Range),重试策略;

并发: threading/ concurrent.futures(大文件分片), asyncio+ aiohttp(大量小文件);

工程化:日志、监控、限速、针对目标站点的礼貌策略、上传到云存储的自动化流程。

推荐练习:实现一个带 --mode(sync/async/thread)参数的下载器;为大文件实现断点续传 + 校验(SHA256)。


附录:常用代码片段速查

requests 简单超时: requests.get(url, timeout=(5,30))

Range 示例头: headers={'Range': 'bytes=1024-'}

tqdm requests tqdm(total=total, unit='B', unit_scale=True)

  • 全部评论(0)
最新发布的资讯信息
【系统环境|】Python 爬虫:从基础到实战的完整指南(2025-11-05 16:54)
【系统环境|】零成本!DeepSeek+KIMI 5分钟生成专业PPT(附详细操作教程)(2025-11-05 16:53)
【系统环境|】1分钟用 DeepSeek 搞定 PPT?实用教程来了(2025-11-05 16:53)
【系统环境|】口子空间使用教程(2025-11-05 16:52)
【系统环境|】CentOS7安装并配置nginx等问题(2025-11-05 16:52)
【系统环境|】Centos7安装nginx最全教程(2025-11-05 16:51)
【系统环境|】nvm安装、管理node多版本以及配置环境变量(2025-11-05 16:45)
【系统环境|】爬虫进阶避坑指南:10个实战反爬技巧,从封IP到破签名全解析(2025-11-05 16:44)
【系统环境|】《Python下载实战技巧:从文件到多线程的完整指南》大纲(2025-11-05 16:44)
【系统环境|】避免爬虫无限循环:分页链接识别与处理实战指南(2025-11-05 16:43)
手机二维码手机访问领取大礼包
返回顶部