点击查看代码
import collections
import re
from d2l import torch as d2l
# 文本预处理
# 将文本当作时序序列,将文本中的字、词作为一个样本,样本之间有时序信息
# 读取数据集
print('读取数据集')
#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine(): #@save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
# 把非字母的内容全部替换为空格
# 有损操作
# 转小写
# strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])
# 词元化
print('词元化')
def tokenize(lines, token='word'): #@save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
tokens = tokenize(lines)
tokens1 = tokenize(lines, token='char')
for i in range(11):
print(lines[i])
print(tokens[i])
print(tokens1[i])
# 词汇表
# 将word/char 映射到从0开始的index
# 字符串 cpu很慢,GPU做不了
class Vocab: #@save
"""文本词表"""
# min_freq
# token少于某个次数,将其丢掉,unk
# reserved_tokens
# 句子开始token、句子结束token
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
# 出现频率由大到小排序
"""
list = sorted(iterable, key=None, reverse=False)
其中,iterable 表示指定的序列,key 参数可以自定义排序规则;
reverse 参数指定以升序(False,默认)还是降序(True)进行排序。
sorted() 函数会返回一个排好序的列表。
#字典默认按照key进行排序
a = {4:1, 5:2, 3:3, 2:6, 1:8}
print(sorted(a.items()))
## [(1, 8), (2, 6), (3, 3), (4, 1), (5, 2)]
x:x[]字母可以随意修改,
x[1]表示以元组的第二个元素排序
[0]按照第一维,[1]按照第二维。
"""
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 未知词元的索引为0
# 列表
# [''] 未知token
self.idx_to_token = [''] + reserved_tokens
# 字典
# enumerate
# 遍历一个集合对象,它在遍历的同时还可以得到当前元素的索引位置。
"""
names = ["Alice","Bob","Carl"]
for index,value in enumerate(names):
print(f'{index}: {value}')
0: Alice
1: Bob
2: Carl
"""
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
# 计算token出现次数 返回值字典
"""
nums = [1, 2, 3, 1, 2, 1]
counts = collections.Counter(nums)
print(counts)
## Counter({1: 3, 2: 2, 3: 1})
"""
return collections.Counter(tokens)
# 构建词汇表
print('构建词汇表')
# 高频小下标,低频大下标
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])
vocab1 = Vocab(tokens1)
print(list(vocab1.token_to_idx.items())[:10])
print(1)
# 将每一条文本行转换成数字索引表
print('将每一条文本行转换成数字索引表')
for i in range(10):
print('words:', tokens[i])
print('indices:', vocab[tokens[i]])
for i in range(10):
print('words:', tokens1[i])
print('indices:', vocab[tokens1[i]])
# 将所有功能打包到load_corpus_time_machine函数中
print('将所有功能打包到load_corpus_time_machine函数中')
def load_corpus_time_machine(max_tokens=-1): #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
print(len(corpus), len(vocab))