[Python] Spark平台下实现分布式AC自动机(一)

转载请注明出处:http://www.cnblogs.com/kirai/ 作者:Kirai

零.问题的提出

  最近希望在分布式平台上实现一个AC自动机,但是如何在这样的分布式平台上表示这样的非线性数据结构就难住我了。因为一直在使用RDD提供的一些基本的操作,没有需要什么复杂的操作。所以突然想到了在分布式的平台上实现一个AC自动机一定很有趣。网上搜了下,发现没有人实现,因此决定尝试实现。或许就是一个玩具,不过也是能帮助自己更深理解分布式平台上进行编程和普通编程的区别吧。

  这个问题对我来讲还是有一定的难度的,加上课业、复习考研、竞赛三重压力,对于这个问题的研究时间可能会比较长,不过一定会抽出时间来考虑的。

  文章仅仅是在记录自己对问题的思考过程和代码,并不能保证每一步都是合理、有用、正确的。

一.实现可持久化字典树

  AC自动机是个什么东西就不赘述了,我首先用python实现了一个比较朴素的版本,这里查询是返回所有字典中出现的单词的起始位置,每一个单词一个list,最终组成一个dict。哈哈,也许有人看出来了这是去年的一道ICPC网赛题,没错。但是我忘记是哪一道了,只记得当时没做出来,所以用python写一个就当是对自己的一个补偿吧:)

  1 # -*- coding: utf-8 -*-
  2 class Node:
  3   """
  4   A Trie‘s basic data structure.
  5   """
  6   def __init__(self):
  7     self.next_letter = {}
  8     self.is_word = False
  9     self.letter = ‘‘
 10     self.depth = -1
 11     self.pre = None
 12     self.fail = None
 13
 14
 15 class Trie:
 16   def __init__(self):
 17     self.root = Node()
 18
 19
 20   def insert(self, word):
 21     """
 22     insert a word into the trie.
 23     :param word: the word
 24     :type word: str
 25     :return: None
 26     """
 27     cur_node = self.root
 28     pre = self.root
 29     depth = 1
 30     for letter in word:
 31       if not cur_node.next_letter.has_key(letter):
 32         cur_node.next_letter[letter] = Node()
 33       cur_node = cur_node.next_letter[letter]
 34
 35       cur_node.pre = pre
 36       cur_node.letter = letter
 37       cur_node.depth = depth
 38
 39       depth += 1
 40       pre = cur_node
 41
 42     cur_node.is_word = True
 43
 44
 45   def in_trie(self, word):
 46     """
 47     judge if the word is in the trie or not.
 48     :param word: the word
 49     :type word: str
 50     :return: if the word is in the trie or not.
 51     :rtype: bool
 52     """
 53     cur_node = self.root
 54     for letter in word:
 55       if not cur_node.next_letter.has_key(letter):
 56         return False
 57       cur_node = cur_node.next_letter[letter]
 58     return cur_node.is_word
 59
 60
 61 class AcAutomation:
 62   def __init__(self):
 63     self._trie = Trie()
 64     self._dict = set([])
 65
 66
 67   def in_trie(self, word):
 68     return self._trie.in_trie(word)
 69
 70
 71   def insert(self, word):
 72     map(self._dict.add, word)
 73     self._trie.insert(word)
 74
 75
 76   def build_ac_automation(self):
 77     """
 78     build the fail pointers to make the ac automation work.
 79     :return:
 80     """
 81     queue = []
 82     queue.append(self._trie.root)
 83     cur_node, tmp_node = None, None
 84
 85     while len(queue) != 0:
 86       cur_node = queue.pop(0)
 87       for letter in cur_node.next_letter:
 88         if cur_node == self._trie.root:
 89           cur_node.next_letter[letter].fail = self._trie.root
 90         else:
 91           tmp_node = cur_node.fail
 92           while tmp_node != None:
 93             if tmp_node.next_letter.has_key(letter):
 94               cur_node.next_letter[letter].fail =  95                 tmp_node.next_letter[letter]
 96               break
 97             tmp_node = tmp_node.fail
 98           if tmp_node == None:
 99             cur_node.next_letter[letter].fail = self._trie.root
100
101         queue.append(cur_node.next_letter[letter])
102
103
104   def get_word_position(self, sentence):
105     """
106     find the word‘s positions in the sentence according to
107     the dictionary.
108     :param sentence:
109     :rtype: Dict[List[]]
110     :return: an dictionary include the word that appears in the sentence
111              and the start and end positions.
112     """
113     cur_node = self._trie.root
114     tmp_node = None
115     result = {}
116     length = len(sentence)
117     for idx in range(0, length):
118       letter = sentence[idx]
119       if letter not in self._dict:
120         cur_node = self._trie.root
121         continue
122
123       if cur_node.next_letter.has_key(letter):
124         cur_node = cur_node.next_letter[letter]
125       else:
126         while cur_node != None and cur_node.next_letter.has_key(letter) == False:
127           cur_node = cur_node.fail
128         if cur_node == None:
129           cur_node = self._trie.root
130
131         if cur_node.next_letter.has_key(letter):
132           cur_node = cur_node.next_letter[letter]
133
134       tmp_node = cur_node
135       while tmp_node != self._trie.root:
136         if tmp_node.is_word == True:
137           word = ‘‘
138           word_node = tmp_node
139           while word_node != self._trie.root:
140             word += word_node.letter
141             word_node = word_node.pre
142           word = word[::-1]
143           if not result.has_key(word):
144             result[word] = []
145           result[word].append((idx - tmp_node.depth + 1, idx))
146         tmp_node = tmp_node.fail
147
148     return result
149
150
151 s = AcAutomation()
152 s.insert(‘AA‘)
153 s.insert(‘BB‘)
154 s.insert(‘CC‘)
155 s.insert(‘ACM‘)
156 s.build_ac_automation()
157 q = s.get_word_position(‘ooxxCC%dAAAAAAAAaaAAaoenGGACM‘)
158
159 print q

返回的结果是这样的:

1 {‘CC‘: [(4, 5)], ‘AA‘: [(8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (18, 19)], ‘ACM‘: [(26, 28)]}

  如何在分布式的平台上实现这样的运算?显然这种实现是有内存共享的,如何做到消除这些共享?

  确实想不到,看来还要多看资料学习。但是可以肯定的一点是,AC自动机的结构一定不允许是这样的。

  由于Spark使用RDD对分布式内存抽象,支持的操作均是熟悉的函数式操作。所以考虑是不是可以把Trie“扁平化”成这样的结构?

  于是定义:

{ letter: [{next level..}, is_word depth] }

  代表一个节点,letter代表当前节点的字符,后接一个list,list第一个元素表示此节点的后继,后面跟着的是一些属性。这些属性是可以扩充的。稍微转换一下思路,就可以把这个字典树写出来:

 1 def build_trie():
 2   """
 3   build a empty trie
 4   :return:
 5   """
 6   return {}
 7
 8
 9 def insert(root, word, depth=1):
10   """
11   insert a word into the trie recursively.
12   :param root:
13   :param word:
14   :param depth:
15   :return:
16   """
17   letter = word[0]
18   if len(word) == 1:
19     if(root.has_key(letter)):
20       root[letter][1] = True
21       return root
22     else:
23       return {letter: [{}, True, depth]}
24   if root.has_key(letter) == False:
25     root[letter] = [{}, False, depth]
26     root[letter][0] = insert(root[letter][0], word[1:], depth+1)
27   else:
28     root[letter][0] = dict(root[letter][0], **insert(root[letter][0], word[1:], depth+1))
29   return root
30
31
32 def find(root, word):
33   """
34   find a word if it‘s in the trie or not.
35   :param root:
36   :param word:
37   :return:
38   """
39   letter = word[0]
40   if root.has_key(letter) == False:
41     return False
42   if len(word) == 1:
43     if root[letter][1] == True:
44       return True
45     else:
46       return False
47   return find(root[letter][0], word[1:])

  我考虑过字典树如何从一个分布式平台上构建,可以这样:

  通过平时对单词的搜集,可以在单机上生成一个又一个小的字典树,保存在分布式文件系统上,在使用的时候只需要把这一个个小字典树合并成一个就行了。

  所以这棵字典树仅仅这样是不够的,因为我希望这字典树支持这个合并的函数式操作,于是加上了归并的操作,这样这棵字典树变成了一棵可持久字典树:

 1 def merge(a_root, b_root):
 2   """
 3   merge two tries.
 4   :param a_root:
 5   :param b_root:
 6   :return:
 7   """
 8   def _merge(a_root, b_root):
 9     for letter in b_root.keys():
10       if a_root.has_key(letter):
11         if b_root[letter][1] == True:
12           a_root[letter][1] = True
13         a_root[letter][0] = dict(a_root[letter][0], **_merge(a_root[letter][0], b_root[letter][0]))
14       else:
15         a_root[letter] = copy.deepcopy(b_root[letter])
16     return a_root
17   a_root = _merge(a_root, b_root)
18   return a_root

  合并的操作思路很简单,就是同时深入,假如有一棵树上没有另一棵树上的儿子,整棵子树都拷贝过来即可(这也是用python的dict方便的地方)。

  测试了一下:

 1 if __name__ == ‘__main__‘:
 2   trie_a = build_trie()
 3   trie_b = build_trie()
 4
 5   trie_a = insert(trie_a, ‘acm‘)
 6   trie_a = insert(trie_a, ‘acr‘)
 7   trie_a = insert(trie_a, ‘ak‘)
 8   trie_a = insert(trie_a, ‘bc‘)
 9
10   trie_b = insert(trie_b, ‘ac‘)
11   trie_b = insert(trie_b, ‘cm‘)
12   trie_b = insert(trie_b, ‘br‘)
13
14   s = [trie_a, trie_b]
15   s = reduce(merge, s)
16   print json.dumps(s)

  输出的结构dumps成json后格式化一下是这样的:

{
    "a": [
        {
            "c": [
                {
                    "r": [
                        {},
                        true,
                        3
                    ],
                    "m": [
                        {},
                        true,
                        3
                    ]
                },
                true,
                2
            ],
            "k": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ],
    "c": [
        {
            "m": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ],
    "b": [
        {
            "c": [
                {},
                true,
                2
            ],
            "r": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ]
}

  这样一棵可持久化的字典树就实现了,接下来考虑将这段程序翻译成Spark的程序,并且生成一些字典数据,看看是否符合自己的预期。

时间: 2024-08-08 06:57:49

[Python] Spark平台下实现分布式AC自动机(一)的相关文章

基于.net平台下大型分布式HIS系统之住院管理系统视频教程

基于.net平台下大型分布式HIS系统之住院管理系统视频教程资源下载地址:https://pan.baidu.com/s/18Mwh_HBEys8v709QWNkDpw 提取码:fhwo 住院管理系统是整个HIS的核心部分,其他大部分业务就从住院开始.这个子系统包含了70多个业务功能点,其中我们主要精讲了入院登记.预交款管理.划价记账.住院退费.转科转床.账目结算.出院等大的业务.每个功能点我们都是从需求出发,分析业务,同时在分析过程中经常性会引入现场客户案例,并目前HIS市场的现状等.然后由于

Python windows 平台下安装BeautifulSoup

Windows平台安装Beautiful Soup  :http://kevinkelly.blog.163.com/blog/static/21390809320133185748442/

在Window平台下安装xgboost的Python版本

原文:http://blog.csdn.net/pengyulong/article/details/50515916 原文修改了两个地方才安装成功,第3步可以不用,第2步重新生成所有的就行了. 第4步,有“xgboost_wrapper.dll”以后,将该文件复制到/python-package/xgboost/中,继续后面步骤就可以了. 特别注意如果你的python是32位的,第二步就不要选择x64,而是选择win32.对应的文件也不是在x64下了.一定可以运行. xgboost的全称是eX

Windows平台下如何在C#中调用Python

最近迷上了Python,发现它能够做很多C#无法完成的事情,比如,调用CMD或者在CMD中执行一个exe文件命令行并获得输出的结果.过程简单,处理起来也非常方便,但如果要用C#调用Python文件呢,没关系,你想到的肯定早就有也人想到过.网上Google一下,超级多.索性拿来实践吧. 首先要用到的就是这个软件:IronPython,官方下载地址:http://ironpython.codeplex.com 安装在Windows下之后去它的安装地址查找下面这两个文件: IronPython.dll

探析大数据需求下的分布式数据库

一.前言 大数据技术从诞生到现在,已经经历了十几个年头.市场上早已不断有公司或机构,给广大金融从业者"洗脑"大数据未来的美好前景与趋势.随着用户对大数据理念与技术的不断深入了解,人们已经开始从理论探索转向对场景落地的寻找,让大数据在企业中落地并开花结果. 从大数据的管理和应用方向集中在两个领域.第一,大数据分析相关,针对海量数据的挖掘.复杂的分析计算:第二,在线数据操作,包括传统交易型操作以及海量数据的实时访问.大数据高并发查询操作.用户根据业务场景以及对数据处理结果的期望选择不同的大

Tachyon:Spark生态系统中的分布式内存文件系统

转自: http://www.csdn.net/article/2015-06-25/2825056  摘要:Tachyon把内存存储的功能从Spark中分离出来, 使Spark可以更专注计算的本身, 以求通过更细的分工达到更高的执行效率. Tachyon是Spark生态系统内快速崛起的一个新项目. 本质上, Tachyon是个分布式的内存文件系统, 它在减轻Spark内存压力的同时,也赋予了Spark内存快速大量数据读写的能力.Tachyon把内存存储的功能从Spark中分离出来, 使Spar

.NET平台下使用MongoDB入门教程

适合人群:完全没有接触MongoDB或对MongoDB有一点了解的C#开发人员.因为本文是一篇入门级的文章. 一.了解MongoDB  MongoDB是一个基于分布式文件存储的数据库.由C++语言编写.旨在为WEB应用提供可扩展的高性能数据存储解决方案.  MongoDB是一个高性能,开源,无模式的文档型数据库,是当前NoSql数据库中比较热门的一种. MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的.他支持的数据结构非常松散,是类似js

大数据平台Hadoop的分布式集群环境搭建

1 概述 本文章介绍大数据平台Hadoop的分布式环境搭建.以下为Hadoop节点的部署图,将NameNode部署在master1,SecondaryNameNode部署在master2,slave1.slave2.slave3中分别部署一个DataNode节点 NN=NameNode(名称节点) SND=SecondaryNameNode(NameNode的辅助节点) DN=DataNode(数据节点)2 前期准备 (1)准备五台服务器 如:master1.master2.slave1.sla

hdu2222 Keywords Search & AC自动机学习小结

传送门:http://http://acm.hdu.edu.cn/showproblem.php?pid=2222 思路:AC自动机入门题,直接上AC自动机即可. 对于构建AC自动机,我们要做的只有三件事: 1)构建字典树 2)构建失败指针 3)构建trie图(这道题好像不做这一步也能A...但是这一步不做是会被卡成O(n^2)的...) 1)第一步还是比较好理解的 根是虚根,边代表字母,那么根到终止节点的路径就是一个字符串,这样对于前缀相同的字符串我们就可以省下存公共前缀的空间. 加入一个模式