MENU

自然语言处理作业(实现 bigram)

November 4, 2020 • Read: 9043 • 数据挖掘与机器学习阅读设置

题目描述

思路

先观察下给定的语料库是什么样的

从上图可以看出,每一行语料有三列,第一列表示这个句子的 ID,第二列是句子,第三列是第二列句子的规范形式。具体来说,第二列句子中有可能会出现阿拉伯数字等一些特殊字符,那么第三列就会将这些字符转换成英文读音(例如将 1455 改写为 fourteen fifty-five)

了解完语料之后整理一下思路。我们需要得到的 bigram 是一张大表,即一个 $n\times n$ 的矩阵,其中 $n$ 表示不重复的单词个数。这个矩阵第 $i$ 行第 $j$ 列的值表示:前一个词是 $w_i$,当前词是 $w_j$ 的概率。例如下面这个矩阵,第一行第二列表示,前一个词是 $i$,当前词是 $want$ 的概率为 0.33

直接计算这个概率似乎是非常难的,我们应该先计算频次,即同样是这个 $n\times n$ 的矩阵,但这个矩阵里的值不再是频率,而是频次。例如下面这个矩阵,第二行第三列表示,前一个词是 $want$,当前词是 $to$ 总共出现了 608 次

有了这个频次表之后,只需再统计一下每个词出现的次数,用这个频次表的每一行除以每个词出现的次数,就得到频率了。例如下面是所有词出现的次数

代码

具体的代码实现中有很多细节,例如单词大小写,标点符号处理,以及平滑方法等等

首先获取第三列的句子,将其去除标点符号,并且将所有单词转为小写(因为大小写不同的单词会被认为是两个不同的单词,这样在统计的时候似乎不太合理),并且在句子的开头和结尾分别添加上 <s></s>

  • import re
  • import numpy as np
  • def removePunctuation(sentence_str):
  • '''给定字符串,进行特殊符号过滤,以及小写转换
  • Args:
  • sentence_str (str): 句子
  • Returns:
  • sentence_str.lower() (str): 预处理过的句子
  • '''
  • punctuation = '.!,;:?\-"\''
  • sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
  • return sentence_str.lower()
  • def get_sentence(filename):
  • '''从给定文件中获取句子
  • Args:
  • filename (str): 语料库的文件名
  • Returns:
  • sentences_list (list): 1-D,存放了所有经过预处理后的句子
  • '''
  • sentences_list = []
  • with open(filename, encoding='utf-8') as f:
  • for line in f.readlines():
  • sentence_str = removePunctuation(line.strip().split('|')[-1])
  • sentences_list.append('<s> ' + sentence_str + ' </s>')
  • return sentences_list

接着统计每个单词出现的次数

  • def count_word(sentences_list):
  • '''给定大量句子,统计出所有单词出现的频次
  • Args:
  • sentences_list (list): 所有经过预处理后的句子
  • Returns:
  • wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
  • '''
  • wordcount_dict = {}
  • for sentence_str in sentences_list:
  • for word in sentence_str.split():
  • if word in wordcount_dict:
  • wordcount_dict[word] += 1
  • else:
  • wordcount_dict[word] = 1
  • return wordcount_dict

构建一个二维矩阵,不论是用 list 还是 numpy,他们进行索引的方式都是下标,而无法直接使用单词。所以我们应该先构建一个单词到索引的映射,以及索引到单词的映射

  • def word2idx(wordcount_dict):
  • '''构建单词到索引的映射与逆映射
  • Args:
  • wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
  • Returns:
  • word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
  • idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
  • '''
  • word2idx_dict = {}
  • idx2word_dict = {}
  • for idx, word in enumerate(list(wordcount_dict.keys())):
  • word2idx_dict[word] = idx
  • idx2word_dict[idx] = word
  • return word2idx_dict, idx2word_dict

接下来要做的就是统计两个单词同时出现的次数。基本做法就是遍历每个句子,同时遍历句子中的每个单词。记前一个词为 $w_i$,当前词为 $w_j$,通过 word2idx_dict 查得 $w_i$ 对应的索引为 $i$,$w_j$ 对应的索引为 $j$,则矩阵中 $(i,j)$ 位置的值就加 1。最后我还设置了一个可选项,如果用户想要使用加一平滑,那一开始就生成一个全 1 的矩阵;如果不用平滑,一开始生成的是全 0 的矩阵

  • def c_table(word2idx_dict, sentences_list, smooth=False):
  • '''构建两个单词连续出现的频次矩阵
  • Args:
  • word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
  • sentences_list (list): 所有经过预处理后的句子
  • smooth (bool): 是否进行加一平滑
  • Returns:
  • c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
  • '''
  • n = len(word2idx_dict) # 单词个数
  • if smooth: # 加一平滑
  • c_table_np = np.ones((n, n)) # n*n 全1矩阵
  • else:
  • c_table_np = np.zeros((n, n)) # n*n 全0矩阵
  • for sentence_str in sentences_list:
  • words_list = sentence_str.split() # ['i', 'like', 'apple']
  • for i in range(1, len(words_list)):
  • w_i = word2idx_dict[words_list[i]] # w_i
  • w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
  • c_table_np[w_j][w_i] += 1
  • return c_table_np

最后用上述生成的 c_table_np 的每一行同除以 wordcount_dict 中的每个值即可,下面代码利用了 numpy 的广播机制,加快了运算速度

  • def compute_bigram_table(c_table_np, wordcount_dict):
  • '''构建bigram概率矩阵
  • Args:
  • c_table_np (numpy): bigram频次矩阵
  • wordcount_dict (dict): 所有单词出现的次数
  • Returns:
  • c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
  • '''
  • count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
  • return c_table_np / count_np[:, None]

如果想要获取一个句子的 bigram 概率,连乘即可

  • def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
  • '''计算每个句子的bigram概率
  • Args:
  • bigram_table_np (numpy): bigram概率矩阵
  • word2idx_dict (dict): 单词到索引的映射
  • sentences_list (list): 预处理后的句子
  • Returns:
  • scores_list (list): 所有句子的bigram概率
  • '''
  • scores_list = []
  • for sentence_str in sentences_list:
  • words_list = sentence_str.split()
  • score = 1
  • for i in range(1, len(words_list)):
  • w_i = word2idx_dict[words_list[i]] # w_i
  • w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
  • score *= bigram_table_np[w_j][w_i]
  • scores_list.append(score)
  • return scores_list

完整代码如下:

  • import re
  • import numpy as np
  • def removePunctuation(sentence_str):
  • '''给定字符串,进行特殊符号过滤,以及小写转换
  • Args:
  • sentence_str (str): 句子
  • Returns:
  • sentence_str.lower() (str): 预处理过的句子
  • '''
  • punctuation = '.!,;:?\-"\''
  • sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
  • return sentence_str.lower()
  • def get_sentence(filename):
  • '''从给定文件中获取句子
  • Args:
  • filename (str): 语料库的文件名
  • Returns:
  • sentences_list (list): 1-D,存放了所有经过预处理后的句子
  • '''
  • sentences_list = []
  • with open(filename, encoding='utf-8') as f:
  • for line in f.readlines():
  • sentence_str = removePunctuation(line.strip().split('|')[-1])
  • sentences_list.append('<s> ' + sentence_str + ' </s>')
  • return sentences_list
  • def count_word(sentences_list):
  • '''给定大量句子,统计出所有单词出现的频次
  • Args:
  • sentences_list (list): 所有经过预处理后的句子
  • Returns:
  • wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
  • '''
  • wordcount_dict = {}
  • for sentence_str in sentences_list:
  • for word in sentence_str.split():
  • if word in wordcount_dict:
  • wordcount_dict[word] += 1
  • else:
  • wordcount_dict[word] = 1
  • return wordcount_dict
  • def word2idx(wordcount_dict):
  • '''构建单词到索引的映射与逆映射
  • Args:
  • wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
  • Returns:
  • word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
  • idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
  • '''
  • word2idx_dict = {}
  • idx2word_dict = {}
  • for idx, word in enumerate(list(wordcount_dict.keys())):
  • word2idx_dict[word] = idx
  • idx2word_dict[idx] = word
  • return word2idx_dict, idx2word_dict
  • def c_table(word2idx_dict, sentences_list, smooth=False):
  • '''构建两个单词连续出现的频次矩阵
  • Args:
  • word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
  • sentences_list (list): 所有经过预处理后的句子
  • smooth (bool): 是否进行加一平滑
  • Returns:
  • c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
  • '''
  • n = len(word2idx_dict) # 单词个数
  • c_table_np = np.zeros((n, n)) # n*n 全0矩阵
  • for sentence_str in sentences_list:
  • words_list = sentence_str.split() # ['i', 'like', 'apple']
  • for i in range(1, len(words_list)):
  • w_i = word2idx_dict[words_list[i]] # w_i
  • w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
  • c_table_np[w_j][w_i] += 1
  • if smooth: # 加一平滑
  • c_table_np[c_table_np == 0] = 1
  • return c_table_np
  • def compute_bigram_table(c_table_np, wordcount_dict):
  • '''构建bigram概率矩阵
  • Args:
  • c_table_np (numpy): bigram频次矩阵
  • wordcount_dict (dict): 所有单词出现的次数
  • Returns:
  • c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
  • '''
  • count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
  • return c_table_np / count_np[:, None]
  • def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
  • '''计算每个句子的bigram概率
  • Args:
  • bigram_table_np (numpy): bigram概率矩阵
  • word2idx_dict (dict): 单词到索引的映射
  • sentences_list (list): 预处理后的句子
  • Returns:
  • scores_list (list): 所有句子的bigram概率
  • '''
  • scores_list = []
  • for sentence_str in sentences_list:
  • words_list = sentence_str.split()
  • score = 1
  • for i in range(1, len(words_list)):
  • w_i = word2idx_dict[words_list[i]] # w_i
  • w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
  • score *= bigram_table_np[w_j][w_i]
  • scores_list.append(score)
  • return scores_list
  • if __name__ == '__main__':
  • sentences_list = get_sentence('metadata.txt')
  • wordcount_dict = count_word(sentences_list)
  • word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
  • c_table_np = c_table(word2idx_dict, sentences_list, True)
  • bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
  • scores_list = compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list)
  • print(scores_list[:10])

以上是第一题的代码,第二题实际上也比较简单,在第一题的基础上稍作修改即可。例如用户输入 $the$ 这个单词,首先通过 word2idx_dict 查出 $the$ 对应的下标,记作 $k$。然后在 bigran_table_np 中寻找第 $k$ 行最大的前 5 个数对应的下标即可 (argmax)。得到下标,再通过 idx2word 返回单词。下面我只贴出不一样部分的代码,相同的代码不再重复

  • import heapq
  • def get_recommend_words(idx, bigram_table_np, idx2word_dict):
  • m_list = list(bigram_table_np[idx])
  • max_number = heapq.nlargest(7, m_list)
  • max_index_list = []
  • for t in max_number:
  • index = m_list.index(t)
  • max_index_list.append(index)
  • m_list[index] = 0
  • return [idx2word_dict[i] for i in max_index_list if idx2word_dict[i] not in ['<s>', '</s>']]
  • if __name__ == '__main__':
  • sentences_list = get_sentence('metadata.txt')
  • wordcount_dict = count_word(sentences_list)
  • word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
  • c_table_np = c_table(word2idx_dict, sentences_list, False)
  • bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
  • while True:
  • print('Please input a word: ', end="")
  • word = input().lower()
  • if word not in word2idx_dict:
  • print('None')
  • break
  • else:
  • recommend_word = get_recommend_words(word2idx_dict[word], bigram_table_np, idx2word_dict)
  • print(recommend_word[:5])
Last Modified: February 11, 2023
Archives Tip
QR Code for this page
Tipping QR Code
Leave a Comment

已有 1 条评论
  1. lix lix

    up 的题目来自哪个课程呢?