MENU

自然语言处理作业(实现bigram)

November 4, 2020 • Read: 8475 • 数据挖掘与机器学习阅读设置

题目描述

思路

先观察下给定的语料库是什么样的

从上图可以看出,每一行语料有三列,第一列表示这个句子的ID,第二列是句子,第三列是第二列句子的规范形式。具体来说,第二列句子中有可能会出现阿拉伯数字等一些特殊字符,那么第三列就会将这些字符转换成英文读音(例如将1455改写为fourteen fifty-five)

了解完语料之后整理一下思路。我们需要得到的bigram是一张大表,即一个$n\times n$的矩阵,其中$n$表示不重复的单词个数。这个矩阵第$i$行第$j$列的值表示:前一个词是$w_i$,当前词是$w_j$的概率。例如下面这个矩阵,第一行第二列表示,前一个词是$i$,当前词是$want$的概率为0.33

直接计算这个概率似乎是非常难的,我们应该先计算频次,即同样是这个$n\times n$的矩阵,但这个矩阵里的值不再是频率,而是频次。例如下面这个矩阵,第二行第三列表示,前一个词是$want$,当前词是$to$总共出现了608次

有了这个频次表之后,只需再统计一下每个词出现的次数,用这个频次表的每一行除以每个词出现的次数,就得到频率了。例如下面是所有词出现的次数

代码

具体的代码实现中有很多细节,例如单词大小写,标点符号处理,以及平滑方法等等

首先获取第三列的句子,将其去除标点符号,并且将所有单词转为小写(因为大小写不同的单词会被认为是两个不同的单词,这样在统计的时候似乎不太合理),并且在句子的开头和结尾分别添加上<s></s>

import re
import numpy as np

def removePunctuation(sentence_str):
    '''给定字符串,进行特殊符号过滤,以及小写转换
    Args:
        sentence_str (str): 句子
    
    Returns:
        sentence_str.lower() (str): 预处理过的句子
    '''
    punctuation = '.!,;:?\-"\''
    sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
    return sentence_str.lower()

def get_sentence(filename):
    '''从给定文件中获取句子
    Args:
        filename (str): 语料库的文件名
    
    Returns:
        sentences_list (list): 1-D,存放了所有经过预处理后的句子
    '''
    sentences_list = []
    with open(filename, encoding='utf-8') as f:
        for line in f.readlines():
            sentence_str = removePunctuation(line.strip().split('|')[-1])
            sentences_list.append('<s> ' + sentence_str + ' </s>')
    return sentences_list

接着统计每个单词出现的次数

def count_word(sentences_list):
    '''给定大量句子,统计出所有单词出现的频次
    Args:
        sentences_list (list): 所有经过预处理后的句子
    
    Returns:
        wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
    '''
    wordcount_dict = {} 
    for sentence_str in sentences_list:
        for word in sentence_str.split():
            if word in wordcount_dict:
                wordcount_dict[word] += 1
            else:
                wordcount_dict[word] = 1
    return wordcount_dict

构建一个二维矩阵,不论是用list还是numpy,他们进行索引的方式都是下标,而无法直接使用单词。所以我们应该先构建一个单词到索引的映射,以及索引到单词的映射

def word2idx(wordcount_dict):
    '''构建单词到索引的映射与逆映射
    Args:
        wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
    
    Returns:
        word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
        idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
    '''
    word2idx_dict = {}
    idx2word_dict = {}
    for idx, word in enumerate(list(wordcount_dict.keys())):
        word2idx_dict[word] = idx
        idx2word_dict[idx] = word
    return word2idx_dict, idx2word_dict

接下来要做的就是统计两个单词同时出现的次数。基本做法就是遍历每个句子,同时遍历句子中的每个单词。记前一个词为$w_i$,当前词为$w_j$,通过word2idx_dict查得$w_i$对应的索引为$i$,$w_j$对应的索引为$j$,则矩阵中$(i,j)$位置的值就加1。最后我还设置了一个可选项,如果用户想要使用加一平滑,那一开始就生成一个全1的矩阵;如果不用平滑,一开始生成的是全0的矩阵

def c_table(word2idx_dict, sentences_list, smooth=False):
    '''构建两个单词连续出现的频次矩阵
    Args:
        word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
        sentences_list (list): 所有经过预处理后的句子
        smooth (bool): 是否进行加一平滑
    
    Returns:
        c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
    '''
    n = len(word2idx_dict) # 单词个数
    if smooth: # 加一平滑
        c_table_np = np.ones((n, n)) # n*n 全1矩阵
    else:
        c_table_np = np.zeros((n, n)) # n*n 全0矩阵
    for sentence_str in sentences_list:
        words_list = sentence_str.split() # ['i', 'like', 'apple']
        for i in range(1, len(words_list)):
            w_i = word2idx_dict[words_list[i]] # w_i
            w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
            c_table_np[w_j][w_i] += 1
    
    return c_table_np

最后用上述生成的c_table_np的每一行同除以wordcount_dict中的每个值即可,下面代码利用了numpy的广播机制,加快了运算速度

def compute_bigram_table(c_table_np, wordcount_dict):
    '''构建bigram概率矩阵
    Args:
        c_table_np (numpy): bigram频次矩阵
        wordcount_dict (dict): 所有单词出现的次数
    
    Returns:
        c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
    '''
    count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
    return c_table_np / count_np[:, None]

如果想要获取一个句子的bigram概率,连乘即可

def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
    '''计算每个句子的bigram概率
    Args:
        bigram_table_np (numpy): bigram概率矩阵
        word2idx_dict (dict): 单词到索引的映射
        sentences_list (list): 预处理后的句子
    
    Returns:
        scores_list (list): 所有句子的bigram概率
    '''
    scores_list = []
    for sentence_str in sentences_list:
        words_list = sentence_str.split()
        score = 1
        for i in range(1, len(words_list)):
            w_i = word2idx_dict[words_list[i]] # w_i
            w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
            score *= bigram_table_np[w_j][w_i]
        scores_list.append(score)
    return scores_list

完整代码如下:

import re
import numpy as np

def removePunctuation(sentence_str):
    '''给定字符串,进行特殊符号过滤,以及小写转换
    Args:
        sentence_str (str): 句子
    
    Returns:
        sentence_str.lower() (str): 预处理过的句子
    '''
    punctuation = '.!,;:?\-"\''
    sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str)
    return sentence_str.lower()

def get_sentence(filename):
    '''从给定文件中获取句子
    Args:
        filename (str): 语料库的文件名
    
    Returns:
        sentences_list (list): 1-D,存放了所有经过预处理后的句子
    '''
    sentences_list = []
    with open(filename, encoding='utf-8') as f:
        for line in f.readlines():
            sentence_str = removePunctuation(line.strip().split('|')[-1])
            sentences_list.append('<s> ' + sentence_str + ' </s>')
    return sentences_list

def count_word(sentences_list):
    '''给定大量句子,统计出所有单词出现的频次
    Args:
        sentences_list (list): 所有经过预处理后的句子
    
    Returns:
        wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234}
    '''
    wordcount_dict = {} 
    for sentence_str in sentences_list:
        for word in sentence_str.split():
            if word in wordcount_dict:
                wordcount_dict[word] += 1
            else:
                wordcount_dict[word] = 1
    return wordcount_dict

def word2idx(wordcount_dict):
    '''构建单词到索引的映射与逆映射
    Args:
        wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数
    
    Returns:
        word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0}
        idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'}
    '''
    word2idx_dict = {}
    idx2word_dict = {}
    for idx, word in enumerate(list(wordcount_dict.keys())):
        word2idx_dict[word] = idx
        idx2word_dict[idx] = word
    return word2idx_dict, idx2word_dict

def c_table(word2idx_dict, sentences_list, smooth=False):
    '''构建两个单词连续出现的频次矩阵
    Args:
        word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引
        sentences_list (list): 所有经过预处理后的句子
        smooth (bool): 是否进行加一平滑
    
    Returns:
        c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a
    '''
    n = len(word2idx_dict) # 单词个数
    c_table_np = np.zeros((n, n)) # n*n 全0矩阵
    for sentence_str in sentences_list:
        words_list = sentence_str.split() # ['i', 'like', 'apple']
        for i in range(1, len(words_list)):
            w_i = word2idx_dict[words_list[i]] # w_i
            w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
            c_table_np[w_j][w_i] += 1
    
    if smooth: # 加一平滑
        c_table_np[c_table_np == 0] = 1
    
    return c_table_np

def compute_bigram_table(c_table_np, wordcount_dict):
    '''构建bigram概率矩阵
    Args:
        c_table_np (numpy): bigram频次矩阵
        wordcount_dict (dict): 所有单词出现的次数
    
    Returns:
        c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵
    '''
    count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...]
    return c_table_np / count_np[:, None]

def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list):
    '''计算每个句子的bigram概率
    Args:
        bigram_table_np (numpy): bigram概率矩阵
        word2idx_dict (dict): 单词到索引的映射
        sentences_list (list): 预处理后的句子
    
    Returns:
        scores_list (list): 所有句子的bigram概率
    '''
    scores_list = []
    for sentence_str in sentences_list:
        words_list = sentence_str.split()
        score = 1
        for i in range(1, len(words_list)):
            w_i = word2idx_dict[words_list[i]] # w_i
            w_j = word2idx_dict[words_list[i-1]] # w_{i-1}
            score *= bigram_table_np[w_j][w_i]
        scores_list.append(score)
    return scores_list

if __name__ == '__main__':
    sentences_list = get_sentence('metadata.txt')
    wordcount_dict = count_word(sentences_list)
    word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
    c_table_np = c_table(word2idx_dict, sentences_list, True)
    bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
    scores_list = compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list)
    print(scores_list[:10])

以上是第一题的代码,第二题实际上也比较简单,在第一题的基础上稍作修改即可。例如用户输入$the$这个单词,首先通过word2idx_dict查出$the$对应的下标,记作$k$。然后在bigran_table_np中寻找第$k$行最大的前5个数对应的下标即可 (argmax)。得到下标,再通过idx2word返回单词。下面我只贴出不一样部分的代码,相同的代码不再重复

import heapq

def get_recommend_words(idx, bigram_table_np, idx2word_dict):
    m_list = list(bigram_table_np[idx])
    max_number = heapq.nlargest(7, m_list)
    max_index_list = []
    for t in max_number:
        index = m_list.index(t)
        max_index_list.append(index)
        m_list[index] = 0
    
    return [idx2word_dict[i] for i in max_index_list if idx2word_dict[i] not in ['<s>', '</s>']]


if __name__ == '__main__':
    sentences_list = get_sentence('metadata.txt')
    wordcount_dict = count_word(sentences_list)
    word2idx_dict, idx2word_dict = word2idx(wordcount_dict)
    c_table_np = c_table(word2idx_dict, sentences_list, False)
    bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict)
    while True:
        print('Please input a word: ', end="")
        word = input().lower()
        if word not in word2idx_dict:
            print('None')
            break
        else:
            recommend_word = get_recommend_words(word2idx_dict[word], bigram_table_np, idx2word_dict)
            print(recommend_word[:5])
Last Modified: February 11, 2023
Archives Tip
QR Code for this page
Tipping QR Code
Leave a Comment

已有 1 条评论
  1. lix lix

    up的题目来自哪个课程呢?