123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696 |
- /*
- The MIT License (MIT)
- Copyright (c) 2014-2020 Nikolai Suslov and the Krestianstvo.org project contributors. (https://github.com/NikolaySuslov/livecodingspace/blob/master/LICENSE.md)
- Virtual World Framework Apache 2.0 license (https://github.com/NikolaySuslov/livecodingspace/blob/master/licenses/LICENSE_VWF.md)
- */
- import { Helpers } from '/core/helpers.js';
- import { Utility } from '/core/vwf/utility/utility.js';
- class VirtualTime {
- constructor() {
- console.log("Virtual Time constructor");
- this.helpers = new Helpers;
- this.utility = new Utility;
- /// The simulation clock, which contains the current time in seconds. Time is controlled by
- /// the reflector and updates here as we receive control messages.
- ///
- /// @name module:vwf.now
- ///
- /// @private
- this.now = 0;
- /// The queue's sequence number for the currently executing action.
- ///
- /// The queue enumerates actions in order of arrival, which is distinct from execution order
- /// since actions may be scheduled to run in the future. `sequence_` can be used to
- /// distinguish between actions that were previously placed on the queue for execution at a
- /// later time, and those that arrived after the current action, regardless of their
- /// scheduled time.
- ///
- /// @name module:vwf.sequence_
- ///
- /// @private
- this.sequence_ = undefined
- /// The moniker of the client responsible for the currently executing action. `client_` will
- /// be falsy for actions originating in the server, such as time ticks.
- ///
- /// @name module:vwf.client_
- ///
- /// @private
- this.client_ = undefined
- ///// From queue props
- /// Current time as provided by the reflector. Messages to be executed at this time or
- /// earlier are available from #pull.
- ///
- /// @name module:vwf~queue.time
- this.time = 0
- /// Suspension count. Queue processing is suspended when suspension is greater than 0.
- ///
- /// @name module:vwf~queue.suspension
- this.suspension = 0
- /// Sequence counter for tagging messages by order of arrival. Messages are sorted by
- /// time, origin, then by arrival order.
- ///
- /// @name module:vwf~queue.sequence
- this.sequence = 0
- /// Array containing the messages in the queue.
- ///
- /// @name module:vwf~queue.queue
- this.queue = new Heap({
- compar: this.queueSort
- }) //[]
- //Add Stream support
- this.initReflectorStream();
- }
- initReflectorStream() {
- const self = this;
- this.streamDelay = 0;
- this.streamDefaultScheduler = M.scheduler.newDefaultScheduler();
- //this.streamDefaultScheduler = M.scheduler.schedulerRelativeTo(self.startTime, this.streamDefaultS);
- //this.streamScheduler = M.scheduler.newDefaultScheduler();
- const [induce, events] = M.createAdapter();
- this.streamAdapter = {
- induce: induce,
- events: events,
- };
- const tapFunction = function (res) {
- let mostTime =
- M.scheduler.currentTime(self.streamDefaultScheduler) / 1000;
- if (_app.config.streamMsg) {
- console.log('STREAM: ', res, ' TIME: ', mostTime);
- self.insert(res, !res.action);
- }
- };
- this.reflectorStream = M.multicast(events); //M.concatMap((x) => M.fromPromise(x()), events);
- const resultStream = M.concatMap(el => {
- return M.delay(this.streamDelay, M.now(el))
- }, this.reflectorStream);
- // M.delay(5000, this.reflectorStream)
- this.eventsStream = M.tap((res) => {
- tapFunction(res);
- }, resultStream);
- M.runEffects(this.eventsStream, this.streamDefaultScheduler);
- }
- /// Insert a message or messages into the queue. Optionally execute the simulation
- /// through the time marked on the message.
- ///
- /// When chronic (chron-ic) is set, vwf#dispatch is called to execute the simulation up
- /// through the indicated time. To prevent actions from executing out of order, insert
- /// should be the caller's last operation before returning to the host when invoked with
- /// chronic.
- ///
- /// @name module:vwf~queue.insert
- ///
- /// @param {Object|Object[]} fields
- /// @param {Boolean} [chronic]
- insert(fields, chronic) {
- var messages = fields instanceof Array ? fields : [fields];
- messages.forEach(function (fields) {
- // if ( fields.action ) { // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
- fields.sequence = ++this.sequence; // track the insertion order for use as a sort key
- this.queue.insert(fields);
- // }
- if (chronic) {
- this.time = Math.max(this.time, fields.time); // save the latest allowed time for suspend/resume
- }
- }, this);
- //Sort here (now in Heap)
- if (chronic) {
- this.dispatch();
- }
- }
- /// Pull the next message from the queue.
- ///
- /// @name module:vwf~queue.pull
- ///
- /// @returns {Object|undefined} The next message if available, otherwise undefined.
- pull() {
- if (this.suspension == 0 && this.queue.length > 0 && this.queue.peek().time <= this.time) {
- return this.queue.shift();
- }
- }
- /// Update the queue to include only the messages selected by a filtering function.
- ///
- /// @name module:vwf~queue.filter
- ///
- /// @param {Function} callback
- /// `filter` calls `callback( fields )` once for each message in the queue. If
- /// `callback` returns a truthy value, the message will be retained. Otherwise it will
- /// be removed from the queue.
- filter(callback /* fields */ ) {
- // this.queue = this.queue.filter( callback );
- let filtered = this.queue._list.slice().filter(callback);
- //this.queue._list = this.queue._list.filter(callback);
- this.queue = new Heap({
- compar: this.queueSort
- });
- filtered.map(el => {
- this.queue.insert(el);
- });
- }
- filterQueue() {
- this.filter(function (fields) {
- if ((fields.origin === "reflector") && fields.sequence > vwf.virtualTime.sequence_) {
- return true;
- } else {
- vwf.logger.debugx("setState", function () {
- return ["removing", JSON.stringify(loggableFields(fields)), "from queue"];
- });
- }
- })
- }
- /// Suspend message execution.
- ///
- /// @name module:vwf~_app.virtualTime.suspend
- ///
- /// @returns {Boolean} true if the queue was suspended by this call.
- suspend(why) {
- if (this.suspension++ == 0) {
- vwf.logger.infox("-queue#suspend", "suspending queue at time", this.now, why ? why : "");
- return true;
- } else {
- vwf.logger.debugx("-queue#suspend", "further suspending queue at time", this.now, why ? why : "");
- return false;
- }
- }
- /// Resume message execution.
- ///
- /// vwf#dispatch may be called to continue the simulation. To prevent actions from
- /// executing out of order, resume should be the caller's last operation before
- /// returning to the host.
- ///
- /// @name module:vwf~_app.virtualTime.resume
- ///
- /// @returns {Boolean} true if the queue was resumed by this call.
- resume(why) {
- if (--this.suspension == 0) {
- vwf.logger.infox("-queue#resume", "resuming queue at time", this.now, why ? why : "");
- this.dispatch();
- return true;
- } else {
- vwf.logger.debugx("-queue#resume", "partially resuming queue at time", this.now, why ? why : "");
- return false;
- }
- }
- // /// Return the ready state of the queue.
- // ///
- // /// @name module:vwf~queue.ready
- // ///
- // /// @returns {Boolean}
- ready() {
- return this.suspension == 0;
- }
- queueSort(a, b) {
- // Sort by time, then future messages ahead of reflector messages, then by sequence. // TODO: we probably want a priority queue here for better performance
- //
- // The sort by origin ensures that the queue is processed in a well-defined order
- // when future messages and reflector messages share the same time, even if the
- // reflector message has not arrived at the client yet.
- //
- // The sort by sequence number ensures that the messages remain in their arrival
- // order when the earlier sort keys don't provide the order.
- // Execute the simulation through the new time.
- // To prevent actions from executing out of order, callers should immediately return
- // to the host after invoking insert with chronic set.
- if (a.time != b.time) {
- return a.time - b.time;
- } else if (a.origin != "reflector" && b.origin == "reflector") {
- return -1;
- } else if (a.origin == "reflector" && b.origin != "reflector") {
- return 1;
- } else {
- return a.sequence - b.sequence;
- }
- }
- // -- queueTransitTransformation -----------------------------------------------------------
- /// vwf/utility/transform() transformation function to convert the message queue for proper
- /// JSON serialization.
- ///
- /// queue: [ { ..., parameters: [ [ arguments ] ], ... }, { ... }, ... ]
- ///
- /// @name module:vwf~queueTransitTransformation
- queueTransitTransformation(object, names, depth) {
- let self = this
- if (depth == 0) {
- // Omit any private direct messages for this client, then sort by arrival order
- // (rather than by time) so that messages will retain the same arrival order when
- // reinserted.
- return object.filter(el => el !== 0).filter(function (fields) {
- return !(fields.origin === "reflector" && fields.sequence > vwf.virtualTime.sequence_) && fields.action; // TODO: fields.action is here to filter out tick messages // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
- }).sort(function (fieldsA, fieldsB) {
- return fieldsA.sequence - fieldsB.sequence;
- });
- } else if (depth == 1) {
- // Remove the sequence fields since they're just local annotations used to keep
- // messages ordered by insertion order and aren't directly meaniful outside of this
- // client.
- var filtered = {};
- Object.keys(object).filter(function (key) {
- return key != "sequence";
- }).forEach(function (key) {
- filtered[key] = object[key];
- });
- return filtered;
- }
- return object;
- }
- get stateQueue() {
- return {
- time: this.time,
- queue: this.utility.transform(this.queue._list, this.queueTransitTransformation),
- }
- }
- // -- dispatch -----------------------------------------------------------------------------
- /// Dispatch incoming messages waiting in the queue. "currentTime" specifies the current
- /// simulation time that we should advance to and was taken from the time stamp of the last
- /// message received from the reflector.
- ///
- /// @name module:vwf.dispatch
- dispatch() {
- var fields;
- // Actions may use receive's ready function to suspend the queue for asynchronous
- // operations, and to resume it when the operation is complete.
- while (fields = /* assignment! */ this.pull()) {
- // Advance time to the message time.
- if (this.now != fields.time) {
- this.sequence_ = undefined; // clear after the previous action
- this.client_ = undefined; // clear after the previous action
- this.now = fields.time;
- this.tock();
- }
- // Perform the action.
- if (fields.action) { // TODO: don't put ticks on the queue but just use them to fast-forward to the current time (requires removing support for passing ticks to the drivers and nodes)
- this.sequence_ = fields.sequence; // note the message's queue sequence number for the duration of the action
- this.client_ = fields.client; // ... and note the originating client
- this.receive(fields.node, fields.action, fields.member, fields.parameters, fields.respond, fields.origin);
- } else {
- this.tick();
- }
- }
- // Advance time to the most recent time received from the server. Tick if the time
- // changed.
- if (this.ready() && this.now != this.time) {
- this.sequence_ = undefined; // clear after the previous action
- this.client_ = undefined; // clear after the previous action
- this.now = this.time;
- this.tock();
- }
- }
- // -- plan ---------------------------------------------------------------------------------
- /// @name module:vwf.plan
- plan(nodeID, actionName, memberName, parameters, when, callback_async /* ( result ) */ ) {
- vwf.logger.debuggx("plan", nodeID, actionName, memberName,
- parameters && parameters.length, when, callback_async && "callback");
- var time = when > 0 ? // absolute (+) or relative (-)
- Math.max(this.now, when) :
- this.now + (-when);
- var fields = {
- time: time,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: parameters,
- client: this.client_, // propagate originating client
- origin: "future",
- // callback: callback_async, // TODO
- };
- this.insert(fields);
- vwf.logger.debugu();
- }
- // -- send ---------------------------------------------------------------------------------
- /// Send a message to the reflector. The message will be reflected back to all participants
- /// in the instance.
- ///
- /// @name module:vwf.send
- send(nodeID, actionName, memberName, parameters, when, callback_async /* ( result ) */ ) {
- vwf.logger.debuggx("send", nodeID, actionName, memberName,
- parameters && parameters.length, when, callback_async && "callback"); // TODO: loggableParameters()
- var time = when > 0 ? // absolute (+) or relative (-)
- Math.max(this.now, when) :
- this.now + (-when);
- // Attach the current simulation time and pack the message as an array of the arguments.
- var fields = {
- time: time,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: this.utility.transform(parameters, this.utility.transforms.transit),
- // callback: callback_async, // TODO: provisionally add fields to queue (or a holding queue) then execute callback when received back from reflector
- };
- if (vwf.isLuminary) {
- vwf.luminary.stampExternalMessage(fields);
- } else if (vwf.reflectorClient.socket) {
- // Send the message.
- var message = JSON.stringify(fields);
- vwf.reflectorClient.socket.send(message);
- }
- // else {
- // // In single-user mode, loop the message back to the incoming queue.
- // fields.client = vwf.moniker_; // stamp with the originating client like the reflector does
- // fields.origin = "reflector";
- // _app.virtualTime.insert( fields );
- // }
- vwf.logger.debugu();
- }
- // get queue () { // vwf.private.queue
- // }
- // -- respond ------------------------------------------------------------------------------
- /// Return a result for a function invoked by the server.
- ///
- /// @name module:vwf.respond
- respond(nodeID, actionName, memberName, parameters, result) {
- vwf.logger.debuggx("respond", nodeID, actionName, memberName,
- parameters && parameters.length, "..."); // TODO: loggableParameters(), loggableResult()
- // Attach the current simulation time and pack the message as an array of the arguments.
- var fields = {
- // sequence: undefined, // TODO: use to identify on return from reflector?
- time: this.now,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: this.utility.transform(parameters, this.utility.transforms.transit),
- result: this.utility.transform(result, this.utility.transforms.transit)
- };
- if (vwf.isLuminary) {
- vwf.luminary.stampExternalMessage(fields);
- } else if (vwf.reflectorClient.socket) {
- // Send the message.
- var message = JSON.stringify(fields);
- vwf.reflectorClient.socket.send(message);
- } else {
- // Nothing to do in single-user mode.
- }
- vwf.logger.debugu();
- }
- // -- receive ------------------------------------------------------------------------------
- /// Handle receipt of a message. Unpack the arguments and call the appropriate handler.
- ///
- /// @name module:vwf.receive
- receive(nodeID, actionName, memberName, parameters, respond, origin) {
- // origin == "reflector" ?
- // this.logger.infogx( "receive", nodeID, actionName, memberName,
- // parameters && parameters.length, respond, origin ) :
- // this.logger.debuggx( "receive", nodeID, actionName, memberName,
- // parameters && parameters.length, respond, origin );
- // TODO: delegate parsing and validation to each action.
- // Look up the action handler and invoke it with the remaining parameters.
- // Note that the message should be validated before looking up and invoking an arbitrary
- // handler.
- var args = [],
- result;
- if (nodeID || nodeID === 0) args.push(nodeID);
- if (memberName) args.push(memberName);
- if (parameters) args = args.concat(parameters); // flatten
- if (actionName == 'createChild') {
- console.log("create child!");
- // args.push(function(childID)
- // {
- // //when creating over the reflector, call ready on heirarchy after create.
- // //nodes from setState are readied in createNode
- // // vwf.decendants(childID).forEach(function(i){
- // // vwf.callMethod(i,'ready',[]);
- // // });
- // // vwf.callMethod(childID,'ready',[]);
- // console.log("create child!");
- // });
- }
- // Invoke the action.
- // if (environment(actionName, parameters)) {
- // require("vwf/configuration").environment = environment(actionName, parameters);
- // } else
- if (origin !== "reflector" || !nodeID || vwf.private.nodes.existing[nodeID]) {
- result = vwf[actionName] && vwf[actionName].apply(vwf, args);
- } else {
- vwf.logger.debugx("receive", "ignoring reflector action on non-existent node", nodeID);
- result = undefined;
- }
- // Return the result.
- respond && this.respond(nodeID, actionName, memberName, parameters, result);
- // origin == "reflector" ?
- // this.logger.infou() : this.logger.debugu();
- /// The reflector sends a `setState` action as part of the application launch to pass
- /// the server's execution environment to the client. A `setState` action isn't really
- /// appropriate though since `setState` should be the last part of the launch, whereas
- /// the environment ought to be set much earlier--ideally before the kernel loads.
- ///
- /// Executing the `setState` as received would overwrite any configuration settings
- /// already applied by the application. So instead, we detect this particular message
- /// and only use it to update the environment in the configuration object.
- ///
- /// `environment` determines if a message is the reflector's special pre-launch
- /// `setState` action, and if so, and if the application hasn't been created yet,
- /// returns the execution environment property.
- // function environment(actionName, param) {
- // if (actionName === "setState" && !vwf.application()) {
- // var parameters = param;
- // if (parameters[0].init) {
- // parameters = [JSON.parse(localStorage.getItem('lcs_app')).saveObject]
- // }
- // var applicationState = parameters && parameters[0];
- // if (applicationState && Object.keys(applicationState).length === 1 &&
- // applicationState.configuration && Object.keys(applicationState.configuration).length === 1) {
- // return applicationState.configuration.environment;
- // }
- // }
- // return undefined;
- // }
- }
- // -- tick ---------------------------------------------------------------------------------
- /// Tick each tickable model, view, and node. Ticks are sent on each reflector idle message.
- ///
- /// @name module:vwf.tick
- // TODO: remove, in favor of drivers and nodes exclusively using future scheduling;
- // TODO: otherwise, all clients must receive exactly the same ticks at the same times.
- tick() {
- // Call ticking() on each model.
- vwf.models.forEach(function (model) {
- model.ticking && model.ticking(this.now); // TODO: maintain a list of tickable models and only call those
- }, vwf);
- // Call ticked() on each view.
- vwf.views.forEach(function (view) {
- view.ticked && view.ticked(this.now); // TODO: maintain a list of tickable views and only call those
- }, vwf);
- // Call tick() on each tickable node.
- vwf.tickable.nodeIDs.forEach(function (nodeID) {
- vwf.callMethod(nodeID, "tick", [this.now]);
- }, vwf);
- };
- // -- tock ---------------------------------------------------------------------------------
- /// Notify views of a kernel time change. Unlike `tick`, `tock` messages are sent each time
- /// that time moves forward. Only view drivers are notified since the model state should be
- /// independent of any particular sequence of idle messages.
- ///
- /// @name module:vwf.tock
- tock() {
- // Call tocked() on each view.
- vwf.views.forEach(function (view) {
- view.tocked && view.tocked(this.now);
- }, vwf);
- }
- get getNow() {
- return this.now
- }
- }
- export {
- VirtualTime
- }
|