08 分布式计算MapReduce--词频统计
WordCount程序任务:
程序 |
WordCount |
输入 |
一个包含大量单词的文本文件 |
输出 |
文件中每个单词及其出现次数(频数), 并按照单词字母顺序排序, 每个单词和其频数占一行,单词和频数之间有间隔 |
1.用你最熟悉的编程环境,编写非分布式的词频统计程序。
import cProfile
import pstats
def process_file(dst):
try:
f = open(dst, "r") # 打开文件
except IOError as s:
print(s)
return None
try:
bvffer = f.read() # 读文件到缓冲区
except:
print('Read File Error!')
return None
f.close()
return bvffer
def process_buffer(bvffer):
if bvffer:
word_freq = {}
# 下面添加处理缓冲区bvffer代码,统计每个单词的频率,存放在字典word_freq
bvffer = bvffer.lower()
# 去除文本中的中英文标点符号
for ch in '“‘!;,.?”':
bvffer = bvffer.replace(ch, " ")
words = bvffer.strip().split()
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1 # 给单词计数
return word_freq
def output_result(word_freq):
if word_freq:
sorted_word_freq = sorted(word_freq.items(), key=lambda v: v[1], reverse=True)
for item in sorted_word_freq[:]:
print(item)
def main():
dst = "C:\\Users\\acer\\Desktop\\sentence.txt"
bvffer = process_file(dst)
word_freq = process_buffer(bvffer)
output_result(word_freq)
if __name__ == "__main__":
cProfile.run("main()", "result")
p = pstats.Stats("result")
p.strip_dirs().sort_stats("call").print_stats(10)
p.strip_dirs().sort_stats("cumulative").print_stats()
p.print_callees("process_buffer")
2.用MapReduce实现词频统计
2.1 编写Map函数
编写mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" % (word, 1)
授予可运行权限
$ chmod +x mapper.py
本地测试mapper.py
2.2 编写Reduce函数
编写reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = word if word == current_word: print "%s\t%s" % (current_word, current_count)
授予可运行权限
$ chmod +x reducer.py