wsp.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. ;(function(wsp){
  2. var Gun = require('../gun')
  3. , ws = require('ws').Server
  4. , http = require('./http')
  5. , url = require('url');
  6. Gun.on('opt').event(function(gun, opt){
  7. gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
  8. function start(server, port, app){
  9. if(app && app.use){ app.use(gun.wsp.server) }
  10. server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
  11. require('./ws')(gun.wsp.ws = gun.wsp.ws || new ws(gun.__.opt.ws), function(req, res){
  12. var ws = this;
  13. req.headers['gun-sid'] = ws.sid = (ws.sid? ws.sid : req.headers['gun-sid']);
  14. ws.sub = ws.sub || gun.wsp.on('network').event(function(msg){
  15. if(!ws || !ws.send || !ws._socket || !ws._socket.writable){ return this.off() }
  16. if(!msg || (ws.sid && msg.headers && msg.headers['gun-sid'] === ws.sid)){ return }
  17. if(msg && msg.headers){ delete msg.headers['ws-rid'] }
  18. // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
  19. try{ws.send(Gun.text.ify(msg));
  20. }catch(e){} // juuuust in case.
  21. });
  22. gun.wsp.wire(req, res);
  23. });
  24. gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
  25. }
  26. var wsp = gun.wsp = gun.wsp || function(server, auth){
  27. gun.wsp.auth = auth;
  28. if(!server){ return gun }
  29. if(Gun.fns.is(server.address)){
  30. if(server.address()){
  31. start(server, server.address().port);
  32. return gun;
  33. }
  34. }
  35. if(Gun.fns.is(server.get) && server.get('port')){
  36. start(server, server.get('port'));
  37. return gun;
  38. }
  39. var listen = server.listen;
  40. server.listen = function(port){
  41. var serve = listen.apply(server, arguments);
  42. start(serve, port, server);
  43. return serve;
  44. }
  45. return gun;
  46. }
  47. gun.wsp.on = gun.wsp.on || Gun.on.create();
  48. gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
  49. gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
  50. gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
  51. gun.wsp.server = gun.wsp.server || function(req, res, next){ // http
  52. next = next || function(){};
  53. if(!req || !res){ return next(), false }
  54. if(!req.url){ return next(), false }
  55. if(!req.method){ return next(), false }
  56. var msg = {};
  57. msg.url = url.parse(req.url, true);
  58. if(!gun.wsp.regex.test(msg.url.pathname)){ return next(), false } // TODO: BUG! If the option isn't a regex then this will fail!
  59. if(msg.url.pathname.replace(gun.wsp.regex,'').slice(0,3).toLowerCase() === '.js'){
  60. res.writeHead(200, {'Content-Type': 'text/javascript'});
  61. res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../gun.js')); // gun server is caching the gun library for the client
  62. return true;
  63. }
  64. return http(req, res, function(req, res){
  65. if(!req){ return next() }
  66. var stream, cb = res = require('./jsonp')(req, res);
  67. if(req.headers && (stream = req.headers['gun-sid'])){
  68. stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
  69. stream.sub = stream.sub || gun.wsp.on('network').event(function(req){
  70. if(!stream){ return this.off() } // self cleans up after itself!
  71. if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
  72. (stream.queue = stream.queue || []).push(req);
  73. stream.drain(stream.reply);
  74. });
  75. cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
  76. stream.drain = stream.drain || function(res){
  77. if(!res || !stream || !stream.queue || !stream.queue.length){ return }
  78. res({headers: {'gun-sid': stream.sid}, body: stream.queue });
  79. stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
  80. stream.reply = stream.queue = null;
  81. return true;
  82. }
  83. clearTimeout(stream.off);
  84. if(req.headers.pull){
  85. if(stream.drain(cb)){ return }
  86. return stream.reply = cb;
  87. }
  88. }
  89. gun.wsp.wire(req, cb);
  90. }), true;
  91. }
  92. if((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false){
  93. require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
  94. }
  95. gun.wsp.msg = gun.wsp.msg || function(id){
  96. if(!id){
  97. return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
  98. }
  99. clearTimeout(gun.wsp.msg.clear);
  100. gun.wsp.msg.clear = setTimeout(function(){
  101. var now = Gun.time.is();
  102. Gun.obj.map(gun.wsp.msg.debounce, function(t,id){
  103. if((now - t) < (1000 * 60 * 5)){ return }
  104. Gun.obj.del(gun.wsp.msg.debounce, id);
  105. });
  106. },500);
  107. if(id = gun.wsp.msg.debounce[id]){
  108. return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
  109. }
  110. };
  111. gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
  112. gun.wsp.wire = gun.wsp.wire || (function(){
  113. // all streams, technically PATCH but implemented as PUT or POST, are forwarded to other trusted peers
  114. // except for the ones that are listed in the message as having already been sending to.
  115. // all states, implemented with GET, are replied to the source that asked for it.
  116. function flow(req, res){
  117. if (!req.auth || req.headers.broadcast) {
  118. gun.wsp.on('network').emit(Gun.obj.copy(req));
  119. }
  120. if(req.headers.rid){ return } // no need to process.
  121. if(Gun.is.lex(req.body)){ return tran.get(req, res) }
  122. else { return tran.put(req, res) }
  123. }
  124. function tran(req, res){
  125. if(!req || !res || !req.body || !req.headers || !req.headers.id){ return }
  126. if(gun.wsp.msg(req.headers.id)){ return }
  127. req.method = (req.body && !Gun.is.lex(req.body))? 'put' : 'get';
  128. if(gun.wsp.auth){ return gun.wsp.auth(req, function(reply){
  129. if(!reply.headers){ reply.headers = {} }
  130. if(!reply.headers['Content-Type']){ reply.headers['Content-Type'] = tran.json }
  131. if(!reply.rid){ reply.headers.rid = req.headers.id }
  132. if(!reply.id){ reply.headers.id = gun.wsp.msg() }
  133. res(reply);
  134. }, flow) }
  135. else { return flow(req, res) }
  136. }
  137. tran.get = function(req, cb){
  138. var key = req.url.key
  139. , reply = {headers: {'Content-Type': tran.json, rid: req.headers.id, id: gun.wsp.msg()}};
  140. //Gun.log(req);
  141. // NTS HACK! SHOULD BE ITS OWN ISOLATED MODULE! //
  142. if(req && req.url && req.url.pathname && req.url.pathname.indexOf('gun.nts') >= 0){
  143. return cb({headers: reply.headers, body: {time: Gun.time.is() }});
  144. }
  145. // NTS END! SHOULD HAVE BEEN ITS OWN MODULE //
  146. // ALL HACK! SHOULD BE ITS OWN MODULE OR CORE? //
  147. if(req && req.url && Gun.obj.has(req.url.query, '*')){
  148. return gun.all(req.url.key + req.url.search, function(err, list){
  149. cb({headers: reply.headers, body: (err? (err.err? err : {err: err || "Unknown error."}) : list || null ) })
  150. });
  151. }
  152. //Gun.log("GET!", req);
  153. key = req.body;
  154. //Gun.log("tran.get", key);
  155. var opt = {key: false, local: true};
  156. //gun.get(key, function(err, node){
  157. (gun.__.opt.wire.get||function(key, cb){cb(null,null)})(key, function(err, node){
  158. //Gun.log("tran.get", key, "<---", err, node);
  159. reply.headers.id = gun.wsp.msg();
  160. if(err || !node){
  161. if(opt.on && opt.on.off){ opt.on.off() }
  162. return cb({headers: reply.headers, body: (err? (err.err? err : {err: err || "Unknown error."}) : null)});
  163. }
  164. if(Gun.obj.empty(node)){
  165. if(opt.on && opt.on.off){ opt.on.off() }
  166. return cb({headers: reply.headers, body: node});
  167. } // we're out of stuff!
  168. /*
  169. (function(chunks){ // FEATURE! Stream chunks if the nodes are large!
  170. var max = 10, count = 0, soul = Gun.is.node.soul(node);
  171. if(Object.keys(node).length > max){
  172. var n = Gun.is.node.soul.ify({}, soul);
  173. Gun.obj.map(node, function(val, field){
  174. if(!(++count % max)){
  175. cb({headers: reply.headers, chunk: n}); // send node chunks
  176. n = Gun.is.node.soul.ify({}, soul);
  177. }
  178. Gun.is.node.state.ify([n, node], field, val);
  179. });
  180. if(count % max){ // finish off the last chunk
  181. cb({headers: reply.headers, chunk: n});
  182. }
  183. } else {
  184. cb({headers: reply.headers, chunk: node}); // send full node
  185. }
  186. }([]));
  187. */
  188. cb({headers: reply.headers, chunk: node }); // Use this if you don't want streaming chunks feature.
  189. }, opt);
  190. }
  191. tran.put = function(req, cb){
  192. // NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
  193. // This will give you much more fine-grain control over security, transactions, and what not.
  194. var reply = {headers: {'Content-Type': tran.json, rid: req.headers.id, id: gun.wsp.msg()}};
  195. if(!req.body){ return cb({headers: reply.headers, body: {err: "No body"}}) }
  196. //Gun.log("\n\ntran.put ----------------->", req.body);
  197. if(Gun.is.graph(req.body)){
  198. if(req.err = Gun.union(gun, req.body, function(err, ctx){ // TODO: BUG? Probably should give me ctx.graph
  199. if(err){ return cb({headers: reply.headers, body: {err: err || "Union failed."}}) }
  200. var ctx = ctx || {}; ctx.graph = {};
  201. Gun.is.graph(req.body, function(node, soul){
  202. ctx.graph[soul] = gun.__.graph[soul];
  203. });
  204. (gun.__.opt.wire.put || function(g,cb){cb("No save.")})(ctx.graph, function(err, ok){
  205. if(err){ return cb({headers: reply.headers, body: {err: err || "Failed."}}) } // TODO: err should already be an error object?
  206. cb({headers: reply.headers, body: {ok: ok || "Persisted."}});
  207. //Gun.log("tran.put <------------------------", ok);
  208. }, {local: true});
  209. }).err){ cb({headers: reply.headers, body: {err: req.err || "Union failed."}}) }
  210. } else {
  211. cb({headers: reply.headers, body: {err: "Not a valid graph!"}});
  212. }
  213. }
  214. gun.wsp.on('network').event(function(req){
  215. // TODO: MARK! You should move the networking events to here, not in WSS only.
  216. });
  217. tran.json = 'application/json';
  218. return tran;
  219. }());
  220. if(opt.server){
  221. wsp(opt.server);
  222. }
  223. if(gun.wsp.driver){ return }
  224. var driver = gun.wsp.driver = {};
  225. var noop = function(){};
  226. var get = gun.__.opt.wire.get || noop;
  227. var put = gun.__.opt.wire.put || noop;
  228. var driver = {
  229. put: function(graph, cb, opt){
  230. put(graph, cb, opt);
  231. opt = opt || {};
  232. if(opt.local){ return }
  233. var id = gun.wsp.msg();
  234. gun.wsp.on('network').emit({ // sent to dynamic peers!
  235. headers: {'Content-Type': 'application/json', id: id},
  236. body: graph
  237. });
  238. var ropt = {headers:{}, WebSocket: WebSocket};
  239. ropt.headers.id = id;
  240. Gun.obj.map(opt.peers || gun.__.opt.peers, function(peer, url){
  241. Gun.request(url, graph, function(err, reply){
  242. reply.body = reply.body || reply.chunk || reply.end || reply.write;
  243. if(err || !reply || (err = reply.body && reply.body.err)){
  244. return cb({err: Gun.log(err || "Put failed.") });
  245. }
  246. cb(null, reply.body);
  247. }, ropt);
  248. });
  249. },
  250. get: function(lex, cb, opt){
  251. get(lex, cb, opt);
  252. opt = opt || {};
  253. if(opt.local){ return }
  254. if(!Gun.request){ return console.log("Server could not find default network abstraction.") }
  255. var ropt = {headers:{}};
  256. ropt.headers.id = gun.wsp.msg();
  257. Gun.obj.map(opt.peers || gun.__.opt.peers, function(peer, url){
  258. Gun.request(url, lex, function(err, reply){
  259. reply.body = reply.body || reply.chunk || reply.end || reply.write;
  260. if(err || !reply || (err = reply.body && reply.body.err)){
  261. return cb({err: Gun.log(err || "Get failed.") });
  262. }
  263. cb(null, reply.body);
  264. }, ropt);
  265. });
  266. }
  267. }
  268. var WebSocket = require('ws');
  269. Gun.request.WebSocket = WebSocket;
  270. Gun.request.createServer(gun.wsp.wire);
  271. gun.__.opt.wire = driver;
  272. gun.opt({wire: driver}, true);
  273. });
  274. }({}));