目 录CONTENT

文章目录

智能多路径日志分割与归档工具

平凡的运维之路
2025-07-25 / 0 评论 / 0 点赞 / 6 阅读 / 24164 字

智能多路径日志分割与归档工具

  • 详细代码
#!/usr/bin/env python3
"""
智能多路径日志分割与归档系统
功能:
1. 支持多个文件或目录路径同时处理
2. 按日期分割日志文件
3. 自动压缩归档(跳过>5GB大文件)
4. 保留策略管理
5. 不中断正在运行的应用程序
"""

import os
import sys
import gzip
import shutil
import argparse
import logging
import fnmatch
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler

# 配置日志记录器
def setup_logger(log_file):
    logger = logging.getLogger("log_rotator")
    logger.setLevel(logging.INFO)
    
    # 创建日志目录
    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    
    # 文件处理器 - 按大小滚动
    file_handler = RotatingFileHandler(
        log_file, maxBytes=10 * 1024 * 1024, backupCount=5
    )
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s [%(levelname)s] %(message)s'
    ))
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(
        '[%(levelname)s] %(message)s'
    ))
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

class LogRotator:
    def __init__(self, log_path, retention_days=30, compress=True, logger=None, max_compress_size=5):
        """
        初始化日志旋转器
        :param log_path: 要处理的日志文件路径
        :param retention_days: 保留天数
        :param compress: 是否压缩归档
        :param logger: 日志记录器
        :param max_compress_size: 最大压缩文件大小(GB)
        """
        self.log_path = log_path
        self.retention_days = retention_days
        self.compress = compress
        self.logger = logger or logging.getLogger("log_rotator")
        self.max_compress_size = max_compress_size * 1024 * 1024 * 1024  # 转换为字节
        
        # 验证日志文件存在
        if not os.path.isfile(self.log_path):
            self.logger.error(f"日志文件不存在: {self.log_path}")
            raise FileNotFoundError(f"日志文件不存在: {self.log_path}")
    
    def rotate_log(self):
        """执行日志分割操作"""
        try:
            # 1. 创建日期标记的副本
            rotated_file = self._create_dated_copy()
            
            # 2. 检查文件大小决定是否压缩
            file_size = os.path.getsize(rotated_file)
            
            if self.compress and file_size < self.max_compress_size:
                # 压缩小文件
                compressed_file = self._compress_file(rotated_file)
                # 删除未压缩的副本
                os.remove(rotated_file)
                rotated_file = compressed_file
                self.logger.info(f"成功压缩日志: {rotated_file}")
            elif self.compress:
                # 大文件跳过压缩
                self.logger.warning(
                    f"跳过压缩: 文件过大 ({file_size/(1024 * 1024 * 1024):.2f}GB > {self.max_compress_size/(1024 * 1024 * 1024)}GB)"
                )
            
            self.logger.info(f"成功分割日志: {rotated_file}")
            
            # 3. 清理旧日志
            self._cleanup_old_logs()
            
            return True
        except Exception as e:
            self.logger.error(f"日志分割失败: {str(e)}", exc_info=True)
            return False
    
    def _create_dated_copy(self):
        """创建带日期标记的日志副本"""
        # 获取当前日期
        today = datetime.now().strftime("%Y%m%d")
        
        # 构建新文件名
        dir_name = os.path.dirname(self.log_path)
        base_name = os.path.basename(self.log_path)
        name, ext = os.path.splitext(base_name)
        new_name = f"{name}_{today}{ext}"
        rotated_path = os.path.join(dir_name, new_name)
        
        # 复制文件(使用重命名确保原子性)
        self.logger.info(f"创建日志副本: {rotated_path}")
        shutil.copy2(self.log_path, rotated_path)
        
        # 清空原始日志文件(不删除,只截断)
        with open(self.log_path, 'w') as f:
            f.truncate()
        
        return rotated_path
    
    def _compress_file(self, file_path):
        """压缩日志文件(分块处理)"""
        self.logger.info(f"压缩文件: {file_path}")
        compressed_path = f"{file_path}.gz"
        
        # 分块处理大文件
        CHUNK_SIZE = 256 * 1024 * 1024  # 256MB分块
        
        try:
            with open(file_path, 'rb') as f_in:
                with gzip.open(compressed_path, 'wb') as f_out:
                    while True:
                        chunk = f_in.read(CHUNK_SIZE)
                        if not chunk:
                            break
                        f_out.write(chunk)
            
            # 验证压缩文件
            if os.path.exists(compressed_path) and os.path.getsize(compressed_path) > 0:
                return compressed_path
            else:
                raise IOError(f"压缩文件创建失败: {compressed_path}")
        except Exception as e:
            # 清理不完整的压缩文件
            if os.path.exists(compressed_path):
                os.remove(compressed_path)
            raise
    
    def _cleanup_old_logs(self):
        """清理超过保留期限的日志"""
        self.logger.info("清理旧日志...")
        dir_name = os.path.dirname(self.log_path)
        base_name = os.path.basename(self.log_path)
        name, ext = os.path.splitext(base_name)
        
        # 计算截止日期
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)
        
        # 遍历日志目录
        for filename in os.listdir(dir_name):
            if filename.startswith(name) and filename != base_name:
                try:
                    # 从文件名提取日期
                    date_str = filename[len(name)+1:].split('.')[0]
                    file_date = datetime.strptime(date_str, "%Y%m%d")
                    
                    # 检查是否过期
                    if file_date < cutoff_date:
                        file_path = os.path.join(dir_name, filename)
                        self.logger.info(f"删除过期日志: {file_path}")
                        os.remove(file_path)
                except (ValueError, IndexError):
                    # 文件名格式不匹配,跳过
                    continue

def find_log_files(paths, patterns, recursive=False):
    """
    查找匹配的日志文件(支持多个路径)
    :param paths: 文件或目录路径列表
    :param patterns: 文件模式列表 (如 ['*.log', '*.txt'])
    :param recursive: 是否递归搜索子目录
    :return: 匹配的文件路径列表
    """
    log_files = []
    
    for path in paths:
        # 如果是单个文件
        if os.path.isfile(path):
            if any(fnmatch.fnmatch(path, pat) for pat in patterns):
                log_files.append(path)
            continue
        
        # 如果是目录
        if os.path.isdir(path):
            for root, dirs, files in os.walk(path):
                if not recursive and root != path:
                    continue
                    
                for file in files:
                    file_path = os.path.join(root, file)
                    if any(fnmatch.fnmatch(file, pat) for pat in patterns):
                        log_files.append(file_path)
    
    return log_files

def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description="智能多路径日志分割与归档工具")
    parser.add_argument("paths", nargs='+', help="日志文件路径或目录路径(可多个)")
    parser.add_argument("-p", "--pattern", action="append", default=["*.log"],
                        help="文件匹配模式 (可多次使用,默认: *.log)")
    parser.add_argument("-r", "--retention", type=int, default=30,
                        help="日志保留天数 (默认: 30)")
    parser.add_argument("-n", "--no-compress", action="store_true",
                        help="禁用压缩")
    parser.add_argument("-l", "--log", default="/var/log/log_rotator.log",
                        help="本脚本的日志文件路径")
    parser.add_argument("-R", "--recursive", action="store_true",
                        help="递归处理子目录")
    parser.add_argument("-d", "--dry-run", action="store_true",
                        help="模拟运行,不实际修改文件")
    parser.add_argument("-m", "--max-compress", type=float, default=5.0,
                        help="最大压缩文件大小(GB) (默认: 5GB)")
    args = parser.parse_args()
    
    # 设置日志记录器
    logger = setup_logger(args.log)
    
    try:
        # 查找匹配的日志文件
        log_files = find_log_files(args.paths, args.pattern, args.recursive)
        
        if not log_files:
            logger.warning(f"未找到匹配的日志文件: {args.paths} (模式: {args.pattern})")
            sys.exit(0)
            
        logger.info(f"找到 {len(log_files)} 个日志文件需要处理")
        
        # 处理每个日志文件
        success_count = 0
        for log_file in log_files:
            try:
                logger.info(f"开始处理: {log_file}")
                
                if args.dry_run:
                    # 模拟运行:检查文件大小但不实际处理
                    file_size = os.path.getsize(log_file) if os.path.exists(log_file) else 0
                    compress_status = "跳过压缩" if file_size > args.max_compress * 1024**3 else "将压缩"
                    logger.info(f"[DRY RUN] {compress_status}: {log_file} ({file_size/(1024 * 1024):.2f}MB)")
                    success_count += 1
                    continue
                
                rotator = LogRotator(
                    log_path=log_file,
                    retention_days=args.retention,
                    compress=not args.no_compress,
                    logger=logger,
                    max_compress_size=args.max_compress
                )
                
                if rotator.rotate_log():
                    success_count += 1
                else:
                    logger.error(f"处理失败: {log_file}")
            except Exception as e:
                logger.error(f"处理文件时出错: {log_file} - {str(e)}", exc_info=True)
        
        # 输出结果摘要
        logger.info(f"处理完成: {success_count}/{len(log_files)} 个文件成功")
        
        if success_count == len(log_files):
            sys.exit(0)
        else:
            sys.exit(1)
            
    except Exception as e:
        logger.critical(f"程序异常终止: {str(e)}", exc_info=True)
        sys.exit(2)

if __name__ == "__main__":
    main()
  • 脚本相关参数说明
[root@testlogs]# log_rotator  -h
usage: log_rotator [-h] [-p PATTERN] [-r RETENTION] [-n] [-l LOG] [-R] [-d]
                   [-m MAX_COMPRESS]
                   paths [paths ...]

智能多路径日志分割与归档工具

positional arguments:
  paths                 日志文件路径或目录路径(可多个)

optional arguments:
  -h, --help            show this help message and exit
  -p PATTERN, --pattern PATTERN
                        文件匹配模式 (可多次使用,默认: *.log)
  -r RETENTION, --retention RETENTION
                        日志保留天数 (默认: 30)
  -n, --no-compress     禁用压缩
  -l LOG, --log LOG     本脚本的日志文件路径
  -R, --recursive       递归处理子目录
  -d, --dry-run         模拟运行,不实际修改文件
  -m MAX_COMPRESS, --max-compress MAX_COMPRESS
                        最大压缩文件大小(GB) (默认: 5GB)
  • 执行过程,脚本已经封转成二进制
[root@test logs]# log_rotator    /data/fastdfs4/storage/logs   /data/fastdfs/tracker/logs/    -r 90
[INFO] 找到 3 个日志文件需要处理
[INFO] 开始处理: /data/fastdfs4/storage/logs/storaged.log
[INFO] 创建日志副本: /data/fastdfs4/storage/logs/storaged_20250725.log
[INFO] 压缩文件: /data/fastdfs4/storage/logs/storaged_20250725.log
[INFO] 成功压缩日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 开始处理: /data/fastdfs4/storage/logs/trackerd.log
[INFO] 创建日志副本: /data/fastdfs4/storage/logs/trackerd_20250725.log
[INFO] 压缩文件: /data/fastdfs4/storage/logs/trackerd_20250725.log
[INFO] 成功压缩日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 开始处理: /data/fastdfs/tracker/logs/trackerd.log
[INFO] 创建日志副本: /data/fastdfs/tracker/logs/trackerd_20250725.log
[INFO] 压缩文件: /data/fastdfs/tracker/logs/trackerd_20250725.log
[INFO] 成功压缩日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
[INFO] 成功分割日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
[INFO] 清理旧日志...
[INFO] 处理完成: 3/3 个文件成功
  • 脚本默认日志文件查看
[root@test logs]# tail -100f /var/log/log_rotator.log 
2025-07-25 11:33:12,277 [INFO] 找到 3 个日志文件需要处理
2025-07-25 11:33:12,278 [INFO] 开始处理: /data/fastdfs4/storage/logs/storaged.log
2025-07-25 11:33:12,278 [INFO] 创建日志副本: /data/fastdfs4/storage/logs/storaged_20250725.log
2025-07-25 11:33:13,818 [INFO] 压缩文件: /data/fastdfs4/storage/logs/storaged_20250725.log
2025-07-25 11:33:15,566 [INFO] 成功压缩日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
2025-07-25 11:33:15,566 [INFO] 成功分割日志: /data/fastdfs4/storage/logs/storaged_20250725.log.gz
2025-07-25 11:33:15,566 [INFO] 清理旧日志...
2025-07-25 11:33:15,569 [INFO] 开始处理: /data/fastdfs4/storage/logs/trackerd.log
2025-07-25 11:33:15,569 [INFO] 创建日志副本: /data/fastdfs4/storage/logs/trackerd_20250725.log
2025-07-25 11:33:15,569 [INFO] 压缩文件: /data/fastdfs4/storage/logs/trackerd_20250725.log
2025-07-25 11:33:15,570 [INFO] 成功压缩日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,570 [INFO] 成功分割日志: /data/fastdfs4/storage/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,570 [INFO] 清理旧日志...
2025-07-25 11:33:15,570 [INFO] 开始处理: /data/fastdfs/tracker/logs/trackerd.log
2025-07-25 11:33:15,570 [INFO] 创建日志副本: /data/fastdfs/tracker/logs/trackerd_20250725.log
2025-07-25 11:33:15,992 [INFO] 压缩文件: /data/fastdfs/tracker/logs/trackerd_20250725.log
2025-07-25 11:33:15,992 [INFO] 成功压缩日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,993 [INFO] 成功分割日志: /data/fastdfs/tracker/logs/trackerd_20250725.log.gz
2025-07-25 11:33:15,993 [INFO] 清理旧日志...
2025-07-25 11:33:15,993 [INFO] 处理完成: 3/3 个文件成功
  • 基本使用方法
# 处理单个文件
log_rotator /var/log/app.log

# 处理多个文件
log_rotator /var/log/app1.log /var/log/app2.log

# 处理多个目录
log_rotator /var/log/apps /opt/logs /tmp/logs

# 混合处理文件和目录
log_rotator /var/log/system.log /var/log/apps /tmp/special.log

# 递归处理多个目录
log_rotator /var/log /opt/logs -R

# 使用多个匹配模式
log_rotator /var/log -p "*.log" -p "*.txt" -p "app_*"

# 不同路径使用不同保留策略(需要多次调用)
log_rotator /var/log/apps -r 30
log_rotator /var/log/system -r 90
0

评论区