现在的位置: 首页 > 综合 > 正文

分布式统计的思考以及实现(二)

2012年04月25日 ⁄ 综合 ⁄ 共 2917字 ⁄ 字号 评论关闭

在第一部分(http://www.cnblogs.com/lovesanni/archive/2013/04/20/3032959.html)中我描述了分布式统计算法的基本思路,这篇从头到尾具体讲如何实现以及其中可能要避免的一些陷阱

同样的,本文中还是以python来做具体实现

 

具体阐述之前,得先考虑要解决哪些问题:

* 如何表示在统计中需要计算的字段

* 如何表示对应的统计算法,字段值如何生成

 

对于上述问题,我的解决方案是:

* 定义一套DSL,实现一系列的聚集算法,然后映射相应的语法

* 制定相应配置文件的BNF规范,用于指定调用哪些聚集算法生成相应字段

 

还是拿第一部分的表格来举例(如下表格为原始数据,D字段为时间值):

A B C D
a1 b1 0.1 datetime(2013, 4, 22, 12, 18, 0)
a2 b2 0.2 datetime(2013, 4, 22, 13, 14, 15)
a1 b1 0.3 datetime(2013, 4, 23, 14, 15, 16)

需求为:

以A,B作为分组字段,计算C值的累加和

 

则对应的DSL表示为:

[stat]
method = group
out_db_url = mongodb://localhost:27017/out_db?out_collection
sync_redis_url = redis://localhost:6379/7

[group:by]
A = copy($src, A)
B = copy($src, B)

[fields]
C = group_incre($src, C)

 

其中;

* stat节点表示输出地址

* group:by节点表示需要分组的字段

* fields节点表示需要做聚集运算的字段

 

那么,现在有了DSL,如何从DSL转化为python调用呢?需要解决几个问题:

* 如何识别字段定义,以及转换为python内置对象

* 如何与python中的函数定义映射

* 如何实现函数的链式调用

识别字段定义

从上文的DSL可以看出,即是常见的ini格式定义,python已内置对应的解析库,例如ConfigParser

转换为python内置对象

我们知道,从ini文件读取的节点值还只是字符串文本,要参与到python中计算,还需要有一次转化过程,因此就涉及到语法解析(说到语法解析,python有开源的yacc库可用,异常强大);为方便做语法解析,我做了几项简化:

* 只识别字符串、整数、浮点型、dict类型字符文本

* 函数调用语法和大多数语言一致,此处即采用和python一致的函数语法

* 函数参数用逗号分隔

* 所有以$为前缀的字符token当作变量,从专有的变量节点中查询引用;且$src默认识别为原始记录

* 函数只支持链式调用,不支持嵌套调用

 

因此在这个简化的前提下,完全无需用到yacc库即可搞定语法解析,例如:

A = copy($src, A)

在语法解析完毕之后,即可映射到相应的copy函数调用上,参数即为$src 和 A

字段值的生成

上文说到字段值通过函数调用生成,就涉及到函数参数的识别以及函数名的识别,通过代码给出:

def compile_expr(self, name, expr):
    '''编译表达式
    name: 字段名
    expr: 链式表达式
    '''
    assert name and isinstance(name, basestring)
    assert expr and isinstance(expr, basestring)
    
    _ = self.FUNC_ARG_PAT.findall(expr)
    if not _:
        raise FunctionNotFoundError(name)
        
    flist = []
    for _ in _:
        args = self.__eval_args(_[1].strip())
        _ = _[0]
        f = self.__func_defs.get(_)
        if not f:
            raise UnDefinedFunctionError('{0} in {1}'.format(_, name))
        flist.append({'f': f, 'args': args})
        
    self.__compiled_funclist.update({name: flist})
    return name

 

通过compile_expr方法调用识别出函数名以及对应的参数列表,并储存起来在调用时使用:

def unwind(self, field, src):
    '''展开调用栈
    '''
    
    if not field or not isinstance(field, basestring):
        return False, None
    if not src or not isinstance(src, dict):
        return False, None
    
    ok, rt = False, None
    for idx, _ in enumerate(self.__compiled_funclist.get(field)):
        f = _.get('f')
        args = list(_.get('args'))
        
        if not self.__verify_args(idx, f, args):
            return False, None
        
        if 0 == idx:
            if not args:
                ok, rt = f()
            else:
                if '$src' == args[0]:
                    ok, rt = f(src, *args[1:])
                else:
                    ok, rt = f(*args)
        else:
            if not ok:
                return False, None
            args.insert(0, rt)
            ok, rt = f(*args)
        
    return ok, rt
    
    
def __verify_args(self, idx, f, args):
    spec = getargspec(f)
    
    f_arg_len = len(spec.args) - 1 if isinstance(f, MethodType) else len(spec.args)
    len_args = len(args) if 0 == idx else len(args) + 1
    
    if not spec.varargs:
        if f_arg_len != len_args:
            return False
    else:
        if not len_args >= f_arg_len:
            return False

    return True

 

得益于python的动态语言以及可变参数特性,函数的动态调用实现得异常简单

分布式的考虑

有了上述基于函数调用的统计算法,结合第一部分的内容,就可以很方便的将统计计算映射到redis集群之上,并且由于每个统计之间没有耦合与依赖关系,可以很方便的横向扩展;

另外,在对统计任务的分发上,可以参考类似memcached的哈希环设计,平均分派到redis集群上;还有为保证统计结果的正确,需要在redis调用中添加事务保护

输出结果的考虑

通过上述函数调用,对于每条原始记录即可产生对应的中间统计结果,那么,最终如何输出到数据库结果表中呢?有一个很自然的考虑:利用mongodb的update调用写入;但这样做其实有非常大的性能瓶颈风险,因为mongodb索引的限制,非常不推荐直接利用update写入,因为随着数据量的增大和数据表的增加,该操作会非常慢;

因此为了快速写入,并考虑统计结果表输出的实际业务需求,需要有另外的模块对中间结果作缓存以及合并操作,并在相应时间做一次批处理插入,在这里就不做详细阐述了

抱歉!评论已关闭.