现在的位置: 首页 > 综合 > 正文

Python编写WEB服务器压力测试工具

2014年01月05日 ⁄ 综合 ⁄ 共 1929字 ⁄ 字号 评论关闭

最近在编写一个简单的WEB服务器,一个日常的工作就是测试服务器 的性能,试用了MS的Web Application Stress,发现它居然不支持除80以外端口的测试,其他的如Loadrunner 太贵而且太大,试用版只支持10个并发用户,我Google到了100个并发用户的许可想试用一下,不过没有安装成功。想想这种压力测试实际上没啥技术含 量,就自己用Python来编写了小段测试代码。

使用Python的原因
毫无疑问,编写这样的代码使用Python最合适,使用C/C++编写有点小题大做,使用C#编写编译又很麻烦,我是使用Editplus来写代码的,因 为要考虑做不同的测试,只能边写边调整,使用Python,下载一个Python的加亮文件,设置User Tool 1 到 Python,运行的时候只需要按Ctrl+1,着实方便的很。

压力测试的实现
压力测试是通过模拟对WEB服务器的访问,进行记录响应时间,计算每秒处理数,记录上传下载的字节数。一般来说,一台测试机器上视机器的性能,发起 50~200的连接,基本就差不多了。考虑到测试机器的负载,一般使用多线程来完成多个WEB请求,幸运的是,Python对所有的这些支持的相当完善。 以下是测试的代码

# code by 李嘉
# 禁止任何商业目的的转载
# 不对因使用代码产生任何后果负任何责任
# 转载请保留所有声明
import threading, time, httplib, random
# 需要测试的 url 列表,每一次的访问,我们随机取一个
urls = [
"/test?page=",
"/test2?orderby=a&page=",
"/test2?orderby=d&page=",
]
MAX_PAGE = 10000
SERVER_NAME = "192.168.0.64:80"
TEST_COUNT = 10000
# 创建一个 threading.Thread 的派生类
class RequestThread(threading.Thread):
# 构造函数
def __init__(self, thread_name):
   threading.Thread.__init__(self)
   self.test_count = 0

# 线程运行的入口函数
def run(self):
   # 不直接把代码写在run里面是因为也许我们还要做其他形式的测试
   i = 0
   while i < TEST_COUNT:
    self.test_performace()
    i += 1
   #self.test_other_things()

def test_performace(self):
   conn = httplib.HTTPConnection(SERVER_NAME)
   # 模拟 Keep-Alive 的访问, HTTP 1.1
   for i in range(0, random.randint(0, 100)):
    # 构造一个 url,提供随机参数的能力
    url = urls[random.randint(0, len(urls) - 1)];
    url += str(random.randint(0, MAX_PAGE))
    # 这就连接到服务器上去
    #print url
    try:
     conn.request("GET", url)
     rsps = conn.getresponse()
     if rsps.status == 200:
      # 读取返回的数据
      data = rsps.read()
     self.test_count += 1
    except:
     continue
   
   conn.close()
  
# main 代码开始

# 开始的时间
start_time = time.time()
threads = []
# 并发的线程数
thread_count = 100

i = 0
while i < thread_count:
t = RequestThread("thread" + str(i))
threads.append(t)
t.start()
i += 1
# 接受统计的命令
word = ""
while True:
word = raw_input("cmd:")
if word == "s":
   time_span = time.time() - start_time
   all_count = 0
   for t in threads:
    all_count += t.test_count
   print "%s Request/Second" % str(all_count / time_span)
elif word == "e":
   # 准备退出 其实 X 掉 窗口更加容易,没什么浪费的资源
   TEST_COUNT = 0
   for t in threads:
    t.join(0)
   break

抱歉!评论已关闭.