题目描述
思路
先观察下给定的语料库是什么样的
从上图可以看出,每一行语料有三列,第一列表示这个句子的 ID,第二列是句子,第三列是第二列句子的规范形式。具体来说,第二列句子中有可能会出现阿拉伯数字等一些特殊字符,那么第三列就会将这些字符转换成英文读音(例如将 1455 改写为 fourteen fifty-five)
了解完语料之后整理一下思路。我们需要得到的 bigram 是一张大表,即一个 $n\times n$ 的矩阵,其中 $n$ 表示不重复的单词个数。这个矩阵第 $i$ 行第 $j$ 列的值表示:前一个词是 $w_i$,当前词是 $w_j$ 的概率。例如下面这个矩阵,第一行第二列表示,前一个词是 $i$,当前词是 $want$ 的概率为 0.33
直接计算这个概率似乎是非常难的,我们应该先计算频次,即同样是这个 $n\times n$ 的矩阵,但这个矩阵里的值不再是频率,而是频次。例如下面这个矩阵,第二行第三列表示,前一个词是 $want$,当前词是 $to$ 总共出现了 608 次
有了这个频次表之后,只需再统计一下每个词出现的次数,用这个频次表的每一行除以每个词出现的次数,就得到频率了。例如下面是所有词出现的次数
代码
具体的代码实现中有很多细节,例如单词大小写,标点符号处理,以及平滑方法等等
首先获取第三列的句子,将其去除标点符号,并且将所有单词转为小写(因为大小写不同的单词会被认为是两个不同的单词,这样在统计的时候似乎不太合理),并且在句子的开头和结尾分别添加上 <s>
和 </s>
- import re
- import numpy as np
-
- def removePunctuation(sentence_str):
- '''给定字符串,进行特殊符号过滤,以及小写转换
- Args:
- sentence_str (str): 句子
-
- Returns:
- sentence_str.lower() (str): 预处理过的句子
- '''
- punctuation = '.!,;:?\-"\''
- sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
- return sentence_str.lower()
-
- def get_sentence(filename):
- '''从给定文件中获取句子
- Args:
- filename (str): 语料库的文件名
-
- Returns:
- sentences_list (list): 1-D,存放了所有经过预处理后的句子
- '''
- sentences_list = []
- with open(filename, encoding='utf-8') as f:
- for line in f.readlines():
- sentence_str = removePunctuation(line.strip().split('|')[-1])
- sentences_list.append('<s> ' + sentence_str + ' </s>')
- return sentences_list
接着统计每个单词出现的次数
- def count_word(sentences_list):
- '''给定大量句子,统计出所有单词出现的频次
- Args:
- sentences_list (list): 所有经过预处理后的句子
-
- Returns:
- wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
- '''
- wordcount_dict = {}
- for sentence_str in sentences_list:
- for word in sentence_str.split():
- if word in wordcount_dict:
- wordcount_dict[word] += 1
- else:
- wordcount_dict[word] = 1
- return wordcount_dict
构建一个二维矩阵,不论是用 list 还是 numpy,他们进行索引的方式都是下标,而无法直接使用单词。所以我们应该先构建一个单词到索引的映射,以及索引到单词的映射
- def word2idx(wordcount_dict):
- '''构建单词到索引的映射与逆映射
- Args:
- wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
-
- Returns:
- word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
- idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
- '''
- word2idx_dict = {}
- idx2word_dict = {}
- for idx, word in enumerate(list(wordcount_dict.keys())):
- word2idx_dict[word] = idx
- idx2word_dict[idx] = word
- return word2idx_dict, idx2word_dict
接下来要做的就是统计两个单词同时出现的次数。基本做法就是遍历每个句子,同时遍历句子中的每个单词。记前一个词为 $w_i$,当前词为 $w_j$,通过 word2idx_dict
查得 $w_i$ 对应的索引为 $i$,$w_j$ 对应的索引为 $j$,则矩阵中 $(i,j)$ 位置的值就加 1。最后我还设置了一个可选项,如果用户想要使用加一平滑,那一开始就生成一个全 1 的矩阵;如果不用平滑,一开始生成的是全 0 的矩阵
- def c_table(word2idx_dict, sentences_list, smooth=False):
- '''构建两个单词连续出现的频次矩阵
- Args:
- word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
- sentences_list (list): 所有经过预处理后的句子
- smooth (bool): 是否进行加一平滑
-
- Returns:
- c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
- '''
- n = len(word2idx_dict) # 单词个数
- if smooth: # 加一平滑
- c_table_np = np.ones((n, n)) # n*n 全1矩阵
- else:
- c_table_np = np.zeros((n, n)) # n*n 全0矩阵
- for sentence_str in sentences_list:
- words_list = sentence_str.split() # ['i', 'like', 'apple']
- for i in range(1, len(words_list)):
- w_i = word2idx_dict[words_list[i]] # w_i
- w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
- c_table_np[w_j][w_i] += 1
-
- return c_table_np
最后用上述生成的 c_table_np
的每一行同除以 wordcount_dict
中的每个值即可,下面代码利用了 numpy 的广播机制,加快了运算速度
- def compute_bigram_table(c_table_np, wordcount_dict):
- '''构建bigram概率矩阵
- Args:
- c_table_np (numpy): bigram频次矩阵
- wordcount_dict (dict): 所有单词出现的次数
-
- Returns:
- c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
- '''
- count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
- return c_table_np / count_np[:, None]
如果想要获取一个句子的 bigram 概率,连乘即可
- def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
- '''计算每个句子的bigram概率
- Args:
- bigram_table_np (numpy): bigram概率矩阵
- word2idx_dict (dict): 单词到索引的映射
- sentences_list (list): 预处理后的句子
-
- Returns:
- scores_list (list): 所有句子的bigram概率
- '''
- scores_list = []
- for sentence_str in sentences_list:
- words_list = sentence_str.split()
- score = 1
- for i in range(1, len(words_list)):
- w_i = word2idx_dict[words_list[i]] # w_i
- w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
- score *= bigram_table_np[w_j][w_i]
- scores_list.append(score)
- return scores_list
完整代码如下:
- import re
- import numpy as np
-
- def removePunctuation(sentence_str):
- '''给定字符串,进行特殊符号过滤,以及小写转换
- Args:
- sentence_str (str): 句子
-
- Returns:
- sentence_str.lower() (str): 预处理过的句子
- '''
- punctuation = '.!,;:?\-"\''
- sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
- return sentence_str.lower()
-
- def get_sentence(filename):
- '''从给定文件中获取句子
- Args:
- filename (str): 语料库的文件名
-
- Returns:
- sentences_list (list): 1-D,存放了所有经过预处理后的句子
- '''
- sentences_list = []
- with open(filename, encoding='utf-8') as f:
- for line in f.readlines():
- sentence_str = removePunctuation(line.strip().split('|')[-1])
- sentences_list.append('<s> ' + sentence_str + ' </s>')
- return sentences_list
-
- def count_word(sentences_list):
- '''给定大量句子,统计出所有单词出现的频次
- Args:
- sentences_list (list): 所有经过预处理后的句子
-
- Returns:
- wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
- '''
- wordcount_dict = {}
- for sentence_str in sentences_list:
- for word in sentence_str.split():
- if word in wordcount_dict:
- wordcount_dict[word] += 1
- else:
- wordcount_dict[word] = 1
- return wordcount_dict
-
- def word2idx(wordcount_dict):
- '''构建单词到索引的映射与逆映射
- Args:
- wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
-
- Returns:
- word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
- idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
- '''
- word2idx_dict = {}
- idx2word_dict = {}
- for idx, word in enumerate(list(wordcount_dict.keys())):
- word2idx_dict[word] = idx
- idx2word_dict[idx] = word
- return word2idx_dict, idx2word_dict
-
- def c_table(word2idx_dict, sentences_list, smooth=False):
- '''构建两个单词连续出现的频次矩阵
- Args:
- word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
- sentences_list (list): 所有经过预处理后的句子
- smooth (bool): 是否进行加一平滑
-
- Returns:
- c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
- '''
- n = len(word2idx_dict) # 单词个数
- c_table_np = np.zeros((n, n)) # n*n 全0矩阵
- for sentence_str in sentences_list:
- words_list = sentence_str.split() # ['i', 'like', 'apple']
- for i in range(1, len(words_list)):
- w_i = word2idx_dict[words_list[i]] # w_i
- w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
- c_table_np[w_j][w_i] += 1
-
- if smooth: # 加一平滑
- c_table_np[c_table_np == 0] = 1
-
- return c_table_np
-
- def compute_bigram_table(c_table_np, wordcount_dict):
- '''构建bigram概率矩阵
- Args:
- c_table_np (numpy): bigram频次矩阵
- wordcount_dict (dict): 所有单词出现的次数
-
- Returns:
- c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
- '''
- count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
- return c_table_np / count_np[:, None]
-
- def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
- '''计算每个句子的bigram概率
- Args:
- bigram_table_np (numpy): bigram概率矩阵
- word2idx_dict (dict): 单词到索引的映射
- sentences_list (list): 预处理后的句子
-
- Returns:
- scores_list (list): 所有句子的bigram概率
- '''
- scores_list = []
- for sentence_str in sentences_list:
- words_list = sentence_str.split()
- score = 1
- for i in range(1, len(words_list)):
- w_i = word2idx_dict[words_list[i]] # w_i
- w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
- score *= bigram_table_np[w_j][w_i]
- scores_list.append(score)
- return scores_list
-
- if __name__ == '__main__':
- sentences_list = get_sentence('metadata.txt')
- wordcount_dict = count_word(sentences_list)
- word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
- c_table_np = c_table(word2idx_dict, sentences_list, True)
- bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
- scores_list = compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list)
- print(scores_list[:10])
以上是第一题的代码,第二题实际上也比较简单,在第一题的基础上稍作修改即可。例如用户输入 $the$ 这个单词,首先通过 word2idx_dict
查出 $the$ 对应的下标,记作 $k$。然后在 bigran_table_np
中寻找第 $k$ 行最大的前 5 个数对应的下标即可 (argmax)。得到下标,再通过 idx2word
返回单词。下面我只贴出不一样部分的代码,相同的代码不再重复
- import heapq
-
- def get_recommend_words(idx, bigram_table_np, idx2word_dict):
- m_list = list(bigram_table_np[idx])
- max_number = heapq.nlargest(7, m_list)
- max_index_list = []
- for t in max_number:
- index = m_list.index(t)
- max_index_list.append(index)
- m_list[index] = 0
-
- return [idx2word_dict[i] for i in max_index_list if idx2word_dict[i] not in ['<s>', '</s>']]
-
-
- if __name__ == '__main__':
- sentences_list = get_sentence('metadata.txt')
- wordcount_dict = count_word(sentences_list)
- word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
- c_table_np = c_table(word2idx_dict, sentences_list, False)
- bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
- while True:
- print('Please input a word: ', end="")
- word = input().lower()
- if word not in word2idx_dict:
- print('None')
- break
- else:
- recommend_word = get_recommend_words(word2idx_dict[word], bigram_table_np, idx2word_dict)
- print(recommend_word[:5])
up 的题目来自哪个课程呢?