Python 原始套接字和流量嗅探
Windows 上的包嗅探
#!/usr/bin/python import socket import os #监听的主机 host = "10.10.10.160" #创建原始套接字,然后绑定在公开接口上 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind((host,0)) #设置在捕获的数据包中包含IP头 sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1) #在Windows平台上,我们需要设置IOCTL以启动混杂模式 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) #读取单个数据包 print sniffer.recvfrom(65565) #在Windows平台上关闭混杂模式 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
使用scapy DNS 嗅探
# -*- coding: UTF-8 -*- from scapy.all import * scapy.config.conf.sniff_promisc=True #设置混杂模式 def packetHandler(pkt): print(pkt.summary()) udp = pkt.getlayer(UDP) print(udp.show()) if __name__ == '__main__': dev = "en0" filter = "udp port 53" sniff(filter=filter,prn=packetHandler,iface=dev)
使用scapy DNS 欺骗
#coding=utf-8 import os import sys import subprocess from scapy.all import * RSN = 48 #管理帧信息元素(Dot11Elt)ID48是RSN信息 WPA = 221 #管理帧信息元素ID221是WPA信息 Dot11i = {0:'GroupCipher', 1:'WEP-40', 2:'TKIP', 4:'CCMP', 5:'WEP-104' } #RSN信息的第6字节 WPA_Auth = {1:'802.11x/PMK', 2:'PSK' } #RSN信息的第22字节 DN = open(os.devnull,'w') def get_wlan_interfaces(): ''' 返回当前PC上所有的无线网卡以及网卡所处的模式 ''' interfaces = {'monitor':[],'managed':[],'all':[]} proc = subprocess.Popen(['iwconfig'],stdout=subprocess.PIPE,stderr=DN) lines = proc.communicate()[0].split('\n') for line in lines: if line: if line[0] != ' ': iface = line.split(' ')[0] if 'Mode:Monitor' in line: interfaces['monitor'].append(iface) if 'IEEE 802.11' in line: interfaces['managed'].append(iface) interfaces['all'].append(iface) if len(interfaces['managed']) == 0: sys.exit('[!]没有无线网卡,请插入网卡') return interfaces interfaces = get_wlan_interfaces() #获取当前的无线网卡 def get_strongest_inface(): ''' 通过iwlist dev scan命令,根据无线网卡可获取到的AP数量来判断哪个网卡的功率最强 ''' iface_APs = [] #interfaces = get_wlan_interfaces() for iface in interfaces['managed']: count = 0 if iface: proc = subprocess.Popen(['iwlist',iface,'scan'],stdout=subprocess.PIPE,stderr=DN) lines = proc.communicate()[0].split('\n') for line in lines: if line: if '- Address:' in line: count += 1 iface_APs.append((count,iface)) interface = max(iface_APs)[1] return interface def start_monitor_mode(): ''' 通过airmon-ng工具将无线网卡启动为监听状态 ''' if interfaces['monitor']: print '[*]监听网卡为:%s' % interfaces['monitor'][0] return interfaces['monitor'][0] interface = get_strongest_inface() print '[*]网卡%s开启监听模式...' % interface try: os.system('/usr/sbin/airmon-ng start %s' % interface) moni_inface = get_wlan_interfaces()['monitor'] print '[*]监听网卡为:%s' % moni_inface[0] return moni_inface except: sys.exit('[!]无法开启监听模式') def get_AP_info(pkt): ''' 从Dot11数据包中获取AP的SSID,BSSID,chanle,加密等信息 ''' AP_info = {} bssid = pkt[Dot11][Dot11Elt].info ssid = pkt[Dot11].addr2 chanle = str(ord(pkt[Dot11][Dot11Elt][:3].info)) AP_infos = [bssid,chanle] wpa_info,cipher_info = get_Dot11_RSN(pkt) if wpa_info and cipher_info: AP_infos = AP_infos + [wpa_info,cipher_info] AP_info[ssid]=AP_infos return AP_info APs_info = {} def get_APs_info(pkt): global APs_info if pkt.haslayer(Dot11) and (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)): AP_info = get_AP_info(pkt) if not APs_info.has_key(AP_info.keys()[0]): APs_info.update(AP_info) return APs_info already_shows = [] def show_APs_info(pkt): global already_shows APs_info = get_APs_info(pkt) for (key,value) in APs_info.items(): if key not in already_shows: already_shows.append(key) print '-' * 40 print ' [+]AP的BSSID:%s' % value[0] print ' [+]AP的SSID:%s' % key print ' [+]AP当前的chanle:%s' % value[1] if len(value) == 4: print ' [+]AP的认证方式为:%s' % value[2] print ' [+]AP的加密算法为:%s' % value[3] else: print ' [+]开放验证!!' print '-' * 40 def get_Dot11_RSN(pkt): ''' 从Beacon帧以及ProbeResponse帧获取cipher及auth信息 ''' ssid = pkt[Dot11].addr2 len_Elt = len(pkt[Dot11Elt].summary().split('/')) #print pkt.show() for i in range(len_Elt): if pkt[Dot11Elt][i].ID == RSN: try: RSN_info = hexstr(pkt[Dot11Elt][i].info) cipher_index = RSN_info.find('ac') #第一个00 0f ac 02中的‘02’代表cipher auth_index = RSN_info.rfind('ac') #从后往前数第一个00 0f ac 02中的‘02’代表AUTH cipher_num = int(RSN_info[(cipher_index + 3):(cipher_index + 5)]) auth_num = int(RSN_info[(auth_index + 3):(auth_index + 5)]) for key,value in Dot11i.items(): if cipher_num == key: cipher_info = value for key,value in WPA_Auth.items(): if auth_num == key: wpa_info = value #print wpa_info,cipher_info return wpa_info,cipher_info except: pass return None,None def sniffering(interface,action): ''' 嗅探5000个数据包 ''' print '[*]附近AP信息如下:' sniff(iface=interface,prn=action,count=5000,store=0) def main(): moni_inface = start_monitor_mode() sniffering(moni_inface, show_APs_info) if __name__ == '__main__': main()
自己实现IP层的解码
#!/usr/bin/python #coding=utf-8 import socket import os import struct from ctypes import * #监听的主机 host = "192.168.1.2" #IP头定义 class IP(Structure): """docstring for IP""" _fields_ = [ ("ihl", c_ubyte, 4), ("version", c_ubyte, 4), ("tos", c_ubyte), ("len", c_ushort), ("id", c_ushort), ("offset", c_ushort), ("ttl", c_ubyte), ("protocol_num", c_ubyte), ("sum", c_ushort), ("src", c_ulong), ("dst", c_ulong) ] def __new__(self,socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): #协议字段与协议名称对应 self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"} #可读性更强的IP地址 self.src_address = socket.inet_ntoa(struct.pack("",self.src)) self.dst_address = socket.inet_ntoa(struct.pack(" ",self.dst)) #协议类型 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) #下面的代码类似于之前的例子 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind((host,0)) sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) try: while True: #读取数据包 raw_buffer = sniffer.recvfrom(65565)[0] #将缓冲区的前20个字节按IP头进行解析 ip_header = IP(raw_buffer[0:20]) #输出协议和通信双方IP地址 print("Protocol : %s %s -> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address)) #处理CTRL-C except KeyboardInterrupt: #如果运行在Windows上,关闭混杂模式 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
改进的可在Kali中运行的代码
#!/usr/bin/python import socket import os import struct from ctypes import * #监听的主机 host = "192.168.1.2" #IP头定义 class IP(Structure): """docstring for IP""" _fields_ = [ ("ihl", c_ubyte, 4), ("version", c_ubyte, 4), ("tos", c_ubyte), ("len", c_ushort), ("id", c_ushort), ("offset", c_ushort), ("ttl", c_ubyte), ("protocol_num", c_ubyte), ("sum", c_ushort), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self,socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): #协议字段与协议名称对应 self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"} #可读性更强的IP地址 self.src_address = socket.inet_ntoa(struct.pack("@I",self.src)) self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst)) #协议类型 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) #下面的代码类似于之前的例子 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind((host,0)) sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) try: while True: #读取数据包 raw_buffer = sniffer.recvfrom(65565)[0] #将缓冲区的前20个字节按IP头进行解析 ip_header = IP(raw_buffer[0:20]) #输出协议和通信双方IP地址 print "Protocol : %s %s -> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address) #处理CTRL-C except KeyboardInterrupt: #如果运行在Windows上,关闭混杂模式 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
解码ICMP层信息
#!/usr/bin/python #coding=utf-8 import socket import os import struct from ctypes import * class IP(Structure): """docstring for IP""" _fields_ = [ ("ihl", c_ubyte, 4), ("version", c_ubyte, 4), ("tos", c_ubyte), ("len", c_ushort), ("id", c_ushort), ("offset", c_ushort), ("ttl", c_ubyte), ("protocol_num", c_ubyte), ("sum", c_ushort), ("src", c_ulong), ("dst", c_ulong) ] def __new__(self,socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"} self.src_address = socket.inet_ntoa(struct.pack("",self.src)) self.dst_address = socket.inet_ntoa(struct.pack(" ",self.dst)) try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) class ICMP(Structure): """docstring for ICMP""" _fields_ = [ ("type", c_ubyte), ("code", c_ubyte), ("checksum", c_ushort), ("unused", c_ushort), ("next_hop_mtu", c_ushort) ] def __new__(self,socket_buffer): return self.from_buffer_copy(socket_buffer) def __new__(self,socket_buffer): pass if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind(("192.168.1.2",0)) sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) try: while True: raw_buffer = sniffer.recvfrom(65565)[0] ip_header = IP(raw_buffer[0:20]) print ("Protocol : %s %s -> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address)) #如果为ICMP,进行处理 if ip_header.protocol == "ICMP": #计算ICMP包的起始位置 offset = ip_header.ihl*4 buf = raw_buffer[offset:offset + sizeof(ICMP)] #解析ICMP数据 icmp_header = ICMP(buf) print ("ICMP -> Type : %d Code : %d"%(icmp_header.type,icmp_header.code)) except KeyboardInterrupt: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
自己实现PING
# encoding:utf-8 import time,struct,socket,select def chesksum(data): n = len(data) m = n % 2 sum = 0 for i in range(0, n - m, 2): # 传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位 sum += (data[i]) + ((data[i+1]) << 8) sum = (sum >> 16) + (sum & 0xffff) if m: sum += (data[-1]) sum = (sum >> 16) + (sum & 0xffff) answer = ~sum & 0xffff # 主机字节序转网络字节序列 answer = answer >> 8 | (answer << 8 & 0xff00) return answer def request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body): # 把字节打包成二进制数据 imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body) # 获取校验和 icmp_chesksum = chesksum(imcp_packet) # 把校验和传入,再次打包 imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, icmp_chesksum, data_ID, data_Sequence, payload_body) return imcp_packet # 初始化套接字,并发送 def raw_socket(dst_addr,imcp_packet): # 实例化一个socket对象,ipv4,原套接字(普通套接字无法处理ICMP等报文),分配协议端口 rawsocket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) # 记录当前请求时间 send_request_ping_time = time.time() # 发送数据到网络 rawsocket.sendto(imcp_packet, (dst_addr, 80)) return send_request_ping_time, rawsocket def reply_ping(send_request_ping_time, rawsocket, data_Sequence, timeout=2): while True: # 实例化select对象(非阻塞),可读,可写为空,异常为空,超时时间 what_ready = select.select([rawsocket], [], [], timeout) # 等待时间 wait_for_time = (time.time() - started_select) wait_for_time = (time.time() - send_request_ping_time) # 没有返回可读的内容,判断超时 if what_ready[0] == []: # Timeout return -1 # 记录接收时间 time_received = time.time() # 设置接收的包的字节为1024 received_packet, addr = rawsocket.recvfrom(1024) # 获取接收包的icmp头 icmpHeader = received_packet[20:28] # 反转编码 type, code, r_checksum, packet_id, sequence = struct.unpack( ">BBHHH", icmpHeader ) if type == 0 and sequence == data_Sequence: return time_received - send_request_ping_time # 数据包的超时时间判断 timeout = timeout - wait_for_time if timeout <= 0: return -1 def ping(host): sumtime, shorttime, longtime, avgtime = 0, 1000, 0, 0 # TODO icmp数据包的构建 # 8回射请求 11超时 0回射应答 data_type = 8 data_code = 0 # 检验和 data_checksum = 0 # ID data_ID = 0 # 序号 data_Sequence = 1 # 可选的内容 payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data # 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变 dst_addr = socket.gethostbyname(host) print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host, dst_addr)) # 默认发送3次 for i in range(0, 3): # 请求ping数据包的二进制转换 icmp_packet = request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence + i, payload_body) # 连接套接字,并将数据发送到套接字 send_request_ping_time, rawsocket = raw_socket(dst_addr, icmp_packet) # 数据包传输时间 times = reply_ping(send_request_ping_time, rawsocket, data_Sequence + i) if times > 0: print("来自 {0} 的回复: 字节=32 时间={1}ms".format(dst_addr, int(times*1000))) return_time = int(times * 1000) sumtime += return_time if return_time > longtime: longtime = return_time if return_time < shorttime: shorttime = return_time time.sleep(0.7) else: print("请求超时") if __name__ == "__main__": ping("www.baidu.com")