Skip to content
This repository was archived by the owner on Mar 1, 2022. It is now read-only.

增加Python多进程计算ngram_freq_total和ngram_keys #55

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions smoothnlp/algorithm/phrase/ngram_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import types
from multiprocessing import cpu_count,Pool
from multiprocessing import Process
import multiprocessing
import math
from collections.abc import Iterable
from collections import Counter
Expand All @@ -23,8 +24,8 @@ def union_word_freq(dic1,dic2):
keys = (dic1.keys()) | (dic2.keys())
total = {}
for key in keys:
total[key] = dic1.get(key, 0) + dic2.get(key, 0)
return total
dic2.update({key: dic1.get(key, 0) + dic2.get(key, 0)})
return dic2

def sentence_split_by_punc(corpus:str):
return re.split(r'[;;.。,,!\n!??]',corpus)
Expand All @@ -50,25 +51,49 @@ def generate_ngram_str(text:str,n):
for text in corpus:
for ngram in generate_ngram_str(text,n):
yield ngram

#进程函数 ngram_freq_total是进程之间的共享变量
def compute_freq(corpus_chunk,lock,ngram_freq_total,min_n,max_n,ngram_keys,min_freq):
# def _process_corpus_chunk(corpus_chunk):
ngram_freq = {}
for ni in [1]+list(range(min_n,max_n+2)):
ngram_generator = generate_ngram(corpus_chunk, ni)
nigram_freq = dict(Counter(ngram_generator))
lock.acquire()
ngram_keys.update({ni:(ngram_keys.get(ni) | nigram_freq.keys())})
lock.release()
ngram_freq = {**nigram_freq, **ngram_freq}
ngram_freq = {word: count for word, count in ngram_freq.items() if count >= min_freq} ## 每个chunk的ngram频率统计
lock.acquire()
ngram_freq_total = union_word_freq(ngram_freq, ngram_freq_total)
lock.release()
def get_ngram_freq_info(corpus, ## list or generator
min_n:int=2,
max_n:int=4,
chunk_size:int=5000,
min_freq:int=2,
):
"""

:param corpus: 接受list或者generator
如果corpus是generator, 默认该generator每次yield一段长度为chunk_size的corpus_chunk
:param max_n:
:param chunk_size:
:param min_freq:
:return:
"""
ngram_freq_total = {} ## 记录词频
ngram_keys = {i: set() for i in range(1, max_n + 2)} ## 用来存储N=时, 都有哪些词

#子进程队列 方便join主进程 等待ngram_freq_total和ngram_keys计算完成
process_queue=[]
manager = multiprocessing.Manager()
#声明进程之间的共享变量
ngram_freq_total = manager.dict()
# 声明进程之间的共享变量
ngram_keys = manager.dict({i: set() for i in range(1, max_n + 2)})
lock=multiprocessing.Lock()
#多进程并发计算ngram_freq_total和ngram_keys
def join_process(corpus, lock, ngrame_freq_total, min_n, max_n, ngram_keys):
for corpus_chunk in corpus:
p = Process(target=compute_freq, args=(corpus_chunk, lock, ngrame_freq_total, min_n, max_n, ngram_keys,min_freq))
p.start()
process_queue.append(p)
def _process_corpus_chunk(corpus_chunk):
ngram_freq = {}
for ni in [1]+list(range(min_n,max_n+2)):
Expand All @@ -78,22 +103,19 @@ def _process_corpus_chunk(corpus_chunk):
ngram_freq = {**nigram_freq, **ngram_freq}
ngram_freq = {word: count for word, count in ngram_freq.items() if count >= min_freq} ## 每个chunk的ngram频率统计
return ngram_freq

if isinstance(corpus,types.GeneratorType):
## 注意: 如果corpus是generator, 该function对chunk_size无感知
for corpus_chunk in corpus:
ngram_freq = _process_corpus_chunk(corpus_chunk)
ngram_freq_total = union_word_freq(ngram_freq, ngram_freq_total)
join_process(corpus, lock, ngram_freq_total, min_n, max_n, ngram_keys)
for per_pro in process_queue:
per_pro.join()
elif isinstance(corpus,list):
len_corpus = len(corpus)
for i in range(0,len_corpus,chunk_size):
corpus_chunk = corpus[i:min(len_corpus,i+chunk_size)]
ngram_freq = _process_corpus_chunk(corpus_chunk)
ngram_freq_total = union_word_freq(ngram_freq,ngram_freq_total)
for k in ngram_keys:
ngram_keys[k] = ngram_keys[k] & ngram_freq_total.keys()
ngram_keys[k] = ngram_keys[k] & set(ngram_freq_total.keys())
return ngram_freq_total,ngram_keys

def _ngram_entropy_scorer(parent_ngrams_freq):
"""
根据一个candidate的neighbor的出现频率, 计算Entropy具体值
Expand All @@ -120,20 +142,16 @@ def _calc_ngram_entropy(ngram_freq,
for ni in n:
entropy = {**entropy,**_calc_ngram_entropy(ngram_freq,ngram_keys,ni)}
return entropy

ngram_entropy = {}
target_ngrams = ngram_keys[n]
parent_candidates = ngram_keys[n+1]

if CPU_COUNT == 1:
## 对 n+1 gram 进行建Trie处理
left_neighbors = Trie()
right_neighbors = Trie()

for parent_candidate in parent_candidates:
right_neighbors[parent_candidate] = ngram_freq[parent_candidate]
left_neighbors[parent_candidate[1:]+parent_candidate[0]] = ngram_freq[parent_candidate]

## 计算
for target_ngram in target_ngrams:
try: ## 一定情况下, 一个candidate ngram 没有左右neighbor
Expand Down Expand Up @@ -177,8 +195,6 @@ def _calc_ngram_pmi(ngram_freq,ngram_keys,n):
ami = pmi/len(target_ngram) #average mutual information
mi[target_ngram] = (pmi,ami)
return mi


def get_scores(corpus,
min_n:int = 2,
max_n: int = 4,
Expand All @@ -196,7 +212,6 @@ def get_scores(corpus,
chunk_size=chunk_size,
min_freq=min_freq)


left_right_entropy = _calc_ngram_entropy(ngram_freq,ngram_keys,range(min_n,max_n+1))
mi = _calc_ngram_pmi(ngram_freq,ngram_keys,range(min_n,max_n+1))
joint_phrase = mi.keys() & left_right_entropy.keys()
Expand All @@ -209,7 +224,6 @@ def get_scores(corpus,
word_liberalization(left_right_entropy[word][0],left_right_entropy[word][1])+mi[word][1] #our score
)
for word in joint_phrase}

## DONE 对在candidate ngram中, 首字或者尾字出现次数特别多的进行筛选, 如"XX的,美丽的,漂亮的"剔出字典
target_ngrams = word_info_scores.keys()
start_chars = Counter([n[0] for n in target_ngrams])
Expand All @@ -219,10 +233,7 @@ def get_scores(corpus,
config.logger.info("~~~ Threshold used for removing start end char: {} ~~~~".format(threshold))
invalid_start_chars = set([char for char, count in start_chars.items() if count > threshold])
invalid_end_chars = set([char for char, count in end_chars.items() if count > threshold])

invalid_target_ngrams = set([n for n in target_ngrams if (n[0] in invalid_start_chars or n[-1] in invalid_end_chars)])

for n in invalid_target_ngrams: ## 按照不合适的字头字尾信息删除一些
word_info_scores.pop(n)

return word_info_scores