123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- var Gun = require('../gun');
- var WebSocket = require('uws');
- var url = require('url');
- console.log("Experimental high performance uWS server is being used.");
- Gun.on('opt', function mount(ctx){
- this.to.next(ctx);
- var opt = ctx.opt;
- if(ctx.once){ return }
- if(!opt.web){ return }
- var ws = opt.uws || opt.ws || (opt.uws = {}), batch;
- ws.server = ws.server || opt.web;
- ws.path = ws.path || '/gun';
- ws.web = new WebSocket.Server(ws);
- ws.web.on('connection', function(wire){
- wire.upgradeReq = wire.upgradeReq || {};
- wire.url = url.parse(wire.upgradeReq.url||'', true);
- wire.id = wire.id || Gun.text.random(6);
- var peer = opt.peers[wire.id] = {wire: wire};
- peer.wire = function(){ return peer };
- ctx.on('hi', peer);
- wire.on('message', function(msg){
- //console.log("MESSAGE", msg);
- receive(msg, wire, ctx); // diff: wire is wire.
- });
- wire.on('close', function(){
- ctx.on('bye', peer);
- Gun.obj.del(opt.peers, wire.id);
- });
- wire.on('error', function(e){});
- });
- ctx.on('out', function(at){
- this.to.next(at);
- batch = JSON.stringify(at);
- if(ws.drain){
- ws.drain.push(batch);
- return;
- }
- ws.drain = [];
- setTimeout(function(){
- if(!ws.drain){ return }
- var tmp = ws.drain;
- ws.drain = null;
- if(!tmp.length){ return }
- batch = JSON.stringify(tmp);
- Gun.obj.map(opt.peers, send, ctx);
- }, opt.gap || opt.wait || 1);
- Gun.obj.map(opt.peers, send, ctx);
- });
- // EVERY message taken care of. The "extra" ones are from in-memory not having "asked" for it yet - which we won't want it to do for foreign requests. Likewise, lots of chattyness because the put/ack replies happen before the `get` syncs so everybody now has it in-memory already to reply with.
- function send(peer){
- var ctx = this, msg = batch;
- var wire = peer.wire || open(peer, ctx);
- if(!wire){ return }
- if(wire.readyState === wire.OPEN){
- wire.send(msg);
- return;
- }
- (peer.queue = peer.queue || []).push(msg);
- }
- function receive(msg, wire, ctx){
- if(!ctx){ return }
- try{msg = JSON.parse(msg.data || msg);
- }catch(e){}
- if(msg instanceof Array){
- var i = 0, m;
- while(m = msg[i++]){
- receive(m, wire, ctx); // wire not peer!
- }
- return;
- }
- msg.peer = wire.peer;
- ctx.on('in', msg);
- }
- function open(peer, as){
- if(!peer || !peer.url){ return }
- var url = peer.url.replace('http', 'ws');
- var wire = peer.wire = new WebSocket(url);
- wire.on('close', function(){
- reconnect(peer, as);
- });
- wire.on('error', function(error){
- if(!error){ return }
- if(error.code === 'ECONNREFUSED'){
- reconnect(peer, as); // placement?
- }
- });
- wire.on('open', function(){
- var queue = peer.queue;
- peer.queue = [];
- Gun.obj.map(queue, function(msg){
- batch = msg;
- send.call(as, peer);
- });
- });
- wire.on('message', function(msg){
- receive(msg, wire, as); // diff: wire not peer!
- });
- return wire;
- }
- function reconnect(peer, as){
- clearTimeout(peer.defer);
- peer.defer = setTimeout(function(){
- open(peer, as);
- }, 2 * 1000);
- }
- });
|