背景介绍
在日常使用电脑时,我们经常会遇到重复文件占用大量存储空间的问题——比如下载多次的文档、备份的照片副本等。手动查找和清理这些重复文件不仅耗时,还容易遗漏。为此,我开发了一个本地文件重复项检测与清理工具,它能自动扫描目录中的重复文件,并提供灵活的清理方案。
这个工具的核心价值在于:
– 精准检测:通过文件内容哈希值识别重复项,避免文件名不同但内容相同的遗漏;
– 灵活清理:支持自动保留最早创建/最新修改的文件,或手动选择删除项;
– 安全可靠:删除前强制确认,防止误操作;
– 轻量无依赖:仅使用Python标准库,无需安装额外框架。
对于开发者而言,这个项目是练习文件系统操作、哈希算法应用、命令行交互的绝佳案例。
实现思路
工具的实现分为五个核心步骤:
1. 目录遍历
使用os.walk递归(或非递归)遍历目标目录,收集所有文件的路径和元数据(大小、创建时间、修改时间)。
2. 哈希计算
对每个文件的内容分块读取(避免大文件占用内存),用MD5算法生成唯一哈希值——相同内容的文件会生成相同的哈希值。
3. 重复文件分组
用字典将哈希值映射到文件列表,过滤出包含多个文件的分组(即重复项)。
4. 命令行交互
通过菜单让用户选择清理方式:自动保留指定文件、手动选择删除项,并在执行前确认操作。
5. 清理与结果输出
根据用户选择删除重复文件,统计删除数量和释放空间,并输出结果。
完整代码实现
以下是工具的完整Python代码,包含详细注释:
import os
import hashlib
from datetime import datetime
import sys
def format_size(size_in_bytes: int) -> str:
"""格式化文件大小(B/KB/MB/GB)"""
if size_in_bytes < 1024:
return f"{size_in_bytes}B"
elif size_in_bytes < 1024 * 1024:
return f"{size_in_bytes / 1024:.1f}KB"
elif size_in_bytes < 1024 * 1024 * 1024:
return f"{size_in_bytes / (1024*1024):.1f}MB"
else:
return f"{size_in_bytes / (1024*1024*1024):.1f}GB"
def compute_file_hash(file_path: str, chunk_size: int = 4096) -> str | None:
"""计算文件内容的MD5哈希值(分块读取大文件)"""
try:
with open(file_path, 'rb') as f:
md5_hash = hashlib.md5()
while chunk := f.read(chunk_size):
md5_hash.update(chunk)
return md5_hash.hexdigest()
except PermissionError:
print(f"⚠️ 无权限读取文件:{file_path}")
return None
except FileNotFoundError:
print(f"⚠️ 文件不存在:{file_path}")
return None
except Exception as e:
print(f"❌ 计算哈希失败:{file_path} → {str(e)}")
return None
def scan_directory(directory: str, recursive: bool) -> list[dict]:
"""扫描目录,收集文件元数据(路径、大小、创建/修改时间)"""
files_info = []
for root, dirs, files in os.walk(directory):
for file_name in files:
file_path = os.path.join(root, file_name)
try:
stat = os.stat(file_path)
files_info.append({
"path": file_path,
"size": stat.st_size,
"create_time": datetime.fromtimestamp(stat.st_ctime),
"modify_time": datetime.fromtimestamp(stat.st_mtime)
})
except PermissionError:
print(f"⚠️ 无权限访问文件:{file_path}")
except Exception as e:
print(f"❌ 处理文件失败:{file_path} → {str(e)}")
if not recursive:
break # 非递归模式下仅扫描当前目录
return files_info
def group_duplicates(files_info: list[dict]) -> dict[str, list[dict]]:
"""按哈希值分组重复文件"""
hash_map = {}
for file in files_info:
file_hash = compute_file_hash(file["path"])
if not file_hash:
continue
if file_hash not in hash_map:
hash_map[file_hash] = []
hash_map[file_hash].append(file)
# 过滤出重复组(文件数>1)
return {h: files for h, files in hash_map.items() if len(files) > 1}
def show_duplicate_groups(duplicate_groups: dict[str, list[dict]]) -> None:
"""格式化显示重复文件组"""
if not duplicate_groups:
print("✅ 未发现重复文件")
return
print("\n--- 重复文件列表 ---")
for group_idx, (file_hash, files) in enumerate(duplicate_groups.items(), 1):
print(f"\n【重复组{group_idx}】哈希值:{file_hash}")
for file_idx, file in enumerate(files, 1):
size_str = format_size(file["size"])
create_time = file["create_time"].strftime("%Y-%m-%d %H:%M:%S")
modify_time = file["modify_time"].strftime("%Y-%m-%d %H:%M:%S")
print(f"{file_idx}. {file['path']} → 大小:{size_str} → 创建时间:{create_time} → 修改时间:{modify_time}")
def clean_duplicates(duplicate_groups: dict[str, list[dict]], option: int) -> tuple[list[str], int]:
"""执行重复文件清理"""
deleted_files = []
total_freed = 0
for file_hash, files in duplicate_groups.items():
if option == 1:
# 保留最早创建的文件
sorted_files = sorted(files, key=lambda x: x["create_time"])
to_delete = sorted_files[1:]
elif option == 2:
# 保留最新修改的文件
sorted_files = sorted(files, key=lambda x: x["modify_time"], reverse=True)
to_delete = sorted_files[1:]
elif option == 3:
# 手动选择删除项
print(f"\n处理重复组(哈希:{file_hash}):")
for idx, file in enumerate(files, 1):
print(f"{idx}. {file['path']} → 大小:{format_size(file['size'])}")
selected = input("请输入要删除的文件编号(逗号分隔,如1,3):").strip()
selected_indices = [int(i)-1 for i in selected.split(",") if i.strip().isdigit()]
to_delete = [files[i] for i in selected_indices if 0 <= i < len(files)]
else:
print("❌ 无效选项")
return [], 0
# 执行删除操作
for file in to_delete:
try:
os.remove(file["path"])
deleted_files.append(file["path"])
total_freed += file["size"]
except PermissionError:
print(f"⚠️ 无法删除:{file['path']}(权限不足)")
except FileNotFoundError:
print(f"⚠️ 文件已不存在:{file['path']}")
except Exception as e:
print(f"❌ 删除失败:{file['path']} → {str(e)}")
return deleted_files, total_freed
def main():
"""工具主入口"""
print("=== 重复文件清理工具 ===")
# 1. 获取用户输入
directory = input("请输入要扫描的目录路径:").strip()
if not os.path.isdir(directory):
print("❌ 错误:输入的路径不是有效目录")
sys.exit(1)
recursive = input("是否递归扫描子目录?(y/n):").strip().lower() == "y"
# 2. 扫描目录
print("\n开始扫描...")
files_info = scan_directory(directory, recursive)
print(f"扫描完成:共检测到 {len(files_info)} 个文件")
# 3. 分组重复文件
duplicate_groups = group_duplicates(files_info)
num_groups = len(duplicate_groups)
print(f"发现 {num_groups} 组重复项\n")
if num_groups == 0:
sys.exit(0)
# 4. 显示重复文件
show_duplicate_groups(duplicate_groups)
# 5. 选择清理方式
while True:
option_input = input("\n请选择清理方式:\n1. 保留最早创建的文件\n2. 保留最新修改的文件\n3. 手动选择删除项\n输入选项编号:").strip()
if option_input in ["1", "2", "3"]:
option = int(option_input)
break
print("❌ 无效选项,请重新输入")
# 6. 确认清理
confirm = input("确认执行清理操作?(y/n):").strip().lower()
if confirm != "y":
print("✅ 取消清理")
sys.exit(0)
# 7. 执行清理并输出结果
deleted_files, total_freed = clean_duplicates(duplicate_groups, option)
print("\n--- 清理结果 ---")
if deleted_files:
print(f"成功删除 {len(deleted_files)} 个文件:")
for file in deleted_files:
print(f"- {file}")
print(f"\n释放总空间:{format_size(total_freed)}")
else:
print("未删除任何文件")
print("✅ 清理完成!")
if __name__ == "__main__":
main()
核心技术点解析
- 文件哈希计算:
对文件内容分块读取(chunk_size=4096),避免大文件占用过多内存。使用MD5算法生成哈希值,确保相同内容的文件得到相同的哈希。 -
元数据获取:
通过os.stat获取文件的st_size(大小)、st_ctime(创建时间)、st_mtime(修改时间),并转换为人类可读的格式。 -
重复分组:
用字典hash_map将哈希值映射到文件列表,快速筛选出重复组(文件数>1)。 -
命令行交互:
通过input函数实现菜单选择和确认流程,确保用户操作的安全性。 -
异常处理:
捕获PermissionError(权限不足)、FileNotFoundError(文件不存在)等常见异常,保证工具稳定运行。
总结
这个工具完全基于Python标准库实现,覆盖了所有需求功能。通过开发它,我巩固了以下技能:
– 文件系统操作(os.walk/os.stat);
– 哈希算法应用(hashlib);
– 命令行交互逻辑;
– 异常处理与错误提示。
未来可以扩展的功能包括:
– 支持SHA-256等更安全的哈希算法;
– 添加图形界面(如Tkinter);
– 允许排除特定文件类型或目录;
– 生成清理报告(如CSV文件)。
如果你有任何改进建议,欢迎在评论区留言!
“`