前面的一篇文章分析了socket.io的客户端的实现,那么这一篇文章来看看它的服务端是怎么搞的吧。。。
首先我们来看一段用于建立socket.io的服务器的代码吧:
var sio = require('socket.io'); var wsocket = sio.listen(80); wsocket.sockets.on('connection', function (socket) { //当有新的连接建立的时候 socket.on("message", function(data){ console.log("get : " + data); }); socket.on("disconnect", function(){ //console.log("client has closed"); }); });
这里可以看到,首先其实是调用listen函数来创建,那么我们来看看这个listen函数究竟是干了什么事情吧:
//其实这里是建立一个http的server,然后让其监听端口,这里的server可以是一个httpserver,也可以是一个端口号 //因为其实websocket本身也是建立在http协议基础上的 exports.listen = function (server, options, fn) { if ('function' == typeof server) { console.warn('Socket.IO\'s `listen()` method expects an `http.Server` instance\n' + 'as its first parameter. Are you migrating from Express 2.x to 3.x?\n' + 'If so, check out the "Socket.IO compatibility" section at:\n' + 'https://github.com/visionmedia/express/wiki/Migrating-from-2.x-to-3.x'); } if ('function' == typeof options) { fn = options; options = {}; } if ('undefined' == typeof server) { server = 80; //默认端口是80 } if ('number' == typeof server) { var port = server; //如果传进来的参数是一个数字,那么就在这个端口上创建一个httpserver if (options && options.key) server = require('https').createServer(options); else server = require('http').createServer(); //默认的request处理 server.on('request', function (req, res) { res.writeHead(200); res.end('Welcome to socket.io.'); }); server.listen(port, fn); //监听端口 } //创建并返回一个manager,其实这个才是最重要的 return new exports.Manager(server, options); };
其实这里就可以看到,首先是建立一个http的服务器,这里可能刚开始会觉得比较的奇怪,这是因为websocket的协议本身就用到了http协议的内容,有兴趣的可以去看看websocket额协议的内容就知道了,那么最后再创建一个manager,然后将这个manager,这里也就是说我们其实最终用到的是这个manager来管理socket.io的服务器。。。
那么接下来来看看这个manager是怎么定义的吧:
function Manager (server, options) { this.server = server; this.namespaces = {}; this.sockets = this.of(''); //创建一个namespace,而且这个namespace会保存到namespaces中,sockets是默认的namespace this.settings = { //一些默认的参数 origins: '*:*' , log: true , store: new MemoryStore , logger: new Logger , static: new Static(this) , heartbeats: true , resource: '/socket.io' , transports: defaultTransports , authorization: false , blacklist: ['disconnect'] , 'log level': 3 , 'log colors': tty.isatty(process.stdout.fd) , 'close timeout': 60 , 'heartbeat interval': 25 , 'heartbeat timeout': 60 , 'polling duration': 20 , 'flash policy server': true , 'flash policy port': 10843 , 'destroy upgrade': true , 'destroy buffer size': 10E7 , 'browser client': true , 'browser client cache': true , 'browser client minification': false , 'browser client etag': false , 'browser client expires': 315360000 , 'browser client gzip': false , 'browser client handler': false , 'client store expiration': 15 , 'match origin protocol': false }; for (var i in options) { if (options.hasOwnProperty(i)) { this.settings[i] = options[i]; } } var self = this; //错误的梳理handler server.on('error', function(err) { self.log.warn('error raised: ' + err); }); this.initStore(); this.on('set:store', function() { self.initStore(); }); //记录最开始的request事件的listener this.oldListeners = server.listeners('request').splice(0); //获取原来的request的事件的listener函数 server.removeAllListeners('request'); //将最早的listener去掉 server.on('request', function (req, res) { //这里是用于处理客户端的request数据,刚开始的握手就是走的这里 self.handleRequest(req, res); }); //这个用于建立ungrade连接,说白了就是实际的websocket的连接 //这个是websocket协议部分的内容了,如果需要建立websocket的连接,那么其http的请求将会如下: /* GET / HTTP/1.1 Upgrade: websocket Connection: Upgrade Host: 127.0.0.1 Origin: null Pragma: no-cache Cache-Control: no-cache Sec-WebSocket-Key: gKlfUoFJucbrV2+2KT6N8w== Sec-WebSocket-Version: 13 Sec-WebSocket-Extensions: x-webkit-deflate-frame User-Agent: Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.69 Safari/537.36 */ server.on('upgrade', function (req, socket, head) { self.handleUpgrade(req, socket, head); }); server.on('close', function () { clearInterval(self.gc); }); //自己定义了一个垃圾处理 server.once('listening', function () { self.gc = setInterval(self.garbageCollection.bind(self), 10000); }); for (var i in transports) { //初始化所有的transport,这里有一些websocket,flashsocket等等,transport可以看成是顶层类 if (transports.hasOwnProperty(i)) { if (transports[i].init) { //如果有init,那么初始化 transports[i].init(this); } } } var self = this; this.sockets.on('connection', function (conn) { //namespace上面的监听,当有新的连接进来之后会激活 self.emit('connection', conn); //这里在manager上监听connection也可以 }); this.sequenceNumber = Date.now() | 0; this.log.info('socket.io started'); }; Manager.prototype.__proto__ = EventEmitter.prototype
这里在服务端其实也有namespace,而且会有一个默认的namespace为"",sockets将会指向它,然后就是设置一些参数的值,接着就是最为重要的内容了,为前面建立的httpserver设置一些事件的处理函数,这里主要是:
(1)request,对应的是普通的http请求事件,这里之所以会有普通的http请求处理,是因为socket.io在建立连接的时候需要先用普通的http请求进行握手,服务端将会向客户端返回sessionid。
(2)upgrade,这是比较特殊的一种http连接方式,对应这种请求的报文在前面已经给出了一个例子,这里就可以简单的将其理解为专门用于建立websocket的连接的。。。
这里就先略微的说说握手的内容吧,它主要是客户端向服务端建立连接之前,首先要向服务端发送一些http请求,用于握手,然后服务端将会为其分配一个id,接着客户端再建立于服务端的websocket的连接,而且会将这个id带过来,说白了这个id就是用于区分每一个连接的。。。
好了,那么来看看是如何建立websocket连接的吧:
//用于处理Upgrade类型的http请求,也就是websocket连接 Manager.prototype.handleUpgrade = function (req, socket, head) { console.log("handler handleUpgrade"); var data = this.checkRequest(req) //获取请求的数据 , self = this; if (!data) { if (this.enabled('destroy upgrade')) { socket.end(); this.log.debug('destroying non-socket.io upgrade'); } return; } req.head = head; this.handleClient(data, req); //这里可以理解为建立一个client,用于与客户端的websocket进行通信 req.head = null; };
这个就是用于处理upgrade连接的handler,首先就是获取建立连接的时候带过来的参数,说白了就是url后面的一些参数,然后再调用handleClient方法来建立一个与客户端的连接,这里有一个比较重要的参数,那就是socket,这个就是实际上底层用的socket,毕竟底层还是TCP/IP的嘛。。。
那么接下来来看看这个建立连接的方法吧(说白了就是对已经有建立好的socket连接进行了一层的封装):
//用于创建一个新的客户端连接 Manager.prototype.handleClient = function (data, req) { var socket = req.socket //获取实际的socket , store = this.store , self = this; // handle sync disconnect xhrs if (undefined != data.query.disconnect) { if (this.transports[data.id] && this.transports[data.id].open) { this.transports[data.id].onForcedDisconnect(); } else { this.store.publish('disconnect-force:' + data.id); } req.res.writeHead(200); req.res.end(); return; } //不支持的transport类型 if (!~this.get('transports').indexOf(data.transport)) { this.log.warn('unknown transport: "' + data.transport + '"'); req.connection.end(); return; } //新建一个transport用于维护与客户端的连接,这里一般情况下都是websocket var transport = new transports[data.transport](this, data, req) , handshaken = this.handshaken[data.id]; //获取以前握手的时候的数据 if (transport.disconnected) { req.connection.end(); return; } if (handshaken) { //表示已经握手过了 if (transport.open) { if (this.closed[data.id] && this.closed[data.id].length) { transport.payload(this.closed[data.id]); this.closed[data.id] = []; } this.onOpen(data.id); this.store.publish('open', data.id); this.transports[data.id] = transport; //保存当前这个transport } if (!this.connected[data.id]) { this.onConnect(data.id); //用于记录当前的这个id已经连接了 this.store.publish('connect', data.id); //这个与上面做的是一样的事情,不知道要干什么。。汗 // flag as used delete handshaken.issued; this.onHandshake(data.id, handshaken); this.store.publish('handshake', data.id, handshaken); // initialize the socket for all namespaces for (var i in this.namespaces) { if (this.namespaces.hasOwnProperty(i)) { var socket = this.namespaces[i].socket(data.id, true); //这里可以理解为在namespace中创建与这个transport关联的socket // echo back connect packet and fire connection event //这里如果namespace如果是'',那么表示是第一次连接,那么需要处理一下,例如激活connection事件 if (i === '') { this.namespaces[i].handlePacket(data.id, { type: 'connect' }); } } } this.store.subscribe('message:' + data.id, function (packet) { self.onClientMessage(data.id, packet); }); this.store.subscribe('disconnect:' + data.id, function (reason) { self.onClientDisconnect(data.id, reason); }); } } else { if (transport.open) { transport.error('client not handshaken', 'reconnect'); } transport.discard(); } };
上面的代码主要要做一下的几件事情:
(1)根据已经有的tcp的socket来对其进行封装,建立相应的transport,一般情况下都是建立websocket,这里还要将其保存到当前manager当中,与前面提到过的id对应起来,
(2)为namespace创建当前这个连接的socket(这个socket是当前socket.io的更上层的封装),激活相应的事件,通知用户已经有新的连接建立,并且将这个socket传给用户,那么用户就可以通过这个socket来进行通信了。。
那么我们接下来来看看这个transport的创建过程吧,一般情况下都是建立websocket,而且用的是hybi-16.js这个版本的。。。
来看看构造函数:
//mng就是外层的manager,data是建立连接的时候带过的参数,req就是httprequest function WebSocket (mng, data, req) { // parser var self = this; this.manager = mng; this.parser = new Parser(); this.parser.on('data', function (packet) { self.onMessage(parser.decodePacket(packet)); //这里需要将传过来的数据先转化为socket.io的数据格式 }); //对于ping数据包的处理 this.parser.on('ping', function () { // version 8 ping => pong try { self.socket.write('\u008a\u0000'); } catch (e) { self.end(); return; } }); this.parser.on('close', function () { self.end(); }); this.parser.on('error', function (reason) { self.log.warn(self.name + ' parser error: ' + reason); self.end(); }); //调用transport的构造函数,因为这里的websocket继承自transport //并且会调用handleRequest函数,设置心跳,然后会调用onSocketConnect函数用于向客户端发送连接头信息 Transport.call(this, mng, data, req); };
这里有一个parser,它是用来解码和编码websocket的frame,这个就是websocket的协议的事情了,,,就不管他,因为当前websocket的其实继承自transport,所以又调用了一遍transport的构造函数,来看看:
function Transport (mng, data, req) { this.manager = mng; this.id = data.id; this.disconnected = false; this.drained = true; this.handleRequest(req); //在新建transport的时候,就需要调用这个方法,表示链接已经建立,用于向客户端发送刚开始的建立头数据 };
这里说白了就是记录了两个比较重要的参数,首先是外面的manager,然后是前面已经提到过的id。。。。
接下来用handleRequest方法进行进一步的处理:
//刚刚开始建立连接的时候会调用,保存当前的socket,然后还要调用onSocketConnect向客户端返回建立websocket连接的头 Transport.prototype.handleRequest = function (req) { this.log.debug('setting request', req.method, req.url); this.req = req; if (req.method == 'GET') { this.socket = req.socket; //保存当前的真正的底层socket this.open = true; this.drained = true; this.setHeartbeatInterval(); //设置心跳 this.setHandlers(); //设置处理函数,主要是socket的一些close,err等 this.onSocketConnect(); //调用这个函数,将会向客户端返回用于建立websocket连接的头,具体的transport有不同的实现 } };
这里主要是保存了真正底层用到的socket,然后为这个socket设置了一些基本的handler,接着在继续调用onSocketConnect来继续处理:
//这个在transport构造函数中调用,也就是刚刚建立连接的时候,用于向客户端发送连接头信息 WebSocket.prototype.onSocketConnect = function () { var self = this; if (typeof this.req.headers.upgrade === 'undefined' || this.req.headers.upgrade.toLowerCase() !== 'websocket') { this.log.warn(this.name + ' connection invalid'); this.end(); return; } var origin = this.req.headers['origin'] || '' , location = ((this.manager.settings['match origin protocol'] ? origin.match(/^https/) : this.socket.encrypted) ? 'wss' : 'ws') + '://' + this.req.headers.host + this.req.url; if (!this.verifyOrigin(origin)) { this.log.warn(this.name + ' connection invalid: origin mismatch'); this.end(); return; } if (!this.req.headers['sec-websocket-key']) { this.log.warn(this.name + ' connection invalid: received no key'); this.end(); return; } // calc key var key = this.req.headers['sec-websocket-key']; var shasum = crypto.createHash('sha1'); shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); key = shasum.digest('base64'); //返回给客户端的报文信息,用于握手 var headers = [ 'HTTP/1.1 101 Switching Protocols' , 'Upgrade: websocket' , 'Connection: Upgrade' , 'Sec-WebSocket-Accept: ' + key ]; try { //向客户端返回建立websocket的头部信息 this.socket.write(headers.concat('', '').join('\r\n')); this.socket.setTimeout(0); this.socket.setNoDelay(true); } catch (e) { this.end(); return; } //当从socket收到数据的时候需要用parser先进行梳理 this.socket.on('data', function (data) { self.parser.add(data); }); };
这部分的代码主要是就是在向客户端返回用于握手的数据,当客户端接收到这个数据之后就表明整个websocket的连接已经建立了,那么再设置底层真正的socket的data事件,用于处理从客户端接收到的数据。。。。
那么到这里为止整个服务端的原理就基本说的比较清楚了,比较简单的总结就是,对socket进行了一层封装,当读取到数据之后,先用praser将数据从websocket的frame中提取出来,其实在外面还有一个parser,因为socket.io自己也有一层格式的定义,这里就不细讲了。。。。
至于那些socket的定义,其实都很简单,有兴趣的自己看看吧。。。(最重要的还是websocket协议的一些定义,以及socket.io自己的协议定义)