Python常用功能函数系列总结(二)
本节目录
-
常用函数一:sel文件转换
-
常用函数二:refwork文件转换
-
常用函数三:xml文档解析
-
常用函数四:文本分词
常用函数一:sel文件转换
sel是种特殊的文件格式,具体应用场景的话可以在搜狗细胞词库中看到,经常在做文本处理,分词的时候需要一些词典,那么搜狗细胞词库中的一些相关词库就会被使用,而这种sel文件格式不能直接使用,需要进行转换,转换成txt文件之后就可以去做进一步使用了,转换的代码是从网上找到,我自己也是用过多次,使用的时候可以直接拿来用。
# -*- coding:utf-8 -*- """ @author:Zhang Yafei @time: 2019/12/26 Description: scel 文件格式转换 """ import struct import os # 搜狗的scel词库就是保存的文本的unicode编码,每两个字节一个字符(中文汉字或者英文字母) # 找出其每部分的偏移位置即可 # 主要两部分 # 1.全局拼音表,貌似是所有的拼音组合,字典序 # 格式为(index,len,pinyin)的列表 # index: 两个字节的整数 代表这个拼音的索引 # len: 两个字节的整数 拼音的字节长度 # pinyin: 当前的拼音,每个字符两个字节,总长len # # 2.汉语词组表 # 格式为(same,py_table_len,py_table,{word_len,word,ext_len,ext})的一个列表 # same: 两个字节 整数 同音词数量 # py_table_len: 两个字节 整数 # py_table: 整数列表,每个整数两个字节,每个整数代表一个拼音的索引 # # word_len:两个字节 整数 代表中文词组字节数长度 # word: 中文词组,每个中文汉字两个字节,总长度word_len # ext_len: 两个字节 整数 代表扩展信息的长度,好像都是10 # ext: 扩展信息 前两个字节是一个整数(不知道是不是词频) 后八个字节全是0 # # {word_len,word,ext_len,ext} 一共重复same次 同音词 相同拼音表 # 拼音表偏移, startPy = 0x1540; # 汉语词组表偏移 startChinese = 0x2628; # 全局拼音表 GPy_Table = {} # 解析结果 # 元组(词频,拼音,中文词组)的列表 GTable = [] # 原始字节码转为字符串 def byte2str(data): pos = 0 str = '' while pos < len(data): c = chr(struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0]) if c != chr(0): str += c pos += 2 return str # 获取拼音表 def getPyTable(data): data = data[4:] pos = 0 while pos < len(data): index = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] pos += 2 lenPy = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] pos += 2 py = byte2str(data[pos:pos + lenPy]) GPy_Table[index] = py pos += lenPy # 获取一个词组的拼音 def getWordPy(data): pos = 0 ret = '' while pos < len(data): index = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] ret += GPy_Table[index] pos += 2 return ret # 读取中文表 def getChinese(data): pos = 0 while pos < len(data): # 同音词数量 same = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] # 拼音索引表长度 pos += 2 py_table_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] # 拼音索引表 pos += 2 py = getWordPy(data[pos: pos + py_table_len]) # 中文词组 pos += py_table_len for i in range(same): # 中文词组长度 c_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] # 中文词组 pos += 2 word = byte2str(data[pos: pos + c_len]) # 扩展数据长度 pos += c_len ext_len = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] # 词频 pos += 2 count = struct.unpack('H', bytes([data[pos], data[pos + 1]]))[0] # 保存 GTable.append((count, py, word)) # 到下个词的偏移位置 pos += ext_len def scel2txt(file_name): print('-' * 60) with open(file_name, 'rb') as f: data = f.read() print("词库名:", byte2str(data[0x130:0x338])) # .encode('GB18030') print("词库类型:", byte2str(data[0x338:0x540])) print("描述信息:", byte2str(data[0x540:0xd40])) print("词库示例:", byte2str(data[0xd40:startPy])) getPyTable(data[startPy:startChinese]) getChinese(data[startChinese:]) def run(to_file, file=None, dir_path=None): """ sel 多个文件转换 :param file: sel文件路径 转换单个sel文件 :param dir_path: sel文件夹路径 若设置 则转换该文件加内所有sel文件 :param to_file: 转换完成文件路径 :return: """ if dir_path: fin = [fname for fname in os.listdir(in_path) if fname[-5:] == ".scel"] for f in fin: f = os.path.join(in_path, f) scel2txt(f) elif file: scel2txt(file) else: raise Exception('参数必须包含file或者dir_path') # 保存结果 with open(to_file, 'w', encoding='utf8') as f: f.writelines([word + '\n' for count, py, word in GTable]) def dict_merge(): """ 词典合并 :return: """ with open('data/medical_dict.txt', encoding='utf8') as f: word_set1 = {word.strip() for word in f} with open('data/medical_dict2.txt', encoding='utf8') as f: word_set2 = {word.strip() for word in f} with open('data/medical_dict3.txt', encoding='utf8') as f: word_set3 = {word.strip() for word in f} word_set = word_set1 | word_set2 | word_set3 with open('data/words_dict.txt', encoding='utf-8', mode='w') as f: for word in word_set: f.write(word + '\n') if __name__ == '__main__': # run(file='data/细胞词库/医学词汇大全【官方推荐】.scel', to_file='医学词库.txt',) run(dir_path="data/细胞词库", to_file="data/cell_dict.txt")
经验分享:直接拿来用。
常用函数二:refwork文件转换
refowrk是一种文献格式,可以用一些科研软件做分析使用,有些场景下我们需要将excel格式的文件转成refwork文件,一下代码可以实现这个功能。
# -*- coding: utf-8 -*- """ Datetime: 2020/03/04 author: Zhang Yafei description: refwork格式转换 数据格式 列 RT,A1,T1,JF,YR,K1,AB,AD ... """ import pandas as pd def main(ref_file, to_file): """ :param ref_file: 转换的csv或者excel文件路径 :param to_file: 转换之后保存的refwork文件路径 """ if ref_file.endswith('csv'): rawdata = pd.read_csv(ref_file) elif ref_file.endswith('xls') or ref_file.endswith('xlsx'): rawdata = pd.read_excel(ref_file) with open(to_file, 'a') as f: for index, item in rawdata.iterrows(): f.write('RT ' + item.RT) A1 = item.A1 f.write('\n' + 'A1 ' + A1) T1 = item.T1 f.write('\n' + 'T1 ' + T1) YR = item.YR f.write('\n' + 'YR ' + YR) JF = item.JF f.write('\n' + 'JF ' + JF) K1 = item.K1 f.write('\n' + 'K1 ' + K1) AB = item.AB if pd.notna(AB): f.write('\n' + 'AB ' + AB) AD = item.AD if pd.notna(AD): f.write('\n' + 'AD ' + AD) f.write('\nDS CNKI') if index < rawdata.shape[0] - 1: f.write('\n\n\n') if __name__ == '__main__': main(ref_file='data.xlsx', to_file='result.txt')
经验分享:直接拿来用
常用函数三:xml文档解析
xml文档经常作为数据传输格式在web领域使用,它有很多优势,但我们平时梳理的数据大多是csv或者exel这种,那么解析xml文档就是一个必备的技能吗,下面以pubmed下载的xml文档解析为例,展示了xml文档解析的整个流程。
# -*- coding: utf-8 -*- """ @Datetime: 2019/4/26 @Author: Zhang Yafei @Description: 07_xml文档解析 """ import os import re import threading from concurrent.futures import ThreadPoolExecutor from lxml import etree import pandas as pd def pubmed_xpath_parse(path): tree = etree.parse(path) # 如果xml数据中出现了关于dtd的声明(如下面的例子),那样的话,必须在使用lxml解析xml的时候,进行相应的声明。 # parser = etree.XMLParser(load_dtd=True) # 首先根据dtd得到一个parser(注意dtd文件要放在和xml文件相同的目录) # tree = etree.parse('1.xml', parser=parser) # 用上面得到的parser将xml解析为树结构 data_list = [] pmid_set = [] for articles in tree.xpath('//PubmedArticle'): pmid = articles.xpath('MedlineCitation/PMID/text()')[0] if pmid in pmid_set: continue pmid_set.append(pmid) Article = articles.xpath('MedlineCitation/Article')[0] journal = Article.xpath('Journal/ISOAbbreviation/text()')[0] try: authors = Article.xpath('AuthorList/Author') affiliations_info = set() for author in authors: # author_name = author.find('LastName').text + ' ' + author.find('ForeName').text affiliations = [x.xpath('Affiliation/text()')[0] for x in author.xpath('AffiliationInfo')] # author = author_name + ':' + ';'.join(affiliations) for affiliation in affiliations: affiliations_info.add(affiliation) affiliations_info = ';'.join(affiliations_info) except AttributeError: affiliations_info = '' try: date = Article.xpath('Journal/JournalIssue/PubDate/Year/text()')[0] except IndexError: date = Article.xpath('Journal/JournalIssue/PubDate/MedlineDate/text()')[0] date = re.search('\d+', date).group(0) try: mesh_words = [] for mesh_heading in articles.xpath('MedlineCitation/MeshHeadingList/MeshHeading'): if len(mesh_heading.xpath('child::*')) == 1: mesh_words.append((mesh_heading.xpath('child::*'))[0].text) continue mesh_name = '' for mesh in mesh_heading.xpath('child::*'): if mesh.tag == 'DescriptorName': mesh_name = mesh.xpath('string()') continue if mesh_name and mesh.tag == 'QualifierName': mesh_word = mesh_name + '/' + mesh.xpath('string()') mesh_words.append(mesh_word) mesh_words = ';'.join(mesh_words) except AttributeError: mesh_words = '' article_type = '/'.join([x.xpath('./text()')[0] for x in Article.xpath('PublicationTypeList/PublicationType')]) country = articles.xpath('MedlineCitation/MedlineJournalInfo/Country/text()')[0] data_list.append( {'PMID': pmid, 'journal': journal, 'affiliations_info': affiliations_info, 'pub_year': date, 'mesh_words': mesh_words, 'country': country, 'article_type': article_type, 'file_path': path}) print(pmid + '\t解析完成') df = pd.DataFrame(data_list) with threading.Lock(): df.to_csv('pubmed.csv', encoding='utf_8_sig', mode='a', index=False, header=False) def to_excel(data, path): writer = pd.ExcelWriter(path) data.to_excel(writer, sheet_name='table', index=False) writer.save() def get_files_path(dir_name): xml_files = [] for base_path, folders, files in os.walk(dir_name): xml_files = xml_files + [os.path.join(base_path, file) for file in files if file.endswith('.xml')] return xml_files if __name__ == '__main__': files = get_files_path(dir_name='data') if not files: print('全部解析完成') else: with ThreadPoolExecutor() as pool: pool.map(pubmed_xpath_parse, files)
常用函数四:文本分词
方式一:jieba分词+停用词+自定义词典+同义词替换
# -*- coding: utf-8 -*- """ Datetime: 2020/06/25 Author: Zhang Yafei Description: 文本分词 输入 停用词文件路径 词典文件路径 同义词文件路径 分词文件路径 表名(可选) 列名 分词结果列名 保存文件名 输出 分词结果-文件 """ import os import re import time from collections import defaultdict from functools import wraps import jieba import pandas as pd if not os.path.exists('res'): os.mkdir('res') def timeit(func): """ 时间装饰器 """ @wraps(func) def inner(*args, **kwargs): start_time = time.time() ret = func(*args, **kwargs) end_time = time.time() - start_time if end_time < 60: print(f'共花费时间:', round(end_time, 2), '秒') else: minute, sec = divmod(end_time, 60) print(f'花费时间\t{round(minute)}分\t{round(sec, 2)}秒') return ret return inner class TextCut(object): def __init__(self, dictionary=None, stopwords=None, synword=None): self.dictionary = dictionary self.word_list = None if self.dictionary: jieba.load_userdict(self.dictionary) if stopwords: with open(stopwords, 'r', encoding='utf-8') as swf: self.stopwords = [line.strip() for line in swf] else: self.stopwords = None if synword: self.syn_word_dict = self.build_sync_dict(synword) else: self.syn_word_dict = None @staticmethod def clean_txt(raw): file = re.compile(r"[^0-9a-zA-Z\u4e00-\u9fa5]+") return file.sub(' ', raw) def cut(self, text): sentence = self.clean_txt(text.strip().replace('\n', '')) return ' '.join([i for i in jieba.cut(sentence) if i.strip() and i not in self.stopwords and len(i) > 1]) def cut2(self, text): sentence = self.clean_txt(text.strip().replace('\n', '')) return ' '.join([i for i in jieba.cut(sentence) if i.strip() and i not in self.stopwords and len(i) > 1 and i in self.word_list]) def syn_word_replace(self, row): word_list = [] for word in row.split(' '): if word in self.syn_word_dict: word = self.syn_word_dict[word] word_list.append(word) return ' '.join(word_list) def build_sync_dict(self, synword): syn_map = {} with open(synword, mode='r', encoding='utf-8') as f: for row in f: stand_word = row.split(',')[0].strip() for word in row.split(',')[1:]: if word.strip(): syn_map[word.strip()] = stand_word return syn_map @timeit def run(self, file_path, col_name, new_col_name, to_file, sheet_name=None, word_in_dict=False): print('######### 开始读取数据文件 ############') if sheet_name: df = pd.read_excel(file_path, sheet_name=sheet_name) else: df = pd.read_excel(file_path) print('######### 开始进行数据处理 ############') if word_in_dict: with open(self.dictionary, encoding='utf-8') as f: self.word_list = [word.strip() for word in f] df[new_col_name] = df[col_name].apply(self.cut2) else: df[new_col_name] = df[col_name].apply(self.cut) if self.syn_word_dict: print('######### 正在进行同义词合并 ############') df[f'{new_col_name}_同义词替换'] = df[new_col_name].apply(self.syn_word_replace) print('######### 同义词合并完成 ############') df.to_excel(to_file, index=False) print('######### 处理完成 ############') if __name__ == "__main__": text_cut = TextCut(stopwords='data/stopwords.txt', dictionary='data/word_dict.txt', synword='data/同义词.txt') text_cut.run(file_path='data/山西政策.xlsx', sheet_name='1.21-2.20', col_name='全文', new_col_name='全文分词', to_file='res/山西政策_分词.xlsx') # text_cut.run(file_path='data/微博数据_处理.xlsx', col_name='微博正文_处理', new_col_name='全文分词', # to_file='data/微博分词.xlsx')
方式二:jieba分词+信息熵合并
# -*- coding: utf-8 -*- """ Datetime: 2020/03/01 Author: Zhang Yafei Description: 基于信息熵对分词结果进行合并 """ from collections import Counter from functools import reduce from pandas import read_excel, DataFrame class InfoEntropyMerge(object): def __init__(self, data, stopwords='data/stopwords.txt'): self.data = data self.words_freq_one = {} self.words_freq_two = {} self.entropy_words_dict = {} if stopwords: with open(stopwords, 'r', encoding='utf-8') as f: self.stopwords = {line.strip() for line in f} else: self.stopwords = None def count_word_freq_one(self, save_to_file=False, word_freq_file=None): keywords = (word for word_list in self.data for word in word_list if word) self.words_freq_one = Counter(keywords) if save_to_file: words = [word for word in self.words_freq_one] freqs = [self.words_freq_one[word] for word in words] words_df = DataFrame(data={'word': words, 'freq': freqs}) words_df.sort_values('freq', ascending=False, inplace=True) words_df.to_excel(word_freq_file, index=False) def count_freq(self, word1, word2): """ 统计相邻两个词出现的频率 :param word1: :param word2: :return: """ if (word1, word2) not in self.words_freq_two: self.words_freq_two[(word1, word2)] = 1 else: self.words_freq_two[(word1, word2)] += 1 return word2 def count_word_freq_two(self, save_to_file=False, word_freq_file=None): """ 计算相邻两个词出现的频率 :param save_to_file: :param word_freq_file: :return: """ for word_list in self.data: reduce(self.count_freq, word_list) if save_to_file and word_freq_file: words_list = [(word1, word2) for word1, word2 in self.words_freq_two] freqs = [self.words_freq_two[w1_w2] for w1_w2 in words_list] words_df = DataFrame(data={'word': words_list, 'freq': freqs}) words_df.sort_values('freq', ascending=False, inplace=True) words_df.to_excel(word_freq_file, index=False) @staticmethod def is_chinese(word): for ch in word: if '\u4e00' <= ch <= '\u9fff': return True return False def clac_entropy(self, save_to_file=False, dict_path='data/entropy_dict.txt'): """ 计算信息熵: E(w1, w2) = P(w1,w2)/min(P(w1),P(w2)) :param save_to_file: 是否将熵值大于0.5的新词保存到文件中 :param dict_path: 保存字典路径 :return: """ for word1, word2 in self.words_freq_two: freq_two = self.words_freq_two[(word1, word2)] freq_one_min = min(self.words_freq_one[word1], self.words_freq_one[word2]) freq_one_max = max(self.words_freq_one[word1], self.words_freq_one[word2]) w1_w2_entropy = freq_two / freq_one_max if self.stopwords: if w1_w2_entropy > 0.5 and word1 not in self.stopwords and word2 not in self.stopwords and self.is_chinese(word1) and self.is_chinese(word2): # print(word1, word2, freq_two, freq_one_min, freq_one_max) self.entropy_words_dict[word1+word2] = w1_w2_entropy else: if w1_w2_entropy > 0.5: self.entropy_words_dict[word1+word2] = w1_w2_entropy print('信息熵大于0.5的词语组合:\n', self.entropy_words_dict) if save_to_file and dict_path: with open(dict_path, mode='r+', encoding='utf-8') as f: content = f.read() f.seek(0, 0) for word in self.entropy_words_dict: f.write(word+'\n') f.write(content) print(f'成功将信息熵大于0.5的词语保存到了{dict_path}中') def data_read(path, col_name): df = read_excel(path) texts = df.loc[df[col_name].notna(), col_name].str.split() return texts if __name__ == '__main__': text_list = data_read(path='res/国家政策_分词.xlsx', col_name='全文分词') info_entro = InfoEntropyMerge(data=text_list) info_entro.count_word_freq_one() info_entro.count_word_freq_two() info_entro.clac_entropy(save_to_file=False, dict_path='data/entropy_dict.txt')
经验分享:若有好的词典和停用词,优先选用方式一,否则选择方式二。