ws.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. var Gun = require('../gun');
  2. var WebSocket = require('ws');
  3. var url = require('url');
  4. Gun.on('opt', function mount(ctx){
  5. this.to.next(ctx);
  6. var opt = ctx.opt;
  7. if( !opt.peers )
  8. if( typeof( opt == "string" ) )
  9. opt.peers = [opt];
  10. if(ctx.once){ return }
  11. if(false === opt.ws){ return }
  12. var ws = opt.ws || (opt.ws = {}), batch;
  13. if(opt.web){
  14. ws.server = ws.server || opt.web;
  15. ws.path = ws.path || '/gun';
  16. if (!ws.web) ws.web = new WebSocket.Server(ws);
  17. ws.web.on('connection', function(wire){
  18. wire.upgradeReq = wire.upgradeReq || {};
  19. wire.url = url.parse(wire.upgradeReq.url||'', true);
  20. wire.id = wire.id || Gun.text.random(6);
  21. var peer = opt.peers[wire.id] = {wire: wire};
  22. wire.peer = function(){ return peer };
  23. ctx.on('hi', peer);
  24. wire.on('message', function(msg){
  25. //console.log("MESSAGE", msg);
  26. receive(msg, wire, ctx); // diff: wire is wire.
  27. });
  28. wire.on('close', function(){
  29. ctx.on('bye', peer);
  30. Gun.obj.del(opt.peers, wire.id);
  31. });
  32. wire.on('error', function(e){});
  33. });
  34. }
  35. ctx.on('out', function(at){
  36. this.to.next(at);
  37. batch = JSON.stringify(at);
  38. if(ws.drain){
  39. ws.drain.push(batch);
  40. return;
  41. }
  42. ws.drain = [];
  43. setTimeout(function(){
  44. if(!ws.drain){ return }
  45. var tmp = ws.drain;
  46. ws.drain = null;
  47. if(!tmp.length){ return }
  48. batch = JSON.stringify(tmp);
  49. Gun.obj.map(opt.peers, send, ctx);
  50. }, opt.gap || opt.wait || 1);
  51. Gun.obj.map(opt.peers, send, ctx);
  52. });
  53. // EVERY message taken care of. The "extra" ones are from in-memory not having "asked" for it yet - which we won't want it to do for foreign requests. Likewise, lots of chattyness because the put/ack replies happen before the `get` syncs so everybody now has it in-memory already to reply with.
  54. function send(peer){
  55. var ctx = this, msg = batch;
  56. var wire = peer.wire || open(peer, ctx);
  57. if(!wire){ return }
  58. if(wire.readyState === wire.OPEN){
  59. wire.send(msg);
  60. return;
  61. }
  62. (peer.queue = peer.queue || []).push(msg);
  63. }
  64. function receive(msg, wire, ctx){
  65. if(!ctx){ return }
  66. try{msg = JSON.parse(msg.data || msg);
  67. }catch(e){}
  68. if(msg instanceof Array){
  69. var i = 0, m;
  70. while(m = msg[i++]){
  71. receive(m, wire, ctx); // wire not peer!
  72. }
  73. return;
  74. }
  75. msg.peer = wire.peer;
  76. ctx.on('in', msg);
  77. }
  78. function open(peer, as){
  79. if(!peer || !peer.url){ return }
  80. var url = peer.url.replace('http', 'ws');
  81. var wire = peer.wire = new WebSocket(url);
  82. wire.on('close', function(){
  83. reconnect(peer, as);
  84. });
  85. wire.on('error', function(error){
  86. if(!error){ return }
  87. if(error.code === 'ECONNREFUSED'){
  88. reconnect(peer, as); // placement?
  89. }
  90. });
  91. wire.on('open', function(){
  92. var queue = peer.queue;
  93. peer.queue = [];
  94. Gun.obj.map(queue, function(msg){
  95. batch = msg;
  96. send.call(as, peer);
  97. });
  98. });
  99. wire.on('message', function(msg){
  100. receive(msg, wire, as); // diff: wire not peer!
  101. });
  102. return wire;
  103. }
  104. function reconnect(peer, as){
  105. clearTimeout(peer.defer);
  106. peer.defer = setTimeout(function(){
  107. open(peer, as);
  108. }, 2 * 1000);
  109. }
  110. });