现在的位置: 首页 > 综合 > 正文

socket.io服务端分析

2014年07月04日 ⁄ 综合 ⁄ 共 10568字 ⁄ 字号 评论关闭

前面的一篇文章分析了socket.io的客户端的实现,那么这一篇文章来看看它的服务端是怎么搞的吧。。。

首先我们来看一段用于建立socket.io的服务器的代码吧:

var sio = require('socket.io');  
  
var  wsocket = sio.listen(80);  
  
wsocket.sockets.on('connection', function (socket) {  //当有新的连接建立的时候  
    socket.on("message", function(data){
    	console.log("get : " + data);
    });
    socket.on("disconnect", function(){
    	//console.log("client has closed");
    });
});  

这里可以看到,首先其实是调用listen函数来创建,那么我们来看看这个listen函数究竟是干了什么事情吧:

//其实这里是建立一个http的server,然后让其监听端口,这里的server可以是一个httpserver,也可以是一个端口号
//因为其实websocket本身也是建立在http协议基础上的
exports.listen = function (server, options, fn) {
  if ('function' == typeof server) {
    console.warn('Socket.IO\'s `listen()` method expects an `http.Server` instance\n'
    + 'as its first parameter. Are you migrating from Express 2.x to 3.x?\n'
    + 'If so, check out the "Socket.IO compatibility" section at:\n'
    + 'https://github.com/visionmedia/express/wiki/Migrating-from-2.x-to-3.x');
  }

  if ('function' == typeof options) {
    fn = options;
    options = {};
  }

  if ('undefined' == typeof server) {
    server = 80;  //默认端口是80
  }

  if ('number' == typeof server) {
    var port = server;
//如果传进来的参数是一个数字,那么就在这个端口上创建一个httpserver
    if (options && options.key)
      server = require('https').createServer(options);
    else
      server = require('http').createServer();

    //默认的request处理
    server.on('request', function (req, res) {
      res.writeHead(200);
      res.end('Welcome to socket.io.');
    });

    server.listen(port, fn);  //监听端口
  }

  //创建并返回一个manager,其实这个才是最重要的
  return new exports.Manager(server, options);
};

其实这里就可以看到,首先是建立一个http的服务器,这里可能刚开始会觉得比较的奇怪,这是因为websocket的协议本身就用到了http协议的内容,有兴趣的可以去看看websocket额协议的内容就知道了,那么最后再创建一个manager,然后将这个manager,这里也就是说我们其实最终用到的是这个manager来管理socket.io的服务器。。。

那么接下来来看看这个manager是怎么定义的吧:

function Manager (server, options) {
  this.server = server;
  this.namespaces = {};
  this.sockets = this.of('');  //创建一个namespace,而且这个namespace会保存到namespaces中,sockets是默认的namespace
  this.settings = {   //一些默认的参数
      origins: '*:*'
    , log: true
    , store: new MemoryStore
    , logger: new Logger
    , static: new Static(this)
    , heartbeats: true
    , resource: '/socket.io'
    , transports: defaultTransports
    , authorization: false
    , blacklist: ['disconnect']
    , 'log level': 3
    , 'log colors': tty.isatty(process.stdout.fd)
    , 'close timeout': 60
    , 'heartbeat interval': 25
    , 'heartbeat timeout': 60
    , 'polling duration': 20
    , 'flash policy server': true
    , 'flash policy port': 10843
    , 'destroy upgrade': true
    , 'destroy buffer size': 10E7
    , 'browser client': true
    , 'browser client cache': true
    , 'browser client minification': false
    , 'browser client etag': false
    , 'browser client expires': 315360000
    , 'browser client gzip': false
    , 'browser client handler': false
    , 'client store expiration': 15
    , 'match origin protocol': false
  };

  for (var i in options) {
    if (options.hasOwnProperty(i)) {
      this.settings[i] = options[i];
    }
  }

  var self = this;
//错误的梳理handler
  server.on('error', function(err) {
    self.log.warn('error raised: ' + err);
  });

  this.initStore();

  this.on('set:store', function() {
    self.initStore();
  });

//记录最开始的request事件的listener
  this.oldListeners = server.listeners('request').splice(0);  //获取原来的request的事件的listener函数
  server.removeAllListeners('request');  //将最早的listener去掉

  server.on('request', function (req, res) {
    //这里是用于处理客户端的request数据,刚开始的握手就是走的这里
    self.handleRequest(req, res);
  });
//这个用于建立ungrade连接,说白了就是实际的websocket的连接
//这个是websocket协议部分的内容了,如果需要建立websocket的连接,那么其http的请求将会如下:
/*
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: 127.0.0.1
Origin: null
Pragma: no-cache
Cache-Control: no-cache
Sec-WebSocket-Key: gKlfUoFJucbrV2+2KT6N8w==
Sec-WebSocket-Version: 13
Sec-WebSocket-Extensions: x-webkit-deflate-frame
User-Agent: Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/30.0.1599.69 Safari/537.36
*/
  server.on('upgrade', function (req, socket, head) {
    self.handleUpgrade(req, socket, head);
  });

  server.on('close', function () {
    clearInterval(self.gc);
  });
//自己定义了一个垃圾处理
  server.once('listening', function () {
    self.gc = setInterval(self.garbageCollection.bind(self), 10000);
  });

  for (var i in transports) {   //初始化所有的transport,这里有一些websocket,flashsocket等等,transport可以看成是顶层类
    if (transports.hasOwnProperty(i)) {
      if (transports[i].init) {  //如果有init,那么初始化
        transports[i].init(this);
      }
    }
  }

  var self = this;
  this.sockets.on('connection', function (conn) {  //namespace上面的监听,当有新的连接进来之后会激活
    self.emit('connection', conn);  //这里在manager上监听connection也可以
  });

  this.sequenceNumber = Date.now() | 0;
 
  this.log.info('socket.io started');
};

Manager.prototype.__proto__ = EventEmitter.prototype

这里在服务端其实也有namespace,而且会有一个默认的namespace为"",sockets将会指向它,然后就是设置一些参数的值,接着就是最为重要的内容了,为前面建立的httpserver设置一些事件的处理函数,这里主要是:

(1)request,对应的是普通的http请求事件,这里之所以会有普通的http请求处理,是因为socket.io在建立连接的时候需要先用普通的http请求进行握手,服务端将会向客户端返回sessionid。

(2)upgrade,这是比较特殊的一种http连接方式,对应这种请求的报文在前面已经给出了一个例子,这里就可以简单的将其理解为专门用于建立websocket的连接的。。。


这里就先略微的说说握手的内容吧,它主要是客户端向服务端建立连接之前,首先要向服务端发送一些http请求,用于握手,然后服务端将会为其分配一个id,接着客户端再建立于服务端的websocket的连接,而且会将这个id带过来,说白了这个id就是用于区分每一个连接的。。。


好了,那么来看看是如何建立websocket连接的吧:

//用于处理Upgrade类型的http请求,也就是websocket连接
Manager.prototype.handleUpgrade = function (req, socket, head) {
  console.log("handler handleUpgrade");
  var data = this.checkRequest(req)    //获取请求的数据
    , self = this;
  
  
  if (!data) {
    if (this.enabled('destroy upgrade')) {
      socket.end();
      this.log.debug('destroying non-socket.io upgrade');
    }

    return;
  }

  req.head = head;
  this.handleClient(data, req);  //这里可以理解为建立一个client,用于与客户端的websocket进行通信
  req.head = null;
};

这个就是用于处理upgrade连接的handler,首先就是获取建立连接的时候带过来的参数,说白了就是url后面的一些参数,然后再调用handleClient方法来建立一个与客户端的连接,这里有一个比较重要的参数,那就是socket,这个就是实际上底层用的socket,毕竟底层还是TCP/IP的嘛。。。

那么接下来来看看这个建立连接的方法吧(说白了就是对已经有建立好的socket连接进行了一层的封装):

//用于创建一个新的客户端连接
Manager.prototype.handleClient = function (data, req) {
  var socket = req.socket  //获取实际的socket
    , store = this.store
    , self = this;

  // handle sync disconnect xhrs
  if (undefined != data.query.disconnect) {
    if (this.transports[data.id] && this.transports[data.id].open) {
      this.transports[data.id].onForcedDisconnect();
    } else {
      this.store.publish('disconnect-force:' + data.id);
    }
    req.res.writeHead(200);
    req.res.end();
    return;
  }
//不支持的transport类型
  if (!~this.get('transports').indexOf(data.transport)) {
    this.log.warn('unknown transport: "' + data.transport + '"');
    req.connection.end();
    return;
  }
//新建一个transport用于维护与客户端的连接,这里一般情况下都是websocket
  var transport = new transports[data.transport](this, data, req)
    , handshaken = this.handshaken[data.id];   //获取以前握手的时候的数据

  if (transport.disconnected) {
    req.connection.end();
    return;
  }
  if (handshaken) {   //表示已经握手过了
    if (transport.open) {
      if (this.closed[data.id] && this.closed[data.id].length) {
        transport.payload(this.closed[data.id]);
        this.closed[data.id] = [];
      }

      this.onOpen(data.id);
      this.store.publish('open', data.id);
      this.transports[data.id] = transport;  //保存当前这个transport
    }

    if (!this.connected[data.id]) {
      this.onConnect(data.id);  //用于记录当前的这个id已经连接了
      this.store.publish('connect', data.id);  //这个与上面做的是一样的事情,不知道要干什么。。汗

      // flag as used
      delete handshaken.issued;
      this.onHandshake(data.id, handshaken);
      this.store.publish('handshake', data.id, handshaken);

      // initialize the socket for all namespaces
      for (var i in this.namespaces) {
        if (this.namespaces.hasOwnProperty(i)) {
          var socket = this.namespaces[i].socket(data.id, true);  //这里可以理解为在namespace中创建与这个transport关联的socket

          // echo back connect packet and fire connection event
          //这里如果namespace如果是'',那么表示是第一次连接,那么需要处理一下,例如激活connection事件
          if (i === '') {
            this.namespaces[i].handlePacket(data.id, { type: 'connect' });
          }
        }
      }

      this.store.subscribe('message:' + data.id, function (packet) {
        self.onClientMessage(data.id, packet);
      });

      this.store.subscribe('disconnect:' + data.id, function (reason) {
        self.onClientDisconnect(data.id, reason);
      });
    }
  } else {
    if (transport.open) {
      transport.error('client not handshaken', 'reconnect');
    }

    transport.discard();
  }
};

上面的代码主要要做一下的几件事情:

(1)根据已经有的tcp的socket来对其进行封装,建立相应的transport,一般情况下都是建立websocket,这里还要将其保存到当前manager当中,与前面提到过的id对应起来,

(2)为namespace创建当前这个连接的socket(这个socket是当前socket.io的更上层的封装),激活相应的事件,通知用户已经有新的连接建立,并且将这个socket传给用户,那么用户就可以通过这个socket来进行通信了。。

那么我们接下来来看看这个transport的创建过程吧,一般情况下都是建立websocket,而且用的是hybi-16.js这个版本的。。。

来看看构造函数:

//mng就是外层的manager,data是建立连接的时候带过的参数,req就是httprequest
function WebSocket (mng, data, req) {
  // parser
  var self = this;

  this.manager = mng;
  this.parser = new Parser();
  this.parser.on('data', function (packet) {
    self.onMessage(parser.decodePacket(packet));  //这里需要将传过来的数据先转化为socket.io的数据格式
  });
  //对于ping数据包的处理
  this.parser.on('ping', function () {
    // version 8 ping => pong
    try {
      self.socket.write('\u008a\u0000');
    }
    catch (e) {
      self.end();
      return;
    }
  });
  this.parser.on('close', function () {
    self.end();
  });
  this.parser.on('error', function (reason) {
    self.log.warn(self.name + ' parser error: ' + reason);
    self.end();
  });
//调用transport的构造函数,因为这里的websocket继承自transport
//并且会调用handleRequest函数,设置心跳,然后会调用onSocketConnect函数用于向客户端发送连接头信息
  Transport.call(this, mng, data, req);
};

这里有一个parser,它是用来解码和编码websocket的frame,这个就是websocket的协议的事情了,,,就不管他,因为当前websocket的其实继承自transport,所以又调用了一遍transport的构造函数,来看看:

function Transport (mng, data, req) {
  this.manager = mng;
  this.id = data.id;
  this.disconnected = false;
  this.drained = true;
  this.handleRequest(req);  //在新建transport的时候,就需要调用这个方法,表示链接已经建立,用于向客户端发送刚开始的建立头数据
};

这里说白了就是记录了两个比较重要的参数,首先是外面的manager,然后是前面已经提到过的id。。。。

接下来用handleRequest方法进行进一步的处理:

//刚刚开始建立连接的时候会调用,保存当前的socket,然后还要调用onSocketConnect向客户端返回建立websocket连接的头
Transport.prototype.handleRequest = function (req) {
  this.log.debug('setting request', req.method, req.url);
  this.req = req;

  if (req.method == 'GET') {
    this.socket = req.socket;   //保存当前的真正的底层socket
    this.open = true;
    this.drained = true;
    this.setHeartbeatInterval();  //设置心跳

    this.setHandlers();  //设置处理函数,主要是socket的一些close,err等
    this.onSocketConnect();  //调用这个函数,将会向客户端返回用于建立websocket连接的头,具体的transport有不同的实现
  }
};

这里主要是保存了真正底层用到的socket,然后为这个socket设置了一些基本的handler,接着在继续调用onSocketConnect来继续处理:

//这个在transport构造函数中调用,也就是刚刚建立连接的时候,用于向客户端发送连接头信息
WebSocket.prototype.onSocketConnect = function () {
  var self = this;
  if (typeof this.req.headers.upgrade === 'undefined' || 
      this.req.headers.upgrade.toLowerCase() !== 'websocket') {
    this.log.warn(this.name + ' connection invalid');
    this.end();
    return;
  }

  var origin = this.req.headers['origin'] || ''
    , location = ((this.manager.settings['match origin protocol'] ?
                      origin.match(/^https/) : this.socket.encrypted) ?
                        'wss' : 'ws')
               + '://' + this.req.headers.host + this.req.url;
  
  if (!this.verifyOrigin(origin)) {
    this.log.warn(this.name + ' connection invalid: origin mismatch');
    this.end();
    return;    
  }
  
  if (!this.req.headers['sec-websocket-key']) {
    this.log.warn(this.name + ' connection invalid: received no key');
    this.end();
    return;
  }
    
  // calc key
  var key = this.req.headers['sec-websocket-key'];  
  var shasum = crypto.createHash('sha1');  
  shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");  
  key = shasum.digest('base64');
//返回给客户端的报文信息,用于握手
  var headers = [
      'HTTP/1.1 101 Switching Protocols'
    , 'Upgrade: websocket'
    , 'Connection: Upgrade'
    , 'Sec-WebSocket-Accept: ' + key
  ];

  try {
    //向客户端返回建立websocket的头部信息
    this.socket.write(headers.concat('', '').join('\r\n'));
    this.socket.setTimeout(0);
    this.socket.setNoDelay(true);
  } catch (e) {
    this.end();
    return;
  }
//当从socket收到数据的时候需要用parser先进行梳理
  this.socket.on('data', function (data) {
    self.parser.add(data);
  });
};

这部分的代码主要是就是在向客户端返回用于握手的数据,当客户端接收到这个数据之后就表明整个websocket的连接已经建立了,那么再设置底层真正的socket的data事件,用于处理从客户端接收到的数据。。。。


那么到这里为止整个服务端的原理就基本说的比较清楚了,比较简单的总结就是,对socket进行了一层封装,当读取到数据之后,先用praser将数据从websocket的frame中提取出来,其实在外面还有一个parser,因为socket.io自己也有一层格式的定义,这里就不细讲了。。。。

至于那些socket的定义,其实都很简单,有兴趣的自己看看吧。。。(最重要的还是websocket协议的一些定义,以及socket.io自己的协议定义)

抱歉!评论已关闭.