uws.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. var Gun = require('../gun');
  2. var WebSocket = require('uws');
  3. var url = require('url');
  4. console.log("Experimental high performance uWS server is being used.");
  5. Gun.on('opt', function mount(ctx){
  6. this.to.next(ctx);
  7. var opt = ctx.opt;
  8. if(ctx.once){ return }
  9. if(!opt.web){ return }
  10. var ws = opt.uws || opt.ws || (opt.uws = {}), batch;
  11. ws.server = ws.server || opt.web;
  12. ws.path = ws.path || '/gun';
  13. ws.web = new WebSocket.Server(ws);
  14. ws.web.on('connection', function(wire){
  15. wire.upgradeReq = wire.upgradeReq || {};
  16. wire.url = url.parse(wire.upgradeReq.url||'', true);
  17. wire.id = wire.id || Gun.text.random(6);
  18. var peer = opt.peers[wire.id] = {wire: wire};
  19. peer.wire = function(){ return peer };
  20. ctx.on('hi', peer);
  21. wire.on('message', function(msg){
  22. //console.log("MESSAGE", msg);
  23. receive(msg, wire, ctx); // diff: wire is wire.
  24. });
  25. wire.on('close', function(){
  26. ctx.on('bye', peer);
  27. Gun.obj.del(opt.peers, wire.id);
  28. });
  29. wire.on('error', function(e){});
  30. });
  31. ctx.on('out', function(at){
  32. this.to.next(at);
  33. batch = JSON.stringify(at);
  34. if(ws.drain){
  35. ws.drain.push(batch);
  36. return;
  37. }
  38. ws.drain = [];
  39. setTimeout(function(){
  40. if(!ws.drain){ return }
  41. var tmp = ws.drain;
  42. ws.drain = null;
  43. if(!tmp.length){ return }
  44. batch = JSON.stringify(tmp);
  45. Gun.obj.map(opt.peers, send, ctx);
  46. }, opt.gap || opt.wait || 1);
  47. Gun.obj.map(opt.peers, send, ctx);
  48. });
  49. // EVERY message taken care of. The "extra" ones are from in-memory not having "asked" for it yet - which we won't want it to do for foreign requests. Likewise, lots of chattyness because the put/ack replies happen before the `get` syncs so everybody now has it in-memory already to reply with.
  50. function send(peer){
  51. var ctx = this, msg = batch;
  52. var wire = peer.wire || open(peer, ctx);
  53. if(!wire){ return }
  54. if(wire.readyState === wire.OPEN){
  55. wire.send(msg);
  56. return;
  57. }
  58. (peer.queue = peer.queue || []).push(msg);
  59. }
  60. function receive(msg, wire, ctx){
  61. if(!ctx){ return }
  62. try{msg = JSON.parse(msg.data || msg);
  63. }catch(e){}
  64. if(msg instanceof Array){
  65. var i = 0, m;
  66. while(m = msg[i++]){
  67. receive(m, wire, ctx); // wire not peer!
  68. }
  69. return;
  70. }
  71. msg.peer = wire.peer;
  72. ctx.on('in', msg);
  73. }
  74. function open(peer, as){
  75. if(!peer || !peer.url){ return }
  76. var url = peer.url.replace('http', 'ws');
  77. var wire = peer.wire = new WebSocket(url);
  78. wire.on('close', function(){
  79. reconnect(peer, as);
  80. });
  81. wire.on('error', function(error){
  82. if(!error){ return }
  83. if(error.code === 'ECONNREFUSED'){
  84. reconnect(peer, as); // placement?
  85. }
  86. });
  87. wire.on('open', function(){
  88. var queue = peer.queue;
  89. peer.queue = [];
  90. Gun.obj.map(queue, function(msg){
  91. batch = msg;
  92. send.call(as, peer);
  93. });
  94. });
  95. wire.on('message', function(msg){
  96. receive(msg, wire, as); // diff: wire not peer!
  97. });
  98. return wire;
  99. }
  100. function reconnect(peer, as){
  101. clearTimeout(peer.defer);
  102. peer.defer = setTimeout(function(){
  103. open(peer, as);
  104. }, 2 * 1000);
  105. }
  106. });