MongoDB:10. MapReduce
在 MongoDB 上使用 Map/Reduce 进行并行 "统计" 很容易。
参数说明:
|
官方文档有几句话很重要:
map/reduce is invoked via a database. The database creates a temporary collection to hold output of the operation. The collection is command cleaned up when the client connection closes, or when explicitly dropped. Alternatively, one can specify a permanent output collection name. map and reduce functions are written in JavaScript and execute on the server. In sharded environments, data processing of map/reduce operations runs in parallel on all shards.
MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current JavaScript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need |
先准备点简单的数据练练手。
1. Map
Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。
value 可以使用 JSON Object 传递 (支持多个属性值)。
例如:
emit(this.age, {count:1}) |
2. Reduce
Reduce 函数接收的参数类似 Group 效果,将 Map 返回的键值序列组合成 { key, [value1, value2, value3, value...] } 传递给 reduce。
Reduce 函数对这些 values 进行 "统计" 操作,返回结果可以使用 JSON Object。
3. Result
我们不必使用 runCommand,改用 db.<collection>.mapReduce() 更方便一些。
mapReduce() 将结果存储在 "tmp.mr.mapreduce_1284097299_10" 临时集合中。
4. Finalize
利用 finalize() 我们可以对 reduce() 的结果做进一步处理。
5. Options
我们还可以添加更多的控制细节。
6. Example
MapReduce 的作用不仅仅是 "统计",我们可以直接用这种在服务器端高速并发执行机制批量修改数据。
7. PyMongo
最后当然得在 Python 调用一下。
In [1]: from pymongo import *
In [2]: conn = Connection() In [3]: db = conn.test In [4]: m = "function() { emit(this.age, 1); }" In [5]: r = "function(key, values) { var x = 0; values.forEach(function(v){ x += v }); return x; }" In [6]: res = db.users.map_reduce(m, r, True)
In [7]: for k in db[res["result"]].find(): print k ....: {u'_id': 1.0, u'value': 24.0} {u'_id': 2.0, u'value': 25.0} {u'_id': 3.0, u'value': 25.0} {u'_id': 4.0, u'value': 25.0} {u'_id': 5.0, u'value': 25.0} {u'_id': 6.0, u'value': 25.0} {u'_id': 7.0, u'value': 25.0} {u'_id': 8.0, u'value': 25.0} {u'_id': 9.0, u'value': 25.0} {u'_id': 10.0, u'value': 25.0} {u'_id': 11.0, u'value': 26.0} {u'_id': 12.0, u'value': 25.0} {u'_id': 13.0, u'value': 25.0} {u'_id': 14.0, u'value': 25.0} {u'_id': 15.0, u'value': 25.0} {u'_id': 16.0, u'value': 25.0} {u'_id': 17.0, u'value': 25.0} {u'_id': 18.0, u'value': 25.0} {u'_id': 19.0, u'value': 25.0} {u'_id': 20.0, u'value': 25.0} {u'_id': 21.0, u'value': 25.0} {u'_id': 22.0, u'value': 25.0} {u'_id': 23.0, u'value': 25.0} {u'_id': 24.0, u'value': 25.0} {u'_id': 25.0, u'value': 25.0} {u'_id': 26.0, u'value': 25.0} {u'_id': 27.0, u'value': 25.0} {u'_id': 28.0, u'value': 25.0} {u'_id': 29.0, u'value': 25.0} {u'_id': 30.0, u'value': 25.0} {u'_id': 31.0, u'value': 25.0} {u'_id': 32.0, u'value': 25.0} {u'_id': 33.0, u'value': 25.0} {u'_id': 34.0, u'value': 25.0} {u'_id': 35.0, u'value': 25.0} {u'_id': 36.0, u'value': 25.0} {u'_id': 37.0, u'value': 25.0} {u'_id': 38.0, u'value': 25.0} {u'_id': 39.0, u'value': 25.0} {u'_id': 40.0, u'value': 25.0} |
附加参数也很容易。
In [10]: res = db.users.map_reduce(m, r, True, limit=10)
In [11]: res Out[11]: {u'counts': {u'emit': 10, u'input': 10, u'output': 10}, u'ok': 1.0, u'result': u'tmp.mr.mapreduce_1284099468_31', u'timeMillis': 20} In [12]: for k in db[res["result"]].find(): print k ....: {u'_id': 2.0, u'value': 1.0} {u'_id': 3.0, u'value': 1.0} {u'_id': 4.0, u'value': 1.0} {u'_id': 5.0, u'value': 1.0} {u'_id': 6.0, u'value': 1.0} {u'_id': 7.0, u'value': 1.0} {u'_id': 8.0, u'value': 1.0} {u'_id': 9.0, u'value': 1.0} {u'_id': 10.0, u'value': 1.0} {u'_id': 11.0, u'value': 1.0} In [13]: res = db.users.map_reduce(m, r, True, query={"age":{"$lt":20}}) In [14]: res Out[14]: {u'counts': {u'emit': 475, u'input': 475, u'output': 19}, u'ok': 1.0, u'result': u'tmp.mr.mapreduce_1284099533_33', u'timeMillis': 77}
In [15]: for k in db[res["result"]].find(): print k ....: {u'_id': 1.0, u'value': 24.0} {u'_id': 2.0, u'value': 25.0} {u'_id': 3.0, u'value': 25.0} {u'_id': 4.0, u'value': 25.0} {u'_id': 5.0, u'value': 25.0} {u'_id': 6.0, u'value': 25.0} {u'_id': 7.0, u'value': 25.0} {u'_id': 8.0, u'value': 25.0} {u'_id': 9.0, u'value': 25.0} {u'_id': 10.0, u'value': 25.0} {u'_id': 11.0, u'value': 26.0} {u'_id': 12.0, u'value': 25.0} {u'_id': 13.0, u'value': 25.0} {u'_id': 14.0, u'value': 25.0} {u'_id': 15.0, u'value': 25.0} {u'_id': 16.0, u'value': 25.0} {u'_id': 17.0, u'value': 25.0} {u'_id': 18.0, u'value': 25.0} {u'_id': 19.0, u'value': 25.0} |
更多细节请参考官方文档。