python实现批量操作主机(自己可以选择交互式和非交互式两种)


代码开始------------------------------------------------------------

#!/usr/bin/env python

# _*_ coding:utf-8 _*_

from paramiko import SSHClient, AutoAddPolicy

import select

import re

host_list = [

    {'hostname': '192.168.174.130', 'port': 22, 'username': 'root', 'password': 'mana.2021'},

]  # 这是一个原始的主机信息列表;你也可以自己在代码中添加,也可以遍历excel进行添加;

conn_deon = []  # 第一次验证主机成功的主机列表

conn_fail = []  # 第一次验证主机报错的主机列表

conn_amend = []  # 修改端口后的主机列表;说明:修改端口的列表修改的是“第一次验证失败的列表【conn_fail】”

host_version_filtration = []  # 过滤符合版本的主机;扩展功能

no_host_version_filtration = []  # 过滤不符合版本的主机;扩展功能

def ssh_ex(hostname, port, username, password):

    '''

    这是一个测试连接是否成功的函数;

    会将成功连接的主机传入“conn_deon“列表;

    将连接失败的主机登录信息追加到“conn_fail”列表

    :param hostname: 测试主机IP

    :param port: 测试主机端口

    :param username: 测试主机用户名

    :param password: 测试主机密码

    :return:

    '''

    ssh_client = SSHClient()  # 创建一个连接实例

    ssh_client.set_missing_host_key_policy(AutoAddPolicy)  # 自动处理SSH弹出的yes/no

    try:

        ssh_client.connect(hostname, port, username, password)  # 进行SSH连接

        # 将成功连接的主机追加“conn_deon“列表

        conn_deon.append({'hostname': hostname, 'port': port,

                          'username': username, 'password': password})

        print('\033[0;32mhost:\033[0m%s' % hostname)

        ssh_client.close()  # 关闭连接

    except Exception as e:

        print('\033[0;31mhost:\033[0m%s' % hostname)  # 打印连接失败的主机

        # 将连接失败的主机登录信息追加到“conn_fail”列表

        conn_fail.append({'hostname': hostname, 'port': port,

                          'username': username, 'password': password})

        print(e)  # 打印错误

        print('', end='\n\n')

def ssh_host_filtration(hostname, port, username, password,filtration_parameter):

    '''

    这是一个过滤函数;是一个扩展功能;

    :param hostname:主机IP

    :param port:登录端口

    :param username:用户名

    :param password:用户密码

    :param filtration_parameter:过滤参数

    :return:如果输入的命令正确,return就会进行返回;反之return就会返回空数据;

    '''

    ssh_client = SSHClient()

    ssh_client.set_missing_host_key_policy(AutoAddPolicy)

    ssh_client.connect(hostname, port, username, password)

    print('\033[0;32mhost:\033[0m%s开始过滤...' % hostname)

    stdin, stdout, stderr = ssh_client.exec_command(filtration_parameter)

    result_out = stdout.read().decode('utf-8')

    result_err = stderr.read().decode('utf-8')

    ssh_client.close()

    return result_out

def ssh_ex_official(hostname, port, username, password):

    '''

    这是一个进行批量操作主机的函数;

    :param hostname: 测试主机IP

    :param port: 测试主机端口

    :param username: 测试主机用户名

    :param password: 测试主机密码

    :return:

    '''

    ssh_client = SSHClient()

    ssh_client.set_missing_host_key_policy(AutoAddPolicy)

    ssh_client.connect(hostname, port, username, password)

    print('\033[0;32mhost:\033[0m%s' % hostname)

    while True:

        cmd = yield

        if cmd != 'quit_shell':

            stdin, stdout, stderr = ssh_client.exec_command(cmd)

            result = stdout.read().decode('utf-8')

            err = stderr.read().decode('utf-8')

            if len(err) != 0:

                print(err)  # 打印错误

                print('\033[0;32m%s\033[0m' % hostname, '^' * 100)

            else:

                print(result)  # 打印服务端主机返回信息

                print('\033[0;32m%s\033[0m' % hostname, '^' * 100)

        else:

            print('主机“%s”已退出连接...' % hostname)

            ssh_client.close()

def ssh_interaction_official(hostname, port, username, password):

    '''

    这是一个交互式的SSH函数;

    :param hostname:

    :param port:

    :param username:

    :param password:

    :return:

    '''

    ssh_shell = SSHClient()

    ssh_shell.set_missing_host_key_policy(AutoAddPolicy)

    ssh_shell.connect(hostname, port, username, password)

    chan = ssh_shell.invoke_shell()

    inputs = [chan, ]

    outputs = []

    while True:

        while True:

            readable, writeable, exceptional = select.select(inputs, outputs, inputs, 1)  # 解决打印完整的问题;

            # 但是select会进行阻塞;关于select.select()函数超时,只要超时就会向下执行;

            if readable:

                info = chan.recv(1024).decode('utf-8')

                print(info)

                continue

            else:

                print('\033[0;32m%s\033[0m' % hostname, '^' * 100)

                break

        cmd = yield

        if cmd != 'quit_shell':

            chan.send(cmd.encode('utf-8') + b'\n')

        else:

            print('“%s”主机退出连接...' % hostname)

            # 最后这段代码“print('“%s”主机退出连接...' % hostname)”非常关键,如果下面继续增加“break”就会出现报错;

            # 添加break报错内容是“File "", line 1, in StopIteration";

            # 添加“ssh_shell.close()”代码会进入一个死循环,一直打印空白行;

            # 所以仅仅只有“print('“%s”主机退出连接...' % hostname)”这段代码就够了;

def bath_handle_host_ex(input_list):

    '''

    这是一个“非交互式”批量操作主机发送操作命令的函数;

    :param input_list: 传入验证用户通过的字典信息

    :return:

    '''

    ssh_conn_list = []  # 将生成的多个批量变量加入这个列表

    count_1 = 1

    for i in input_list:

        print('开始连接第%s个主机...' % count_1)

        hostname = i['hostname']

        port = i['port']

        username = i['username']

        password = i['password']

        # 将批量生成的遍历加入“ssh_conn_list”列表

        ssh_conn_list.append("conn%s = ssh_ex_official('%s', %s, '%s', '%s')"

                             % (input_list.index(i), hostname, port, username, password))

        count_1 += 1

    print('所有主机准备就绪...回车继续')

    input()

    for i in ssh_conn_list:

        exec(i)  # 执行遍历出的【conn%s = ssh_ex_official('%s', %s, '%s', '%s')】变量

    for_number = len(input_list)  # 将遍历的次数进行赋值

    for i in range(for_number):

        exec('next(conn%s)' % i)  # exec函数是一个将字符串类型的代码进行执行

    while True:

        print('', end='\n\n\n')

        cmds = input('请输入需要批量处理的命令>>:').strip()  # strip去掉开头和结尾的回车和空格

        if len(cmds) == 0:

            continue

        else:

            for i in range(for_number):

                exec('conn%s.send(cmds)' % i)

            if cmds == 'quit_shell':

                input('返回上级菜单')

                break

def bath_handle_host_interaction(input_list):

    '''

    这是一个“交互式”批量操作主机发送操作命令的函数;

    :param input_list: 传入验证用户通过的字典信息

    :return:

    '''

    ssh_conn_list = []  # 将生成的多个批量变量加入这个列表

    count_1 = 1

    for i in input_list:

        print('开始连接第%s个主机...' % count_1)

        hostname = i['hostname']

        port = i['port']

        username = i['username']

        password = i['password']

        # 将批量生成的遍历加入“ssh_conn_list”列表

        ssh_conn_list.append("conn%s = ssh_interaction_official('%s', %s, '%s', '%s')"

                             % (input_list.index(i), hostname, port, username, password))

        count_1 += 1

    print('所有主机准备就绪...回车继续')

    input()

    for i in ssh_conn_list:

        exec(i)  # 执行遍历出的【conn%s = ssh_ex_official('%s', %s, '%s', '%s')】变量

    for_number = len(ssh_conn_list)  # 将遍历的次数进行赋值

    for i in range(for_number):

        exec('next(conn%s)' % i)  # exec函数是一个将字符串类型的代码进行执行

    while True:

        print('', end='\n\n\n')

        cmds = input('请输入需要批量处理的命令>>:').strip()  # strip去掉开头和结尾的回车和空格

        if len(cmds) == 0:

            continue

        else:

            for i in range(for_number):

                exec('conn%s.send(cmds)' % i)

            if cmds == 'quit_shell':

                input('返回上级菜单')

                break

def test_login(print_info, host_list_info):

    print(print_info, '-' * 30)

    count = 1

    for i in host_list_info:

        print('开始进行第%s个主机用户验证...' % count)

        ssh_ex(hostname=i['hostname'], port=i['port'],

               username=i['username'], password=i['password'])

        count += 1

    print('连接成功主机列表', '-' * 20)

    conn_count = 0

    for i in conn_deon:

        print(i['hostname'])

        conn_count += 1

    return conn_count

if __name__ == '__main__':

    while True:

        print('''

---------------------------------------------------------------------------

功能说明:

    1.登录过滤主机

    2.批量操作主机

    3.修改过滤后主机端口

    4.查看脚本使用说明

    提示:选择输入对应的功能编号!!!

---------------------------------------------------------------------------

        ''')

        func_in = input('输入对应的功能编号:')

        if func_in == '1':

            while True:

                test_port = input('''

1.登录过滤主机-----------------------------------

请输入需要验证登录的主机列表

提示只可以输入:

1.验证原始主机是否可以登录

2.验证修改端口后的主机是否可以登录

"quit"返回上一级

>>:

                ''')

                if test_port == '1':

                    del conn_deon[:]

                    number = test_login(print_info='验证所有原始主机信息列表', host_list_info=host_list)

                    input('''

验证完成...

\033[0;32m成功连接%s个主机\033[0m

回车继续

                        ''' % number)

                    break

                elif test_port == '2':

                    count = 0

                    for i in conn_deon:

                        count += 1

                    number = test_login(print_info='验证修改端口后的主机列表', host_list_info=conn_amend)

                    new_number = count - number

                    input('''

验证完成...

\033[0;32m修改SSH连接端口后成功连接%s个主机\033[0m

回车继续

                        ''' % new_number)

                    break

                elif test_port == 'quit':

                    break

                else:

                    continue

        elif func_in == '2':

            while True:

                handle_list = input('''

2.批量操作主机-----------------------------------

请输入需要批量操作的主机列表

提示只可以输入:

1.开始非交互批量操作主机

2.开始交互式批量操作主机

3.过滤符合条件的主机加入新列表进行批量操作;注意这是一个扩展功能

"quit"返回上一级

>>:

                ''')

                if handle_list == '1':

                    input('''

\033[0;31m提示:输入“quit_shell”就会退出批量操作主机\033[0m

回车继续

                    ''')

                    bath_handle_host_ex(conn_deon)

                elif handle_list == '2':

                    input('''

\033[0;31m提示:输入“quit_shell”就会退出批量操作主机\033[0m

回车继续

                    ''')

                    bath_handle_host_interaction(conn_deon)

                elif handle_list == '3':

                    for i in conn_deon:

                        hostname = i['hostname']

                        port = i['port']

                        username = i['username']

                        password = i['password']

                        re_parameter = ssh_host_filtration(hostname=hostname, port=port, username=username,

                                                 password=password,

                                                 filtration_parameter='cat /etc/.kyinfo |grep milestone')

                        if re_parameter:

                            re_version = '\d\.\d'

                            res = re.search(re_version, re_parameter, re.I).group()

                            if res == '3.3':

                                host_version_filtration.append(i)

                            else:

                                no_host_version_filtration.append({'hostname':hostname, '系统版本':res})

                        else:

                            no_host_version_filtration.append({'hostname':hostname, '系统版本':'非麒麟系统'})

                    if len(host_version_filtration) == 0:

                        print('', end='\n\n\n')

                        print('没有符合过滤条件的主机...')

                        print('\033[0;31m不符合主机过滤规则的主机和版本如下', '-' * 30)

                        for i in no_host_version_filtration:

                            for k in i:

                                print('\033[0;31m%s\033[0m' % i[k], end=' ')

                            print('')

                        input('回车继续')

                        del no_host_version_filtration[:]

                    else:

                        bath_handle_host_interaction(host_version_filtration)

                        print('', end='\n\n\n')

                        print('\033[0;31m不符合主机过滤规则的主机和版本\033[0m', '-' * 30)

                        for i in no_host_version_filtration:

                            for k in i:

                                print('\033[0;31m%s\033[0m' % i[k], end=' ')

                            print('')

                        input('回车继续')

                        del no_host_version_filtration[:]

                        del host_version_filtration[:]

                elif handle_list == 'quit':

                    break

                else:

                    print('输入的列表名存在错误!请重新输入!')

                    continue

        elif func_in == '3':

            print('3.修改过滤主机端口-----------------------------------')

            func_in_3 = input('''

是否开始修改连接失败的端口号?

yes/no

>>:

            ''')

            if func_in_3 == 'yes':

                port_new = int(input('请输入新的端口号:'))

                for i in conn_fail:

                    i['port'] = port_new

                    conn_amend.append(i)

                print('\033[0;32m-\033[0m' * 90)

                count = 0

                for i in conn_amend:

                    print(i)

                    count += 1

                input('''

已完成主机列表的端口\033[0;32m%s\033[0m修改;

共计:完成\033[0;32m%s\033[0m个

回车继续

                ''' % (port_new, count))

            else:

                continue

        elif func_in == '4':

            print('4.查看脚本使用说明-----------------------------------')

            input('''

本脚本使用步骤:

        1.运行‘1.1’过滤主机

        2.运行‘3’修改过滤后的主机端口

        3.运行‘1.2’继续过滤主机

        4.运行‘2.1’开始批量操作主机

        回车返回上级菜单...

            ''')

        else:

            exit(0)

代码结束------------------------------------------------------------

使用方法:

1.每使用一次脚本就需要执行1.1过滤一次主机

2.一般批量操作主机使用2.2交互式操作就可以了

3.代码缺点:它不是多线程的,for循环比较慢-需要一个一个为主机下发命令;

      另外一个缺点就是,主机账号信息需要手动输入host_list列表中;

      host_list中的信息不是使用hash加密的,存在安全隐患;

      你可以自己写一个登录加密的列表进行给它加密;