最近看了July的一些关于Java处理海量数据的问题研究,深有感触,链接:http://blog.csdn.net/v_july_v/article/details/6685962
感谢July ^_^
他用的是Java的Hash Map等方法做了处理,讲解的非常深刻入骨
我也一时兴起,想拿Python试试刀,看看Python对于海量数据的处理能力如何。无奈在百度和Google输入“Python 海量数据”都无果。可能是国内使用python的不多,用python处理海量数据的就更少了。不过这浇灭不了我的欲望,哈哈
打算拿July的其中一个问题来试验一下:
海量日志数据,提取出某日访问百度次数最多的那个IP。
July给出的解决方案:
方案1:首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
下手吧!
(一)生成数据
我首先构造1亿个IP,这些IP前两段都是“10.197”,后两段为0-255的随机数
把这1亿个IP存入一个文本文件中
Python代码如下:
__author__ = "Wally Yu (dayu.ebay@gmail.com)" __date__ = "$Date: 2012/04/09 $" def generateRandom(rangeFrom, rangeTo): import random return random.randint(rangeFrom,rangeTo) def generageMassiveIPAddr(fileLocation,numberOfLines): IP = [] file_handler = open(fileLocation, 'a+') for i in range(numberOfLines): IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n') file_handler.writelines(IP) file_handler.close() if __name__ == '__main__': from time import ctime print ctime() for i in range(10): print ' ' + str(i) + ": " + ctime() generageMassiveIPAddr('d:\\massiveIP.txt', 10000000) print ctime()
这里插一下,我的软件硬件环境:
硬件:
- ThinkPad T420(CPU: i7, 内存16G)
软件:
-OS: WinXP32位 (只认出3.6G内存)
- Python:2.7
从Python的print日志中基本可以看出,生成一千万条IP地址大概需要1分钟,生成1亿条记录需要10分钟
占据硬盘空间:1.4G
日志:
Mon Apr 09 16:52:28 2012 0: Mon Apr 09 16:52:28 2012 1: Mon Apr 09 16:53:28 2012 2: Mon Apr 09 16:54:29 2012 3: Mon Apr 09 16:55:30 2012 4: Mon Apr 09 16:56:32 2012 5: Mon Apr 09 16:57:33 2012 6: Mon Apr 09 16:58:36 2012 7: Mon Apr 09 16:59:36 2012 8: Mon Apr 09 17:00:36 2012 9: Mon Apr 09 17:01:35 2012 Mon Apr 09 17:02:36 2012
(二)处理思路
假设现在可用内存仅为128M,而每行IP经计算需要14Byte
因为数据太大,把1亿条数据载入内存,再做排序会导致内存溢出。July的思想:“以大化小,分而治之”非常合适,我转化后的操作思路:
1. 每行IP需要14B空间,那么128M内存最多可以处理 128M / 14B = 9142857个IP
把每36571429个IP拆成一个小文件保存起来,每个小文件的大小小于等于128M,共将生成11个文件
2. 对每个小文件用Hash Table处理,Python有自己非常高效的Hash Table:Dictionary!
具体处理如下:
1). 构建名为“Result”的Dictionary,“key”为IP地址,“value”为该IP地址出现的次数,用来记录11个文件每一个的最多出现的IP
2). 构建名为“IP”的Dictionary,“key”为IP地址,“value”为该IP地址出现的次数,用来记录每一个小文件的所有IP地址
3). 读入每一条IP地址到“IP” Dictionary,如果该IP地址出现过,把相应的value的值加1;如果该IP地址没有出现过,则key=IP地址,value=1
4). 对“IP” Dictionary进行内排序,返回最大的IP地址(如果有若干个最大值是一样的,就都返回)
5). 将返回值存入“Result” Dictionary
6). 对“Result”进行内排序,返回最大的IP地址(如果有若干个最大值是一样的,就都返回)
(三)实现
1)拆分成小文件
代码如下:
__author__ = "Wally Yu (dayu.ebay@gmail.com)" __date__ = "$Date: 2012/04/10 $" from time import ctime def splitFile(fileLocation, targetFoler): file_handler = open(fileLocation, 'r') block_size = 9142857 line = file_handler.readline() temp = [] countFile = 1 while line: for i in range(block_size): if i == (block_size-1): # write block to small files file_writer = open(targetFoler + "\\file_"+str(countFile)+".txt", 'a+') file_writer.writelines(temp) file_writer.close() temp = [] print " file " + str(countFile) + " generated at: " + str(ctime()) countFile = countFile + 1 else: temp.append(file_handler.readline()) file_handler.close() if __name__ == '__main__': print "Start At: " + str(ctime()) splitFile("d:\\massiveIP.txt", "d:\\massiveData")
运行后的log如下:
Start At: Tue Apr 10 10:56:25 2012 file 1 generated at: Tue Apr 10 10:56:37 2012 file 2 generated at: Tue Apr 10 10:56:47 2012 file 3 generated at: Tue Apr 10 10:57:00 2012 file 4 generated at: Tue Apr 10 10:57:14 2012 file 5 generated at: Tue Apr 10 10:57:26 2012 file 6 generated at: Tue Apr 10 10:57:42 2012 file 7 generated at: Tue Apr 10 10:57:53 2012 file 8 generated at: Tue Apr 10 10:58:04 2012 file 9 generated at: Tue Apr 10 10:58:16 2012 file 10 generated at: Tue Apr 10 10:58:27 2012 file 11 generated at: Tue Apr 10 10:58:38 2012
可见拆分一个文件需要费时10-15秒,拆分文件总共耗时2分14秒
2). 找出出现次数最大的IP:
代码如下:
__author__ = "Wally Yu (dayu.ebay@gmail.com)" __date__ = "$Date: 2012/04/10 $" import os from time import ctime def findIP(targetFolder): Result = {} IP = {} for root, dirs, files in os.walk(targetFolder): for f in files: # read small files file_handler = open(os.path.join(targetFolder, f), 'r') lines = file_handler.readlines() file_handler.close() # get IP in file, store to IP Dictionary for line in lines: if line in IP: IP[line] = IP[line] + 1 else: IP[line] = 1 # sort Dictionary IP = sorted(IP.items(), key=lambda d: d[1]) # get max item(s), store to Result Dictionary maxItem = IP[-1][1] print ' File ' + str(f) + ":" print " maxItem: " + str(IP[-1]) tempTuple = IP.pop() while tempTuple[1] == maxItem: if tempTuple[0] in Result: Result[tempTuple[0]] = Result[tempTuple[0]] + 1 else: Result[tempTuple[0]] = tempTuple[1] tempTuple = IP.pop() IP = {} print ' Finished: ' + ctime() print sorted(Result.items(), key=lambda d: d[1]) if __name__ == '__main__': print 'Start: ' + ctime() findIP("d:\\massiveData") print 'End: ' + ctime()
运行后的log如下:
Start: Thu Apr 12 10:20:01 2012 File file_1.txt: maxItem: ('10.197.223.85\n', 190) Finished: Thu Apr 12 10:20:23 2012 File file_10.txt: maxItem: ('10.197.44.215\n', 194) Finished: Thu Apr 12 10:20:37 2012 File file_11.txt: maxItem: ('10.197.251.171\n', 181) Finished: Thu Apr 12 10:20:48 2012 File file_2.txt: maxItem: ('10.197.181.190\n', 191) Finished: Thu Apr 12 10:21:00 2012 File file_3.txt: maxItem: ('10.197.135.27\n', 193) Finished: Thu Apr 12 10:21:14 2012 File file_4.txt: maxItem: ('10.197.208.113\n', 192) Finished: Thu Apr 12 10:21:24 2012 File file_5.txt: maxItem: ('10.197.120.160\n', 190) Finished: Thu Apr 12 10:21:34 2012 File file_6.txt: maxItem: ('10.197.69.155\n', 193) Finished: Thu Apr 12 10:21:44 2012 File file_7.txt: maxItem: ('10.197.88.144\n', 192) Finished: Thu Apr 12 10:21:55 2012 File file_8.txt: maxItem: ('10.197.103.234\n', 193) Finished: Thu Apr 12 10:22:08 2012 File file_9.txt: maxItem: ('10.197.117.46\n', 192) Finished: Thu Apr 12 10:22:20 2012 [('10.197.251.171\n', 181), ('10.197.120.160\n', 190), ('10.197.223.85\n', 190), ('10.197.181.190\n', 191), ('10.197.117.46\n', 192), ('10.197.208.113\n', 192), ('10.197.88.144\n', 192), ('10.197.147.29\n', 193), ('10.197.68.183\n', 193), ('10.197.69.155\n', 193), ('10.197.103.234\n', 193), ('10.197.135.27\n', 193), ('10.197.44.215\n', 194)] End: Thu Apr 12 10:22:21 2012
由此可见,最大的IP地址为:“10.197.44.215”,共出现194次!
而Python的计算时间为2分20秒,非常快
(四)引申测试
以上是在假设内存仅为128M下的计算时间,为了测试Python真正的执行效率,打算再写一算法,将所有1.4G的数据一次性导入内存,并作内排序,看看它的执行效率
代码如下:
__author__ = "Wally Yu (dayu.ebay@gmail.com)" __date__ = "$Date: 2012/04/10 $" import os from time import ctime def findIPAtOnce(targetFile): print "Started At: " + ctime() Result = {} file_handler = open(targetFile, 'r') lines = file_handler.readlines() file_handler.close() print "File Read Finished At: " + ctime() for line in lines: if line in Result: Result[line] = Result[line] + 1 else: Result[line] = 1 print "Write to Dic Finished At: " + ctime() Result = sorted(Result.items(), key=lambda d: d[1]) print "Sorting Finished At: " + ctime() print 'Result:' for i in range(10): print ' ' + str(Result.pop()) if __name__ == '__main__': findIPAtOnce("d:\\massiveIP.txt")
最后得到了Memory Error:
Traceback (most recent call last): File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 30, in <module> findIPAtOnce("d:\\massiveIP.txt") File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 11, in findIPAtOnce lines = file_handler.readlines() MemoryError
哈哈哈!
为了测试Python的处理速度,重新生成一个小一点的Txt文件,重新运行generageMassiveIPAddr函数,生成一千万个IP地址
if __name__ == '__main__': from time import ctime print ctime() for i in range(1): print ' ' + str(i) + ": " + ctime() generageMassiveIPAddr('d:\\massiveIP_small.txt', 10000000) print ctime()
生成后的Txt占据144M的空间
再次运行
if __name__ == '__main__': findIPAtOnce("d:\\massiveIP_small.txt")
得到Log如下:
Started At: Thu Apr 12 11:03:35 2012 File Read Finished At: Thu Apr 12 11:03:41 2012 Write to Dic Finished At: Thu Apr 12 11:03:44 2012 Sorting Finished At: Thu Apr 12 11:03:45 2012 Result: ('10.197.222.105\n', 215) ('10.197.68.118\n', 210) ('10.197.229.152\n', 206) ('10.197.22.46\n', 202) ('10.197.98.83\n', 201) ('10.197.53.43\n', 201) ('10.197.169.65\n', 200) ('10.197.225.22\n', 200) ('10.197.146.78\n', 200) ('10.197.57.101\n', 200)
可见时间消耗如下:
文件数据读取:6秒
写入Dictionary:3秒
排序:1秒
总共耗时不过10秒,而且大多时间都是I/O的开销!!!
(五)小节
由以上种种可见Python对于海量数据处理的高效率,也为Python的同行处理海量数据提供了一些思路
有兴趣的朋友可以拿其他语言做同样的测试,共同进步
(六)修改
注:
1. 以上完成于2012年4月10日,本节及以下完成于2012年4月18日
2. 六、七节的增加是由于lidguan兄发现的一个大漏洞而做的修改,非常感谢!具体评论见下:
-
4楼 lidguan昨天 16:23发表[回复]
[引用][举报][删除] - 我有个问题,请不吝赐教:
将一个大文件分割成许多的小文件,但楼主分割方式好像是根据文件大小来分割,然后分别排序,得出一系列的最大值,最后在最大值中比较,得出一个最后结果....但每个ip可能在不同文件中都有记录,也许这个倒霉的ip是第一个文件的第二名,在第二个文件也是第二名,你用最大值进行比较,就会把这个倒霉的ip忽略掉,但其实这个ip才是真正的最大值..
我不懂python,当伪代码看的....如有不对的地方,请多多原谅
我确实也是考虑不周,才导致了以上算法的巨大漏洞,今天做如下修改:
思路:
1. 不对大文件进行拆分,否则会产生lidguan兄提到的问题
2. 假设这个一亿个IP地址的重复率比较高,重复后的IP可以一次性记录入Python Dictionary (Java Hash Map),那么就直接从大文件中一条一条读取IP地址,记录入Dictionary
3. 对Dictionary进行排序并输出
代码:
__author__ = "Wally Yu (dayu.ebay@gmail.com)" __date__ = "$Date: 2012/04/18 $" import os from time import ctime def findIPAtOnce(targetFile): print "Started At: " + ctime() Result = {} file_handler = open(targetFile, 'r') for line in file_handler: if line in Result: Result[line] = Result[line] + 1 else: Result[line] = 1 print "Write to Dic Finished At: " + ctime() file_handler.close() Result = sorted(Result.items(), key=lambda d: d[1]) print "Sorting Finished At: " + ctime() print 'Result:' for i in range(10): print ' ' + str(Result.pop()) if __name__ == '__main__': findIPAtOnce("d:\\massiveIP.txt")
Log:
>>> Started At: Wed Apr 18 13:20:34 2012 Write to Dic Finished At: Wed Apr 18 13:21:34 2012 Sorting Finished At: Wed Apr 18 13:21:34 2012 Result: ('10.197.200.159\n', 1713) ('10.197.143.163\n', 1707) ('10.197.68.193\n', 1693) ('10.197.136.119\n', 1692) ('10.197.71.24\n', 1692) ('10.197.132.242\n', 1690) ('10.197.4.219\n', 1688) ('10.197.113.84\n', 1684) ('10.197.204.142\n', 1681) ('10.197.78.110\n', 1675)
由此可见,出现最多的IP为“10.197.200.159”,出现次数1713次!
执行时间:
- 读取文件并写入Dictionary:60秒
- 内排序:小于1秒!!!
经过这次的修改,运算结果相信是可靠的
(七)总结
1. 修改后的代码是在假设IP地址的重复性比较高,可以一次性导入内存中操作的前提下展开的;如果重复性低,或者更大量的数据,导致无法一次导入内存的话,就需要使用外排序来找出IP地址了
2. 希望大家多多探讨,也幸亏lidguan兄的指出,否则险些酿成大错
3. 最近在讨论的纯粹的QA是否有必要存在,我也来多嘴几句,我相信这些代码如果是经过了QA的测试后多多少少会降低风险,尤其是此类明显的逻辑性的错误是肯定可以避免的,所以QA人员的重要性不言而喻。说要消灭QA,鄙人觉得是软件工程思维的一种倒退
版权所有。转载本BLOG内任何文章,请以超链接形式注明出处。