现在的位置: 首页 > 综合 > 正文

socket.io客户端分析

2014年07月04日 ⁄ 综合 ⁄ 共 20011字 ⁄ 字号 评论关闭

最近好像看代码也是零零散散的,因为遇到了比较郁闷的事情,让自己损失挺大的,整个人最近都没啥状态。。

组里面最近要做一点东西,推荐学弟用socket.io来实现服务器向web端推送数据,然后学弟遇到了点问题,其实自己对socket.io的了解也不多,是在读pomelo的源代码的时候才了解它的,知道它是对websocket的更高层的封装,会简单的用一下。。但是个人觉得它用起来还是非常的方便的,于是觉得了解一下它的实现原理。。。

那么先从它的客户端的源代码开始分析,一般情况下我们会用如下的代码来建立一个连接,

//进行websocket的链接
    socket = io.connect(url, {'force new connection': true, reconnect: false});

也就是我们用的是io对象下面的函数,那么我们先来看看io对象的定义吧:

(function (exports, global) {

//用于暴露的作用域,其实是全局,windows.io
  var io = exports;

  io.version = '0.9.6';

  io.protocol = 1;

  io.transports = [];  //所有的transport类型,有什么websocket,flashsocket啥的

  io.j = [];


  io.sockets = {};

//这个函数的作用的是建立与远程服务器的连接,并返回socket,不过这里的socket用namespace又代理了一下,
//这里的namespace其实是与http里面的path相似的概念
  io.connect = function (host, details) {
    var uri = io.util.parseUri(host)   //获取远程服务器的地址
      , uuri
      , socket;

    if (global && global.location) {
      uri.protocol = uri.protocol || global.location.protocol.slice(0, -1);
      uri.host = uri.host || (global.document
        ? global.document.domain : global.location.hostname);
      uri.port = uri.port || global.location.port;
    }

    uuri = io.util.uniqueUri(uri);  //"ws://121.199.40.246:3014"

    var options = {
        host: uri.host
      , secure: 'https' == uri.protocol
      , port: uri.port || ('https' == uri.protocol ? 443 : 80)
      , query: uri.query || ''
    };

    io.util.merge(options, details);
//看看当前这个远程地址是否已经有socket的连接,如果有的话就直接封装一个namespace就可以了
    if (options['force new connection'] || !io.sockets[uuri]) {
      socket = new io.Socket(options);
    }
//这里用于将刚刚建立的连接保存起来,以后可以复用
    if (!options['force new connection'] && socket) {
      io.sockets[uuri] = socket;
    }

    socket = socket || io.sockets[uuri];

    // if path is different from '' or /
    //其实真正返回的是一个namespace,里面封装了发送等方法
    //这里的namespace其实是与建立连接的url的path对应的
    return socket.of(uri.path.length > 1 ? uri.path : '');
  };

})('object' === typeof module ? module.exports : (this.io = {}), this);  //这里可以看出,如果是在浏览器里,那么是在全局定义了io对象

到这里其就能够看到,代码在全局定义了一个io对象,然后它就有一个重要的方法,用于建立一个与远程服务器的连接。。。这里建立的连接其实是通过建立一个Socket对象,这里面可能还有很多其余的东西,在注释里面也已经说的比较清楚,无非就是一个远程连接可能会被多个path复用,也就是socket.io所谓的namespace了,到此,在继续看代码之前,先给出一张图形来综合的介绍一下整个客户端的设计吧:


上图就是整个socket.io的设计,它用自己定义的socket对最终的websocket或者其余的什么flashsocket进行了一层封装,类似于进行了一层的代理,这种设计在很多地方都能够看到,例如pomelo的connector组件以及netty都是这样子,这里还能看到一个东西就是transport,它其实是设计的一个顶层对象,用相当于抽象出了一些进行通用的方法,要知道其实javascript也是可以实现面向对象编程的。。。。。

那么接下来我们来看看transport的定义吧,这里其实可已经transport看做是一个接口,然后websocket是其的一种具体实现。。

(function (exports, io) {

  //在io下面申明transport
  exports.Transport = Transport;


//这里的socket是外层封装的socket,不是websocket,这里的sessid是与服务器握手之后获取的id
  function Transport (socket, sessid) {
    this.socket = socket;
    this.sessid = sessid;
  };

  io.util.mixin(Transport, io.EventEmitter);


//相当于是接收到了数据,data是接收到的最原始数据
  Transport.prototype.onData = function (data) {
    this.clearCloseTimeout();
    
    if (this.socket.connected || this.socket.connecting || this.socket.reconnecting) {
      this.setCloseTimeout();  //设置超时
    }

    if (data !== '') {
      // todo: we should only do decodePayload for xhr transports
      //用parser对接收到的数据进行解码,将其转换成规定的格式
      var msgs = io.parser.decodePayload(data);

      if (msgs && msgs.length) {
        for (var i = 0, l = msgs.length; i < l; i++) {
          //处理已经解码过后的数据
          this.onPacket(msgs[i]);
        }
      }
    }

    return this;
  };

//处理已经解码的数据,判断接收到的数据的类型,进行相应的处理
  Transport.prototype.onPacket = function (packet) {
    this.socket.setHeartbeatTimeout();
    //心跳类型的数据
    if (packet.type == 'heartbeat') {
      return this.onHeartbeat();
    }
    //连接建立
    if (packet.type == 'connect' && packet.endpoint == '') {
      this.onConnect();
    }
    //错误
    if (packet.type == 'error' && packet.advice == 'reconnect') {
      this.open = false;
    }
    //处理数据
    this.socket.onPacket(packet);

    return this;
  };

  //设置超时,如果规定的时间没有接收到数据,那么表示已经断开了
  //因为socket.io有心跳数据的
  Transport.prototype.setCloseTimeout = function () {
    if (!this.closeTimeout) {
      var self = this;

      this.closeTimeout = setTimeout(function () {
        self.onDisconnect();
      }, this.socket.closeTimeout);
    }
  };

  /**
   * Called when transport disconnects.
   *
   * @api private
   */
   //表示已经断开了,那么进行相应的处理
  Transport.prototype.onDisconnect = function () {
    if (this.close && this.open) this.close();
    this.clearTimeouts();  //清理超时
    this.socket.onDisconnect();   //通知外层的socket,连接已经断开了
    return this;
  };

  /**
   * Called when transport connects
   *
   * @api private
   */
//表示连接建立,通知外层的socket 
  Transport.prototype.onConnect = function () {
    this.socket.onConnect();
    return this;
  }

//关闭超时
  Transport.prototype.clearCloseTimeout = function () {
    if (this.closeTimeout) {
      clearTimeout(this.closeTimeout);
      this.closeTimeout = null;
    }
  };


//清理重开的超时
  Transport.prototype.clearTimeouts = function () {
    this.clearCloseTimeout();

    if (this.reopenTimeout) {
      clearTimeout(this.reopenTimeout);
    }
  };


//这里是用于发送数据,首先要用parser进行编码,这个方法将会在外面的socket的namespace里面调用
  Transport.prototype.packet = function (packet) {
    var data = io.parser.encodePacket(packet);
    this.send(data);
  };


//接收到心跳数据,那么还要给服务器回一个心跳数据
  Transport.prototype.onHeartbeat = function (heartbeat) {
    this.packet({ type: 'heartbeat' });
  };
 
//当open之后会调用的方法,还要通知外面的socket
  Transport.prototype.onOpen = function () {
    this.open = true;
    this.clearCloseTimeout();
    this.socket.onOpen();
  };


//当关闭的时候会调用,而且要通知外层的socket
  Transport.prototype.onClose = function () {
    var self = this;
    this.open = false;
    this.socket.onClose();
    this.onDisconnect();
  };

//用于将url转化为特定的格式
  Transport.prototype.prepareUrl = function () {
    var options = this.socket.options;

    return this.scheme() + '://'
      + options.host + ':' + options.port + '/'
      + options.resource + '/' + io.protocol
      + '/' + this.name + '/' + this.sessid;
  };


//表示当前已经准备好进行连接了,这个方法可能会被具体的实现覆盖
  Transport.prototype.ready = function (socket, fn) {
    fn.call(this);
  };
})(
    'undefined' != typeof io ? io : module.exports
  , 'undefined' != typeof io ? io : module.parent.exports
);

其实transport本身的定义还是很简单的,主要是由三个部分:

(1)接受到数据进行处理,这里还需要用parser进行decode

(2)发送数据,这里需要用parser进行encode

(3)处理心跳数据,socket.io自己定义了心跳数据,如果超时间内没有数据收到或者收不到心跳数据,那么就断开。

好了,到这里transport的定义就这样了,接下来就来看看怎么定义的Socket的吧:

(function (exports, io, global) {

//在io上面申明socket
  exports.Socket = Socket;
//构造函数
  function Socket (options) {
    this.options = {
        port: 80
      , secure: false
      , document: 'document' in global ? document : false
      , resource: 'socket.io'
      , transports: io.transports  //所有的transport类型,有websocket等
      , 'connect timeout': 10000
      , 'try multiple transports': true
      , 'reconnect': true
      , 'reconnection delay': 500
      , 'reconnection limit': Infinity
      , 'reopen delay': 3000
      , 'max reconnection attempts': 10
      , 'sync disconnect on unload': true
      , 'auto connect': true
      , 'flash policy port': 10843
    };

    io.util.merge(this.options, options);  //合并默认配置和用户传进来的配置

    this.connected = false;  //表示当前还没有建立连接
    this.open = false;   //没有打开
    this.connecting = false;
    this.reconnecting = false;
    this.namespaces = {};   //这个socket的所有namespace,就像同一个url可以对应好多path,fjs.com/aa与fjs.com/bb,那么aa与bb就是两个namespace
    this.buffer = [];
    this.doBuffer = false;

    if (this.options['sync disconnect on unload'] &&
        (!this.isXDomain() || io.util.ua.hasCORS)) {
      var self = this;

      io.util.on(global, 'unload', function () {
        self.disconnectSync();
      }, false);
    }

    if (this.options['auto connect']) {
      this.connect();  //建立连接
    }
};

  io.util.mixin(Socket, io.EventEmitter);

//这里是设置名字空间,所谓的namespace其实就是url后面的path,同一个连接可以对应多个path
  Socket.prototype.of = function (name) {
    if (!this.namespaces[name]) {  //如果没有这个namespace,那么创建一个
      //新建一个namespace
      this.namespaces[name] = new io.SocketNamespace(this, name);

      if (name !== '') {//默认的namespace就是空的字符串,
        this.namespaces[name].packet({ type: 'connect' });
      }
    }

    return this.namespaces[name];
  };

//向所有的namespace发送事件,传进来的参数是事件类型
  Socket.prototype.publish = function () {
    this.emit.apply(this, arguments);

    var nsp;
//遍历所有的namespace
    for (var i in this.namespaces) {
      if (this.namespaces.hasOwnProperty(i)) {
        nsp = this.of(i);
        //在当前这个namespace上面触发事件
        nsp.$emit.apply(nsp, arguments);
      }
    }
  };

  function empty () { };

//用于与server进行握手,获取sessionid,心跳间隔,传输方式等
//这里比较神奇的是居然用的是http的get来获取
  Socket.prototype.handshake = function (fn) {
    var self = this
      , options = this.options;

    function complete (data) {
      if (data instanceof Error) {
        self.onError(data.message);
      } else {
        fn.apply(null, data.split(':'));  //sid:heartbeat:close:transports
      }
    };

    var url = [
          'http' + (options.secure ? 's' : '') + ':/'
        , options.host + ':' + options.port
        , options.resource
        , io.protocol
        , io.util.query(this.options.query, 't=' + +new Date)
      ].join('/');

    if (this.isXDomain() && !io.util.ua.hasCORS) {
      var insertAt = document.getElementsByTagName('script')[0]
        , script = document.createElement('script');

      script.src = url + '&jsonp=' + io.j.length;
      insertAt.parentNode.insertBefore(script, insertAt);

      io.j.push(function (data) {
        complete(data);
        script.parentNode.removeChild(script);
      });
    } else {
      var xhr = io.util.request();

      xhr.open('GET', url, true);
      xhr.withCredentials = true;
      xhr.onreadystatechange = function () {
        if (xhr.readyState == 4) {
          xhr.onreadystatechange = empty;

          if (xhr.status == 200) {
            complete(xhr.responseText);
          } else {
            !self.reconnecting && self.onError(xhr.responseText);
          }
        }
      };
      xhr.send(null);
    }
  };

//overide =[ flashsocket,websocket,htmlfile,xhr-polling,jsonp-polling]
  Socket.prototype.getTransport = function (override) {
    var transports = override || this.transports, match;
//遍历当前io定义的所有transport,选取一个
    for (var i = 0, transport; transport = transports[i]; i++) {
      if (io.Transport[transport]
        && io.Transport[transport].check(this)
        && (!this.isXDomain() || io.Transport[transport].xdomainCheck())) {
        //这里用于筛选出transport,传进去的参数是当前这个socket对象,以及sessionid
        return new io.Transport[transport](this, this.sessionid);
      }
    }

    return null;
  };

  Socket.prototype.connect = function (fn) {
    if (this.connecting) {
      return this;
    }

    var self = this;

//transports = flashsocket,websocket,htmlfile,xhr-polling,jsonp-polling
    this.handshake(function (sid, heartbeat, close, transports) {
      self.sessionid = sid;  //sessionid
      self.closeTimeout = close * 1000;  //超时时间
      self.heartbeatTimeout = heartbeat * 1000;
      self.transports = transports ? io.util.intersect(
          transports.split(',')  //将服务器支持的transport转换为数组
        , self.options.transports
      ) : self.options.transports;

      self.setHeartbeatTimeout();   //设置心跳超时

      function connect (transports){
        if (self.transport) self.transport.clearTimeouts();

//这里获取的transport一般情况下都是websocket
        self.transport = self.getTransport(transports);
        if (!self.transport) return self.publish('connect_failed');

        // once the transport is ready
        //如果当前的transport准备好了,那么将会调用回调函数
        self.transport.ready(self, function () {
          self.connecting = true;
          self.publish('connecting', self.transport.name);
          //这里才是真正的建立与服务器的连接
          self.transport.open();

          if (self.options['connect timeout']) {
            //设置连接超时,还可以尝试一下别的transport
            self.connectTimeoutTimer = setTimeout(function () {
              if (!self.connected) {
                self.connecting = false;

                if (self.options['try multiple transports']) {
                  if (!self.remainingTransports) {
                    self.remainingTransports = self.transports.slice(0);
                  }

                  var remaining = self.remainingTransports;

                  while (remaining.length > 0 && remaining.splice(0,1)[0] !=
                         self.transport.name) {}

                    if (remaining.length){
                      connect(remaining);
                    } else {
                      self.publish('connect_failed');
                    }
                }
              }
            }, self.options['connect timeout']);
          }
        });
      }

      connect(self.transports);
      //如果连接建立了,那么需要清理连接超时
      self.once('connect', function (){
        clearTimeout(self.connectTimeoutTimer);

        fn && typeof fn == 'function' && fn();
      });
    });

    return this;
  };

//设置心跳的超时
  Socket.prototype.setHeartbeatTimeout = function () {
    clearTimeout(this.heartbeatTimeoutTimer);

    var self = this;
    this.heartbeatTimeoutTimer = setTimeout(function () {
      self.transport.onClose();
    }, this.heartbeatTimeout);
  };

//用于发送数据,其实这里是调transport进行数据的发送
  Socket.prototype.packet = function (data) {
    if (this.connected && !this.doBuffer) {
      this.transport.packet(data);
    } else {
      this.buffer.push(data);
    }

    return this;
  };

  Socket.prototype.setBuffer = function (v) {
    this.doBuffer = v;

    if (!v && this.connected && this.buffer.length) {
      this.transport.payload(this.buffer);
      this.buffer = [];
    }
  };

//关闭当前的连接,用默认的namespace来发送disconnect数据
  Socket.prototype.disconnect = function () {
    if (this.connected || this.connecting) {
      if (this.open) {
        this.of('').packet({ type: 'disconnect' });
      }

      // handle disconnection immediately
      this.onDisconnect('booted');
    }

    return this;
  };

  Socket.prototype.disconnectSync = function () {
    // ensure disconnection
    var xhr = io.util.request()
      , uri = this.resource + '/' + io.protocol + '/' + this.sessionid;

    xhr.open('GET', uri, true);

    // handle disconnection immediately
    this.onDisconnect('booted');
  };

//是否需要跨域的访问
  Socket.prototype.isXDomain = function () {

    var port = global.location.port ||
      ('https:' == global.location.protocol ? 443 : 80);

    return this.options.host !== global.location.hostname 
      || this.options.port != port;
  };


//表示已经建立了连接,下层的websocket会通知这个方法
  Socket.prototype.onConnect = function () {
    if (!this.connected) {
      this.connected = true;  //表示连接已经建立了
      this.connecting = false;
      if (!this.doBuffer) {
        // make sure to flush the buffer
        this.setBuffer(false);
      }
      this.emit('connect'); //发起connect事件
    }
  };

  Socket.prototype.onOpen = function () {
    this.open = true;
  };


  Socket.prototype.onClose = function () {
    this.open = false;
    clearTimeout(this.heartbeatTimeoutTimer);
  };

//获取相应的namespace来处理数据
  Socket.prototype.onPacket = function (packet) {
    this.of(packet.endpoint).onPacket(packet);
  };

  Socket.prototype.onError = function (err) {
    if (err && err.advice) {
      if (err.advice === 'reconnect' && (this.connected || this.connecting)) {
        this.disconnect();
        if (this.options.reconnect) {
          this.reconnect();
        }
      }
    }

    this.publish('error', err && err.reason ? err.reason : err);
  };

//当底层的连接断开之后会发生的事情
  Socket.prototype.onDisconnect = function (reason) {
    var wasConnected = this.connected
      , wasConnecting = this.connecting;

    this.connected = false;
    this.connecting = false;
    this.open = false;

    if (wasConnected || wasConnecting) {
      this.transport.close();
      this.transport.clearTimeouts();
      if (wasConnected) {
        //出发所有namespace的disconnect事件
        this.publish('disconnect', reason);

        if ('booted' != reason && this.options.reconnect && !this.reconnecting) {
          this.reconnect();
        }
      }
    }
  };

//重新连接
  Socket.prototype.reconnect = function () {
    this.reconnecting = true;
    this.reconnectionAttempts = 0;
    this.reconnectionDelay = this.options['reconnection delay'];

    var self = this
      , maxAttempts = this.options['max reconnection attempts']
      , tryMultiple = this.options['try multiple transports']
      , limit = this.options['reconnection limit'];

    function reset () {
      if (self.connected) {
        for (var i in self.namespaces) {
          if (self.namespaces.hasOwnProperty(i) && '' !== i) {
              self.namespaces[i].packet({ type: 'connect' });
          }
        }
        self.publish('reconnect', self.transport.name, self.reconnectionAttempts);
      }

      clearTimeout(self.reconnectionTimer);

      self.removeListener('connect_failed', maybeReconnect);
      self.removeListener('connect', maybeReconnect);

      self.reconnecting = false;

      delete self.reconnectionAttempts;
      delete self.reconnectionDelay;
      delete self.reconnectionTimer;
      delete self.redoTransports;

      self.options['try multiple transports'] = tryMultiple;
    };

    function maybeReconnect () {
      if (!self.reconnecting) {
        return;
      }

      if (self.connected) {
        return reset();
      };

      if (self.connecting && self.reconnecting) {
        return self.reconnectionTimer = setTimeout(maybeReconnect, 1000);
      }

      if (self.reconnectionAttempts++ >= maxAttempts) {
        if (!self.redoTransports) {
          self.on('connect_failed', maybeReconnect);
          self.options['try multiple transports'] = true;
          self.transport = self.getTransport();
          self.redoTransports = true;
          self.connect();
        } else {
          self.publish('reconnect_failed');
          reset();
        }
      } else {
        if (self.reconnectionDelay < limit) {
          self.reconnectionDelay *= 2; // exponential back off
        }

        self.connect();
        self.publish('reconnecting', self.reconnectionDelay, self.reconnectionAttempts);
        self.reconnectionTimer = setTimeout(maybeReconnect, self.reconnectionDelay);
      }
    };

    this.options['try multiple transports'] = false;
    this.reconnectionTimer = setTimeout(maybeReconnect, this.reconnectionDelay);

    this.on('connect', maybeReconnect);
  };

})(
    'undefined' != typeof io ? io : module.exports
  , 'undefined' != typeof io ? io : module.parent.exports
  , this
);

这里socket的定义够多的吧,一些主要的方法有一些什么用上面的注释都已经说的较为清楚了吧,其实说白了它就是底层的transport与上层的namespace之间的一层代理,具体的发送数据等操作需要调用下层的transport来实现,然后对于transport接收到的数据则将其进行一些处理,然后通知上面的namespace进行处理。。。

好了,就不细说代码了,应该也还算挺简单额,那么接下来来看看前面一直提到过的namespace的定义:

(function (exports, io) {

  exports.SocketNamespace = SocketNamespace;

//socket是上面定义的socket,其实它是对websocket的封装,name是这个namespace的名字,也就是url里面的path
//因为同一个连接其实可以对应多个namespace,也就是好多个path
  function SocketNamespace (socket, name) {
    this.socket = socket;
    this.name = name || '';
    this.flags = {};
    this.json = new Flag(this, 'json');
    this.ackPackets = 0;
    this.acks = {};  //用于记录需要确认的数据的回调函数,当确认消息接收到以后会调用
  };

  io.util.mixin(SocketNamespace, io.EventEmitter);

  SocketNamespace.prototype.$emit = io.EventEmitter.prototype.emit;

  SocketNamespace.prototype.of = function () {
    return this.socket.of.apply(this.socket, arguments);
  };

//这里相当于是调用socket来发送数据,而socket则是调用下面的transport来发送数据
  SocketNamespace.prototype.packet = function (packet) {
    packet.endpoint = this.name;  //也就是当前连接的path
    this.socket.packet(packet);
    this.flags = {};
    return this;
  };

//这里是真正的发送数据
//这里如果提供了回调函数,那么表示这个数据需要确认,当确认消息接收到了以后,回调用这个回调函数
  SocketNamespace.prototype.send = function (data, fn) {
    var packet = {
        type: this.flags.json ? 'json' : 'message'
      , data: data
    };

    if ('function' == typeof fn) {
      packet.id = ++this.ackPackets;   //为这个packet分配id
      packet.ack = true;   //表示这个packet需要确认
      this.acks[packet.id] = fn;     //记录回调函数
    }

    return this.packet(packet);
  };

  //发送数据,采取事件的方式进行发送
  //其实说白了就是另外一种发送数据的方法,用过socket.io的就知道了
  SocketNamespace.prototype.emit = function (name) {
    var args = Array.prototype.slice.call(arguments, 1)
      , lastArg = args[args.length - 1]
      , packet = {
            type: 'event'
          , name: name
        };

    if ('function' == typeof lastArg) {
      packet.id = ++this.ackPackets;
      packet.ack = 'data';
      this.acks[packet.id] = lastArg;
      args = args.slice(0, args.length - 1);
    }

    packet.args = args;

    return this.packet(packet);
  };


//这里用于断开当前namespace的连接,如果是默认namespace或者仅有的namespace的话,那么会断开实际的底层websocket连接
  SocketNamespace.prototype.disconnect = function () {
    if (this.name === '') {
      this.socket.disconnect();
    } else {
      this.packet({ type: 'disconnect' });
      this.$emit('disconnect');
    }

    return this;
  };

//用于处理接收到 的数据,packet
  SocketNamespace.prototype.onPacket = function (packet) {
    var self = this;
//这里用于向服务器发送确认消息
    function ack () {
      self.packet({
          type: 'ack'
        , args: io.util.toArray(arguments)
        , ackId: packet.id
      });
    };
//判断发送过来的数据的类型,然后进行相应的处理
    switch (packet.type) {
      case 'connect':   
        this.$emit('connect');
        break;

      case 'disconnect':
        if (this.name === '') {
          this.socket.onDisconnect(packet.reason || 'booted');
        } else {
          this.$emit('disconnect', packet.reason);
        }
        break;

      case 'message':
      case 'json':
        var params = ['message', packet.data];

        if (packet.ack == 'data') {
          params.push(ack);
        } else if (packet.ack) {
          this.packet({ type: 'ack', ackId: packet.id });
        }

        this.$emit.apply(this, params);
        break;

      case 'event':
        var params = [packet.name].concat(packet.args);

        if (packet.ack == 'data')
          params.push(ack);

        this.$emit.apply(this, params);
        break;

      case 'ack':  //表示是确认消息,
        if (this.acks[packet.ackId]) {
          this.acks[packet.ackId].apply(this, packet.args); //调用消息的回调函数
          delete this.acks[packet.ackId];
        }
        break;

      case 'error':
        if (packet.advice){
          this.socket.onError(packet);
        } else {
          if (packet.reason == 'unauthorized') {
            this.$emit('connect_failed', packet.reason);
          } else {
            this.$emit('error', packet.reason);
          }
        }
        break;
    }
  };

  function Flag (nsp, name) {
    this.namespace = nsp;
    this.name = name;
  };


  Flag.prototype.send = function () {
    this.namespace.flags[this.name] = true;
    this.namespace.send.apply(this.namespace, arguments);
  };

  Flag.prototype.emit = function () {
    this.namespace.flags[this.name] = true;
    this.namespace.emit.apply(this.namespace, arguments);
  };

})(
    'undefined' != typeof io ? io : module.exports
  , 'undefined' != typeof io ? io : module.parent.exports
);

上述namespace的定义也就是socket.io为用户暴露的api(其实个人感觉socket.io自己又定义了一层namespace有那么一点点繁琐了,其实完全不用这个,因为通过用户的代码很容易就能实现相似的功能,而框架自己实现一个namespace机制,让框架不那么轻便),还是很简单,最为主要的功能两个:

(1)下面socket接受的数据会让相应的namespace来处理,那么namespace需要对数据的类型进行分别,然后进行相应的处理,然后触发相应的事件,让用户的代码来处理这些数据

(2)对于用户的一些操作,例如发送数据,断开连接等,调用socket的相应方法来处理。。。

好了,接下来来看看socket.io对websocket的封装吧。。。。这里要强调一下:它继承自transport

(function (exports, io, global) {

  exports.websocket = WS;

//这里直接调用transport的构造函数,这里要记住,websocket直接继承了transport
  function WS (socket) {
    io.Transport.apply(this, arguments);
  };

//这里的websocket需要继承上面的transport
  io.util.inherit(WS, io.Transport);

  WS.prototype.name = 'websocket';

//打开,相当于是建立于服务器的连接
  WS.prototype.open = function () {
    var query = io.util.query(this.socket.options.query)
      , self = this
      , Socket


    if (!Socket) {  //这里将socket指向定义的websocket,因为不同的浏览器可能有不同的实现,不同的名字
      Socket = global.MozWebSocket || global.WebSocket;
    }
//这里就相当于是建立真正的websocket连接了
    this.websocket = new Socket(this.prepareUrl() + query);
    //当下层的websocket打开了
    this.websocket.onopen = function () {
      self.onOpen();
      self.socket.setBuffer(false);
    };
    //当下层的websocket收到了数据
    this.websocket.onmessage = function (ev) {
      self.onData(ev.data);
    };
    this.websocket.onclose = function () {
      self.onClose();
      self.socket.setBuffer(true);
    };
    this.websocket.onerror = function (e) {
      self.onError(e);
    };

    return this;
  };

//其实调用的是内部的websocket来发送
  WS.prototype.send = function (data) {
    this.websocket.send(data);
    return this;
  };

//可以理解为批量处理接收到的数据
  WS.prototype.payload = function (arr) {
    for (var i = 0, l = arr.length; i < l; i++) {
      this.packet(arr[i]);
    }
    return this;
  };
//这几调用底层的websocket来关闭
  WS.prototype.close = function () {
    this.websocket.close();
    return this;
  };
//当有错误发生的时候
  WS.prototype.onError = function (e) {
    this.socket.onError(e);
  };

//wss或者ws
  WS.prototype.scheme = function () {
    return this.socket.options.secure ? 'wss' : 'ws';
  };

//判断浏览器是否支持原生的websocket
  WS.check = function () {
    return ('WebSocket' in global && !('__addTask' in WebSocket))
          || 'MozWebSocket' in global;
  };

  WS.xdomainCheck = function () {
    return true;
  };

//表示当前支持websocket
  io.transports.push('websocket');  //相当于这里的多了一种传输方式,将其保存在io的transports数组中

})(
    'undefined' != typeof io ? io.Transport : module.exports
  , 'undefined' != typeof io ? io : module.parent.exports
  , this
);

这部分代码很简单吧,最重要的也就是open方法了,这里将会真正建立于server端的连接,然后还设置了一些处理函数,例如当数据进来的时候该做什么处理等等。。。

到这里位置,socket.io的client部分的整个设计也都差不太多了,当然我只看了websocket,什么flashsocket什么的我都没有看。。

最上面是namespace,接下来下面有socket,那么在socket里面又包含了transport,对应的也就是websocket。、、、


最后说一下发送的数据的编码格式吧:type:id:path:data  

这里type就是数据的类型,例如event,message,ack等,id就是这个数据的id,如果数据需要确认的话,那么这个id将会有用,path就是那个namespace,data就是data了。。

好了,socket.io的客户端就这样了。。。接下来看看他的服务端吧。。。

抱歉!评论已关闭.