使用 Python 爬虫爬取网络
Mechanize库浏览页面
#!/usr/bin/python #coding=utf-8 import mechanize def viewPage(url): browser = mechanize.Browser() page = browser.open(url) source_code = page.read() print source_code viewPage('http://www.imooc.com/')
使用代理服务器、User-Agent和cookie:
#!/usr/bin/python #coding=utf-8 import mechanize def testProxy(url, proxy): browser = mechanize.Browser() browser.set_proxies(proxy) page = browser.open(url) source_code = page.read() print source_code url = 'http://2017.ip138.com/ic.asp' hideMeProxy = {'http': '139.196.202.164:9001'} testProxy(url, hideMeProxy)
#!/usr/bin/python #coding=utf-8 import mechanize def testUserAgent(url, userAgent): browser = mechanize.Browser() browser.addheaders = userAgent page = browser.open(url) source_code = page.read() print source_code url = 'http://whatismyuseragent.dotdoh.com/' userAgent = [('User-agent', 'Mozilla/5.0 (X11; U; Linux 2.4.2-2 i586; en-US; m18) Gecko/20010131 Netscape6/6.01')] testUserAgent(url, userAgent)
把代码集成在Python类的AnonBrowser中
#!/usr/bin/python #coding=utf-8 import mechanize import cookielib import random class anonBrowser(mechanize.Browser): def __init__(self, proxies = [], user_agents = []): mechanize.Browser.__init__(self) self.set_handle_robots(False) # 可供用户使用的代理服务器列表 self.proxies = proxies # user_agent列表 self.user_agents = user_agents + ['Mozilla/4.0 ', 'FireFox/6.01','ExactSearch', 'Nokia7110/1.0'] self.cookie_jar = cookielib.LWPCookieJar() self.set_cookiejar(self.cookie_jar) self.anonymize() # 清空cookie def clear_cookies(self): self.cookie_jar = cookielib.LWPCookieJar() self.set_cookiejar(self.cookie_jar) # 从user_agent列表中随机设置一个user_agent def change_user_agent(self): index = random.randrange(0, len(self.user_agents) ) self.addheaders = [('User-agent', ( self.user_agents[index] ))] # 从代理列表中随机设置一个代理 def change_proxy(self): if self.proxies: index = random.randrange(0, len(self.proxies)) self.set_proxies( {'http': self.proxies[index]} ) # 调用上述三个函数改变UA、代理以及清空cookie以提高匿名性,其中sleep参数可让进程休眠以进一步提高匿名效果 def anonymize(self, sleep = False): self.clear_cookies() self.change_user_agent() self.change_proxy() if sleep: time.sleep(60)
测试每次是否使用不同的cookie访问:
#!/usr/bin/python #coding=utf-8 from anonBrowser import * ab = anonBrowser(proxies=[], user_agents=[('User-agent','superSecretBroswer')]) for attempt in range(1, 5): # 每次访问都进行一次匿名操作 ab.anonymize() print '[*] Fetching page' response = ab.open('http://www.kittenwar.com/') for cookie in ab.cookie_jar: print cookie
用BeautifulSoup解析Href链接:
#!/usr/bin/python #coding=utf-8 from anonBrowser import * from BeautifulSoup import BeautifulSoup import os import optparse import re def printLinks(url): ab = anonBrowser() ab.anonymize() page = ab.open(url) html = page.read() # 使用re模块解析href链接 try: print '[+] Printing Links From Regex.' link_finder = re.compile('href="(.*?)"') links = link_finder.findall(html) for link in links: print link except: pass # 使用bs4模块解析href链接 try: print '\n[+] Printing Links From BeautifulSoup.' soup = BeautifulSoup(html) links = soup.findAll(name='a') for link in links: if link.has_key('href'): print link['href'] except: pass def main(): parser = optparse.OptionParser('[*]Usage: python linkParser.py -u') parser.add_option('-u', dest='tgtURL', type='string', help='specify target url') (options, args) = parser.parse_args() url = options.tgtURL if url == None: print parser.usage exit(0) else: printLinks(url) if __name__ == '__main__': main()
用BeautifulSoup映射图像
#!/usr/bin/python #coding=utf-8 from anonBrowser import * from BeautifulSoup import BeautifulSoup import os import optparse def mirrorImages(url, dir): ab = anonBrowser() ab.anonymize() html = ab.open(url) soup = BeautifulSoup(html) image_tags = soup.findAll('img') for image in image_tags: # lstrip() 方法用于截掉字符串左边的空格或指定字符 filename = image['src'].lstrip('http://') filename = os.path.join(dir, filename.replace('/', '_')) print '[+] Saving ' + str(filename) data = ab.open(image['src']).read() # 回退 ab.back() save = open(filename, 'wb') save.write(data) save.close() def main(): parser = optparse.OptionParser('[*]Usage: python imageMirror.py -u-d ') parser.add_option('-u', dest='tgtURL', type='string', help='specify target url') parser.add_option('-d', dest='dir', type='string', help='specify destination directory') (options, args) = parser.parse_args() url = options.tgtURL dir = options.dir if url == None or dir == None: print parser.usage exit(0) else: try: mirrorImages(url, dir) except Exception, e: print '[-] Error Mirroring Images.' print '[-] ' + str(e) if __name__ == '__main__': main()
用Python与谷歌API交互
#!/usr/bin/python #coding=utf-8 import urllib from anonBrowser import * def google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term) print response.read() google('Boundock Saint')
接着就对Json格式的数据进行处理,添加json库的load()函数对Json数据进行加载即可
#!/usr/bin/python #coding=utf-8 import urllib from anonBrowser import * import json def google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term) objects = json.load(response) print objects google('Boundock Saint')
编写Google_Result类,用于保存Json数据解析下来的标题
#!/usr/bin/python #coding=utf-8 import urllib from anonBrowser import * import json import optparse class Google_Result: def __init__(self,title,text,url): self.title = title self.text = text self.url = url def __repr__(self): return self.title def google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term) objects = json.load(response) results = [] for result in objects['items']: url = result['link'] title = result['title'] text = result['snippet'] print url print title print text new_gr = Google_Result(title, text, url) results.append(new_gr) return results def main(): parser = optparse.OptionParser('[*]Usage: python anonGoogle.py -k') parser.add_option('-k', dest='keyword', type='string', help='specify google keyword') (options, args) = parser.parse_args() keyword = options.keyword if options.keyword == None: print parser.usage exit(0) else: results = google(keyword) print results if __name__ == '__main__': main()
用Python解析Tweets个人主页
#!/usr/bin/python #coding=utf-8 import json import urllib from anonBrowser import * class reconPerson: def __init__(self, first_name, last_name, job='', social_media={}): self.first_name = first_name self.last_name = last_name self.job = job self.social_media = social_media def __repr__(self): return self.first_name + ' ' + self.last_name + ' has job ' + self.job def get_social(self, media_name): if self.social_media.has_key(media_name): return self.social_media[media_name] return None def query_twitter(self, query): query = urllib.quote_plus(query) results = [] browser = anonBrowser() response = browser.open('http://search.twitter.com/search.json?q=' + query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] results.append(new_result) return results ap = reconPerson('Boondock', 'Saint') print ap.query_twitter('from:th3j35t3r since:2010-01-01 include:retweets')
从推文中提取地理位置信息
#!/usr/bin/python #coding=utf-8 import json import urllib import optparse from anonBrowser import * def get_tweets(handle): query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('http://search.twitter.com/search.json?q='+ query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweets def load_cities(cityFile): cities = [] for line in open(cityFile).readlines(): city=line.strip('\n').strip('\r').lower() cities.append(city) return cities def twitter_locate(tweets,cities): locations = [] locCnt = 0 cityCnt = 0 tweetsText = "" for tweet in tweets: if tweet['geo'] != None: locations.append(tweet['geo']) locCnt += 1 tweetsText += tweet['tweet'].lower() for city in cities: if city in tweetsText: locations.append(city) cityCnt+=1 print "[+] Found " + str(locCnt) + " locations via Twitter API and " + str(cityCnt) + " locations from text search." return locations def main(): parser = optparse.OptionParser('[*]Usage: python twitterGeo.py -u[-c ') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') parser.add_option('-c', dest='cityFile', type='string', help='specify file containing cities to search') (options, args) = parser.parse_args() handle = options.handle cityFile = options.cityFile if (handle==None): print parser.usage exit(0) cities = [] if (cityFile!=None): cities = load_cities(cityFile) tweets = get_tweets(handle) locations = twitter_locate(tweets,cities) print "[+] Locations: "+str(locations) if __name__ == '__main__': main()]
用正则表达式解析Twitter用户的兴趣爱好
#!/usr/bin/python #coding=utf-8 import json import re import urllib import urllib2 import optparse from anonBrowser import * def get_tweets(handle): query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('http://search.twitter.com/search.json?q='+ query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweets def find_interests(tweets): interests = {} interests['links'] = [] interests['users'] = [] interests['hashtags'] = [] for tweet in tweets: text = tweet['tweet'] links = re.compile('(http.*?)\Z|(http.*?) ').findall(text) for link in links: if link[0]: link = link[0] elif link[1]: link = link[1] else: continue try: response = urllib2.urlopen(link) full_link = response.url interests['links'].append(full_link) except: pass interests['users'] += re.compile('(@\w+)').findall(text) interests['hashtags'] += re.compile('(#\w+)').findall(text) interests['users'].sort() interests['hashtags'].sort() interests['links'].sort() return interests def main(): parser = optparse.OptionParser('[*]Usage: python twitterInterests.py -u') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') (options, args) = parser.parse_args() handle = options.handle if handle == None: print parser.usage exit(0) tweets = get_tweets(handle) interests = find_interests(tweets) print '\n[+] Links.' for link in set(interests['links']): print ' [+] ' + str(link) print '\n[+] Users.' for user in set(interests['users']): print ' [+] ' + str(user) print '\n[+] HashTags.' for hashtag in set(interests['hashtags']): print ' [+] ' + str(hashtag) if __name__ == '__main__': main()
编写reconPerson类,封装所有抓取的地理位置、兴趣爱好以及Twitter页面的代码:
#!/usr/bin/python #coding=utf-8 import urllib from anonBrowser import * import json import re import urllib2 class reconPerson: def __init__(self, handle): self.handle = handle self.tweets = self.get_tweets() def get_tweets(self): query = urllib.quote_plus('from:' + self.handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('http://search.twitter.com/search.json?q=' + query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweets def find_interests(self): interests = {} interests['links'] = [] interests['users'] = [] interests['hashtags'] = [] for tweet in self.tweets: text = tweet['tweet'] links = re.compile('(http.*?)\Z|(http.*?) ').findall(text) for link in links: if link[0]: link = link[0] elif link[1]: link = link[1] else: continue try: response = urllib2.urlopen(link) full_link = response.url interests['links'].append(full_link) except: pass interests['users'] += re.compile('(@\w+)').findall(text) interests['hashtags'] += re.compile('(#\w+)').findall(text) interests['users'].sort() interests['hashtags'].sort() interests['links'].sort() return interests def twitter_locate(self, cityFile): cities = [] if cityFile != None: for line in open(cityFile).readlines(): city = line.strip('\n').strip('\r').lower() cities.append(city) locations = [] locCnt = 0 cityCnt = 0 tweetsText = '' for tweet in self.tweets: if tweet['geo'] != None: locations.append(tweet['geo']) locCnt += 1 tweetsText += tweet['tweet'].lower() for city in cities: if city in tweetsText: locations.append(city) cityCnt += 1 return locations
使用Smtplib给目标对象发邮件
#!/usr/bin/python #coding=utf-8 import smtplib from email.mime.text import MIMEText def sendMail(user, pwd, to, subject, text): msg = MIMEText(text) msg['From'] = user msg['To'] = to msg['Subject'] = subject try: smtpServer = smtplib.SMTP('smtp.gmail.com', 587) print "[+] Connecting To Mail Server." smtpServer.ehlo() print "[+] Starting Encrypted Session." smtpServer.starttls() smtpServer.ehlo() print "[+] Logging Into Mail Server." smtpServer.login(user, pwd) print "[+] Sending Mail." smtpServer.sendmail(user, to, msg.as_string()) smtpServer.close() print "[+] Mail Sent Successfully." except: print "[-] Sending Mail Failed." user = 'username' pwd = 'password' sendMail(user, pwd, 'target@tgt.tgt', 'Re: Important', 'Test Message')
用smtplib进行网络钓鱼
#!/usr/bin/python #coding=utf-8 import smtplib import optparse from email.mime.text import MIMEText from twitterClass import * from random import choice def sendMail(user, pwd, to, subject, text): msg = MIMEText(text) msg['From'] = user msg['To'] = to msg['Subject'] = subject try: smtpServer = smtplib.SMTP('smtp.gmail.com', 587) print "[+] Connecting To Mail Server." smtpServer.ehlo() print "[+] Starting Encrypted Session." smtpServer.starttls() smtpServer.ehlo() print "[+] Logging Into Mail Server." smtpServer.login(user, pwd) print "[+] Sending Mail." smtpServer.sendmail(user, to, msg.as_string()) smtpServer.close() print "[+] Mail Sent Successfully." except: print "[-] Sending Mail Failed." def main(): parser = optparse.OptionParser('[*]Usage: python sendSam.py -u-t ' + '-l-p ') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') parser.add_option('-t', dest='tgt', type='string', help='specify target email') parser.add_option('-l', dest='user', type='string', help='specify gmail login') parser.add_option('-p', dest='pwd', type='string', help='specify gmail password') (options, args) = parser.parse_args() handle = options.handle tgt = options.tgt user = options.user pwd = options.pwd if handle == None or tgt == None or user ==None or pwd==None: print parser.usage exit(0) print "[+] Fetching tweets from: " + str(handle) spamTgt = reconPerson(handle) spamTgt.get_tweets() print "[+] Fetching interests from: " + str(handle) interests = spamTgt.find_interests() print "[+] Fetching location information from: " + str(handle) location = spamTgt.twitter_locate('mlb-cities.txt') spamMsg = "Dear " + tgt + "," if (location != None): randLoc = choice(location) spamMsg += " Its me from " + randLoc + "." if (interests['users'] != None): randUser = choice(interests['users']) spamMsg += " " + randUser + " said to say hello." if (interests['hashtags'] != None): randHash=choice(interests['hashtags']) spamMsg += " Did you see all the fuss about " + randHash + "?" if (interests['links']!=None): randLink=choice(interests['links']) spamMsg += " I really liked your link to: " + randLink + "." spamMsg += " Check out my link to http://evil.tgt/malware" print "[+] Sending Msg: " + spamMsg sendMail(user, pwd, tgt, 'Re: Important', spamMsg) if __name__ == '__main__': main()