engine.io分析2--socket.io的基石

转载请注明: TheViper http://www.cnblogs.com/TheViper

源码分析

var engine = require(‘./node_modules/engine.io/lib/engine.io.js‘);
var server = engine.listen(8000,{
    transports:[‘polling‘]
});

server.on(‘connection‘, function(socket){
  socket.send(‘utf 8 string‘);
});

之前提到过engine.io只有websocket,polling两种传输方式。对于websocket,engine.io用了第三方库ws,这个从源码目录看的出来,调用很简单。所以本文只说polling这种方式.

另外精力有限,所以只分析服务端,就当是抛砖引玉,源码比较简单。

首先是加载engine.io.js.

exports = module.exports = function() {
  // backwards compatible use as `.attach`
  // if first argument is an http server
  if (arguments.length && arguments[0] instanceof http.Server) {
    return attach.apply(this, arguments);
  }

  // if first argument is not an http server, then just make a regular eio server
  return exports.Server.apply(null, arguments);
};

注释上说的很清楚,if是为了兼容以前的版本,判断传入的是不是http.然后初始化Server.

Server初始化只是将参数写入,没有指定参数的,使用默认参数,还有就是定义后面用到的变量。最后,如果参数方式中有websocket,就初始化WebSocketServer。

  if (~this.transports.indexOf(‘websocket‘)) {
    this.ws = new WebSocketServer({ noServer: true, clientTracking: false });
  }

接着是engine.listen().

function listen(port, options, fn) {
  if (‘function‘ == typeof options) {
    fn = options;
    options = {};
  }

  var server = http.createServer(function (req, res) {
    res.writeHead(501);
    res.end(‘Not Implemented‘);
  });

  server.listen(port, fn);

  // create engine server
  var engine = exports.attach(server, options);
  engine.httpServer = server;

  return engine;
};

先判断是不是重载的方法,如果是,则没有指定端口,使用默认端口,并且参数移位。后面是将我这种定义方式,变成

var engine = require(‘engine.io‘);
var http = require(‘http‘).createServer().listen(3000);
var server = engine.attach(http);

这是文档上另一种定义方式。

接着是attach()

function attach(server, options) {
  var engine = new exports.Server(options);
  engine.attach(server, options);
  return engine;
};

转到server里面的attach(),里面做的事就比较多了。简单说就是

  // cache and clean up listeners
  var listeners = server.listeners(‘request‘).slice(0);
  server.removeAllListeners(‘request‘);
  server.on(‘close‘, self.close.bind(self));

  // add request handler
  server.on(‘request‘, function(req, res){
    if (check(req)) {
      debug(‘intercepting request for path "%s"‘, path);
      self.handleRequest(req, res);
    } else {
      for (var i = 0, l = listeners.length; i < l; i++) {
        listeners[i].call(server, req, res);
      }
    }
  });

清空监听器,有请求传入时用server.on(‘request‘)监听。

  function check (req) {
    return path == req.url.substr(0, path.length);
  }

一般情况下,check()返回的是true,搞不懂什么时候返回false.看后面好像是负载均衡,需要配置多个监听的时候用。不明白!

接着是self.handleRequest(req, res);

Server.prototype.handleRequest = function(req, res){
  debug(‘handling "%s" http request "%s"‘, req.method, req.url);
  this.prepare(req);
  req.res = res;

  var self = this;
  this.verify(req, false, function(err, success) {
    if (!success) {
      sendErrorMessage(req, res, err);
      return;
    }

    if (req._query.sid) {
      debug(‘setting new request for existing client‘);
      self.clients[req._query.sid].transport.onRequest(req);
    } else {
      self.handshake(req._query.transport, req);
    }
  });
};

this.prepare(req)是将请求里面的查询参数封装到req._query上。然后是verify()

Server.prototype.verify = function(req, upgrade, fn){
  // transport check
  var transport = req._query.transport;
  if (!~this.transports.indexOf(transport)) {
    debug(‘unknown transport "%s"‘, transport);
    return fn(Server.errors.UNKNOWN_TRANSPORT, false);
  }

  // sid check
  var sid = req._query.sid;
  if (sid) {
    if (!this.clients.hasOwnProperty(sid))
      return fn(Server.errors.UNKNOWN_SID, false);
    if (!upgrade && this.clients[sid].transport.name !== transport) {
      debug(‘bad request: unexpected transport without upgrade‘);
      return fn(Server.errors.BAD_REQUEST, false);
    }
  } else {
    // handshake is GET only
    if (‘GET‘ != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);
    if (!this.allowRequest) return fn(null, true);
    return this.allowRequest(req, fn);
  }

  fn(null, true);
};

记下请求中要求的传输方式,例如http://localhost:3000/socket.io/?EIO=3&transport=polling&t=1418545776090-1&sid=M5e5hB6NAOmh7YW1AAAB。可以看到由transport参数指定。然后记下sid,这个请求中也有。注意,第一次请求的时候,由于没有实例socket,所以没有socket的id。这时判断有没有sid,确定是不是第一次请求。

下面我们假定是第一次请求,然后走else.

if (‘GET‘ != req.method),这里要判断下,因为engine.io会通过请求的方法的不同确定后面的执行流程,默认get方法维持长连接,当然发出握手请求的也是get方法,这个后面会说到。

this.allowRequest像是拦截器,拦截握手请求,和传输升级的请求(websocket).

例子没有设置allowRequest方法,就直接fn(null,true),进入回调函数.

  this.verify(req, false, function(err, success) {
    if (!success) {
      sendErrorMessage(req, res, err);
      return;
    }

    if (req._query.sid) {
      debug(‘setting new request for existing client‘);
      self.clients[req._query.sid].transport.onRequest(req);
    } else {
      self.handshake(req._query.transport, req);
    }
  });

如果是第一次请求,后面执行握手;如果不是,则选定已经存在的socket,调用socket上面的transport(传输方式,已经绑定到socket)的onRequest方法,这个后面会说到。现在我们还是第一次请求,所以开始握手。

Server.prototype.handshake = function(transport, req){
  var id = base64id.generateId();

  debug(‘handshaking client "%s"‘, id);

  var transportName = transport;
  try {
    var transport = new transports[transport](req);
    if (‘polling‘ == transportName) {
      transport.maxHttpBufferSize = this.maxHttpBufferSize;
    }

    if (req._query && req._query.b64) {
      transport.supportsBinary = false;
    } else {
      transport.supportsBinary = true;
    }
  }
  catch (e) {
    sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
    return;
  }
  var socket = new Socket(id, this, transport, req);
  var self = this;

  if (false !== this.cookie) {
    transport.on(‘headers‘, function(headers){
      headers[‘Set-Cookie‘] = self.cookie + ‘=‘ + id;
    });
  }

  transport.onRequest(req);

  this.clients[id] = socket;
  this.clientsCount++;

  socket.once(‘close‘, function(){
    delete self.clients[id];
    self.clientsCount--;
  });

  this.emit(‘connection‘, socket);
};

var transport = new transports[transport](req);这里的transports是transports = require(‘./transports‘)。加载的是transprots文件夹下面的index.js,不是transprots.js.

var XHR = require(‘./polling-xhr‘);
var JSONP = require(‘./polling-jsonp‘);

module.exports = exports = {
  polling: polling,
  websocket: require(‘./websocket‘)
};

exports.polling.upgradesTo = [‘websocket‘];

function polling (req) {
  if (‘string‘ == typeof req._query.j) {
    return new JSONP(req);
  } else {
    return new XHR(req);
  }
}

由于transport由请求中的transprot参数决定,这里是polling。所以会执行polling(req),再看请求参数里有没有j,最终确定具体的传输方式。这里是xhr.

function XHR(req){
  Polling.call(this, req);
}

XHR.prototype.__proto__ = Polling.prototype;

这里XHR继承了Polling.然后实例化Polling.

function Polling (req) {
  Transport.call(this, req);
}

Polling.prototype.__proto__ = Transport.prototype;

又去实例化Transport.Transport继承EventEmitter。

回到主线,var socket = new Socket(id, this, transport, req);这里开始实例化socket了。socket里面做了什么,后面会说到。然后在header上设置cookie,值是上面产生的随机id.方便开发者做权限控制。

接着是transport.onRequest(req); 进入polling-xhr.js

XHR.prototype.onRequest = function (req) {
  if (‘OPTIONS‘ == req.method) {
    var res = req.res;
    var headers = this.headers(req);
    headers[‘Access-Control-Allow-Headers‘] = ‘Content-Type‘;
    res.writeHead(200, headers);
    res.end();
  } else {
    Polling.prototype.onRequest.call(this, req);
  }
};

这里会先判断请求方法是不是options,这个好像只有高级浏览器中才会出现。

第一次请求是get方法,走后面。

Polling.prototype.onRequest = function (req) {
  var res = req.res;

  if (‘GET‘ == req.method) {
    this.onPollRequest(req, res);
  } else if (‘POST‘ == req.method) {
    this.onDataRequest(req, res);
  } else {
    res.writeHead(500);
    res.end();
  }
};

又对请求方法进行判断。从方法名字就看的出,get方法用来维持长连接的,当然广播返回的数据也是通过这个get方法。post方法用来传输客户端的数据,比如客户端聊天的文字,如果没有数据,就会发出get请求。

我们还在握手请求,get方法。

Polling.prototype.onPollRequest = function (req, res) {
  if (this.req) {
    debug(‘request overlap‘);
    // assert: this.res, ‘.req and .res should be (un)set together‘
    this.onError(‘overlap from client‘);
    res.writeHead(500);
    return;
  }

  debug(‘setting request‘);

  this.req = req;
  this.res = res;

  var self = this;

  function onClose () {
    self.onError(‘poll connection closed prematurely‘);
  }

  function cleanup () {
    req.removeListener(‘close‘, onClose);
    self.req = self.res = null;
  }
  req.cleanup = cleanup;
  req.on(‘close‘, onClose);

  this.writable = true;
  this.emit(‘drain‘);

  // if we‘re still writable but had a pending close, trigger an empty send
  if (this.writable && this.shouldClose) {
    debug(‘triggering empty send to append close packet‘);
    this.send([{ type: ‘noop‘ }]);
  }
};

onPollRequest()主要是对相应对象,如request,绑定事件,回调函数,后面传输数据时会用到。

回到前面看实例化socket,然后做了什么。

function Socket (id, server, transport, req) {
  this.id = id;
  this.server = server;
  this.upgraded = false;
  this.readyState = ‘opening‘;
  this.writeBuffer = [];
  this.packetsFn = [];
  this.sentCallbackFn = [];
  this.request = req;

  // Cache IP since it might not be in the req later
  this.remoteAddress = req.connection.remoteAddress;

  this.checkIntervalTimer = null;
  this.upgradeTimeoutTimer = null;
  this.pingTimeoutTimer = null;

  this.setTransport(transport);
  this.onOpen();
}

变量初始化后,this.setTransport(transport);

Socket.prototype.setTransport = function (transport) {
  this.transport = transport;
  this.transport.once(‘error‘, this.onError.bind(this));
  this.transport.on(‘packet‘, this.onPacket.bind(this));
  this.transport.on(‘drain‘, this.flush.bind(this));
  this.transport.once(‘close‘, this.onClose.bind(this, ‘transport close‘));
  //this function will manage packet events (also message callbacks)
  this.setupSendCallback();
};

对前面实例化的transport绑定事件,回调函数。然后是onOpen()

Socket.prototype.onOpen = function () {
  this.readyState = ‘open‘;

  // sends an `open` packet
  this.transport.sid = this.id;
  this.sendPacket(‘open‘, JSON.stringify({
      sid: this.id
    , upgrades: this.getAvailableUpgrades()
    , pingInterval: this.server.pingInterval
    , pingTimeout: this.server.pingTimeout
  }));

  this.emit(‘open‘);
  this.setPingTimeout();
};

sendPacket()开始传输针对握手请求的响应,然后是设置超时定时器。

Socket.prototype.setPingTimeout = function () {
  var self = this;
  clearTimeout(self.pingTimeoutTimer);
  self.pingTimeoutTimer = setTimeout(function () {
    self.onClose(‘ping timeout‘);
  }, self.server.pingInterval + self.server.pingTimeout);
};

回到sendPacket()

Socket.prototype.sendPacket = function (type, data, callback) {
  if (‘closing‘ != this.readyState) {
    debug(‘sending packet "%s" (%s)‘, type, data);

    var packet = { type: type };
    if (data) packet.data = data;

    // exports packetCreate event
    this.emit(‘packetCreate‘, packet);

    this.writeBuffer.push(packet);

    //add send callback to object
    this.packetsFn.push(callback);

    this.flush();
  }
};

this.writeBuffer是暂时保存响应数据的缓冲。packet是用来包装响应数据的。然后flush输出。

Socket.prototype.flush = function () {
  if (‘closed‘ != this.readyState && this.transport.writable
    && this.writeBuffer.length) {
    debug(‘flushing buffer to transport‘);
    this.emit(‘flush‘, this.writeBuffer);
    this.server.emit(‘flush‘, this, this.writeBuffer);
    var wbuf = this.writeBuffer;
    this.writeBuffer = [];
    if (!this.transport.supportsFraming) {
      this.sentCallbackFn.push(this.packetsFn);
    } else {
      this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
    }
    this.packetsFn = [];
    this.transport.send(wbuf);
    this.emit(‘drain‘);
    this.server.emit(‘drain‘, this);
  }
};

触发各种绑定的事件,最后调用this.transport.send(wbuf);

Polling.prototype.send = function (packets) {
  if (this.shouldClose) {
    debug(‘appending close packet to payload‘);
    packets.push({ type: ‘close‘ });
    this.shouldClose();
    this.shouldClose = null;
  }

  var self = this;
  parser.encodePayload(packets, this.supportsBinary, function(data) {
    self.write(data);
  });
};

对buffer解析后,write()

Polling.prototype.write = function (data) {
  debug(‘writing "%s"‘, data);
  this.doWrite(data);
  this.req.cleanup();
  this.writable = false;
};

doWrite()由子类实现。

XHR.prototype.doWrite = function(data){
  // explicit UTF-8 is required for pages not served under utf
  var isString = typeof data == ‘string‘;
  var contentType = isString
    ? ‘text/plain; charset=UTF-8‘
    : ‘application/octet-stream‘;
  var contentLength = ‘‘ + (isString ? Buffer.byteLength(data) : data.length);

  var headers = {
    ‘Content-Type‘: contentType,
    ‘Content-Length‘: contentLength
  };

  // prevent XSS warnings on IE
  // https://github.com/LearnBoost/socket.io/pull/1333
  var ua = this.req.headers[‘user-agent‘];
  if (ua && (~ua.indexOf(‘;MSIE‘) || ~ua.indexOf(‘Trident/‘))) {
    headers[‘X-XSS-Protection‘] = ‘0‘;
  }

  this.res.writeHead(200, this.headers(this.req, headers));
  this.res.end(data);
};
时间: 2024-08-09 12:53:30

engine.io分析2--socket.io的基石的相关文章

socket.io 中文手册 socket.io 中文文档

socket.io 中文手册,socket.io 中文文档转载于:http://www.cnblogs.com/xiezhengcai/p/3956401.html 服务端 io.on('connection',function(socket));//监听客户端连接,回调函数会传递本次连接的socket io.sockets.emit('String',data);//给所有客户端广播消息 io.sockets.socket(socketid).emit('String', data);//给指

Socket.IO 概述

为了防止无良网站的爬虫抓取文章,特此标识,转载请注明文章出处.LaplaceDemon/SJQ. http://www.cnblogs.com/shijiaqi1066/p/3826251.html Socket.IO简述 Socket.IO用于浏览器与node.js之间实现实时通信.Socket.IO设计的目标是支持任何的浏览器,任何Mobile设备.支持主流的PC浏览器 (IE,Safari,Chrome,Firefox,Opera等),Mobile浏览器(iphone Safari/ipa

nodejs 基于socket.io实现聊天室

由于之后要做的网页视频直播项目要用到socket.io模块,所以特地花时间研究了下,参照网上的代码做了些改进,自己写了个聊天室代码.不得不承认后端事实推送能力有点厉害,这是以前我用php一直苦恼的事情.下面简单介绍下我的项目,顺带讲解下nodejs. 事实上,在看别人写的代码之前,我一直不知道nodejs是干嘛的,直到真正接触到才明白这也可以算作是服务端代码,丰富的第三方库使其功能极其强大.它可以像golang的beego一样直接通过命令行开启服务器,不过要用到express模块.加载模块的方式

socket.io 1.x迁移指南

转载请注明: TheViper http://www.cnblogs.com/TheViper socket.io 1.x是从今年5月底开始发布更新的,从版本号看的出,这是次大更新.具体参见https://github.com/Automattic/socket.io/wiki/Migrating-to-1.0.我就说几点最重要的. 日志输出 0.x版本的日志输出都是直接在终端或命令行输出,使用者只能控制是否输出日志.在1.x里面,使用者还可以指定输出什么,比如,DEBUG=socket.io:

websocket 与Socket.IO介绍

一  websocket WebSocket是html5新增加的一种通信协议,目前流行的浏览器都支持这个协议,例如 Chrome,Safrie,Firefox,Opera,IE等等,对该协议支持最早的应该是chrome,从chrome12就已经开始支持,随着协 议草案的不断变化,各个浏览器对协议的实现也在不停的更新.该协议还是草案,没有成为标准,不过成为标准应该只是时间问题了. 1. WebSocket API 首先看一段简单的javascript代码,该代码调用了WebSockets的API.

反向Ajax,第3部分:Web服务器和Socket.IO

英文原文:Reverse Ajax, Part 3: Web servers and Socket.IO 前言 时至今日,用户期待的是可通过web访问快速.动态的应用.这一文章系列展示了如何使用反向Ajax(Reverse Ajax)技术来开发事件驱动的web应用.系列的第1部分介绍了反向Ajax.轮询(polling).流(streaming).Comet和长轮询(long polling).你已经了解了Comet是如何使用HTTP长轮询的,这是可靠地实现反向Ajax的最好方式,因为现有的所有

关于Socket.IO的知识点记录

最近因为项目的需要,开始学习nodejs,本着js的那点儿功底,nodejs学习起来还是挺快能上手的.随着深入学习,知道了express框架并那它写了一个小功能,作为一个php程序员哈,在express框架路由.模板渲染那里看到了Yii2的影子,所以便更加的亲切了.再接着便接触到了websocket,而今天谈论的socket.io 便是websocket的一个类库,说道这里了,我们先去了解下websocket和socket.io: 一  websocket WebSocket是html5新增加的

socket.io对IE8的支持

默认下载了最新版的socket.io,版本号是1.7.2,对IE8的支持不好,反正在IE8下收发消息都不行.在网上查了很多资料,都解决不了IE8的问题,崩溃. 最后用了一个大家比较认可的版本1.0.6,可以支持IE8: 卸载socket.io npm uninstall socket.io 安装1.0.6版本的socket.io npm install [email protected] 后面的一些版本没有一一的去试,下了一个1.3.7版本的,还是可以支持收发消息的,只是发现IE8在关闭窗口时,

【socket.io研究】0.前提准备

WebSocket出现之前,web实时推送,一般采用轮询和Comet技术(可细分为长轮询机制和流技术两种),需要大量http请求,服务器受不了.HTML5定义了WebSocket协议,基于TCP协议,由通讯协议和编程API组成,在浏览器和服务器之间建立双向连接,以基于事件的方式,赋予浏览器实时通讯的能力. 建立WebSocket连接的过程是浏览器首先发起一个http请求,在请求头中附带着“Upgrade: WebSocket”头信息,表名申请协议升级,服务器解析后产生应答信息,服务器与客户端的W

TCG开发日志(3)socket.io

为了让网页端和服务端能随时互相通讯,以前在HTTP下有多种写法,但是现在我们可以使用websocket,至少在Chrome上可以. 先安装socket.io npm install socket.io --save 在服务端,可以将socket.io和KOA结合起来: import Koa from 'koa';import IO from 'socket.io' let app = new Koa();let server = require('http').Server(app.callba