智能多路径日志分割与归档工具
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
智能多路径日志分割与归档系统 - 增强版
功能:
1. 支持多个文件或目录路径同时处理
2. 先通过-p模式匹配文件,再通过-a天数过滤
3. 可选择保留原文件(截断)或不保留(删除)
4. 按日期分割日志文件
5. 自动压缩归档(跳过>5GB大文件)
6. 保留策略管理
7. 不中断正在运行的应用程序
8. 多线程并行处理
9. 保留源文件属性(类似cp -p)
10. 不指定-b时备份到原路径
"""
import os
import sys
import gzip
import shutil
import argparse
import logging
import fnmatch
import threading
import concurrent.futures
import hashlib
import stat
import locale
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from typing import List, Dict, Any, Optional, Tuple
from queue import Queue
# 设置系统编码
def setup_encoding():
"""设置系统编码,解决中文乱码问题"""
# 设置环境变量
os.environ['PYTHONIOENCODING'] = 'utf-8'
# 尝试设置locale
try:
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
except locale.Error:
try:
locale.setlocale(locale.LC_ALL, 'C.UTF-8')
except locale.Error:
try:
locale.setlocale(locale.LC_ALL, '')
except locale.Error:
pass
# 设置标准流的编码
try:
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8')
except:
pass
# 调用设置编码
setup_encoding()
# 配置日志记录器
def setup_logger(log_file: str, verbose: bool = False) -> logging.Logger:
"""设置日志记录器"""
logger = logging.getLogger("log_rotator")
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# 清除现有的处理器,避免重复
if logger.hasHandlers():
logger.handlers.clear()
# 创建日志目录
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
try:
os.makedirs(log_dir, exist_ok=True)
except Exception as e:
print(f"Create log directory failed: {e}")
try:
# 文件处理器 - 按大小滚动,指定UTF-8编码
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024,
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
logger.addHandler(file_handler)
except Exception as e:
print(f"Create file log handler failed: {e}")
# 控制台处理器
try:
console_handler = logging.StreamHandler()
if verbose:
console_handler.setLevel(logging.DEBUG)
else:
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('[%(levelname)s] %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
except Exception as e:
print(f"Create console log handler failed: {e}")
return logger
class LogRotator:
def __init__(
self,
log_path: str,
backup_dir: Optional[str] = None,
target_date: Optional[datetime] = None,
retention_days: int = 30,
compress: bool = True,
keep_original: bool = True,
logger: Optional[logging.Logger] = None,
max_compress_size: float = 5.0,
preserve_attributes: bool = True
):
"""
初始化日志旋转器
Args:
log_path: 要处理的日志文件路径
backup_dir: 备份目录(None则在原目录)
target_date: 目标处理日期(用于文件名)
retention_days: 保留天数
compress: 是否压缩归档
keep_original: 是否保留原文件(True=截断,False=删除)
logger: 日志记录器
max_compress_size: 最大压缩文件大小(GB)
preserve_attributes: 是否保留源文件属性
"""
self.log_path = os.path.abspath(log_path)
self.target_date = target_date or datetime.now()
self.retention_days = retention_days
self.compress = compress
self.keep_original = keep_original
self.logger = logger or logging.getLogger("log_rotator")
self.max_compress_size = max_compress_size * 1024 * 1024 * 1024
self.preserve_attributes = preserve_attributes
# 重要修复:正确处理备份目录
# 1. 如果指定了备份目录,使用指定的目录
# 2. 如果没有指定备份目录,使用原文件所在目录
if backup_dir is not None:
# 用户指定了备份目录
self.backup_dir = os.path.abspath(backup_dir)
if not os.path.exists(self.backup_dir):
try:
os.makedirs(self.backup_dir, exist_ok=True)
self.logger.info(f"Create backup directory: {self.backup_dir}")
except Exception as e:
self.logger.error(f"Create backup directory failed: {e}")
# 如果创建失败,回退到原文件目录
self.backup_dir = os.path.dirname(self.log_path)
self.logger.info(f"Fallback to original directory: {self.backup_dir}")
else:
# 用户没有指定备份目录,使用原文件所在目录
self.backup_dir = os.path.dirname(self.log_path)
# 记录配置
self.logger.debug(f"LogRotator initialized: path={self.log_path}, backup_dir={self.backup_dir}, keep_original={self.keep_original}")
def rotate_log(self) -> bool:
"""执行日志分割操作"""
try:
# 检查日志文件是否存在
if not os.path.isfile(self.log_path):
self.logger.error(f"Log file not exists: {self.log_path}")
return False
# 检查文件大小
file_size = os.path.getsize(self.log_path)
if file_size == 0:
self.logger.info(f"File is empty, skip: {self.log_path}")
return True
backup_info = f"backup to: {self.backup_dir}" if hasattr(self, 'backup_dir') else "backup to original directory"
self.logger.info(f"Start processing: {self.log_path} (size: {self._format_size(file_size)}, keep_original: {self.keep_original}, {backup_info})")
# 1. 创建日期标记的副本
rotated_file = self._create_dated_copy()
if not rotated_file:
return False
# 2. 根据设置处理原文件
if not self.keep_original:
# 删除原文件
self.logger.info(f"Delete original file: {self.log_path}")
try:
os.remove(self.log_path)
self.logger.info(f"Original file deleted: {self.log_path}")
except Exception as e:
self.logger.error(f"Delete original file failed: {e}")
return False
else:
# 清空原文件
self.logger.info(f"Truncate original file: {self.log_path}")
try:
# 保存原文件属性
original_stat = None
if self.preserve_attributes:
original_stat = os.stat(self.log_path)
# 以二进制模式打开,避免编码问题
with open(self.log_path, 'wb') as f:
f.truncate()
# 恢复属性
if self.preserve_attributes and original_stat:
try:
os.utime(self.log_path, (original_stat.st_atime, original_stat.st_mtime))
except:
pass
# 验证文件确实被截断
new_size = os.path.getsize(self.log_path)
if new_size == 0:
self.logger.info(f"Original file truncated successfully: {self.log_path}")
else:
self.logger.warning(f"Original file may not be fully truncated: {self.log_path} (new size: {new_size})")
except PermissionError as e:
self.logger.error(f"Permission denied to truncate file: {self.log_path} - {e}")
return False
except Exception as e:
self.logger.error(f"Truncate file failed: {self.log_path} - {e}")
return False
# 3. 检查文件大小决定是否压缩
rotated_size = os.path.getsize(rotated_file)
if self.compress and rotated_size < self.max_compress_size:
# 压缩小文件
compressed_file = self._compress_file(rotated_file)
if compressed_file:
# 删除未压缩的副本
if os.path.exists(rotated_file):
os.remove(rotated_file)
rotated_file = compressed_file
self.logger.info(f"Log compressed successfully: {rotated_file}")
else:
self.logger.warning(f"Compression failed, keep uncompressed file: {rotated_file}")
elif self.compress:
# 大文件跳过压缩
size_gb = rotated_size / (1024 * 1024 * 1024)
max_gb = self.max_compress_size / (1024 * 1024 * 1024)
self.logger.warning(
f"Skip compression: file too large ({size_gb:.2f}GB > {max_gb:.0f}GB)"
)
self.logger.info(f"Log rotated successfully: {rotated_file}")
# 4. 清理旧日志
self._cleanup_old_logs()
return True
except Exception as e:
self.logger.error(f"Log rotation failed: {str(e)}", exc_info=True)
return False
def _create_dated_copy(self) -> Optional[str]:
"""创建带日期标记的日志副本(保留源文件属性)"""
try:
# 使用目标日期
date_str = self.target_date.strftime("%Y%m%d")
# 构建新文件名
base_name = os.path.basename(self.log_path)
name, ext = os.path.splitext(base_name)
# 确保扩展名正确(对于压缩文件)
if ext.lower() == '.gz':
name_without_gz = name
name, ext2 = os.path.splitext(name_without_gz)
ext = ext2 + ext
new_name = f"{name}_{date_str}{ext}"
rotated_path = os.path.join(self.backup_dir, new_name)
# 确保目标文件不存在
counter = 1
while os.path.exists(rotated_path):
new_name = f"{name}_{date_str}_{counter}{ext}"
rotated_path = os.path.join(self.backup_dir, new_name)
counter += 1
# 复制文件并保留属性
self.logger.info(f"Create log copy: {self.log_path} -> {rotated_path}")
if self.preserve_attributes:
self._copy_with_attributes(self.log_path, rotated_path)
else:
shutil.copy2(self.log_path, rotated_path)
# 验证复制完整性
src_size = os.path.getsize(self.log_path)
dst_size = os.path.getsize(rotated_path)
if src_size != dst_size:
self.logger.error(f"File size mismatch after copy: src={src_size}, dst={dst_size}")
if os.path.exists(rotated_path):
os.remove(rotated_path)
return None
return rotated_path
except Exception as e:
self.logger.error(f"Create copy failed: {str(e)}")
return None
def _copy_with_attributes(self, src: str, dst: str) -> None:
"""复制文件并保留所有属性(类似Linux的cp -p)"""
try:
# 1. 先复制文件内容
CHUNK_SIZE = 64 * 1024 * 1024
with open(src, 'rb') as f_src:
with open(dst, 'wb') as f_dst:
while True:
chunk = f_src.read(CHUNK_SIZE)
if not chunk:
break
f_dst.write(chunk)
# 2. 获取源文件的所有属性
src_stat = os.stat(src)
# 3. 设置权限
try:
os.chmod(dst, src_stat.st_mode & 0o7777)
except Exception as e:
self.logger.warning(f"Set file permission failed: {e}")
# 4. 设置时间戳
try:
os.utime(dst, (src_stat.st_atime, src_stat.st_mtime))
except Exception as e:
self.logger.warning(f"Set file timestamp failed: {e}")
# 5. 设置所有者和组
try:
os.chown(dst, src_stat.st_uid, src_stat.st_gid)
except (PermissionError, AttributeError):
pass
except Exception as e:
self.logger.debug(f"Set file owner failed: {e}")
self.logger.debug(f"File attributes preserved: {src} -> {dst}")
except Exception as e:
self.logger.error(f"Copy file attributes failed: {e}")
raise
def _compress_file(self, file_path: str) -> Optional[str]:
"""压缩日志文件(分块处理)"""
compressed_path = f"{file_path}.gz"
# 如果压缩文件已存在,添加数字后缀
counter = 1
while os.path.exists(compressed_path):
compressed_path = f"{file_path}.{counter}.gz"
counter += 1
self.logger.info(f"Compress file: {file_path} -> {compressed_path}")
# 分块处理大文件
CHUNK_SIZE = 256 * 1024 * 1024
try:
# 保存源文件属性
original_stat = None
if self.preserve_attributes:
original_stat = os.stat(file_path)
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb', compresslevel=6) as f_out:
while True:
chunk = f_in.read(CHUNK_SIZE)
if not chunk:
break
f_out.write(chunk)
# 验证压缩文件
if os.path.exists(compressed_path) and os.path.getsize(compressed_path) > 0:
# 恢复文件属性
if self.preserve_attributes and original_stat:
self._restore_compressed_file_attributes(compressed_path, original_stat)
return compressed_path
else:
self.logger.error(f"Compressed file creation failed: {compressed_path}")
if os.path.exists(compressed_path):
os.remove(compressed_path)
return None
except Exception as e:
self.logger.error(f"Compress file failed: {str(e)}")
if os.path.exists(compressed_path):
os.remove(compressed_path)
return None
def _restore_compressed_file_attributes(self, file_path: str, original_stat: os.stat_result) -> None:
"""恢复压缩文件的属性"""
try:
mode = original_stat.st_mode & 0o666
os.chmod(file_path, mode)
os.utime(file_path, (original_stat.st_atime, original_stat.st_mtime))
try:
os.chown(file_path, original_stat.st_uid, original_stat.st_gid)
except (PermissionError, AttributeError):
pass
self.logger.debug(f"Compressed file attributes restored: {file_path}")
except Exception as e:
self.logger.warning(f"Restore compressed file attributes failed: {e}")
def _cleanup_old_logs(self) -> None:
"""清理超过保留期限的日志"""
try:
self.logger.info(f"Cleanup old logs in {self.backup_dir}...")
# 计算截止日期
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
cutoff_str = cutoff_date.strftime("%Y%m%d")
# 获取日志文件的基础名称模式
base_name = os.path.basename(self.log_path)
name, ext = os.path.splitext(base_name)
if ext.lower() == '.gz':
name, _ = os.path.splitext(name)
files_deleted = 0
for filename in os.listdir(self.backup_dir):
if filename.startswith(name + '_'):
try:
date_part = filename[len(name)+1:].split('.')[0]
if '_' in date_part:
date_str = date_part.split('_')[0]
else:
date_str = date_part
if len(date_str) == 8 and date_str.isdigit():
if date_str < cutoff_str:
file_path = os.path.join(self.backup_dir, filename)
self.logger.info(f"Delete expired log: {file_path}")
os.remove(file_path)
files_deleted += 1
except Exception as e:
self.logger.debug(f"Skip file {filename}: {str(e)}")
if files_deleted > 0:
self.logger.info(f"Cleanup completed: deleted {files_deleted} expired files")
else:
self.logger.debug("No expired files to cleanup")
except Exception as e:
self.logger.error(f"Cleanup old logs failed: {str(e)}")
@staticmethod
def _format_size(size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
class LogProcessor:
"""日志处理器,支持多线程"""
def __init__(self, max_workers: int = 4, preserve_attributes: bool = True):
self.max_workers = max_workers
self.preserve_attributes = preserve_attributes
self.logger = logging.getLogger("log_rotator")
self.results = {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0
}
def process_files(
self,
log_files: List[str],
backup_dir: Optional[str] = None,
target_date: Optional[datetime] = None,
retention_days: int = 30,
compress: bool = True,
keep_original: bool = True,
max_compress_size: float = 5.0,
dry_run: bool = False,
preserve_attributes: bool = True
) -> Dict[str, int]:
"""处理多个日志文件(支持多线程)"""
self.results = {
'total': len(log_files),
'success': 0,
'failed': 0,
'skipped': 0
}
if dry_run:
self._dry_run_process(log_files, backup_dir, target_date, retention_days,
compress, keep_original, max_compress_size, preserve_attributes)
return self.results
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for log_file in log_files:
future = executor.submit(
self._process_single_file,
log_file,
backup_dir,
target_date,
retention_days,
compress,
keep_original,
max_compress_size,
preserve_attributes
)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
if result == 'success':
self.results['success'] += 1
elif result == 'failed':
self.results['failed'] += 1
else:
self.results['skipped'] += 1
except Exception as e:
self.logger.error(f"Task execution exception: {e}")
self.results['failed'] += 1
return self.results
def _process_single_file(
self,
log_file: str,
backup_dir: Optional[str],
target_date: Optional[datetime],
retention_days: int,
compress: bool,
keep_original: bool,
max_compress_size: float,
preserve_attributes: bool
) -> str:
"""处理单个日志文件"""
try:
if not os.path.exists(log_file):
self.logger.error(f"File not exists: {log_file}")
return 'failed'
if not os.path.isfile(log_file):
self.logger.warning(f"Skip non-file: {log_file}")
return 'skipped'
try:
file_size = os.path.getsize(log_file)
if file_size == 0:
self.logger.info(f"Empty file skip: {log_file}")
return 'skipped'
except OSError as e:
self.logger.error(f"Get file size failed {log_file}: {e}")
return 'skipped'
rotator = LogRotator(
log_path=log_file,
backup_dir=backup_dir,
target_date=target_date,
retention_days=retention_days,
compress=compress,
keep_original=keep_original,
logger=self.logger,
max_compress_size=max_compress_size,
preserve_attributes=preserve_attributes
)
if rotator.rotate_log():
return 'success'
else:
return 'failed'
except Exception as e:
self.logger.error(f"Process file {log_file} error: {e}")
return 'failed'
def _dry_run_process(
self,
log_files: List[str],
backup_dir: Optional[str],
target_date: Optional[datetime],
retention_days: int,
compress: bool,
keep_original: bool,
max_compress_size: float,
preserve_attributes: bool
) -> None:
"""模拟运行"""
date_str = target_date.strftime("%Y%m%d") if target_date else datetime.now().strftime("%Y%m%d")
for log_file in log_files:
try:
if not os.path.exists(log_file):
self.logger.info(f"[DRY RUN] File not exists: {log_file}")
self.results['skipped'] += 1
continue
if not os.path.isfile(log_file):
self.logger.info(f"[DRY RUN] Skip non-file: {log_file}")
self.results['skipped'] += 1
continue
file_stat = os.stat(log_file)
file_mtime_str = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
file_permissions = oct(file_stat.st_mode & 0o777)
file_size = file_stat.st_size
# 重要:正确处理备份目录
if backup_dir:
actual_backup_dir = backup_dir
else:
actual_backup_dir = os.path.dirname(log_file)
base_name = os.path.basename(log_file)
name, ext = os.path.splitext(base_name)
new_name = f"{name}_{date_str}{ext}"
if compress and file_size < max_compress_size * 1024**3:
new_name += ".gz"
compress_action = "Will compress and backup"
elif compress:
compress_action = "Will backup (skip compression, file too large)"
else:
compress_action = "Will backup"
# 处理原文件的方式
if keep_original:
original_action = "truncate original file"
else:
original_action = "delete original file"
attr_info = " (preserve attributes)" if preserve_attributes else ""
self.logger.info(
f"[DRY RUN] {compress_action}, {original_action}{attr_info}: {log_file} -> "
f"{os.path.join(actual_backup_dir, new_name)} "
f"({file_size/(1024 * 1024):.2f}MB, modify time: {file_mtime_str}, permission: {file_permissions})"
)
self.results['success'] += 1
except Exception as e:
self.logger.error(f"[DRY RUN] Process {log_file} error: {e}")
self.results['failed'] += 1
def find_log_files_by_pattern(
paths: List[str],
patterns: List[str],
recursive: bool = False,
exclude_patterns: Optional[List[str]] = None
) -> List[str]:
"""
通过模式匹配查找日志文件
Args:
paths: 文件或目录路径列表
patterns: 包含的文件模式列表
recursive: 是否递归搜索子目录
exclude_patterns: 排除的文件模式列表
Returns:
匹配的文件路径列表
"""
log_files = []
exclude_patterns = exclude_patterns or []
for path in paths:
# 处理通配符
if '*' in path or '?' in path or '[' in path:
import glob
expanded_paths = glob.glob(path, recursive=recursive)
for expanded_path in expanded_paths:
if os.path.isfile(expanded_path):
file_name = os.path.basename(expanded_path)
# 检查包含模式
if any(fnmatch.fnmatch(file_name, pat) for pat in patterns):
# 检查排除模式
if not any(fnmatch.fnmatch(file_name, pat) for pat in exclude_patterns):
log_files.append(os.path.abspath(expanded_path))
continue
# 如果是单个文件
if os.path.isfile(path):
file_name = os.path.basename(path)
if any(fnmatch.fnmatch(file_name, pat) for pat in patterns):
if not any(fnmatch.fnmatch(file_name, pat) for pat in exclude_patterns):
log_files.append(os.path.abspath(path))
continue
# 如果是目录
if os.path.isdir(path):
for root, dirs, files in os.walk(path):
if not recursive and root != path:
continue
for file in files:
file_path = os.path.join(root, file)
if any(fnmatch.fnmatch(file, pat) for pat in patterns):
if not any(fnmatch.fnmatch(file, pat) for pat in exclude_patterns):
log_files.append(os.path.abspath(file_path))
return sorted(list(set(log_files)))
def filter_files_by_days(
file_list: List[str],
days_ago: int,
logger: logging.Logger
) -> List[str]:
"""
按文件修改时间过滤文件列表
Args:
file_list: 文件路径列表
days_ago: 处理多少天之前的文件
logger: 日志记录器
Returns:
过滤后的文件路径列表
"""
if days_ago <= 0:
return file_list
# 计算截止日期
cutoff_date = datetime.now() - timedelta(days=days_ago)
# 设置时间部分为23点59分59秒
cutoff_datetime = datetime(cutoff_date.year, cutoff_date.month, cutoff_date.day, 23, 59, 59)
logger.info(f"Filter files {days_ago} days ago (modify time <= {cutoff_date.strftime('%Y-%m-%d')})")
filtered_files = []
for file_path in file_list:
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime <= cutoff_datetime:
filtered_files.append(file_path)
logger.debug(f"Match: {file_path} (modify time: {file_mtime.strftime('%Y-%m-%d %H:%M:%S')})")
else:
logger.debug(f"Skip (too new): {file_path} "
f"(modify time: {file_mtime.strftime('%Y-%m-%d %H:%M:%S')})")
except Exception as e:
logger.warning(f"Get file modify time failed {file_path}: {e}")
# 如果无法获取修改时间,默认包含
filtered_files.append(file_path)
return filtered_files
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="Smart multi-path log rotation and archiving tool - Two-stage filtering (pattern match first, then days filter)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# 1. Match *.log files first, then filter files 7 days ago, keep original (truncate)
python log_rotator.py /var/log/app.log -p "*.log" -a 7 --keep-original
# 2. Match *.log and *.txt files, filter 30 days ago, delete original
python log_rotator.py /var/log/ -p "*.log" -p "*.txt" -a 30 --no-keep-original
# 3. Specify backup directory
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -b /backup/logs
# 4. Multi-file processing (support wildcards)
python log_rotator.py /var/log/*.log /var/log/nginx/*.log -p "*.log" -a 7
# 5. Recursive processing
python log_rotator.py /var/log/ -R -p "*.log" -p "*.txt" -a 30
# 6. Exclude specific files
python log_rotator.py /var/log/ -p "*.log" -e "access.log" -e "error.log" -a 7
# 7. Multi-threading processing
python log_rotator.py /var/log/*.log -p "*.log" -a 7 -t 8
# 8. Dry run
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -d -v
# 9. No compression
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -n
# 10. Do not preserve file attributes
python log_rotator.py /var/log/app.log -p "*.log" -a 7 --no-preserve
Filter process:
1. First use -p parameter for pattern matching
2. Then use -a parameter to filter by modify time
3. Finally process files that meet the conditions
"""
)
# 必需参数
parser.add_argument("paths", nargs='+',
help="Log file path or directory path (multiple, support wildcards)")
# 包含模式参数
parser.add_argument("-p", "--pattern", action="append", default=["*.log"],
help="File match pattern (can be used multiple times, default: *.log)")
# 排除模式参数
parser.add_argument("-e", "--exclude", action="append", default=[],
help="Exclude file pattern (can be used multiple times)")
# 时间过滤参数
parser.add_argument("-a", "--days-ago", type=int, default=0,
help="Process files before specified days (0=no filter)")
# 保留原文件参数
parser.add_argument("--keep-original", action="store_true", default=True,
help="Keep original file (truncate) [default]")
parser.add_argument("--no-keep-original", action="store_false", dest="keep_original",
help="Do not keep original file (delete)")
# 其他参数
parser.add_argument("-r", "--retention", type=int, default=30,
help="Log retention days (default: 30)")
parser.add_argument("-b", "--backup-dir",
help="Backup directory (default: original file directory)")
parser.add_argument("-n", "--no-compress", action="store_true",
help="Disable compression")
parser.add_argument("-l", "--log", default="/var/log/log_rotator.log",
help="This script log file path (default: /var/log/log_rotator.log)")
parser.add_argument("-R", "--recursive", action="store_true",
help="Recursively process subdirectories")
parser.add_argument("-d", "--dry-run", action="store_true",
help="Dry run, do not actually modify files")
parser.add_argument("-m", "--max-compress", type=float, default=5.0,
help="Maximum compressed file size (GB) (default: 5.0)")
parser.add_argument("-t", "--threads", type=int, default=4,
help="Processing thread count (default: 4)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable verbose log output")
parser.add_argument("--no-preserve", action="store_true",
help="Do not preserve source file attributes (default preserve)")
args = parser.parse_args()
# 设置日志记录器
logger = setup_logger(args.log, args.verbose)
try:
# 显示配置信息
logger.info("=" * 60)
logger.info("Smart log rotation and archiving tool start")
logger.info("=" * 60)
logger.info("Configuration:")
logger.info(f" Search paths: {args.paths}")
logger.info(f" Include patterns: {args.pattern}")
logger.info(f" Exclude patterns: {args.exclude}")
logger.info(f" Time filter: {args.days_ago} days ago")
logger.info(f" Keep original: {args.keep_original}")
# 明确显示备份目录设置
if args.backup_dir:
logger.info(f" Backup directory: {args.backup_dir}")
else:
logger.info(f" Backup directory: original file directory")
logger.info(f" Retention days: {args.retention}")
logger.info(f" Compress files: {not args.no_compress}")
logger.info(f" Threads: {args.threads}")
logger.info(f" Preserve attributes: {not args.no_preserve}")
logger.info(f" Dry run: {args.dry_run}")
logger.info("=" * 60)
# 步骤1: 通过模式匹配查找文件
logger.info("Stage 1: Find files by pattern matching...")
matched_files = find_log_files_by_pattern(
paths=args.paths,
patterns=args.pattern,
recursive=args.recursive,
exclude_patterns=args.exclude
)
if not matched_files:
logger.warning(f"No matching files found: {args.paths} (pattern: {args.pattern})")
sys.exit(0)
logger.info(f"Pattern matching found {len(matched_files)} files")
# 步骤2: 按天数过滤文件
filtered_files = matched_files
if args.days_ago > 0:
logger.info(f"Stage 2: Filter files by modify time {args.days_ago} days ago...")
filtered_files = filter_files_by_days(matched_files, args.days_ago, logger)
if not filtered_files:
logger.warning("No qualified files to process")
sys.exit(0)
logger.info(f"After two-stage filtering, found {len(filtered_files)} qualified files")
# 显示文件列表
if args.verbose and len(filtered_files) <= 20:
logger.debug("Qualified files:")
for i, file_path in enumerate(filtered_files, 1):
try:
file_size = os.path.getsize(file_path)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
file_mtime_str = file_mtime.strftime('%Y-%m-%d %H:%M:%S')
days_diff = (datetime.now() - file_mtime).days
logger.debug(f" {i:3d}. {file_path} "
f"({file_size/(1024 * 1024):.2f}MB, "
f"{file_mtime_str}, {days_diff} days ago)")
except:
logger.debug(f" {i:3d}. {file_path}")
# 创建日志处理器
processor = LogProcessor(
max_workers=args.threads,
preserve_attributes=not args.no_preserve
)
# 计算目标日期(用于备份文件名)
if args.days_ago > 0:
cutoff_date = datetime.now() - timedelta(days=args.days_ago)
process_target_date = cutoff_date
else:
process_target_date = datetime.now()
# 处理文件
results = processor.process_files(
log_files=filtered_files,
backup_dir=args.backup_dir,
target_date=process_target_date,
retention_days=args.retention,
compress=not args.no_compress,
keep_original=args.keep_original,
max_compress_size=args.max_compress,
dry_run=args.dry_run,
preserve_attributes=not args.no_preserve
)
# 输出结果摘要
logger.info("=" * 60)
logger.info("Process completed summary:")
logger.info(f"Pattern match: {args.pattern}")
logger.info(f"Time filter: {args.days_ago} days ago")
logger.info(f"Keep original: {args.keep_original}")
logger.info(f"Total files: {results['total']}")
logger.info(f"Success: {results['success']}")
logger.info(f"Failed: {results['failed']}")
logger.info(f"Skipped: {results['skipped']}")
if args.dry_run:
logger.info("This is a dry run, no files were actually modified")
logger.info("=" * 60)
if results['failed'] > 0:
logger.error(f"There are {results['failed']} files processing failed")
sys.exit(1)
else:
logger.info("All files processed successfully")
sys.exit(0)
except KeyboardInterrupt:
logger.info("User interrupted operation")
sys.exit(130)
except Exception as e:
logger.critical(f"Program terminated abnormally: {str(e)}", exc_info=True)
sys.exit(2)
if __name__ == "__main__":
main()
[root@testlogs]# log_rotator -h
optional arguments:
-h, --help show this help message and exit
-p PATTERN, --pattern PATTERN
File match pattern (can be used multiple times,
default: *.log)
-e EXCLUDE, --exclude EXCLUDE
Exclude file pattern (can be used multiple times)
-a DAYS_AGO, --days-ago DAYS_AGO
Process files before specified days (0=no filter)
--keep-original Keep original file (truncate) [default]
--no-keep-original Do not keep original file (delete)
-r RETENTION, --retention RETENTION
Log retention days (default: 30)
-b BACKUP_DIR, --backup-dir BACKUP_DIR
Backup directory (default: original file directory)
-n, --no-compress Disable compression
-l LOG, --log LOG This script log file path (default:
/var/log/log_rotator.log)
-R, --recursive Recursively process subdirectories
-d, --dry-run Dry run, do not actually modify files
-m MAX_COMPRESS, --max-compress MAX_COMPRESS
Maximum compressed file size (GB) (default: 5.0)
-t THREADS, --threads THREADS
Processing thread count (default: 4)
-v, --verbose Enable verbose log output
--no-preserve Do not preserve source file attributes (default
preserve)
Examples:
# 1. Match *.log files first, then filter files 7 days ago, keep original (truncate)
python log_rotator.py /var/log/app.log -p "*.log" -a 7 --keep-original
# 2. Match *.log and *.txt files, filter 30 days ago, delete original
python log_rotator.py /var/log/ -p "*.log" -p "*.txt" -a 30 --no-keep-original
# 3. Specify backup directory
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -b /backup/logs
# 4. Multi-file processing (support wildcards)
python log_rotator.py /var/log/*.log /var/log/nginx/*.log -p "*.log" -a 7
# 5. Recursive processing
python log_rotator.py /var/log/ -R -p "*.log" -p "*.txt" -a 30
# 6. Exclude specific files
python log_rotator.py /var/log/ -p "*.log" -e "access.log" -e "error.log" -a 7
# 7. Multi-threading processing
python log_rotator.py /var/log/*.log -p "*.log" -a 7 -t 8
# 8. Dry run
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -d -v
# 9. No compression
python log_rotator.py /var/log/app.log -p "*.log" -a 7 -n
# 10. Do not preserve file attributes
python log_rotator.py /var/log/app.log -p "*.log" -a 7 --no-preserve
Filter process:
1. First use -p parameter for pattern matching
2. Then use -a parameter to filter by modify time
3. Finally process files that meet the conditions
[root@test logs]# log_rotator /data/fastdfs4/storage/logs /data/fastdfs/tracker/logs/ -r 90
[INFO] 找到 3 个日志文件需要处理
[INFO] 开始处理: /data/fastdfs4/storage/logs/storaged.log
[INFO] 创建日志副本: /data/fastdfs4/storage/logs/storaged_20250725.log
[INFO] 压缩文件: /data/fastdfs4/storage/logs/storaged_20250725.log
[INFO] 成功压缩日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 开始处理: /data/fastdfs4/storage/logs/trackerd.log
[INFO] 创建日志副本: /data/fastdfs4/storage/logs/trackerd_20250725.log
[INFO] 压缩文件: /data/fastdfs4/storage/logs/trackerd_20250725.log
[INFO] 成功压缩日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 开始处理: /data/fastdfs/tracker/logs/trackerd.log
[INFO] 创建日志副本: /data/fastdfs/tracker/logs/trackerd_20250725.log
[INFO] 压缩文件: /data/fastdfs/tracker/logs/trackerd_20250725.log
[INFO] 成功压缩日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 处理完成: 3/3 个文件成功
[root@test logs]# tail -100f /var/log/log_rotator.log
2025-07-25 11:33:12,277 [INFO] 找到 3 个日志文件需要处理
2025-07-25 11:33:12,278 [INFO] 开始处理: /data/fastdfs4/storage/logs/storaged.log
2025-07-25 11:33:12,278 [INFO] 创建日志副本: /data/fastdfs4/storage/logs/storaged_20250725.log
2025-07-25 11:33:13,818 [INFO] 压缩文件: /data/fastdfs4/storage/logs/storaged_20250725.log
2025-07-25 11:33:15,566 [INFO] 成功压缩日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
2025-07-25 11:33:15,566 [INFO] 成功分割日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
2025-07-25 11:33:15,566 [INFO] 清理旧日志...
2025-07-25 11:33:15,569 [INFO] 开始处理: /data/fastdfs4/storage/logs/trackerd.log
2025-07-25 11:33:15,569 [INFO] 创建日志副本: /data/fastdfs4/storage/logs/trackerd_20250725.log
2025-07-25 11:33:15,569 [INFO] 压缩文件: /data/fastdfs4/storage/logs/trackerd_20250725.log
2025-07-25 11:33:15,570 [INFO] 成功压缩日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,570 [INFO] 成功分割日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,570 [INFO] 清理旧日志...
2025-07-25 11:33:15,570 [INFO] 开始处理: /data/fastdfs/tracker/logs/trackerd.log
2025-07-25 11:33:15,570 [INFO] 创建日志副本: /data/fastdfs/tracker/logs/trackerd_20250725.log
2025-07-25 11:33:15,992 [INFO] 压缩文件: /data/fastdfs/tracker/logs/trackerd_20250725.log
2025-07-25 11:33:15,992 [INFO] 成功压缩日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,993 [INFO] 成功分割日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,993 [INFO] 清理旧日志...
2025-07-25 11:33:15,993 [INFO] 处理完成: 3/3 个文件成功
# 处理单个文件
log_rotator /var/log/app.log
# 处理多个文件
log_rotator /var/log/app1.log /var/log/app2.log
# 处理多个目录
log_rotator /var/log/apps /opt/logs /tmp/logs
# 混合处理文件和目录
log_rotator /var/log/system.log /var/log/apps /tmp/special.log
# 递归处理多个目录
log_rotator /var/log /opt/logs -R
# 使用多个匹配模式
log_rotator /var/log -p "*.log" -p "*.txt" -p "app_*"
# 不同路径使用不同保留策略(需要多次调用)
log_rotator /var/log/apps -r 30
log_rotator /var/log/system -r 90
#过滤查找一天前的文件,压缩,删除原文件,然后移动到备份目录下
log_rotator_v4 -R ./test/ -p messages* -p freeswitch.log.* -p *.gz -a 1 -l ./test/log_rotator.log --no-keep-original -b /home/devops/Python/log_rotator/bak_log
评论区