import argparse
import json
import string
import threading
import time
from decimal import Decimal
from pysnmp.entity.rfc3413.oneliner import cmdgen
class SnmpDetect:
OID_url = 'https://blog.csdn.net/qq_28657577/article/details/82834442'
def __init__(self, host, port, key_file='snmp.json', timeout=0):
"""
:param host: 地址(eg:192.168.100.115)
:param port: 端口
:param OID: 传送的OID,个人认为MIB值
:param key: 表示社区名
:param new_key: 最终正确
:param bind_list: 扫描的结果
:param key_list: 常用社区名
:param max_key: 社区名最大位数
:param max_threads_number: 最大线程数
:param max_threads: 限制线程的最大数量
"""
self.host = host
self.port = port
self.timeout = timeout
self.OID = '.1.3.6.1.2.1.1.1.0'
# self.key = 'public'
self.new_key = None
self.bind_list = []
self.key_file = key_file
self.key_list = self._get_key_list()
self.max_key = 10
self.max_threads_number = 1000
self.max_threads = threading.Semaphore(self.max_threads_number) # 限制线程的最大数量为4个
def _get_key_list(self):
# 默认为public
# key_list = ['public', 'privicy', 'Cisco', 'H3C']
with open(self.key_file, 'r', encoding='utf-8') as r:
key_list = json.loads(r.read())
return key_list
def __get_key(self, charsetString, length, key_list=None):
if key_list is None:
key_list = []
if length == 0:
return key_list
new_key_list = []
if len(key_list) == 0:
new_key_list = [c for c in charsetString]
else:
for i in key_list:
for j in charsetString:
d = i + j
new_key_list.append(d)
length -= 1
return self.__get_key(charsetString, length, new_key_list)
def get_key(self):
# charsetString = string.digits + string.ascii_letters + string.punctuation
charsetString = string.digits + string.ascii_letters
length = 1
lis = []
for i in range(1, self.max_key + 1):
key_list = self.__get_key(charsetString, i)
lis = lis + key_list
return lis
def __connect(self, key):
crack = False
bind_list = []
try:
errorIndication, errorStatus, errorIndex, varBinds = cmdgen.CommandGenerator().getCmd(
# 0代表v1,1代表v2c
cmdgen.CommunityData('my-agent', key, 0), # 社区信息,my-agent ,public 表示社区名,1表示snmp v2c版本,0为v1版本
cmdgen.UdpTransportTarget((self.host, self.port)),
# 这是传输的通道,传输到IP 192.168.70.237, 端口 161上(snmp标准默认161 UDP端口)
self.OID, # 传送的OID,个人认为MIB值
)
for bind in varBinds:
d = str(bind).split(' = ')[-1]
bind_list.append(d)
if varBinds:
crack = True
self.new_key = key
self.bind_list = bind_list
except:
pass
time.sleep(self.timeout)
return crack
def connect(self, key, thread):
if thread is True:
with self.max_threads:
crack = self.__connect(key)
else:
crack = self.__connect(key)
return crack
def blow_connect(self):
key_list = self.get_key()
threads = []
for key in key_list:
thread = threading.Thread(target=self.connect, args=(key, True,))
threads.append(thread)
for thread in threads:
if self.new_key:
return self.new_key, self.bind_list
else:
thread.start()
for thread in threads:
if self.new_key:
return self.new_key, self.bind_list
else:
thread.join()
return self.new_key, self.bind_list
def normal_connect(self):
for key in self.key_list:
print(key)
crack = self.connect(key, False)
if crack:
return key, self.bind_list
else:
return self.blow_connect()
def opt():
parser = argparse.ArgumentParser(description='命令行中传入命令')
parser.add_argument('-host', required=True, help='')
parser.add_argument('-port', type=int, help='', default=161)
parser.add_argument('-rule', help='<社区名字典文件>', default='snmp.json')
parser.add_argument('-timeout', type=int, help='<间隔时间>', default=0)
return parser.parse_args()
def get_host(host):
host_list = []
if '-' in host:
h = host.split('-')
left_host = h[0]
left_host_list = left_host.split('.')
right_host = h[1]
right_host_list = right_host.split('.')
for i in range(int(left_host_list[-1]), int(right_host_list[-1]) + 1):
d = left_host_list[:3] + [str(i)]
host_list.append('.'.join(d))
elif '*' in host:
rule = {
'0': {'m': 1, 'n': 255 + 1},
'1': {'m': 0, 'n': 99 + 1},
'2': {'m': 0, 'n': 9 + 1},
}
h = host.split('.')
r = h[-1].split('*')[0]
for i in range(rule['{}'.format(len(r))]['m'], rule['{}'.format(len(r))]['n']):
d = h[:3] + ['{}{}'.format(r, i)]
host_list.append('.'.join(d))
else:
host_list.append(host)
return host_list
def get_model():
model = {
'router': ['RB750Gr3', 'RB941-2nD', 'CCR1009-8G-1S'],
'linux': ['R6100'],
}
def main(flag=False, **kwargs):
start_time = Decimal(time.time()).quantize(Decimal("0.00"))
if flag:
host = kwargs.get('host')
port = kwargs.get('port')
rule = kwargs.get('rule')
timeout = kwargs.get('timeout')
if not timeout:
timeout = 0
host_list = get_host(host)
else:
args = opt()
host_list = get_host(args.host)
port = args.port
rule = args.rule
timeout = args.timeout
text = []
for host in host_list:
key, bind_list = SnmpDetect(host, port, key_file=rule, timeout=timeout).normal_connect()
d = {
'host': host,
'port': port,
'key': key,
'bind_list': bind_list,
}
text.append(d)
end_time = Decimal(time.time()).quantize(Decimal("0.00"))
print('结果为:\n{},\n用时:{}s'.format(text, end_time - start_time))
return {'result': text, 'hold_time': '{}s'.format(end_time - start_time)}
if __name__ == '__main__':
main()
# ip = '192.168.79.136'
# ip = '220.132.209.236'
# ip = '102.218.195.226'
# ip = '14.141.209.187'
# host = '67.240.210.178'
# ip = '206.125.43.40'
# key = 'public'
# key = 'private'
# key = 'all'
# key = 'location'
# key = 'contact'
# port = 161
# crack = snmp_connect(ip, key, port)
# print(crack)
# keys, bind_list = SnmpDetect(host).normal_connect()
# print(keys)
# print(bind_list)