virtualTime.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. /*
  2. The MIT License (MIT)
  3. Copyright (c) 2014-2020 Nikolai Suslov and the Krestianstvo.org project contributors. (https://github.com/NikolaySuslov/livecodingspace/blob/master/LICENSE.md)
  4. Virtual World Framework Apache 2.0 license (https://github.com/NikolaySuslov/livecodingspace/blob/master/licenses/LICENSE_VWF.md)
  5. */
  6. import { Helpers } from '/core/helpers.js';
  7. import { Utility } from '/core/vwf/utility/utility.js';
  8. class VirtualTime {
  9. constructor() {
  10. console.log("Virtual Time constructor");
  11. this.helpers = new Helpers;
  12. this.utility = new Utility;
  13. /// The simulation clock, which contains the current time in seconds. Time is controlled by
  14. /// the reflector and updates here as we receive control messages.
  15. ///
  16. /// @name module:vwf.now
  17. ///
  18. /// @private
  19. this.now = 0;
  20. /// The queue's sequence number for the currently executing action.
  21. ///
  22. /// The queue enumerates actions in order of arrival, which is distinct from execution order
  23. /// since actions may be scheduled to run in the future. `sequence_` can be used to
  24. /// distinguish between actions that were previously placed on the queue for execution at a
  25. /// later time, and those that arrived after the current action, regardless of their
  26. /// scheduled time.
  27. ///
  28. /// @name module:vwf.sequence_
  29. ///
  30. /// @private
  31. this.sequence_ = undefined
  32. /// The moniker of the client responsible for the currently executing action. `client_` will
  33. /// be falsy for actions originating in the server, such as time ticks.
  34. ///
  35. /// @name module:vwf.client_
  36. ///
  37. /// @private
  38. this.client_ = undefined
  39. ///// From queue props
  40. /// Current time as provided by the reflector. Messages to be executed at this time or
  41. /// earlier are available from #pull.
  42. ///
  43. /// @name module:vwf~queue.time
  44. this.time = 0
  45. /// Suspension count. Queue processing is suspended when suspension is greater than 0.
  46. ///
  47. /// @name module:vwf~queue.suspension
  48. this.suspension = 0
  49. /// Sequence counter for tagging messages by order of arrival. Messages are sorted by
  50. /// time, origin, then by arrival order.
  51. ///
  52. /// @name module:vwf~queue.sequence
  53. this.sequence = 0
  54. /// Array containing the messages in the queue.
  55. ///
  56. /// @name module:vwf~queue.queue
  57. this.queue = new Heap({
  58. compar: this.queueSort
  59. }) //[]
  60. //Add Stream support
  61. this.initReflectorStream();
  62. }
  63. initReflectorStream() {
  64. const self = this;
  65. this.streamDelay = 0;
  66. this.streamDefaultScheduler = M.scheduler.newDefaultScheduler();
  67. //this.streamDefaultScheduler = M.scheduler.schedulerRelativeTo(self.startTime, this.streamDefaultS);
  68. //this.streamScheduler = M.scheduler.newDefaultScheduler();
  69. const [induce, events] = M.createAdapter();
  70. this.streamAdapter = {
  71. induce: induce,
  72. events: events,
  73. };
  74. const tapFunction = function (res) {
  75. let mostTime =
  76. M.scheduler.currentTime(self.streamDefaultScheduler) / 1000;
  77. if (_app.config.streamMsg) {
  78. console.log('STREAM: ', res, ' TIME: ', mostTime);
  79. self.insert(res, !res.action);
  80. }
  81. };
  82. this.reflectorStream = M.multicast(events); //M.concatMap((x) => M.fromPromise(x()), events);
  83. const resultStream = M.concatMap(el => {
  84. return M.delay(this.streamDelay, M.now(el))
  85. }, this.reflectorStream);
  86. // M.delay(5000, this.reflectorStream)
  87. this.eventsStream = M.tap((res) => {
  88. tapFunction(res);
  89. }, resultStream);
  90. M.runEffects(this.eventsStream, this.streamDefaultScheduler);
  91. }
  92. /// Insert a message or messages into the queue. Optionally execute the simulation
  93. /// through the time marked on the message.
  94. ///
  95. /// When chronic (chron-ic) is set, vwf#dispatch is called to execute the simulation up
  96. /// through the indicated time. To prevent actions from executing out of order, insert
  97. /// should be the caller's last operation before returning to the host when invoked with
  98. /// chronic.
  99. ///
  100. /// @name module:vwf~queue.insert
  101. ///
  102. /// @param {Object|Object[]} fields
  103. /// @param {Boolean} [chronic]
  104. insert(fields, chronic) {
  105. var messages = fields instanceof Array ? fields : [fields];
  106. messages.forEach(function (fields) {
  107. // if ( fields.action ) { // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
  108. fields.sequence = ++this.sequence; // track the insertion order for use as a sort key
  109. this.queue.insert(fields);
  110. // }
  111. if (chronic) {
  112. this.time = Math.max(this.time, fields.time); // save the latest allowed time for suspend/resume
  113. }
  114. }, this);
  115. //Sort here (now in Heap)
  116. if (chronic) {
  117. this.dispatch();
  118. }
  119. }
  120. /// Pull the next message from the queue.
  121. ///
  122. /// @name module:vwf~queue.pull
  123. ///
  124. /// @returns {Object|undefined} The next message if available, otherwise undefined.
  125. pull() {
  126. if (this.suspension == 0 && this.queue.length > 0 && this.queue.peek().time <= this.time) {
  127. return this.queue.shift();
  128. }
  129. }
  130. /// Update the queue to include only the messages selected by a filtering function.
  131. ///
  132. /// @name module:vwf~queue.filter
  133. ///
  134. /// @param {Function} callback
  135. /// `filter` calls `callback( fields )` once for each message in the queue. If
  136. /// `callback` returns a truthy value, the message will be retained. Otherwise it will
  137. /// be removed from the queue.
  138. filter(callback /* fields */ ) {
  139. // this.queue = this.queue.filter( callback );
  140. let filtered = this.queue._list.slice().filter(callback);
  141. //this.queue._list = this.queue._list.filter(callback);
  142. this.queue = new Heap({
  143. compar: this.queueSort
  144. });
  145. filtered.map(el => {
  146. this.queue.insert(el);
  147. });
  148. }
  149. filterQueue() {
  150. this.filter(function (fields) {
  151. if ((fields.origin === "reflector") && fields.sequence > vwf.virtualTime.sequence_) {
  152. return true;
  153. } else {
  154. vwf.logger.debugx("setState", function () {
  155. return ["removing", JSON.stringify(loggableFields(fields)), "from queue"];
  156. });
  157. }
  158. })
  159. }
  160. /// Suspend message execution.
  161. ///
  162. /// @name module:vwf~_app.virtualTime.suspend
  163. ///
  164. /// @returns {Boolean} true if the queue was suspended by this call.
  165. suspend(why) {
  166. if (this.suspension++ == 0) {
  167. vwf.logger.infox("-queue#suspend", "suspending queue at time", this.now, why ? why : "");
  168. return true;
  169. } else {
  170. vwf.logger.debugx("-queue#suspend", "further suspending queue at time", this.now, why ? why : "");
  171. return false;
  172. }
  173. }
  174. /// Resume message execution.
  175. ///
  176. /// vwf#dispatch may be called to continue the simulation. To prevent actions from
  177. /// executing out of order, resume should be the caller's last operation before
  178. /// returning to the host.
  179. ///
  180. /// @name module:vwf~_app.virtualTime.resume
  181. ///
  182. /// @returns {Boolean} true if the queue was resumed by this call.
  183. resume(why) {
  184. if (--this.suspension == 0) {
  185. vwf.logger.infox("-queue#resume", "resuming queue at time", this.now, why ? why : "");
  186. this.dispatch();
  187. return true;
  188. } else {
  189. vwf.logger.debugx("-queue#resume", "partially resuming queue at time", this.now, why ? why : "");
  190. return false;
  191. }
  192. }
  193. // /// Return the ready state of the queue.
  194. // ///
  195. // /// @name module:vwf~queue.ready
  196. // ///
  197. // /// @returns {Boolean}
  198. ready() {
  199. return this.suspension == 0;
  200. }
  201. queueSort(a, b) {
  202. // Sort by time, then future messages ahead of reflector messages, then by sequence. // TODO: we probably want a priority queue here for better performance
  203. //
  204. // The sort by origin ensures that the queue is processed in a well-defined order
  205. // when future messages and reflector messages share the same time, even if the
  206. // reflector message has not arrived at the client yet.
  207. //
  208. // The sort by sequence number ensures that the messages remain in their arrival
  209. // order when the earlier sort keys don't provide the order.
  210. // Execute the simulation through the new time.
  211. // To prevent actions from executing out of order, callers should immediately return
  212. // to the host after invoking insert with chronic set.
  213. if (a.time != b.time) {
  214. return a.time - b.time;
  215. } else if (a.origin != "reflector" && b.origin == "reflector") {
  216. return -1;
  217. } else if (a.origin == "reflector" && b.origin != "reflector") {
  218. return 1;
  219. } else {
  220. return a.sequence - b.sequence;
  221. }
  222. }
  223. // -- queueTransitTransformation -----------------------------------------------------------
  224. /// vwf/utility/transform() transformation function to convert the message queue for proper
  225. /// JSON serialization.
  226. ///
  227. /// queue: [ { ..., parameters: [ [ arguments ] ], ... }, { ... }, ... ]
  228. ///
  229. /// @name module:vwf~queueTransitTransformation
  230. queueTransitTransformation(object, names, depth) {
  231. let self = this
  232. if (depth == 0) {
  233. // Omit any private direct messages for this client, then sort by arrival order
  234. // (rather than by time) so that messages will retain the same arrival order when
  235. // reinserted.
  236. return object.filter(el => el !== 0).filter(function (fields) {
  237. return !(fields.origin === "reflector" && fields.sequence > vwf.virtualTime.sequence_) && fields.action; // TODO: fields.action is here to filter out tick messages // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
  238. }).sort(function (fieldsA, fieldsB) {
  239. return fieldsA.sequence - fieldsB.sequence;
  240. });
  241. } else if (depth == 1) {
  242. // Remove the sequence fields since they're just local annotations used to keep
  243. // messages ordered by insertion order and aren't directly meaniful outside of this
  244. // client.
  245. var filtered = {};
  246. Object.keys(object).filter(function (key) {
  247. return key != "sequence";
  248. }).forEach(function (key) {
  249. filtered[key] = object[key];
  250. });
  251. return filtered;
  252. }
  253. return object;
  254. }
  255. get stateQueue() {
  256. return {
  257. time: this.time,
  258. queue: this.utility.transform(this.queue._list, this.queueTransitTransformation),
  259. }
  260. }
  261. // -- dispatch -----------------------------------------------------------------------------
  262. /// Dispatch incoming messages waiting in the queue. "currentTime" specifies the current
  263. /// simulation time that we should advance to and was taken from the time stamp of the last
  264. /// message received from the reflector.
  265. ///
  266. /// @name module:vwf.dispatch
  267. dispatch() {
  268. var fields;
  269. // Actions may use receive's ready function to suspend the queue for asynchronous
  270. // operations, and to resume it when the operation is complete.
  271. while (fields = /* assignment! */ this.pull()) {
  272. // Advance time to the message time.
  273. if (this.now != fields.time) {
  274. this.sequence_ = undefined; // clear after the previous action
  275. this.client_ = undefined; // clear after the previous action
  276. this.now = fields.time;
  277. this.tock();
  278. }
  279. // Perform the action.
  280. if (fields.action) { // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
  281. this.sequence_ = fields.sequence; // note the message's queue sequence number for the duration of the action
  282. this.client_ = fields.client; // ... and note the originating client
  283. this.receive(fields.node, fields.action, fields.member, fields.parameters, fields.respond, fields.origin);
  284. } else {
  285. this.tick();
  286. }
  287. }
  288. // Advance time to the most recent time received from the server. Tick if the time
  289. // changed.
  290. if (this.ready() && this.now != this.time) {
  291. this.sequence_ = undefined; // clear after the previous action
  292. this.client_ = undefined; // clear after the previous action
  293. this.now = this.time;
  294. this.tock();
  295. }
  296. }
  297. // -- plan ---------------------------------------------------------------------------------
  298. /// @name module:vwf.plan
  299. plan(nodeID, actionName, memberName, parameters, when, callback_async /* ( result ) */ ) {
  300. vwf.logger.debuggx("plan", nodeID, actionName, memberName,
  301. parameters && parameters.length, when, callback_async && "callback");
  302. var time = when > 0 ? // absolute (+) or relative (-)
  303. Math.max(this.now, when) :
  304. this.now + (-when);
  305. var fields = {
  306. time: time,
  307. node: nodeID,
  308. action: actionName,
  309. member: memberName,
  310. parameters: parameters,
  311. client: this.client_, // propagate originating client
  312. origin: "future",
  313. // callback: callback_async, // TODO
  314. };
  315. this.insert(fields);
  316. vwf.logger.debugu();
  317. }
  318. // -- send ---------------------------------------------------------------------------------
  319. /// Send a message to the reflector. The message will be reflected back to all participants
  320. /// in the instance.
  321. ///
  322. /// @name module:vwf.send
  323. send(nodeID, actionName, memberName, parameters, when, callback_async /* ( result ) */ ) {
  324. vwf.logger.debuggx("send", nodeID, actionName, memberName,
  325. parameters && parameters.length, when, callback_async && "callback"); // TODO: loggableParameters()
  326. var time = when > 0 ? // absolute (+) or relative (-)
  327. Math.max(this.now, when) :
  328. this.now + (-when);
  329. // Attach the current simulation time and pack the message as an array of the arguments.
  330. var fields = {
  331. time: time,
  332. node: nodeID,
  333. action: actionName,
  334. member: memberName,
  335. parameters: this.utility.transform(parameters, this.utility.transforms.transit),
  336. // callback: callback_async, // TODO: provisionally add fields to queue (or a holding queue) then execute callback when received back from reflector
  337. };
  338. if (vwf.isLuminary) {
  339. vwf.luminary.stampExternalMessage(fields);
  340. } else if (vwf.reflectorClient.socket) {
  341. // Send the message.
  342. var message = JSON.stringify(fields);
  343. vwf.reflectorClient.socket.send(message);
  344. }
  345. // else {
  346. // // In single-user mode, loop the message back to the incoming queue.
  347. // fields.client = vwf.moniker_; // stamp with the originating client like the reflector does
  348. // fields.origin = "reflector";
  349. // _app.virtualTime.insert( fields );
  350. // }
  351. vwf.logger.debugu();
  352. }
  353. // get queue () { // vwf.private.queue
  354. // }
  355. // -- respond ------------------------------------------------------------------------------
  356. /// Return a result for a function invoked by the server.
  357. ///
  358. /// @name module:vwf.respond
  359. respond(nodeID, actionName, memberName, parameters, result) {
  360. vwf.logger.debuggx("respond", nodeID, actionName, memberName,
  361. parameters && parameters.length, "..."); // TODO: loggableParameters(), loggableResult()
  362. // Attach the current simulation time and pack the message as an array of the arguments.
  363. var fields = {
  364. // sequence: undefined, // TODO: use to identify on return from reflector?
  365. time: this.now,
  366. node: nodeID,
  367. action: actionName,
  368. member: memberName,
  369. parameters: this.utility.transform(parameters, this.utility.transforms.transit),
  370. result: this.utility.transform(result, this.utility.transforms.transit)
  371. };
  372. if (vwf.isLuminary) {
  373. vwf.luminary.stampExternalMessage(fields);
  374. } else if (vwf.reflectorClient.socket) {
  375. // Send the message.
  376. var message = JSON.stringify(fields);
  377. vwf.reflectorClient.socket.send(message);
  378. } else {
  379. // Nothing to do in single-user mode.
  380. }
  381. vwf.logger.debugu();
  382. }
  383. // -- receive ------------------------------------------------------------------------------
  384. /// Handle receipt of a message. Unpack the arguments and call the appropriate handler.
  385. ///
  386. /// @name module:vwf.receive
  387. receive(nodeID, actionName, memberName, parameters, respond, origin) {
  388. // origin == "reflector" ?
  389. // this.logger.infogx( "receive", nodeID, actionName, memberName,
  390. // parameters && parameters.length, respond, origin ) :
  391. // this.logger.debuggx( "receive", nodeID, actionName, memberName,
  392. // parameters && parameters.length, respond, origin );
  393. // TODO: delegate parsing and validation to each action.
  394. // Look up the action handler and invoke it with the remaining parameters.
  395. // Note that the message should be validated before looking up and invoking an arbitrary
  396. // handler.
  397. var args = [],
  398. result;
  399. if (nodeID || nodeID === 0) args.push(nodeID);
  400. if (memberName) args.push(memberName);
  401. if (parameters) args = args.concat(parameters); // flatten
  402. if (actionName == 'createChild') {
  403. console.log("create child!");
  404. // args.push(function(childID)
  405. // {
  406. // //when creating over the reflector, call ready on heirarchy after create.
  407. // //nodes from setState are readied in createNode
  408. // // vwf.decendants(childID).forEach(function(i){
  409. // // vwf.callMethod(i,'ready',[]);
  410. // // });
  411. // // vwf.callMethod(childID,'ready',[]);
  412. // console.log("create child!");
  413. // });
  414. }
  415. // Invoke the action.
  416. // if (environment(actionName, parameters)) {
  417. // require("vwf/configuration").environment = environment(actionName, parameters);
  418. // } else
  419. if (origin !== "reflector" || !nodeID || vwf.private.nodes.existing[nodeID]) {
  420. result = vwf[actionName] && vwf[actionName].apply(vwf, args);
  421. } else {
  422. vwf.logger.debugx("receive", "ignoring reflector action on non-existent node", nodeID);
  423. result = undefined;
  424. }
  425. // Return the result.
  426. respond && this.respond(nodeID, actionName, memberName, parameters, result);
  427. // origin == "reflector" ?
  428. // this.logger.infou() : this.logger.debugu();
  429. /// The reflector sends a `setState` action as part of the application launch to pass
  430. /// the server's execution environment to the client. A `setState` action isn't really
  431. /// appropriate though since `setState` should be the last part of the launch, whereas
  432. /// the environment ought to be set much earlier--ideally before the kernel loads.
  433. ///
  434. /// Executing the `setState` as received would overwrite any configuration settings
  435. /// already applied by the application. So instead, we detect this particular message
  436. /// and only use it to update the environment in the configuration object.
  437. ///
  438. /// `environment` determines if a message is the reflector's special pre-launch
  439. /// `setState` action, and if so, and if the application hasn't been created yet,
  440. /// returns the execution environment property.
  441. // function environment(actionName, param) {
  442. // if (actionName === "setState" && !vwf.application()) {
  443. // var parameters = param;
  444. // if (parameters[0].init) {
  445. // parameters = [JSON.parse(localStorage.getItem('lcs_app')).saveObject]
  446. // }
  447. // var applicationState = parameters && parameters[0];
  448. // if (applicationState && Object.keys(applicationState).length === 1 &&
  449. // applicationState.configuration && Object.keys(applicationState.configuration).length === 1) {
  450. // return applicationState.configuration.environment;
  451. // }
  452. // }
  453. // return undefined;
  454. // }
  455. }
  456. // -- tick ---------------------------------------------------------------------------------
  457. /// Tick each tickable model, view, and node. Ticks are sent on each reflector idle message.
  458. ///
  459. /// @name module:vwf.tick
  460. // TODO: remove, in favor of drivers and nodes exclusively using future scheduling;
  461. // TODO: otherwise, all clients must receive exactly the same ticks at the same times.
  462. tick() {
  463. // Call ticking() on each model.
  464. vwf.models.forEach(function (model) {
  465. model.ticking && model.ticking(this.now); // TODO: maintain a list of tickable models and only call those
  466. }, vwf);
  467. // Call ticked() on each view.
  468. vwf.views.forEach(function (view) {
  469. view.ticked && view.ticked(this.now); // TODO: maintain a list of tickable views and only call those
  470. }, vwf);
  471. // Call tick() on each tickable node.
  472. vwf.tickable.nodeIDs.forEach(function (nodeID) {
  473. vwf.callMethod(nodeID, "tick", [this.now]);
  474. }, vwf);
  475. };
  476. // -- tock ---------------------------------------------------------------------------------
  477. /// Notify views of a kernel time change. Unlike `tick`, `tock` messages are sent each time
  478. /// that time moves forward. Only view drivers are notified since the model state should be
  479. /// independent of any particular sequence of idle messages.
  480. ///
  481. /// @name module:vwf.tock
  482. tock() {
  483. // Call tocked() on each view.
  484. vwf.views.forEach(function (view) {
  485. view.tocked && view.tocked(this.now);
  486. }, vwf);
  487. }
  488. get getNow() {
  489. return this.now
  490. }
  491. }
  492. export {
  493. VirtualTime
  494. }