|
- import { Helpers } from '/core/helpers.js';
- import { Utility } from '/core/vwf/utility/utility.js';
- class VirtualTime {
- constructor() {
- console.log("Virtual Time constructor");
- this.helpers = new Helpers;
- this.utility = new Utility;
-
-
-
-
-
-
- this.now = 0;
-
-
-
-
-
-
-
-
-
-
-
- this.sequence_ = undefined
-
-
-
-
-
-
- this.client_ = undefined
-
-
-
-
-
- this.time = 0
-
-
-
- this.suspension = 0
-
-
-
-
- this.sequence = 0
-
-
-
- this.queue = new Heap({
- compar: this.queueSort
- })
-
- this.initReflectorStream();
- }
- initReflectorStream() {
- const self = this;
- this.streamDelay = 0;
- this.streamDefaultScheduler = M.scheduler.newDefaultScheduler();
-
-
- const [induce, events] = M.createAdapter();
- this.streamAdapter = {
- induce: induce,
- events: events,
- };
- const tapFunction = function (res) {
- let mostTime =
- M.scheduler.currentTime(self.streamDefaultScheduler) / 1000;
- if (_app.config.streamMsg) {
- console.log('STREAM: ', res, ' TIME: ', mostTime);
- self.insert(res, !res.action);
- }
- };
- this.reflectorStream = M.multicast(events);
- const resultStream = M.concatMap(el => {
- return M.delay(this.streamDelay, M.now(el))
- }, this.reflectorStream);
-
- this.eventsStream = M.tap((res) => {
- tapFunction(res);
- }, resultStream);
- M.runEffects(this.eventsStream, this.streamDefaultScheduler);
- }
-
-
-
-
-
-
-
-
-
-
-
-
- insert(fields, chronic) {
- var messages = fields instanceof Array ? fields : [fields];
- messages.forEach(function (fields) {
-
- fields.sequence = ++this.sequence;
- this.queue.insert(fields);
-
- if (chronic) {
- this.time = Math.max(this.time, fields.time);
- }
- }, this);
-
- if (chronic) {
- this.dispatch();
- }
- }
-
-
-
-
-
- pull() {
- if (this.suspension == 0 && this.queue.length > 0 && this.queue.peek().time <= this.time) {
- return this.queue.shift();
- }
- }
-
-
-
-
-
-
-
-
- filter(callback ) {
-
- let filtered = this.queue._list.slice().filter(callback);
-
- this.queue = new Heap({
- compar: this.queueSort
- });
- filtered.map(el => {
- this.queue.insert(el);
- });
- }
- filterQueue() {
- this.filter(function (fields) {
- if ((fields.origin === "reflector") && fields.sequence > vwf.virtualTime.sequence_) {
- return true;
- } else {
- vwf.logger.debugx("setState", function () {
- return ["removing", JSON.stringify(loggableFields(fields)), "from queue"];
- });
- }
- })
- }
-
-
-
-
-
- suspend(why) {
- if (this.suspension++ == 0) {
- vwf.logger.infox("-queue#suspend", "suspending queue at time", this.now, why ? why : "");
- return true;
- } else {
- vwf.logger.debugx("-queue#suspend", "further suspending queue at time", this.now, why ? why : "");
- return false;
- }
- }
-
-
-
-
-
-
-
-
-
- resume(why) {
- if (--this.suspension == 0) {
- vwf.logger.infox("-queue#resume", "resuming queue at time", this.now, why ? why : "");
- this.dispatch();
- return true;
- } else {
- vwf.logger.debugx("-queue#resume", "partially resuming queue at time", this.now, why ? why : "");
- return false;
- }
- }
-
-
-
-
-
- ready() {
- return this.suspension == 0;
- }
- queueSort(a, b) {
-
-
-
-
-
-
-
-
-
-
-
- if (a.time != b.time) {
- return a.time - b.time;
- } else if (a.origin != "reflector" && b.origin == "reflector") {
- return -1;
- } else if (a.origin == "reflector" && b.origin != "reflector") {
- return 1;
- } else {
- return a.sequence - b.sequence;
- }
- }
-
-
-
-
-
-
-
- queueTransitTransformation(object, names, depth) {
- let self = this
- if (depth == 0) {
-
-
-
- return object.filter(el => el !== 0).filter(function (fields) {
- return !(fields.origin === "reflector" && fields.sequence > vwf.virtualTime.sequence_) && fields.action;
- }).sort(function (fieldsA, fieldsB) {
- return fieldsA.sequence - fieldsB.sequence;
- });
- } else if (depth == 1) {
-
-
-
- var filtered = {};
- Object.keys(object).filter(function (key) {
- return key != "sequence";
- }).forEach(function (key) {
- filtered[key] = object[key];
- });
- return filtered;
- }
- return object;
- }
- get stateQueue() {
- return {
- time: this.time,
- queue: this.utility.transform(this.queue._list, this.queueTransitTransformation),
- }
- }
-
-
-
-
-
-
- dispatch() {
- var fields;
-
-
- while (fields = this.pull()) {
-
- if (this.now != fields.time) {
- this.sequence_ = undefined;
- this.client_ = undefined;
- this.now = fields.time;
- this.tock();
- }
-
- if (fields.action) {
- this.sequence_ = fields.sequence;
- this.client_ = fields.client;
- this.receive(fields.node, fields.action, fields.member, fields.parameters, fields.respond, fields.origin);
- } else {
- this.tick();
- }
- }
-
-
- if (this.ready() && this.now != this.time) {
- this.sequence_ = undefined;
- this.client_ = undefined;
- this.now = this.time;
- this.tock();
- }
- }
-
-
- plan(nodeID, actionName, memberName, parameters, when, callback_async ) {
- vwf.logger.debuggx("plan", nodeID, actionName, memberName,
- parameters && parameters.length, when, callback_async && "callback");
- var time = when > 0 ?
- Math.max(this.now, when) :
- this.now + (-when);
- var fields = {
- time: time,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: parameters,
- client: this.client_,
- origin: "future",
-
- };
- this.insert(fields);
- vwf.logger.debugu();
- }
-
-
-
-
-
- send(nodeID, actionName, memberName, parameters, when, callback_async ) {
- vwf.logger.debuggx("send", nodeID, actionName, memberName,
- parameters && parameters.length, when, callback_async && "callback");
- var time = when > 0 ?
- Math.max(this.now, when) :
- this.now + (-when);
-
- var fields = {
- time: time,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: this.utility.transform(parameters, this.utility.transforms.transit),
-
- };
- if (vwf.isLuminary) {
- vwf.luminary.stampExternalMessage(fields);
- } else if (vwf.reflectorClient.socket) {
-
- var message = JSON.stringify(fields);
- vwf.reflectorClient.socket.send(message);
- }
-
-
-
-
-
-
- vwf.logger.debugu();
- }
-
-
-
-
-
-
- respond(nodeID, actionName, memberName, parameters, result) {
- vwf.logger.debuggx("respond", nodeID, actionName, memberName,
- parameters && parameters.length, "...");
-
- var fields = {
-
- time: this.now,
- node: nodeID,
- action: actionName,
- member: memberName,
- parameters: this.utility.transform(parameters, this.utility.transforms.transit),
- result: this.utility.transform(result, this.utility.transforms.transit)
- };
- if (vwf.isLuminary) {
- vwf.luminary.stampExternalMessage(fields);
- } else if (vwf.reflectorClient.socket) {
-
- var message = JSON.stringify(fields);
- vwf.reflectorClient.socket.send(message);
- } else {
-
- }
- vwf.logger.debugu();
- }
-
-
-
-
- receive(nodeID, actionName, memberName, parameters, respond, origin) {
-
-
-
-
-
-
-
-
-
- var args = [],
- result;
- if (nodeID || nodeID === 0) args.push(nodeID);
- if (memberName) args.push(memberName);
- if (parameters) args = args.concat(parameters);
- if (actionName == 'createChild') {
- console.log("create child!");
-
-
-
-
-
-
-
-
-
-
- }
-
-
-
-
- if (origin !== "reflector" || !nodeID || vwf.private.nodes.existing[nodeID]) {
- result = vwf[actionName] && vwf[actionName].apply(vwf, args);
- } else {
- vwf.logger.debugx("receive", "ignoring reflector action on non-existent node", nodeID);
- result = undefined;
- }
-
- respond && this.respond(nodeID, actionName, memberName, parameters, result);
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
-
-
-
-
-
-
- tick() {
-
- vwf.models.forEach(function (model) {
- model.ticking && model.ticking(this.now);
- }, vwf);
-
- vwf.views.forEach(function (view) {
- view.ticked && view.ticked(this.now);
- }, vwf);
-
- vwf.tickable.nodeIDs.forEach(function (nodeID) {
- vwf.callMethod(nodeID, "tick", [this.now]);
- }, vwf);
- };
-
-
-
-
-
-
- tock() {
-
- vwf.views.forEach(function (view) {
- view.tocked && view.tocked(this.now);
- }, vwf);
- }
- get getNow() {
- return this.now
- }
- }
- export {
- VirtualTime
- }
|