问答过程

1、learn 函数

根据Kernel类的结构,learn函数的代码如下

def learn(self, filename):
    """Load and learn the contents of the specified AIML file.

    If filename includes wildcard characters, all matching files
    will be loaded and learned.

    """
    for f in glob.glob(filename):
        if self._verboseMode: print "Loading %s..." % f,
        start = time.clock()
        # Load and parse the AIML file.
        parser = AimlParser.create_parser()
        handler = parser.getContentHandler()
        handler.setEncoding(self._textEncoding)
        try: parser.parse(f)
        except xml.sax.SAXParseException, msg:
            err = "\nFATAL PARSE ERROR in file %s:\n%s\n" % (f,msg)
            sys.stderr.write(err)
            continue
        # store the pattern/template pairs in the PatternMgr.
        for key,tem in handler.categories.items():
            self._brain.add(key,tem)
        # Parsing was successful.
        if self._verboseMode:
            print "done (%.2f seconds)" % (time.clock() - start)

该函数遍历传入的文件参数,对于每个文件,通过 AimlParser 中创建的AIML解析类进行语法解析,转换成上面所述的结构存放在 AimlParser 相应 handler 的 categories 参数中。categories 参数中的每一个结构都是一个相应的匹配规则,最后将这些规则添加到AIML的大脑中(PatternMgr匹配规则管理类)。

PatternMgr 类通过add函数,将一个个解析好的匹配规则添加进行,形成统一的规则管理,实际上是将各个独立的匹配规则整合到一起,形成树的形式。下面我们先从add函数入手,其代码如下

def add(self, (pattern,that,topic), template):
    """Add a [pattern/that/topic] tuple and its corresponding template
    to the node tree.

    """
    # TODO: make sure words contains only legal characters
    # (alphanumerics,*,_)

    # Navigate through the node tree to the template's location, adding
    # nodes if necessary.
    node = self._root
    for word in string.split(pattern):
        key = word
        if key == u"_":
            key = self._UNDERSCORE
        elif key == u"*":
            key = self._STAR
        elif key == u"BOT_NAME":
            key = self._BOT_NAME
        if not node.has_key(key):
            node[key] = {}
        node = node[key]

    # navigate further down, if a non-empty "that" pattern was included
    if len(that) > 0:
        if not node.has_key(self._THAT):
            node[self._THAT] = {}
        node = node[self._THAT]
        for word in string.split(that):
            key = word
            if key == u"_":
                key = self._UNDERSCORE
            elif key == u"*":
                key = self._STAR
            if not node.has_key(key):
                node[key] = {}
            node = node[key]

    # navigate yet further down, if a non-empty "topic" string was included
    if len(topic) > 0:
        if not node.has_key(self._TOPIC):
            node[self._TOPIC] = {}
        node = node[self._TOPIC]
        for word in string.split(topic):
            key = word
            if key == u"_":
                key = self._UNDERSCORE
            elif key == u"*":
                key = self._STAR
            if not node.has_key(key):
                node[key] = {}
            node = node[key]


    # add the template.
    if not node.has_key(self._TEMPLATE):
        self._templateCount += 1    
    node[self._TEMPLATE] = template

可以看到该函数中,对于一个新增的匹配规则, node 初始化为 _root,这是规则树的根节点。接着,根据 pattern 中的单词,依次构建相应的子节点,如此一句话就能形成一条的规则路径,而路径的尾端连接 template 内容作为叶节点。对于 that 以及 topic 的处理为,同样将其中的全部单词转换为树路径,并依次连接到原路径尾部的节点上,再将 template 内容连接到尾部作为叶节点。

如此,当全部匹配规则添加进 PatternMgr 类后,我们可以得到如下规则树

2、respond过程

根据Kernel类的结构,respond函数的代码如下

def respond(self, input, sessionID = _globalSessionID):
    """Return the Kernel's response to the input string."""
    if len(input) == 0:
        return ""

    #ensure that input is a unicode string
    try: input = input.decode(self._textEncoding, 'replace')
    except UnicodeError: pass
    except AttributeError: pass

    # prevent other threads from stomping all over us.
    self._respondLock.acquire()

    # Add the session, if it doesn't already exist
    self._addSession(sessionID)

    # split the input into discrete sentences
    sentences = Utils.sentences(input)
    finalResponse = ""
    for s in sentences:
        # Add the input to the history list before fetching the
        # response, so that <input/> tags work properly.
        inputHistory = self.getPredicate(self._inputHistory, sessionID)
        inputHistory.append(s)
        while len(inputHistory) > self._maxHistorySize:
            inputHistory.pop(0)
        self.setPredicate(self._inputHistory, inputHistory, sessionID)

        # Fetch the response
        response = self._respond(s, sessionID)

        # add the data from this exchange to the history lists
        outputHistory = self.getPredicate(self._outputHistory, sessionID)
        outputHistory.append(response)
        while len(outputHistory) > self._maxHistorySize:
            outputHistory.pop(0)
        self.setPredicate(self._outputHistory, outputHistory, sessionID)

        # append this response to the final response.
        finalResponse += (response + "  ")
    finalResponse = finalResponse.strip()

    assert(len(self.getPredicate(self._inputStack, sessionID)) == 0)

    # release the lock and return
    self._respondLock.release()
    try: return finalResponse.encode(self._textEncoding)
    except UnicodeError: return finalResponse

该函数在检查输入合法后,将输入按标点规则划分为多个 sentence。对于每个sentence,调用 _respond函数得到相应的回答,将所有回答串连到一起后,返回给提问者。当然,该函数对于每个问答,都利用自建的session机制进行维护,形成history列表,当然这并不是我们所关注的核心。下面我们重点关注 _respond函数的实现

def _respond(self, input, sessionID):
    """Private version of respond(), does the real work."""
    if len(input) == 0:
        return ""

    # guard against infinite recursion
    inputStack = self.getPredicate(self._inputStack, sessionID)
    if len(inputStack) > self._maxRecursionDepth:
        if self._verboseMode:
            err = "WARNING: maximum recursion depth exceeded (input='%s')" % input.encode(self._textEncoding, 'replace')
            sys.stderr.write(err)
        return ""

    # push the input onto the input stack
    inputStack = self.getPredicate(self._inputStack, sessionID)
    inputStack.append(input)
    self.setPredicate(self._inputStack, inputStack, sessionID)

    # run the input through the 'normal' subber
    subbedInput = self._subbers['normal'].sub(input)

    # fetch the bot's previous response, to pass to the match()
    # function as 'that'.
    outputHistory = self.getPredicate(self._outputHistory, sessionID)
    try: that = outputHistory[-1]
    except IndexError: that = ""
    subbedThat = self._subbers['normal'].sub(that)

    # fetch the current topic
    topic = self.getPredicate("topic", sessionID)
    subbedTopic = self._subbers['normal'].sub(topic)

    # Determine the final response.
    response = ""
    elem = self._brain.match(subbedInput, subbedThat, subbedTopic)
    if elem is None:
        if self._verboseMode:
            err = "WARNING: No match found for input: %s\n" % input.encode(self._textEncoding)
            sys.stderr.write(err)
    else:
        # Process the element into a response string.
        response += self._processElement(elem, sessionID).strip()
        response += " "
    response = response.strip()

    # pop the top entry off the input stack.
    inputStack = self.getPredicate(self._inputStack, sessionID)
    inputStack.pop()
    self.setPredicate(self._inputStack, inputStack, sessionID)

    return response

_respond 函数中,首先是检查通过自建的session机制中,获取上次机器人的回答内容以及当前处理的问答topic。接着利用WordSub类再对input(本次输入)、that(上次输出)、topic(主题)进行替换词处理,这一个过程通过正则表达式替换完成。然后,通过 PatternMgr 类的match函数,从之前已经建立好的规则树中匹配对应问题的回答 template。match函数的代码如下

def match(self, pattern, that, topic):
    """Return the template which is the closest match to pattern. The
    'that' parameter contains the bot's previous response. The 'topic'
    parameter contains the current topic of conversation.

    Returns None if no template is found.

    """
    if len(pattern) == 0:
        return None
    # Mutilate the input.  Remove all punctuation and convert the
    # text to all caps.
    input = string.upper(pattern)
    input = re.sub(self._puncStripRE, " ", input)
    if that.strip() == u"": that = u"ULTRABOGUSDUMMYTHAT" # 'that' must never be empty
    thatInput = string.upper(that)
    thatInput = re.sub(self._puncStripRE, " ", thatInput)
    thatInput = re.sub(self._whitespaceRE, " ", thatInput)
    if topic.strip() == u"": topic = u"ULTRABOGUSDUMMYTOPIC" # 'topic' must never be empty
    topicInput = string.upper(topic)
    topicInput = re.sub(self._puncStripRE, " ", topicInput)

    # Pass the input off to the recursive call
    patMatch, template = self._match(input.split(), thatInput.split(), topicInput.split(), self._root)
    return template

该函数首页对输入的参数进行大写以及标点、空白符进行处理,并把参数根据空字符进行分词,最后将参数传递给 _match 函数获得相匹配的路径以及template。下面跟踪到 _match 函数实现

def _match(self, words, thatWords, topicWords, root):
    """Return a tuple (pat, tem) where pat is a list of nodes, starting
    at the root and leading to the matching pattern, and tem is the
    matched template.

    """ 
    # base-case: if the word list is empty, return the current node's
    # template.
    if len(words) == 0:
        # we're out of words.
        pattern = []
        template = None
        if len(thatWords) > 0:
            # If thatWords isn't empty, recursively
            # pattern-match on the _THAT node with thatWords as words.
            try:
                pattern, template = self._match(thatWords, [], topicWords, root[self._THAT])
                if pattern != None:
                    pattern = [self._THAT] + pattern
            except KeyError:
                pattern = []
                template = None
        elif len(topicWords) > 0:
            # If thatWords is empty and topicWords isn't, recursively pattern
            # on the _TOPIC node with topicWords as words.
            try:
                pattern, template = self._match(topicWords, [], [], root[self._TOPIC])
                if pattern != None:
                    pattern = [self._TOPIC] + pattern
            except KeyError:
                pattern = []
                template = None
        if template == None:
            # we're totally out of input.  Grab the template at this node.
            pattern = []
            try: template = root[self._TEMPLATE]
            except KeyError: template = None
        return (pattern, template)

    first = words[0]
    suffix = words[1:]

    # Check underscore.
    # Note: this is causing problems in the standard AIML set, and is
    # currently disabled.
    if root.has_key(self._UNDERSCORE):
        # Must include the case where suf is [] in order to handle the case
        # where a * or _ is at the end of the pattern.
        for j in range(len(suffix)+1):
            suf = suffix[j:]
            pattern, template = self._match(suf, thatWords, topicWords, root[self._UNDERSCORE])
            if template is not None:
                newPattern = [self._UNDERSCORE] + pattern
                return (newPattern, template)

    # Check first
    if root.has_key(first):
        pattern, template = self._match(suffix, thatWords, topicWords, root[first])
        if template is not None:
            newPattern = [first] + pattern
            return (newPattern, template)

    # check bot name
    if root.has_key(self._BOT_NAME) and first == self._botName:
        pattern, template = self._match(suffix, thatWords, topicWords, root[self._BOT_NAME])
        if template is not None:
            newPattern = [first] + pattern
            return (newPattern, template)

    # check star
    if root.has_key(self._STAR):
        # Must include the case where suf is [] in order to handle the case
        # where a * or _ is at the end of the pattern.
        for j in range(len(suffix)+1):
            suf = suffix[j:]
            pattern, template = self._match(suf, thatWords, topicWords, root[self._STAR])
            if template is not None:
                newPattern = [self._STAR] + pattern
                return (newPattern, template)

    # No matches were found.
    return (None, None)

该函数以递归方式执行,从root节点开始,查找相应的匹配规则。当words列表不为空时,取出列表中第一个单词,查看当前节点中对应取出单词的子节点,若找到匹配项则记录新的pattern,然后对words列表中剩下的单词以及查找的子节点,递归执行 _match 函数匹配完整的路径;若无匹配项则返回空。同时,在匹配规则中,AIML支持以 * 和 _ 方式进行模糊匹配规则,其相应的实现也在此进行。递归函数一直执行,直到words列表为空时,取出此时尾节点中对应的 _TEMPALTE 路径的值作为匹配结果。当然,对于存在thatWords和topicWords的情况,则在递归至words列表为空时,需要继续依次递归thatWords和topicWords中的单词,直至列表全部为空时,取出此时尾节点中对应的 _TEMPALTE 路径的值作为匹配结果。

需要注意的是,此处 template 并不是回答文本内容,而是一个结构化的 template 数组,具体形式参见“语法解析类”结尾。因此,我们还需要从该template结构提取出我们需要的问答结果内容。_respond 函数中,通过 _processElement 函数来从template数组中分析获取我们最终需要的问答结果内容。

def _processElement(self,elem, sessionID):
    """Process an AIML element.

    The first item of the elem list is the name of the element's
    XML tag.  The second item is a dictionary containing any
    attributes passed to that tag, and their values.  Any further
    items in the list are the elements enclosed by the current
    element's begin and end tags; they are handled by each
    element's handler function.

    """
    try:
        handlerFunc = self._elementProcessors[elem[0]]
    except:
        # Oops -- there's no handler function for this element
        # type!
        if self._verboseMode:
            err = "WARNING: No handler found for <%s> element\n" % elem[0].encode(self._textEncoding, 'replace')
            sys.stderr.write(err)
        return ""
    return handlerFunc(elem, sessionID)

该函数提供了一个接口,首先从element中取出元素名称,查找 _elementProcessors 中预先定义好的,每种元素对应的处理函数,调用该函数获取最终问答结果。这里,我们以包含<randmon>的tempalte作为示例进行分析,下面先给出其template写法以及结构化形式

<template>
    <random>
        <li>asnwer1</li>
        <li>answer2</li>
    </random>
</template>

[
    'template', {}
    [
        'random', {}, 
        [
            'li', {}, ['text', {'xml:space': 'default'}, u'answer1']
        ], 
        [
            'li', {}, ['text', {'xml:space': 'default'}, u'answer2']
        ]
    ]
]

由以上结构化形式,首先对于第一个template标签,通过 _processElement 函数查找到对应的 _processTemplate 函数进行处理,其代码如下

def _processTemplate(self,elem, sessionID):
    """Process a <template> AIML element.

    <template> elements recursively process their contents, and
    return the results.  <template> is the root node of any AIML
    response tree.

    """
    response = ""
    for e in elem[2:]:
        response += self._processElement(e, sessionID)
    return response

template元素的处理过程很简单,对于其每个子元素单独调用 _processElement 函数进行处理,并把处理结果连接到一起作为返回结果。由于template中只有一个子元素random,依据 _processElement 函数原理,函数执行跟踪至 _processRandom

def _processRandom(self, elem, sessionID):
    """Process a <random> AIML element.

    <random> elements contain zero or more <li> elements.  If
    none, the empty string is returned.  If one or more <li>
    elements are present, one of them is selected randomly to be
    processed recursively and have its results returned.  Only the
    chosen <li> element's contents are processed.  Any non-<li> contents are
    ignored.

    """
    listitems = []
    for e in elem[2:]:
        if e[0] == 'li':
            listitems.append(e)
    if len(listitems) == 0:
        return ""

    # select and process a random listitem.
    random.shuffle(listitems)
    return self._processElement(listitems[0], sessionID)

random元素的处理过程与template相似,首先找出其全部li子元素,然后调用shuffle函数打乱li元素的顺序,最后获取列表中第一个li元素的处理结果作为返回 结果。由此,依据 _processElement 函数原理,我们可跟踪至 _processLi

def _processLi(self,elem, sessionID):
    """Process an <li> AIML element.

    Optional attribute elements:
        name: the name of a predicate to query.
        value: the value to check that predicate for.

    <li> elements process their contents recursively and return
    the results. They can only appear inside <condition> and
    <random> elements.  See _processCondition() and
    _processRandom() for details of their usage.

    """
    response = ""
    for e in elem[2:]:
        response += self._processElement(e, sessionID)
    return response

li元素的处理方式与template完全一致,同理,我们跟踪至 _processText

def _processText(self,elem, sessionID):
    """Process a raw text element.

    Raw text elements aren't really AIML tags. Text elements cannot contain
    other elements; instead, the third item of the 'elem' list is a text
    string, which is immediately returned. They have a single attribute,
    automatically inserted by the parser, which indicates whether whitespace
    in the text should be preserved or not.

    """
    try: elem[2] + ""
    except TypeError: raise TypeError, "Text element contents are not text"

    # If the the whitespace behavior for this element is "default",
    # we reduce all stretches of >1 whitespace characters to a single
    # space.  To improve performance, we do this only once for each
    # text element encountered, and save the results for the future.
    if elem[1]["xml:space"] == "default":
        elem[2] = re.sub("\s+", " ", elem[2])
        elem[1]["xml:space"] = "preserve"
    return elem[2]

text元素基本上都是最为末端元素,即该元素已经没有子元素了,因此,我们只需要把text元素中保存的字符串作为返回结果即可。要注意的是,我们需要根据xml标准中,对应空格符不同的处理,对即将返回的字符串进行相应处理。以我们的示例数据为例,最终 _processText 的返回值为 'answer1' 或 'answer2'。

至此,我们已经简要的分析了一个完整的问答处理过程

results matching ""

    No results matching ""