socket方式运行linux系统命令
需求
- 1、行内、相关保险等行业,在部署服务时,监管不让使用ssh分发批量执行相关命令。
- 2、快速批量执行命令,提高部署效率。
- 3、通过每台服务器上面部署一个socket服务端,用于接收socket 客户端发送的命令,执行命令,并将结果返回给socket客户端。
- 4、增加特殊标识判断,避免任意命令都去执行
实现
- 1、编写socket 客户端和服务端脚本。
- 2、通过每台服务器上面部署一个socket服务端,用于接收socket 客户端发送的命令,执行命令,并将执行结果返回给socket客户端。
代码实现
#!/usr/local/python3/bin/python3
# -*- coding:utf-8 -*-
import socket
import subprocess
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor
import time
import os
# 配置日志
def setup_logging():
# 创建日志目录
if not os.path.exists("logs"):
os.makedirs("logs")
# 设置日志格式
log_format = '%(asctime)s %(thread)d %(filename)s:%(lineno)d %(levelname)s %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
# 创建日志器
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 文件处理器 - 按日期滚动
file_handler = logging.FileHandler(f"logs/server_{time.strftime('%Y%m%d')}.log")
file_handler.setFormatter(logging.Formatter(log_format, date_format))
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format, date_format))
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
def execute_command_safely(command):
"""安全执行命令"""
try:
if not command:
raise ValueError("Empty command")
# 执行命令(禁用shell模式)
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 设置超时(30秒)
try:
stdout, stderr = process.communicate(timeout=30)
returncode = process.returncode
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
return returncode, b"", b"Command timed out after 30 seconds"
return returncode, stdout, stderr
except Exception as e:
return -1, b"", str(e).encode()
def handle_client(client_socket, client_address, ip_whitelist):
"""处理客户端请求"""
client_ip = client_address[0]
try:
logging.info(f"New connection from {client_address}")
# 检查IP白名单
if client_ip not in ip_whitelist:
msg = f"IP {client_ip} not in allowed IP list. Connection closed."
client_socket.send(msg.encode('utf-8'))
logging.warning(msg)
return
logging.info(f"Client {client_address} connected")
# 接收命令(设置10秒超时)
client_socket.settimeout(10.0)
command_data = client_socket.recv(1024).decode('utf-8')
# 验证命令格式
if not command_data.startswith("CCOD==>"):
msg = "Invalid command format. Command must start with 'CCOD==>'."
client_socket.send(msg.encode('utf-8'))
logging.warning(f"Invalid command from {client_address}: {command_data}")
return
# 解析命令
ccod_command = command_data.split("==>", 1)[1].strip()
if not ccod_command:
msg = "Empty command after 'CCOD==>'."
client_socket.send(msg.encode('utf-8'))
return
logging.info(f"Executing command from {client_address}: {ccod_command}")
# 安全执行命令
returncode, stdout, stderr = execute_command_safely(ccod_command)
# 处理执行结果
if returncode == 0:
if stdout:
client_socket.sendall(stdout)
else:
msg = f"Command '{ccod_command}' executed successfully with no output"
client_socket.send(msg.encode('utf-8'))
else:
error_msg = f"Command failed (return code {returncode}): {stderr.decode('utf-8', errors='ignore')}"
client_socket.send(error_msg.encode('utf-8'))
logging.error(f"Command failed: {error_msg}")
except socket.timeout:
logging.warning(f"Client {client_address} timed out waiting for command")
client_socket.send(b"Connection timed out. No command received within 10 seconds.")
except Exception as e:
logging.error(f"Error handling client {client_address}: {str(e)}")
client_socket.send(f"Server error: {str(e)}".encode('utf-8'))
finally:
client_socket.close()
logging.info(f"Closed connection from {client_address}")
def start_server(port, ip_whitelist, max_workers=10):
"""启动TCP服务器"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', port))
server_socket.listen(5) # 支持5个等待连接
logging.info(f"Server started on port {port}, waiting for connections...")
logging.info(f"Allowed IPs: {ip_whitelist}")
# 创建线程池
with ThreadPoolExecutor(max_workers=max_workers) as executor:
while True:
try:
client_socket, client_address = server_socket.accept()
logging.info(f"Accepted connection from {client_address}")
# 提交任务到线程池
executor.submit(
handle_client,
client_socket,
client_address,
ip_whitelist
)
except KeyboardInterrupt:
logging.info("Server interrupted by user. Shutting down...")
break
except Exception as e:
logging.error(f"Server error: {str(e)}")
if __name__ == "__main__":
# 配置日志系统
setup_logging()
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="Secure TCP Command Server")
parser.add_argument(
"--port",
type=int,
default=10100,
help="Port to listen on (default: 10100)"
)
parser.add_argument(
"--ips",
type=str,
default="10.0.16.16,127.0.0.1",
help="Comma-separated list of allowed IP addresses"
)
parser.add_argument(
"--workers",
type=int,
default=20,
help="Number of worker threads (default: 20)"
)
# 解析参数
args = parser.parse_args()
# 转换IP列表
allowed_ips = [ip.strip() for ip in args.ips.split(",")]
try:
# 启动服务器
start_server(args.port, allowed_ips, args.workers)
except Exception as e:
logging.critical(f"Critical server failure: {str(e)}")
#!/usr/local/python3/bin/python3
# -*- coding:utf-8 -*-
import socket
import argparse
import concurrent.futures
import time
import logging
def setup_logging():
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"client_{time.strftime('%Y%m%d')}.log")
]
)
def execute_command(ip, port, command, timeout=10):
"""在远程服务器上执行命令"""
# 创建socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.settimeout(timeout)
try:
# 连接到服务器
start_time = time.time()
client_socket.connect((ip, port))
conn_time = time.time() - start_time
logging.info(f"Connected to {ip}:{port} in {conn_time:.2f}s")
except socket.timeout:
logging.error(f"Connection to {ip}:{port} timed out after {timeout}s")
return f"{ip}: Connection timeout"
except Exception as e:
logging.error(f"Connection to {ip}:{port} failed: {str(e)}")
return f"{ip}: Connection failed"
try:
# 发送命令
client_socket.sendall(command.encode('utf-8'))
logging.debug(f"Sent command to {ip}: {command}")
# 接收响应
response = b""
start_time = time.time()
while True:
try:
chunk = client_socket.recv(4096)
if not chunk:
break
response += chunk
# 检查是否超时
if time.time() - start_time > timeout:
raise socket.timeout("Response timed out")
except socket.timeout:
logging.warning(f"Response from {ip} timed out")
return f"{ip}: Response timed out"
# 处理响应
result = response.decode('utf-8', errors='ignore')
logging.info(f"Response from {ip}:\n{result}")
return f"{ip}:\n{result}"
except Exception as e:
logging.error(f"Error communicating with {ip}: {str(e)}")
return f"{ip}: Communication error"
finally:
# 确保关闭连接
client_socket.close()
logging.info(f"Closed connection to {ip}")
def run_commands(ips, port, commands, max_workers=10):
"""执行批量命令操作"""
logging.info(f"Starting command execution for {len(ips)} servers")
logging.info(f"Port: {port}, Commands: {commands}")
logging.info("=" * 80)
# 处理多个命令
if not isinstance(commands, list):
commands = [commands]
# 处理多个IP
if not isinstance(ips, list):
ips = [ip.strip() for ip in ips.split(',')]
# 准备任务列表
tasks = []
for command in commands:
for ip in ips:
tasks.append((ip, port, command))
# 使用线程池执行
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(execute_command, *task) for task in tasks]
for future in concurrent.futures.as_completed(futures):
try:
results.append(future.result(timeout=15))
except concurrent.futures.TimeoutError:
logging.error("Task timed out")
# 输出结果汇总
logging.info("\n" + "=" * 80)
logging.info("Command execution summary:")
logging.info("-" * 80)
for result in results:
logging.info(result)
logging.info("=" * 80)
return results
if __name__ == "__main__":
# 配置日志
setup_logging()
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="Distributed Command Executor")
parser.add_argument(
"--ips",
type=str,
required=True,
help="Comma-separated list of IP addresses or hostnames"
)
parser.add_argument(
"--port",
type=int,
default=10100,
help="TCP port for server connection (default: 10100)"
)
parser.add_argument(
"--commands",
type=str,
nargs='+',
required=True,
help="One or more commands to execute (separate multiple commands with spaces)"
)
parser.add_argument(
"--timeout",
type=int,
default=10,
help="Connection and response timeout in seconds (default: 10)"
)
parser.add_argument(
"--workers",
type=int,
default=5,
help="Number of concurrent connections (default: 5)"
)
# 解析参数
args = parser.parse_args()
# 执行命令
run_commands(
ips=args.ips,
port=args.port,
commands=args.commands,
max_workers=args.workers
)
脚本运行详情
[devops@db1 code]$ ./socket_command_cli.py -h
usage: socket_command_cli.py [-h] --ips IPS [--port PORT] --commands COMMANDS
[COMMANDS ...] [--timeout TIMEOUT]
[--workers WORKERS]
Distributed Command Executor
optional arguments:
-h, --help show this help message and exit
--ips IPS Comma-separated list of IP addresses or hostnames
--port PORT TCP port for server connection (default: 10100)
--commands COMMANDS [COMMANDS ...]
One or more commands to execute (separate multiple
commands with spaces)
--timeout TIMEOUT Connection and response timeout in seconds (default:
10)
--workers WORKERS Number of concurrent connections (default: 5)
[devops@db1 code]$ ./socket_command_server.py -h
usage: socket_command_server.py [-h] [--port PORT] [--ips IPS]
[--workers WORKERS]
Secure TCP Command Server
optional arguments:
-h, --help show this help message and exit
--port PORT Port to listen on (default: 10100)
--ips IPS Comma-separated list of allowed IP addresses
--workers WORKERS Number of worker threads (default: 20)
[devops@db1 x86]$ ./socket_command_cli --ips "10.130.47.197,127.0.0.1" --commands "CCOD==> id=\$( ip r|grep src|awk '{print \$NF}'|awk -F"." '{print \$NF}') ; sed -i s@001@\$id@g /home/devops/1.txt"
2025-08-22 15:53:47,419 - INFO - Starting command execution for 23 servers
2025-08-22 15:53:47,420 - INFO - Port: 10100, Commands: ["CCOD==> id=$( ip r|grep src|awk '{print $NF}'|awk -F. '{print $NF}') ; sed -i s@001@$id@g /home/devops/1.txt"]
2025-08-22 15:53:47,420 - INFO - ================================================================================
2025-08-22 15:53:47,422 - INFO - Connected to 127.0.0.1:10100 in 0.00s
2025-08-22 15:53:47,422 - INFO - Connected to 10.130.47.197:10100 in 0.00s
2025-08-22 15:53:47,435 - INFO - Response from 127.0.0.1:
Command failed (return code 2): sed: can't read /home/devops/1.txt: No such file or directory
2025-08-22 15:53:47,436 - INFO - Closed connection to 127.0.0.1
2025-08-22 15:53:47,437 - INFO - Response from 10.130.47.197:
Command 'id=$( ip r|grep src|awk '{print $NF}'|awk -F. '{print $NF}') ; sed -i s@001@$id@g /home/devops/1.txt' executed successfully with no output
2025-08-22 15:53:47,438 - INFO - Closed connection to 10.130.47.197
2025-08-22 15:53:47,438 - INFO -
================================================================================
2025-08-22 15:53:47,438 - INFO - Command execution summary:
2025-08-22 15:53:47,438 - INFO - --------------------------------------------------------------------------------
2025-08-22 15:53:47,438 - INFO - 127.0.0.1:
Command failed (return code 2): sed: can't read /home/devops/1.txt: No such file or directory
2025-08-22 15:53:47,439 - INFO - 10.130.47.197:
Command 'id=$( ip r|grep src|awk '{print $NF}'|awk -F. '{print $NF}') ; sed -i s@001@$id@g /home/devops/1.txt' executed successfully with no output
[devops@db1 x86]$ ./socket_command_cli --ips "10.130.47.197,127.0.0.1" --commands "CCOD==> ip r"
2025-08-22 15:54:42,703 - INFO - Starting command execution for 23 servers
2025-08-22 15:54:42,704 - INFO - Port: 10100, Commands: ['CCOD==> ip r']
2025-08-22 15:54:42,704 - INFO - ================================================================================
2025-08-22 15:54:42,706 - INFO - Connected to 127.0.0.1:10100 in 0.00s
2025-08-22 15:54:42,706 - INFO - Connected to 10.130.47.197:10100 in 0.00s
2025-08-22 15:54:42,713 - INFO - Response from 10.130.47.197:
default via 10.130.47.1 dev eth0
10.130.47.0/24 dev eth0 proto kernel scope link src 10.130.47.197
169.254.0.0/16 dev eth0 scope link metric 1002
169.254.169.254 via 10.130.47.81 dev eth0 proto static
2025-08-22 15:54:42,714 - INFO - Closed connection to 10.130.47.197
2025-08-22 15:54:42,714 - INFO - Response from 127.0.0.1:
default via 10.130.47.1 dev eth0
10.130.47.0/24 dev eth0 proto kernel scope link src 10.130.47.202
169.254.0.0/16 dev eth0 scope link metric 1002
169.254.169.254 via 10.130.47.81 dev eth0 proto static
2025-08-22 15:54:42,715 - INFO - Closed connection to 127.0.0.1
2025-08-22 15:54:42,715 - INFO -
================================================================================
评论区