python嵌入式主机ip搜索 windows服务器 linux客户端
这种发送受限广播和直接广播的模式有很大局限性,要求要么机器在同一个局域网内即不通过路由器,要么发送主机的子网掩码跟接收主机的子网掩码一致,从而使用广播地址可以通信。但在网络情况复杂且不知道目标主机的子网掩码情况下效果不好(目标主机需要通过路由器,且目标主机的子网掩码跟发送主机不一致,广播消息是到不到那里的)。所以后边考虑使用多播技术跨网段实现目标主机ip查找。
客户端
# coding=utf-8 from socket import * import os import time import ctypes import io import threading #kernel32 = ctypes.windll.kernel32 #kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128)
#上了是为了设置cmd窗口运行一段时间后卡住问题,即去掉了窗口快速编辑模式。 但是去掉后无法输入,所以这里暂时先屏蔽 BroadCastPort = 18666 ListenPort = 18667 g_search_over_flg = False g_search_over_time = time.time() def save_test_as_utf8(path,context): with io.open(path, "wb") as f: f.write(context.encode('utf-8')) def check_ip_valide(strip): ret = os.system("ping -w 1000 -n 1 %s"%strip) if(ret != 0): return False return True def broadcast_ip(dstIp): ADDR = (dstIp, BroadCastPort) broadcast = socket(AF_INET, SOCK_DGRAM) broadcast.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) broadcast.sendto("who is 007".encode("utf8"), ADDR) broadcast.close() def save_all(ltIpInfo): strContext = "" if(len(ltIpInfo)>0): for ipInfo in ltIpInfo: strContext += ipInfo+"\r\n" save_test_as_utf8("已获取的IP.txt",strContext) def wait_007_anwser(): broadcastrecv = socket(AF_INET, SOCK_DGRAM) broadcastrecv.bind(('0.0.0.0',ListenPort)) broadcastrecv.settimeout(2) print('wating...') wait_time_while_search_over = 6 recv_ip_and_data = [] while True: try: data, addr = broadcastrecv.recvfrom(1024) rc = addr[0] + " :"+data.decode("utf8") print("anwser " + rc) recv_ip_and_data.append(rc) except error as err_msg: #print("wait rec error:",err_msg) if(g_search_over_flg): if( time.time() - g_search_over_time >= wait_time_while_search_over ): break else: print("wait for %d second..."%int( wait_time_while_search_over - (time.time() - g_search_over_time ))) broadcastrecv.close() print("over...") for info in recv_ip_and_data: print(info) save_all(recv_ip_and_data) def genarate_broadcast_ip(input_ip): ret_ip_lt = [] if(input_ip=="" or input_ip is None): ret_ip_lt.append("255.255.255.255") else: input_ip = str(input_ip) ip_metas = [ v.strip() for v in input_ip.split('.') ] if(len(ip_metas)==3): #use 192.168.6.255 ret_ip_lt.append(ip_metas[0]+"."+ip_metas[1]+"."+ip_metas+".255") elif(len(ip_metas)==2): #use 192.168.0.255 to 192.168.254.255 str_ip_pre = ip_metas[0]+"."+ip_metas[1]+"." ret_ip_lt = [ str_ip_pre+str(i)+".255" for i in range(255)] else: print("input error!") return ret_ip_lt def search_ip(input_ip): #print("begin search ip") ltip = genarate_broadcast_ip(input_ip) for dstIp in ltip: print("broadcast ip %s"%dstIp) broadcast_ip(dstIp) time.sleep(0.1) global g_search_over_time global g_search_over_flg g_search_over_time = time.time() g_search_over_flg = True #print("over search ip") if __name__ == '__main__': print("input example (192.168 or 192.168.6 or nothing ,then push enter):") input_ip = input() rec_th = threading.Thread(target=wait_007_anwser) rec_th.start() search_th= threading.Thread(target=search_ip,args=(input_ip,)) search_th.start() search_th.join() rec_th.join() os.system("pause")
Linux服务器端
# coding=utf-8 from socket import * import time AnwserBroadCastPort = 18667 ListenPort = 18666 def anwser(askerIp): print("anwser ",askerIp) ADDR = (askerIp, AnwserBroadCastPort) broadcast = socket(AF_INET, SOCK_DGRAM) #broadcast.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) broadcast.sendto("I am 007", ADDR) broadcast.close() def wait_client_ask(): bret = None broadcastrecv = socket(AF_INET, SOCK_DGRAM) broadcastrecv.bind(('0.0.0.0',ListenPort)) print('wating...') try: data, addr = broadcastrecv.recvfrom(1024) data = data.decode("utf8") anwerIp = addr[0] print('recv: %s %s'%(anwerIp,data) ) bret = anwerIp except error as err_msg: print("wait rec error:",err_msg) bret = None broadcastrecv.close() return bret if __name__ == '__main__': while(True): ret = wait_client_ask() if(ret): anwser(ret)