python实现批量操作主机(自己可以选择交互式和非交互式两种)
代码开始------------------------------------------------------------
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
from paramiko import SSHClient, AutoAddPolicy
import select
import re
host_list = [
{'hostname': '192.168.174.130', 'port': 22, 'username': 'root', 'password': 'mana.2021'},
] # 这是一个原始的主机信息列表;你也可以自己在代码中添加,也可以遍历excel进行添加;
conn_deon = [] # 第一次验证主机成功的主机列表
conn_fail = [] # 第一次验证主机报错的主机列表
conn_amend = [] # 修改端口后的主机列表;说明:修改端口的列表修改的是“第一次验证失败的列表【conn_fail】”
host_version_filtration = [] # 过滤符合版本的主机;扩展功能
no_host_version_filtration = [] # 过滤不符合版本的主机;扩展功能
def ssh_ex(hostname, port, username, password):
'''
这是一个测试连接是否成功的函数;
会将成功连接的主机传入“conn_deon“列表;
将连接失败的主机登录信息追加到“conn_fail”列表
:param hostname: 测试主机IP
:param port: 测试主机端口
:param username: 测试主机用户名
:param password: 测试主机密码
:return:
'''
ssh_client = SSHClient() # 创建一个连接实例
ssh_client.set_missing_host_key_policy(AutoAddPolicy) # 自动处理SSH弹出的yes/no
try:
ssh_client.connect(hostname, port, username, password) # 进行SSH连接
# 将成功连接的主机追加“conn_deon“列表
conn_deon.append({'hostname': hostname, 'port': port,
'username': username, 'password': password})
print('\033[0;32mhost:\033[0m%s' % hostname)
ssh_client.close() # 关闭连接
except Exception as e:
print('\033[0;31mhost:\033[0m%s' % hostname) # 打印连接失败的主机
# 将连接失败的主机登录信息追加到“conn_fail”列表
conn_fail.append({'hostname': hostname, 'port': port,
'username': username, 'password': password})
print(e) # 打印错误
print('', end='\n\n')
def ssh_host_filtration(hostname, port, username, password,filtration_parameter):
'''
这是一个过滤函数;是一个扩展功能;
:param hostname:主机IP
:param port:登录端口
:param username:用户名
:param password:用户密码
:param filtration_parameter:过滤参数
:return:如果输入的命令正确,return就会进行返回;反之return就会返回空数据;
'''
ssh_client = SSHClient()
ssh_client.set_missing_host_key_policy(AutoAddPolicy)
ssh_client.connect(hostname, port, username, password)
print('\033[0;32mhost:\033[0m%s开始过滤...' % hostname)
stdin, stdout, stderr = ssh_client.exec_command(filtration_parameter)
result_out = stdout.read().decode('utf-8')
result_err = stderr.read().decode('utf-8')
ssh_client.close()
return result_out
def ssh_ex_official(hostname, port, username, password):
'''
这是一个进行批量操作主机的函数;
:param hostname: 测试主机IP
:param port: 测试主机端口
:param username: 测试主机用户名
:param password: 测试主机密码
:return:
'''
ssh_client = SSHClient()
ssh_client.set_missing_host_key_policy(AutoAddPolicy)
ssh_client.connect(hostname, port, username, password)
print('\033[0;32mhost:\033[0m%s' % hostname)
while True:
cmd = yield
if cmd != 'quit_shell':
stdin, stdout, stderr = ssh_client.exec_command(cmd)
result = stdout.read().decode('utf-8')
err = stderr.read().decode('utf-8')
if len(err) != 0:
print(err) # 打印错误
print('\033[0;32m%s\033[0m' % hostname, '^' * 100)
else:
print(result) # 打印服务端主机返回信息
print('\033[0;32m%s\033[0m' % hostname, '^' * 100)
else:
print('主机“%s”已退出连接...' % hostname)
ssh_client.close()
def ssh_interaction_official(hostname, port, username, password):
'''
这是一个交互式的SSH函数;
:param hostname:
:param port:
:param username:
:param password:
:return:
'''
ssh_shell = SSHClient()
ssh_shell.set_missing_host_key_policy(AutoAddPolicy)
ssh_shell.connect(hostname, port, username, password)
chan = ssh_shell.invoke_shell()
inputs = [chan, ]
outputs = []
while True:
while True:
readable, writeable, exceptional = select.select(inputs, outputs, inputs, 1) # 解决打印完整的问题;
# 但是select会进行阻塞;关于select.select()函数超时,只要超时就会向下执行;
if readable:
info = chan.recv(1024).decode('utf-8')
print(info)
continue
else:
print('\033[0;32m%s\033[0m' % hostname, '^' * 100)
break
cmd = yield
if cmd != 'quit_shell':
chan.send(cmd.encode('utf-8') + b'\n')
else:
print('“%s”主机退出连接...' % hostname)
# 最后这段代码“print('“%s”主机退出连接...' % hostname)”非常关键,如果下面继续增加“break”就会出现报错;
# 添加break报错内容是“File "
# 添加“ssh_shell.close()”代码会进入一个死循环,一直打印空白行;
# 所以仅仅只有“print('“%s”主机退出连接...' % hostname)”这段代码就够了;
def bath_handle_host_ex(input_list):
'''
这是一个“非交互式”批量操作主机发送操作命令的函数;
:param input_list: 传入验证用户通过的字典信息
:return:
'''
ssh_conn_list = [] # 将生成的多个批量变量加入这个列表
count_1 = 1
for i in input_list:
print('开始连接第%s个主机...' % count_1)
hostname = i['hostname']
port = i['port']
username = i['username']
password = i['password']
# 将批量生成的遍历加入“ssh_conn_list”列表
ssh_conn_list.append("conn%s = ssh_ex_official('%s', %s, '%s', '%s')"
% (input_list.index(i), hostname, port, username, password))
count_1 += 1
print('所有主机准备就绪...回车继续')
input()
for i in ssh_conn_list:
exec(i) # 执行遍历出的【conn%s = ssh_ex_official('%s', %s, '%s', '%s')】变量
for_number = len(input_list) # 将遍历的次数进行赋值
for i in range(for_number):
exec('next(conn%s)' % i) # exec函数是一个将字符串类型的代码进行执行
while True:
print('', end='\n\n\n')
cmds = input('请输入需要批量处理的命令>>:').strip() # strip去掉开头和结尾的回车和空格
if len(cmds) == 0:
continue
else:
for i in range(for_number):
exec('conn%s.send(cmds)' % i)
if cmds == 'quit_shell':
input('返回上级菜单')
break
def bath_handle_host_interaction(input_list):
'''
这是一个“交互式”批量操作主机发送操作命令的函数;
:param input_list: 传入验证用户通过的字典信息
:return:
'''
ssh_conn_list = [] # 将生成的多个批量变量加入这个列表
count_1 = 1
for i in input_list:
print('开始连接第%s个主机...' % count_1)
hostname = i['hostname']
port = i['port']
username = i['username']
password = i['password']
# 将批量生成的遍历加入“ssh_conn_list”列表
ssh_conn_list.append("conn%s = ssh_interaction_official('%s', %s, '%s', '%s')"
% (input_list.index(i), hostname, port, username, password))
count_1 += 1
print('所有主机准备就绪...回车继续')
input()
for i in ssh_conn_list:
exec(i) # 执行遍历出的【conn%s = ssh_ex_official('%s', %s, '%s', '%s')】变量
for_number = len(ssh_conn_list) # 将遍历的次数进行赋值
for i in range(for_number):
exec('next(conn%s)' % i) # exec函数是一个将字符串类型的代码进行执行
while True:
print('', end='\n\n\n')
cmds = input('请输入需要批量处理的命令>>:').strip() # strip去掉开头和结尾的回车和空格
if len(cmds) == 0:
continue
else:
for i in range(for_number):
exec('conn%s.send(cmds)' % i)
if cmds == 'quit_shell':
input('返回上级菜单')
break
def test_login(print_info, host_list_info):
print(print_info, '-' * 30)
count = 1
for i in host_list_info:
print('开始进行第%s个主机用户验证...' % count)
ssh_ex(hostname=i['hostname'], port=i['port'],
username=i['username'], password=i['password'])
count += 1
print('连接成功主机列表', '-' * 20)
conn_count = 0
for i in conn_deon:
print(i['hostname'])
conn_count += 1
return conn_count
if __name__ == '__main__':
while True:
print('''
---------------------------------------------------------------------------
功能说明:
1.登录过滤主机
2.批量操作主机
3.修改过滤后主机端口
4.查看脚本使用说明
提示:选择输入对应的功能编号!!!
---------------------------------------------------------------------------
''')
func_in = input('输入对应的功能编号:')
if func_in == '1':
while True:
test_port = input('''
1.登录过滤主机-----------------------------------
请输入需要验证登录的主机列表
提示只可以输入:
1.验证原始主机是否可以登录
2.验证修改端口后的主机是否可以登录
"quit"返回上一级
>>:
''')
if test_port == '1':
del conn_deon[:]
number = test_login(print_info='验证所有原始主机信息列表', host_list_info=host_list)
input('''
验证完成...
\033[0;32m成功连接%s个主机\033[0m
回车继续
''' % number)
break
elif test_port == '2':
count = 0
for i in conn_deon:
count += 1
number = test_login(print_info='验证修改端口后的主机列表', host_list_info=conn_amend)
new_number = count - number
input('''
验证完成...
\033[0;32m修改SSH连接端口后成功连接%s个主机\033[0m
回车继续
''' % new_number)
break
elif test_port == 'quit':
break
else:
continue
elif func_in == '2':
while True:
handle_list = input('''
2.批量操作主机-----------------------------------
请输入需要批量操作的主机列表
提示只可以输入:
1.开始非交互批量操作主机
2.开始交互式批量操作主机
3.过滤符合条件的主机加入新列表进行批量操作;注意这是一个扩展功能
"quit"返回上一级
>>:
''')
if handle_list == '1':
input('''
\033[0;31m提示:输入“quit_shell”就会退出批量操作主机\033[0m
回车继续
''')
bath_handle_host_ex(conn_deon)
elif handle_list == '2':
input('''
\033[0;31m提示:输入“quit_shell”就会退出批量操作主机\033[0m
回车继续
''')
bath_handle_host_interaction(conn_deon)
elif handle_list == '3':
for i in conn_deon:
hostname = i['hostname']
port = i['port']
username = i['username']
password = i['password']
re_parameter = ssh_host_filtration(hostname=hostname, port=port, username=username,
password=password,
filtration_parameter='cat /etc/.kyinfo |grep milestone')
if re_parameter:
re_version = '\d\.\d'
res = re.search(re_version, re_parameter, re.I).group()
if res == '3.3':
host_version_filtration.append(i)
else:
no_host_version_filtration.append({'hostname':hostname, '系统版本':res})
else:
no_host_version_filtration.append({'hostname':hostname, '系统版本':'非麒麟系统'})
if len(host_version_filtration) == 0:
print('', end='\n\n\n')
print('没有符合过滤条件的主机...')
print('\033[0;31m不符合主机过滤规则的主机和版本如下', '-' * 30)
for i in no_host_version_filtration:
for k in i:
print('\033[0;31m%s\033[0m' % i[k], end=' ')
print('')
input('回车继续')
del no_host_version_filtration[:]
else:
bath_handle_host_interaction(host_version_filtration)
print('', end='\n\n\n')
print('\033[0;31m不符合主机过滤规则的主机和版本\033[0m', '-' * 30)
for i in no_host_version_filtration:
for k in i:
print('\033[0;31m%s\033[0m' % i[k], end=' ')
print('')
input('回车继续')
del no_host_version_filtration[:]
del host_version_filtration[:]
elif handle_list == 'quit':
break
else:
print('输入的列表名存在错误!请重新输入!')
continue
elif func_in == '3':
print('3.修改过滤主机端口-----------------------------------')
func_in_3 = input('''
是否开始修改连接失败的端口号?
yes/no
>>:
''')
if func_in_3 == 'yes':
port_new = int(input('请输入新的端口号:'))
for i in conn_fail:
i['port'] = port_new
conn_amend.append(i)
print('\033[0;32m-\033[0m' * 90)
count = 0
for i in conn_amend:
print(i)
count += 1
input('''
已完成主机列表的端口\033[0;32m%s\033[0m修改;
共计:完成\033[0;32m%s\033[0m个
回车继续
''' % (port_new, count))
else:
continue
elif func_in == '4':
print('4.查看脚本使用说明-----------------------------------')
input('''
本脚本使用步骤:
1.运行‘1.1’过滤主机
2.运行‘3’修改过滤后的主机端口
3.运行‘1.2’继续过滤主机
4.运行‘2.1’开始批量操作主机
回车返回上级菜单...
''')
else:
exit(0)
代码结束------------------------------------------------------------
使用方法:
1.每使用一次脚本就需要执行1.1过滤一次主机
2.一般批量操作主机使用2.2交互式操作就可以了
3.代码缺点:它不是多线程的,for循环比较慢-需要一个一个为主机下发命令;
另外一个缺点就是,主机账号信息需要手动输入host_list列表中;
host_list中的信息不是使用hash加密的,存在安全隐患;
你可以自己写一个登录加密的列表进行给它加密;