在第一部分(http://www.cnblogs.com/lovesanni/archive/2013/04/20/3032959.html)中我描述了分布式统计算法的基本思路,这篇从头到尾具体讲如何实现以及其中可能要避免的一些陷阱
同样的,本文中还是以python来做具体实现
具体阐述之前,得先考虑要解决哪些问题:
* 如何表示在统计中需要计算的字段
* 如何表示对应的统计算法,字段值如何生成
对于上述问题,我的解决方案是:
* 定义一套DSL,实现一系列的聚集算法,然后映射相应的语法
* 制定相应配置文件的BNF规范,用于指定调用哪些聚集算法生成相应字段
还是拿第一部分的表格来举例(如下表格为原始数据,D字段为时间值):
A | B | C | D |
a1 | b1 | 0.1 | datetime(2013, 4, 22, 12, 18, 0) |
a2 | b2 | 0.2 | datetime(2013, 4, 22, 13, 14, 15) |
a1 | b1 | 0.3 | datetime(2013, 4, 23, 14, 15, 16) |
需求为:
以A,B作为分组字段,计算C值的累加和
则对应的DSL表示为:
[stat] method = group out_db_url = mongodb://localhost:27017/out_db?out_collection sync_redis_url = redis://localhost:6379/7 [group:by] A = copy($src, A) B = copy($src, B) [fields] C = group_incre($src, C)
其中;
* stat节点表示输出地址
* group:by节点表示需要分组的字段
* fields节点表示需要做聚集运算的字段
那么,现在有了DSL,如何从DSL转化为python调用呢?需要解决几个问题:
* 如何识别字段定义,以及转换为python内置对象
* 如何与python中的函数定义映射
* 如何实现函数的链式调用
识别字段定义
从上文的DSL可以看出,即是常见的ini格式定义,python已内置对应的解析库,例如ConfigParser
转换为python内置对象
我们知道,从ini文件读取的节点值还只是字符串文本,要参与到python中计算,还需要有一次转化过程,因此就涉及到语法解析(说到语法解析,python有开源的yacc库可用,异常强大);为方便做语法解析,我做了几项简化:
* 只识别字符串、整数、浮点型、dict类型字符文本
* 函数调用语法和大多数语言一致,此处即采用和python一致的函数语法
* 函数参数用逗号分隔
* 所有以$为前缀的字符token当作变量,从专有的变量节点中查询引用;且$src默认识别为原始记录
* 函数只支持链式调用,不支持嵌套调用
因此在这个简化的前提下,完全无需用到yacc库即可搞定语法解析,例如:
A = copy($src, A)
在语法解析完毕之后,即可映射到相应的copy函数调用上,参数即为$src 和 A
字段值的生成
上文说到字段值通过函数调用生成,就涉及到函数参数的识别以及函数名的识别,通过代码给出:
def compile_expr(self, name, expr): '''编译表达式 name: 字段名 expr: 链式表达式 ''' assert name and isinstance(name, basestring) assert expr and isinstance(expr, basestring) _ = self.FUNC_ARG_PAT.findall(expr) if not _: raise FunctionNotFoundError(name) flist = [] for _ in _: args = self.__eval_args(_[1].strip()) _ = _[0] f = self.__func_defs.get(_) if not f: raise UnDefinedFunctionError('{0} in {1}'.format(_, name)) flist.append({'f': f, 'args': args}) self.__compiled_funclist.update({name: flist}) return name
通过compile_expr方法调用识别出函数名以及对应的参数列表,并储存起来在调用时使用:
def unwind(self, field, src): '''展开调用栈 ''' if not field or not isinstance(field, basestring): return False, None if not src or not isinstance(src, dict): return False, None ok, rt = False, None for idx, _ in enumerate(self.__compiled_funclist.get(field)): f = _.get('f') args = list(_.get('args')) if not self.__verify_args(idx, f, args): return False, None if 0 == idx: if not args: ok, rt = f() else: if '$src' == args[0]: ok, rt = f(src, *args[1:]) else: ok, rt = f(*args) else: if not ok: return False, None args.insert(0, rt) ok, rt = f(*args) return ok, rt def __verify_args(self, idx, f, args): spec = getargspec(f) f_arg_len = len(spec.args) - 1 if isinstance(f, MethodType) else len(spec.args) len_args = len(args) if 0 == idx else len(args) + 1 if not spec.varargs: if f_arg_len != len_args: return False else: if not len_args >= f_arg_len: return False return True
得益于python的动态语言以及可变参数特性,函数的动态调用实现得异常简单
分布式的考虑
有了上述基于函数调用的统计算法,结合第一部分的内容,就可以很方便的将统计计算映射到redis集群之上,并且由于每个统计之间没有耦合与依赖关系,可以很方便的横向扩展;
另外,在对统计任务的分发上,可以参考类似memcached的哈希环设计,平均分派到redis集群上;还有为保证统计结果的正确,需要在redis调用中添加事务保护
输出结果的考虑
通过上述函数调用,对于每条原始记录即可产生对应的中间统计结果,那么,最终如何输出到数据库结果表中呢?有一个很自然的考虑:利用mongodb的update调用写入;但这样做其实有非常大的性能瓶颈风险,因为mongodb索引的限制,非常不推荐直接利用update写入,因为随着数据量的增大和数据表的增加,该操作会非常慢;
因此为了快速写入,并考虑统计结果表输出的实际业务需求,需要有另外的模块对中间结果作缓存以及合并操作,并在相应时间做一次批处理插入,在这里就不做详细阐述了