目 录CONTENT

文章目录

socket方式快速部署服务

平凡的运维之路
2024-08-01 / 0 评论 / 0 点赞 / 77 阅读 / 22633 字

socket方式运行linux系统命令

需求

  • 1、行内、相关保险等行业,在部署服务时,监管不让使用ssh分发批量执行相关命令。
  • 2、快速批量执行命令,提高部署效率。
  • 3、通过每台服务器上面部署一个socket服务端,用于接收socket 客户端发送的命令,执行命令,并将结果返回给socket客户端。
  • 4、增加特殊标识判断,避免任意命令都去执行

实现

  • 1、编写socket 客户端和服务端脚本。
  • 2、通过每台服务器上面部署一个socket服务端,用于接收socket 客户端发送的命令,执行命令,并将执行结果返回给socket客户端。

代码实现

  • socket服务端脚本
#!/usr/local/python3/bin/python3
# -*- coding:utf-8 -*-
import socket
import subprocess
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor
import time
import os

# 配置日志
def setup_logging():
    # 创建日志目录
    if not os.path.exists("logs"):
        os.makedirs("logs")
    
    # 设置日志格式
    log_format = '%(asctime)s %(thread)d %(filename)s:%(lineno)d %(levelname)s %(message)s'
    date_format = '%Y-%m-%d %H:%M:%S'
    
    # 创建日志器
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # 文件处理器 - 按日期滚动
    file_handler = logging.FileHandler(f"logs/server_{time.strftime('%Y%m%d')}.log")
    file_handler.setFormatter(logging.Formatter(log_format, date_format))
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(log_format, date_format))
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

def execute_command_safely(command):
    """安全执行命令"""
    try:
        if not command:
            raise ValueError("Empty command")
        
        # 执行命令(禁用shell模式)
        process = subprocess.Popen(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        # 设置超时(30秒)
        try:
            stdout, stderr = process.communicate(timeout=30)
            returncode = process.returncode
        except subprocess.TimeoutExpired:
            process.kill()
            stdout, stderr = process.communicate()
            return returncode, b"", b"Command timed out after 30 seconds"
        
        return returncode, stdout, stderr
        
    except Exception as e:
        return -1, b"", str(e).encode()

def handle_client(client_socket, client_address, ip_whitelist):
    """处理客户端请求"""
    client_ip = client_address[0]
    try:
        logging.info(f"New connection from {client_address}")
        
        # 检查IP白名单
        if client_ip not in ip_whitelist:
            msg = f"IP {client_ip} not in allowed IP list. Connection closed."
            client_socket.send(msg.encode('utf-8'))
            logging.warning(msg)
            return
        
        logging.info(f"Client {client_address} connected")
        
        # 接收命令(设置10秒超时)
        client_socket.settimeout(10.0)
        command_data = client_socket.recv(1024).decode('utf-8')
        
        # 验证命令格式
        if not command_data.startswith("CCOD==>"):
            msg = "Invalid command format. Command must start with 'CCOD==>'."
            client_socket.send(msg.encode('utf-8'))
            logging.warning(f"Invalid command from {client_address}: {command_data}")
            return
        
        # 解析命令
        ccod_command = command_data.split("==>", 1)[1].strip()
        if not ccod_command:
            msg = "Empty command after 'CCOD==>'."
            client_socket.send(msg.encode('utf-8'))
            return
            
        logging.info(f"Executing command from {client_address}: {ccod_command}")
        
        # 安全执行命令
        returncode, stdout, stderr = execute_command_safely(ccod_command)
        
        # 处理执行结果
        if returncode == 0:
            if stdout:
                client_socket.sendall(stdout)
            else:
                msg = f"Command '{ccod_command}' executed successfully with no output"
                client_socket.send(msg.encode('utf-8'))
        else:
            error_msg = f"Command failed (return code {returncode}): {stderr.decode('utf-8', errors='ignore')}"
            client_socket.send(error_msg.encode('utf-8'))
            logging.error(f"Command failed: {error_msg}")
    
    except socket.timeout:
        logging.warning(f"Client {client_address} timed out waiting for command")
        client_socket.send(b"Connection timed out. No command received within 10 seconds.")
    
    except Exception as e:
        logging.error(f"Error handling client {client_address}: {str(e)}")
        client_socket.send(f"Server error: {str(e)}".encode('utf-8'))
    
    finally:
        client_socket.close()
        logging.info(f"Closed connection from {client_address}")

def start_server(port, ip_whitelist, max_workers=10):
    """启动TCP服务器"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(('0.0.0.0', port))
        server_socket.listen(5)  # 支持5个等待连接
        
        logging.info(f"Server started on port {port}, waiting for connections...")
        logging.info(f"Allowed IPs: {ip_whitelist}")
        
        # 创建线程池
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            while True:
                try:
                    client_socket, client_address = server_socket.accept()
                    logging.info(f"Accepted connection from {client_address}")
                    
                    # 提交任务到线程池
                    executor.submit(
                        handle_client, 
                        client_socket, 
                        client_address, 
                        ip_whitelist
                    )
                
                except KeyboardInterrupt:
                    logging.info("Server interrupted by user. Shutting down...")
                    break
                
                except Exception as e:
                    logging.error(f"Server error: {str(e)}")

if __name__ == "__main__":
    # 配置日志系统
    setup_logging()
    
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description="Secure TCP Command Server")
    parser.add_argument(
        "--port", 
        type=int, 
        default=10100,
        help="Port to listen on (default: 10100)"
    )
    parser.add_argument(
        "--ips", 
        type=str, 
        default="10.0.16.16,127.0.0.1",
        help="Comma-separated list of allowed IP addresses"
    )
    parser.add_argument(
        "--workers", 
        type=int, 
        default=20,
        help="Number of worker threads (default: 20)"
    )
    
    # 解析参数
    args = parser.parse_args()
    
    # 转换IP列表
    allowed_ips = [ip.strip() for ip in args.ips.split(",")]
    
    try:
        # 启动服务器
        start_server(args.port, allowed_ips, args.workers)
    except Exception as e:
        logging.critical(f"Critical server failure: {str(e)}")

  • socket客户端脚本
#!/usr/local/python3/bin/python3
# -*- coding:utf-8 -*-
import socket
import argparse
import concurrent.futures
import time
import logging

def setup_logging():
    """配置日志系统"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler(f"client_{time.strftime('%Y%m%d')}.log")
        ]
    )

def execute_command(ip, port, command, timeout=10):
    """在远程服务器上执行命令"""
    # 创建socket对象
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.settimeout(timeout)
    
    try:
        # 连接到服务器
        start_time = time.time()
        client_socket.connect((ip, port))
        conn_time = time.time() - start_time
        logging.info(f"Connected to {ip}:{port} in {conn_time:.2f}s")
    except socket.timeout:
        logging.error(f"Connection to {ip}:{port} timed out after {timeout}s")
        return f"{ip}: Connection timeout"
    except Exception as e:
        logging.error(f"Connection to {ip}:{port} failed: {str(e)}")
        return f"{ip}: Connection failed"
    
    try:
        # 发送命令
        client_socket.sendall(command.encode('utf-8'))
        logging.debug(f"Sent command to {ip}: {command}")
        
        # 接收响应
        response = b""
        start_time = time.time()
        while True:
            try:
                chunk = client_socket.recv(4096)
                if not chunk:
                    break
                response += chunk
                # 检查是否超时
                if time.time() - start_time > timeout:
                    raise socket.timeout("Response timed out")
            except socket.timeout:
                logging.warning(f"Response from {ip} timed out")
                return f"{ip}: Response timed out"
        
        # 处理响应
        result = response.decode('utf-8', errors='ignore')
        logging.info(f"Response from {ip}:\n{result}")
        return f"{ip}:\n{result}"
        
    except Exception as e:
        logging.error(f"Error communicating with {ip}: {str(e)}")
        return f"{ip}: Communication error"
    finally:
        # 确保关闭连接
        client_socket.close()
        logging.info(f"Closed connection to {ip}")

def run_commands(ips, port, commands, max_workers=10):
    """执行批量命令操作"""
    logging.info(f"Starting command execution for {len(ips)} servers")
    logging.info(f"Port: {port}, Commands: {commands}")
    logging.info("=" * 80)
    
    # 处理多个命令
    if not isinstance(commands, list):
        commands = [commands]
    
    # 处理多个IP
    if not isinstance(ips, list):
        ips = [ip.strip() for ip in ips.split(',')]
    
    # 准备任务列表
    tasks = []
    for command in commands:
        for ip in ips:
            tasks.append((ip, port, command))
    
    # 使用线程池执行
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(execute_command, *task) for task in tasks]
        for future in concurrent.futures.as_completed(futures):
            try:
                results.append(future.result(timeout=15))
            except concurrent.futures.TimeoutError:
                logging.error("Task timed out")
    
    # 输出结果汇总
    logging.info("\n" + "=" * 80)
    logging.info("Command execution summary:")
    logging.info("-" * 80)
    for result in results:
        logging.info(result)
    logging.info("=" * 80)
    
    return results

if __name__ == "__main__":
    # 配置日志
    setup_logging()
    
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description="Distributed Command Executor")
    parser.add_argument(
        "--ips", 
        type=str,
        required=True,
        help="Comma-separated list of IP addresses or hostnames"
    )
    parser.add_argument(
        "--port", 
        type=int, 
        default=10100,
        help="TCP port for server connection (default: 10100)"
    )
    parser.add_argument(
        "--commands", 
        type=str,
        nargs='+',
        required=True,
        help="One or more commands to execute (separate multiple commands with spaces)"
    )
    parser.add_argument(
        "--timeout", 
        type=int, 
        default=10,
        help="Connection and response timeout in seconds (default: 10)"
    )
    parser.add_argument(
        "--workers", 
        type=int, 
        default=5,
        help="Number of concurrent connections (default: 5)"
    )
    
    # 解析参数
    args = parser.parse_args()
    
    # 执行命令
    run_commands(
        ips=args.ips,
        port=args.port,
        commands=args.commands,
        max_workers=args.workers
    )

脚本运行详情

  • 相关参数说明
[devops@db1 code]$ ./socket_command_cli.py  -h
usage: socket_command_cli.py [-h] --ips IPS [--port PORT] --commands COMMANDS
                             [COMMANDS ...] [--timeout TIMEOUT]
                             [--workers WORKERS]

Distributed Command Executor

optional arguments:
  -h, --help            show this help message and exit
  --ips IPS             Comma-separated list of IP addresses or hostnames
  --port PORT           TCP port for server connection (default: 10100)
  --commands COMMANDS [COMMANDS ...]
                        One or more commands to execute (separate multiple
                        commands with spaces)
  --timeout TIMEOUT     Connection and response timeout in seconds (default:
                        10)
  --workers WORKERS     Number of concurrent connections (default: 5)
[devops@db1 code]$ ./socket_command_server.py  -h
usage: socket_command_server.py [-h] [--port PORT] [--ips IPS]
                                [--workers WORKERS]

Secure TCP Command Server

optional arguments:
  -h, --help         show this help message and exit
  --port PORT        Port to listen on (default: 10100)
  --ips IPS          Comma-separated list of allowed IP addresses
  --workers WORKERS  Number of worker threads (default: 20)
  • 执行sed替换操作
[devops@db1 x86]$  ./socket_command_cli    --ips "10.130.47.197,127.0.0.1" --commands "CCOD==>  id=\$( ip r|grep    src|awk '{print \$NF}'|awk -F"." '{print \$NF}') ;   sed -i s@001@\$id@g /home/devops/1.txt"
2025-08-22 15:53:47,419 - INFO - Starting command execution for 23 servers
2025-08-22 15:53:47,420 - INFO - Port: 10100, Commands: ["CCOD==>  id=$( ip r|grep    src|awk '{print $NF}'|awk -F. '{print $NF}') ;   sed -i s@001@$id@g /home/devops/1.txt"]
2025-08-22 15:53:47,420 - INFO - ================================================================================
2025-08-22 15:53:47,422 - INFO - Connected to 127.0.0.1:10100 in 0.00s
2025-08-22 15:53:47,422 - INFO - Connected to 10.130.47.197:10100 in 0.00s
2025-08-22 15:53:47,435 - INFO - Response from 127.0.0.1:
Command failed (return code 2): sed: can't read /home/devops/1.txt: No such file or directory

2025-08-22 15:53:47,436 - INFO - Closed connection to 127.0.0.1
2025-08-22 15:53:47,437 - INFO - Response from 10.130.47.197:
Command 'id=$( ip r|grep    src|awk '{print $NF}'|awk -F. '{print $NF}') ;   sed -i s@001@$id@g /home/devops/1.txt' executed successfully with no output
2025-08-22 15:53:47,438 - INFO - Closed connection to 10.130.47.197
2025-08-22 15:53:47,438 - INFO - 
================================================================================
2025-08-22 15:53:47,438 - INFO - Command execution summary:
2025-08-22 15:53:47,438 - INFO - --------------------------------------------------------------------------------
2025-08-22 15:53:47,438 - INFO - 127.0.0.1:
Command failed (return code 2): sed: can't read /home/devops/1.txt: No such file or directory

2025-08-22 15:53:47,439 - INFO - 10.130.47.197:
Command 'id=$( ip r|grep    src|awk '{print $NF}'|awk -F. '{print $NF}') ;   sed -i s@001@$id@g /home/devops/1.txt' executed successfully with no output
  • 执行相关命令
[devops@db1 x86]$  ./socket_command_cli    --ips "10.130.47.197,127.0.0.1" --commands "CCOD==>  ip r"
2025-08-22 15:54:42,703 - INFO - Starting command execution for 23 servers
2025-08-22 15:54:42,704 - INFO - Port: 10100, Commands: ['CCOD==>  ip r']
2025-08-22 15:54:42,704 - INFO - ================================================================================
2025-08-22 15:54:42,706 - INFO - Connected to 127.0.0.1:10100 in 0.00s
2025-08-22 15:54:42,706 - INFO - Connected to 10.130.47.197:10100 in 0.00s
2025-08-22 15:54:42,713 - INFO - Response from 10.130.47.197:
default via 10.130.47.1 dev eth0 
10.130.47.0/24 dev eth0 proto kernel scope link src 10.130.47.197 
169.254.0.0/16 dev eth0 scope link metric 1002 
169.254.169.254 via 10.130.47.81 dev eth0 proto static 

2025-08-22 15:54:42,714 - INFO - Closed connection to 10.130.47.197
2025-08-22 15:54:42,714 - INFO - Response from 127.0.0.1:
default via 10.130.47.1 dev eth0 
10.130.47.0/24 dev eth0 proto kernel scope link src 10.130.47.202 
169.254.0.0/16 dev eth0 scope link metric 1002 
169.254.169.254 via 10.130.47.81 dev eth0 proto static 

2025-08-22 15:54:42,715 - INFO - Closed connection to 127.0.0.1
2025-08-22 15:54:42,715 - INFO - 
================================================================================
0

评论区