Python一次性将指定版本所有包上传PyPI镜像解决方案
目录
- 为什么需要这个方案
- 完整解决方案
- 1. 项目目录结构
- 2. 创建智能下载脚本
- 3. 创建包清单生成脚本
- 4. 创建 docker Compose 配置docker-compose.yml
- 5. 创建 Web 服务目录结构
- 6. 创建准备脚本
- 完整操作流程
- 1. 在可联网的环境中准备镜像
- 2. 验证下载结果
- 3. 将整个目录复制到内网环境
- 4. 在内网环境中部署
- 5. 内网客户端使用
- 方案优势
- 注意事项
根据您的需求和知识库中的警告信息(清华镜像会阻断大量下载行为),我设计了一个安全、完整、可离线部署的解决方案,用于一次性准备指定python版本的所有包,然后导出到内网环境。
为什么需要这个方案
知识库明确显示:清华镜像会检测到大量下载行为并阻断请求。直接尝试下载所有包会触发此限制,导致下载失败。我们需要一个谨慎、分阶段、有间隔的下载策略。
完整解决方案
1. 项目目录结构
mkdir -p pypi-offline/{config,scripts,packages}
cd pypi-offline
2. 创建智能下载脚本
scripts/safe_download.py
#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置参数
MAX_RETRIES = 3
MIN_DELAY = 5 # 最小延迟(秒)
MAX_DELAY = 15 # 最大延迟(秒)
MAX_WORKERS = 2 # 最大并发数(避免触发限制)
# PyPI API端点
PYPI_SIMPLE = "https://pypi.org/simple"
# 使用官方PyPI而非清华镜像,避免触发限制
# 如果网络条件好,可考虑使用其他镜像
def get_package_info(package_name):
"""获取包的元信息"""
url = f"https://pypi.org/pypi/{package_name}/json"
for i in range(MAX_RETRIES):
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return None
time.sleep((i + 1) * 2)
except Exception as e:
print(f"获取包 {package_name} 信息失败: {str(e)}")
time.sleep((i + 1) * 5)
return None
def filter_packages_for_python(package_data, python_version):
"""过滤出兼容指定Python版本的包"""
compatible_files = []
py_ver = python_version.replace('.', '')
for file_info in package_data['urls']:
# 检查Python版本兼容性
py_tag = file_info.get('python_version', '')
if py_tag == 'source' or py_tag.startswith('py') or py_tag.startswith('cp' + py_ver):
compatible_files.append(file_info)
return compatible_files
def download_package(package_name, file_info, target_dir):
"""安全下载单个包文件"""
file_url = file_info['url']
file_name = file_info['filename']
target_path = os.path.join(target_dir, file_name)
# 如果文件已存在,跳过
if os.path.exists(target_path):
print(f"跳过已存在的文件: {file_name}")
return True
print(f"下载: {file_name} ({packahttp://www.devze.comge_name})")
for i in range(MAX_RETRIES):
try:
response = requests.get(file_url, stream=True, timeout=60)
if 编程客栈response.status_code == 200:
with open(target_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"成功下载: {file_name}")
return True
print(f"下载失败 ({response.status_code}): {file_name}")
time.sleep((i + 1) * 5)
except Exception as e:
print(f"下载 {file_name} 失败: {str(e)}")
time.sleep((i + 1) * 5)
# 下载失败,删除可能的部分文件
if os.path.exists(target_path):
os.remove(target_path)
return False
def get_all_packages():
"""获取所有包的列表"""
print("获取所有包的列表...")
response = requests.get(f"{PYPI_SIMPLE}/", timeout=30)
if response.status_code != 200:
raise Exception(f"无法获取包列表: HTTP {response.status_code}")
# 解析html获取包名
import re
package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
return package_names
def main():
parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包')
parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
parser.add_argument('--output-dir', default='../packages', help='输出目录')
args = parser.parse_args()
# 创建输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"输出目录: {output_dir}")
# 获取所有包列表
try:
all_packages = get_all_packages()
print(f"找到 {len(all_packages)} 个包")
except Exception as e:
print(f"获取包列表失败: {str(e)}")
print("请尝试使用清华镜像的简单页面 API (需要处理 HTML)")
# 作为备选方案,可以使用清华镜像的简单页面
# all_packages = get_packages_from_tuna()
return 1
# 处理包
success_count = 0
failed_packages = []
# 分批次处理,避免一次性太多请求
BATch_size = 50
for i in range(0, len(all_packages), batch_size):
batch = all_packages[i:i+batch_size]
print(f"\n处理包批次 {i//batch_size + 1} ({len(batch)} 个包)")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_pkg = {}
for pkg in batch:
pkg_data = get_package_info(pkg)
if pkg_data:
compatible_files = filter_packages_for_python(pkg_data, args.python_version)
if compatible_files:
for file_info in compatible_files:
future = executor.submit(
download_package,
pkg,
file_info,
output_dir
)
future_to_pkg[future] = pkg
# 添加随机延迟,避免触发限制
time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
# 等待并处理结果
for future in as_completed(future_to_pkg):
pkg = future_to_pkg[future]
try:
if future.result():
success_count += 1
else:
failed_packages.append(pkg)
except Exception as e:
print(f"处理包 {pkg} 时出错: {str(e)}")
failed_packages.append(pkg)
# 批次间额外延迟
print(f"\n批次处理完成,等待 {MAX_DELAY*2} 秒...")
time.sleep(MAX_DELAY * 2)
# 生成报告
print("\n===== 下载完成 =====")
print(f"成功: {success_count} 个包")
print(f"失败: {len(failed_packages)} 个包")
if failed_packages:
print("\n失败的包列表 (可后续重试):")
for pkg in failed_www.devze.compackages[:20]: # 只显示前20个
print(f"- {pkg}")
if len(failed_packages) > 20:
print(f"... 及其他 {len(failed_packages) - 20} 个包")
# 保存失败列表以便重试
with open(output_dir / 'failed_packages.txt', 'w') as f:
for pkg in failed_packages:
编程客栈 f.write(pkg + '\n')
return 0
if __name__ == "__main__":
sys.exit(main())
3. 创建包清单生成脚本
scripts/generate_simple_index.py
#!/usr/bin/env python3
import os
import sys
from pathlib import Path
import re
import html
def generate_simple_index(packages_dir, output_dir):
"""生成符合PEP 503的simple index"""
packages_dir = Path(packages_dir)
output_dir = Path(output_dir)
# 创建输出目录
output_dir.mkdir(parents=True, exist_ok=True)
# 收集所有包名
package_names = set()
for filename in os.listdir(packages_dir):
# 从文件名提取包名 (PEP 427 格式)
match = re.match(r'^([a-zA-Z0-9_-]+)(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename)
if match:
package_name = match.group(1).lower()
# 规范化包名 (PEP 503)
package_name = package_name.replace('_', '-')
package_names.add(package_name)
# 为每个包生成索引页面
for package in package_names:
index_content = f'<html><head><title>Links for {package}</title></head>\n'
index_content += f'<body>\n<h1>Links for {package}</h1>\n'
# 找出该包的所有文件
for filename in os.listdir(packages_dir):
http://www.devze.com if re.match(f'^{re.escape(package)}(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename, re.IGNORECASE):
file_url = f'../{filename}'
index_content += f'<a href="{file_url}" rel="external nofollow" >{html.escape(filename)}</a>
\n'
index_content += '</body></html>'
# 保存索引页面
package_dir = output_dir / package
package_dir.mkdir(exist_ok=True)
with open(package_dir / 'index.html', 'w', encoding='utf-8') as f:
f.write(index_content)
# 生成根索引页面
root_index = '<html><head><title>Simple Index</title></head>\n'
root_index += '<body>\n<h1>Simple Index</h1>\n'
for package in sorted(package_names):
root_index += f'<a href="{package}/" rel="external nofollow" >{html.escape(package)}</a>
\n'
root_index += '</body></html>'
with open(output_dir / 'index.html', 'w', encoding='utf-8') as f:
f.write(root_index)
print(f"成功生成 {len(package_names)} 个包的索引")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("用法: python generate_simple_index.py <packages_dir> <output_dir>")
sys.exit(1)
generate_simple_index(sys.argv[1], sys.argv[2])
4. 创建 Docker Compose 配置docker-compose.yml
version: '3.8'
services:
pypi-offline:
image: python:3.9-slim
container_name: pypi-offline
ports:
- "8080:8000"
volumes:
- ./web:/usr/src/app
- ./packages:/usr/src/app/packages
working_dir: /usr/src/app
command: >
sh -c "python -m http.server 8000 --directory /usr/src/app"
restart: unless-stopped
5. 创建 Web 服务目录结构
mkdir -p web/{simple,packages}
6. 创建准备脚本
prepare_offline_mirror.sh
#!/bin/bash
# 准备离线PyPI镜像
PYTHON_VERSION=$1
if [ -z "$PYTHON_VERSION" ]; then
echo "用法: $0 <python版本>"
echo "示例: $0 3.8"
exit 1
fi
# 检查依赖
if ! command -v python3 &> /dev/null; then
echo "错误: 需要安装Python 3"
exit 1
fi
# 1. 安全下载所有兼容指定Python版本的包
echo "步骤1: 安全下载所有兼容Python $PYTHON_VERSION 的包"
python3 scripts/safe_download.py --python-version "$PYTHON_VERSION" --output-dir packages
# 2. 生成simple index
echo "步骤2: 生成PEP 503兼容的simple index"
python3 scripts/generate_simple_index.py packages web/simple
# 3. 创建README
cat > web/README.md << EOF
# 离线PyPI镜像
此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。
## 使用方法
1. 启动服务:
docker-compose up -d
2. 客户端使用:
pip install 包名 -i http://your-server:8080/simple
EOF
echo "离线PyPI镜像准备完成!"
echo "现在可以将整个pypi-offline目录复制到内网环境"
echo "在内网环境中执行: docker-compose up -d 启动服务"
完整操作流程
1. 在可联网的环境中准备镜像
# 克隆项目 git clone https://github.com/your-repo/pypi-offline.git cd pypi-offline # 赋予脚本执行权限 chmod +x scripts/*.py prepare_offline_mirror.sh # 准备Python 3.8的完整镜像 (这将需要较长时间) ./prepare_offline_mirror.sh 3.8 # 准备Python 3.9的完整镜像 (可选) # ./prepare_offline_mirror.sh 3.9
2. 验证下载结果
# 检查包数量 ls packages | wc -l # 检查simple index ls web/simple | wc -l
3. 将整个目录复制到内网环境
# 压缩整个目录 tar -czvf pypi-offline-3.8.tar.gz pypi-offline # 将压缩文件传输到内网环境
4. 在内网环境中部署
# 解压 tar -xzvf pypi-offline-3.8.tar.gz cd pypi-offline # 启动服务 docker-compose up -d
5. 内网客户端使用
# 临时使用 pip install numpy -i http://pypi-offline-server:8080/simple # 永久配置 pip config set global.index-url http://pypi-offline-server:8080/simple
方案优势
1.避免触发下载限制:
- 使用低并发(2个线程)
- 随机延迟(5-15秒)
- 批次处理(每50个包)
- 失败重试机制
- 使用官方PyPI而非清华镜像(更宽松)
2.完整镜像:
- 下载指定Python版本的所有兼容包
- 生成符合PEP 503标准的simple index
- 支持所有pip安装方式
3.轻量级部署:
- 仅需Python内置HTTP服务器
- 无需额外依赖
- 完全静态内容,易于复制
4.可验证性:
- 生成下载报告
- 记录失败包以便重试
- 简单的README说明
注意事项
- 下载时间: 完整镜像需要较长时间(可能数天),因为要遵守严格的下载限制
- 存储空间: Python 3.8的完整镜像约需50-100GB空间
- 分阶段下载: 如果中途失败,可从
failed_packages.txt继续 - 版本选择: 建议选择企业常用的稳定版本(如3.8或3.9)
此方案确保您可以安全地一次性准备完整镜像,然后完全离线使用,完美符合您的内网部署需求。
以上就是Python一次性将指定版本所有包上传PyPI镜像解决方案的详细内容,更多关于Python打包到PyPI的资料请关注编程客栈(www.devze.com)其它相关文章!
加载中,请稍侯......
精彩评论