|
@@ -197,17 +197,39 @@
|
|
|
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
|
|
|
}
|
|
|
|
|
|
- function applyEach(eachfn) {
|
|
|
- return function(fns, ...callArgs) {
|
|
|
- var go = initialParams(function(args, callback) {
|
|
|
+ // conditionally promisify a function.
|
|
|
+ // only return a promise if a callback is omitted
|
|
|
+ function awaitify (asyncFn, arity = asyncFn.length) {
|
|
|
+ if (!arity) throw new Error('arity is undefined')
|
|
|
+ function awaitable (...args) {
|
|
|
+ if (typeof args[arity - 1] === 'function') {
|
|
|
+ return asyncFn.apply(this, args)
|
|
|
+ }
|
|
|
+
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ args[arity - 1] = (err, ...cbArgs) => {
|
|
|
+ if (err) return reject(err)
|
|
|
+ resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]);
|
|
|
+ };
|
|
|
+ asyncFn.apply(this, args);
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ Object.defineProperty(awaitable, 'name', {
|
|
|
+ value: `awaitable(${asyncFn.name})`
|
|
|
+ });
|
|
|
+
|
|
|
+ return awaitable
|
|
|
+ }
|
|
|
+
|
|
|
+ function applyEach (eachfn) {
|
|
|
+ return function applyEach(fns, ...callArgs) {
|
|
|
+ const go = awaitify(function (callback) {
|
|
|
var that = this;
|
|
|
return eachfn(fns, (fn, cb) => {
|
|
|
- wrapAsync(fn).apply(that, args.concat(cb));
|
|
|
+ wrapAsync(fn).apply(that, callArgs.concat(cb));
|
|
|
}, callback);
|
|
|
});
|
|
|
- if (callArgs.length) {
|
|
|
- return go.apply(this, callArgs);
|
|
|
- }
|
|
|
return go;
|
|
|
};
|
|
|
}
|
|
@@ -426,31 +448,6 @@
|
|
|
};
|
|
|
};
|
|
|
|
|
|
- // conditionally promisify a function.
|
|
|
- // only return a promise if a callback is omitted
|
|
|
- function awaitify (asyncFn, arity = asyncFn.length) {
|
|
|
- if (!arity) throw new Error('arity is undefined')
|
|
|
- function awaitable (...args) {
|
|
|
- if (typeof args[arity - 1] === 'function') {
|
|
|
- return asyncFn.apply(this, args)
|
|
|
- }
|
|
|
-
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- args[arity - 1] = (err, ...cbArgs) => {
|
|
|
- if (err) return reject(err)
|
|
|
- resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]);
|
|
|
- };
|
|
|
- asyncFn.apply(this, args);
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- Object.defineProperty(awaitable, 'name', {
|
|
|
- value: `awaitable(${asyncFn.name})`
|
|
|
- });
|
|
|
-
|
|
|
- return awaitable
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a
|
|
|
* time.
|
|
@@ -977,7 +974,7 @@
|
|
|
}
|
|
|
|
|
|
var FN_ARGS = /^(?:async\s+)?(?:function)?\s*[^(]*\(\s*([^)]+)\s*\)(?:\s*{)/m;
|
|
|
- var ARROW_FN_ARGS = /^(?:async\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m;
|
|
|
+ var ARROW_FN_ARGS = /^(?:async\s+)?(?:function\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m;
|
|
|
var FN_ARG_SPLIT = /,/;
|
|
|
var FN_ARG = /(=.+)?(\s*)$/;
|
|
|
var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg;
|
|
@@ -1212,8 +1209,6 @@
|
|
|
dll.head = dll.tail = node;
|
|
|
}
|
|
|
|
|
|
- const noop = () => {};
|
|
|
-
|
|
|
function queue(worker, concurrency, payload) {
|
|
|
if (concurrency == null) {
|
|
|
concurrency = 1;
|
|
@@ -1225,6 +1220,35 @@
|
|
|
var _worker = wrapAsync(worker);
|
|
|
var numRunning = 0;
|
|
|
var workersList = [];
|
|
|
+ const events = {
|
|
|
+ error: [],
|
|
|
+ drain: [],
|
|
|
+ saturated: [],
|
|
|
+ unsaturated: [],
|
|
|
+ empty: []
|
|
|
+ };
|
|
|
+
|
|
|
+ function on (event, handler) {
|
|
|
+ events[event].push(handler);
|
|
|
+ }
|
|
|
+
|
|
|
+ function once (event, handler) {
|
|
|
+ const handleAndRemove = (...args) => {
|
|
|
+ off(event, handleAndRemove);
|
|
|
+ handler(...args);
|
|
|
+ };
|
|
|
+ events[event].push(handleAndRemove);
|
|
|
+ }
|
|
|
+
|
|
|
+ function off (event, handler) {
|
|
|
+ if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
|
|
|
+ if (!handler) return events[event] = []
|
|
|
+ events[event] = events[event].filter(ev => ev !== handler);
|
|
|
+ }
|
|
|
+
|
|
|
+ function trigger (event, ...args) {
|
|
|
+ events[event].forEach(handler => handler(...args));
|
|
|
+ }
|
|
|
|
|
|
var processingScheduled = false;
|
|
|
function _insert(data, insertAtFront, callback) {
|
|
@@ -1232,25 +1256,28 @@
|
|
|
throw new Error('task callback must be a function');
|
|
|
}
|
|
|
q.started = true;
|
|
|
- if (!Array.isArray(data)) {
|
|
|
- data = [data];
|
|
|
- }
|
|
|
- if (data.length === 0 && q.idle()) {
|
|
|
- // call drain immediately if there are no tasks
|
|
|
- return setImmediate$1(() => q.drain());
|
|
|
- }
|
|
|
+ /*if (Array.isArray(data)) {
|
|
|
|
|
|
- for (var i = 0, l = data.length; i < l; i++) {
|
|
|
- var item = {
|
|
|
- data: data[i],
|
|
|
- callback: callback || noop
|
|
|
- };
|
|
|
+ return data.map(datum => _insert(datum, insertAtFront, callback));
|
|
|
+ }*/
|
|
|
|
|
|
- if (insertAtFront) {
|
|
|
- q._tasks.unshift(item);
|
|
|
- } else {
|
|
|
- q._tasks.push(item);
|
|
|
+ var res;
|
|
|
+
|
|
|
+ var item = {
|
|
|
+ data,
|
|
|
+ callback: callback || function (err, ...args) {
|
|
|
+ // we don't care about the error, let the global error handler
|
|
|
+ // deal with it
|
|
|
+ if (err) return
|
|
|
+ if (args.length <= 1) return res(args[0])
|
|
|
+ res(args);
|
|
|
}
|
|
|
+ };
|
|
|
+
|
|
|
+ if (insertAtFront) {
|
|
|
+ q._tasks.unshift(item);
|
|
|
+ } else {
|
|
|
+ q._tasks.push(item);
|
|
|
}
|
|
|
|
|
|
if (!processingScheduled) {
|
|
@@ -1260,9 +1287,15 @@
|
|
|
q.process();
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ if (!callback) {
|
|
|
+ return new Promise((resolve) => {
|
|
|
+ res = resolve;
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- function _next(tasks) {
|
|
|
+ function _createCB(tasks) {
|
|
|
return function (err, ...args) {
|
|
|
numRunning -= 1;
|
|
|
|
|
@@ -1279,21 +1312,35 @@
|
|
|
task.callback(err, ...args);
|
|
|
|
|
|
if (err != null) {
|
|
|
- q.error(err, task.data);
|
|
|
+ trigger('error', err, task.data);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (numRunning <= (q.concurrency - q.buffer) ) {
|
|
|
- q.unsaturated();
|
|
|
+ trigger('unsaturated');
|
|
|
}
|
|
|
|
|
|
if (q.idle()) {
|
|
|
- q.drain();
|
|
|
+ trigger('drain');
|
|
|
}
|
|
|
q.process();
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ const eventMethod = (name) => (handler) => {
|
|
|
+ if (!handler) {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ once(name, (err, data) => {
|
|
|
+ if (err) return reject(err)
|
|
|
+ resolve(data);
|
|
|
+ });
|
|
|
+ })
|
|
|
+ }
|
|
|
+ off(name);
|
|
|
+ on(name, handler);
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
var isProcessing = false;
|
|
|
var q = {
|
|
|
_tasks: new DLL(),
|
|
@@ -1302,23 +1349,32 @@
|
|
|
},
|
|
|
concurrency,
|
|
|
payload,
|
|
|
- saturated: noop,
|
|
|
- unsaturated:noop,
|
|
|
buffer: concurrency / 4,
|
|
|
- empty: noop,
|
|
|
- drain: noop,
|
|
|
- error: noop,
|
|
|
started: false,
|
|
|
paused: false,
|
|
|
push (data, callback) {
|
|
|
- _insert(data, false, callback);
|
|
|
+ if (Array.isArray(data)) {
|
|
|
+ if (data.length === 0 && q.idle()) {
|
|
|
+ // call drain immediately if there are no tasks
|
|
|
+ return setImmediate$1(() => trigger('drain'));
|
|
|
+ }
|
|
|
+ return data.map(datum => _insert(datum, false, callback))
|
|
|
+ }
|
|
|
+ return _insert(data, false, callback);
|
|
|
},
|
|
|
kill () {
|
|
|
- q.drain = noop;
|
|
|
+ off();
|
|
|
q._tasks.empty();
|
|
|
},
|
|
|
unshift (data, callback) {
|
|
|
- _insert(data, true, callback);
|
|
|
+ if (Array.isArray(data)) {
|
|
|
+ if (data.length === 0 && q.idle()) {
|
|
|
+ // call drain immediately if there are no tasks
|
|
|
+ return setImmediate$1(() => trigger('drain'));
|
|
|
+ }
|
|
|
+ return data.map(datum => _insert(datum, true, callback))
|
|
|
+ }
|
|
|
+ return _insert(data, true, callback);
|
|
|
},
|
|
|
remove (testFn) {
|
|
|
q._tasks.remove(testFn);
|
|
@@ -1344,14 +1400,14 @@
|
|
|
numRunning += 1;
|
|
|
|
|
|
if (q._tasks.length === 0) {
|
|
|
- q.empty();
|
|
|
+ trigger('empty');
|
|
|
}
|
|
|
|
|
|
if (numRunning === q.concurrency) {
|
|
|
- q.saturated();
|
|
|
+ trigger('saturated');
|
|
|
}
|
|
|
|
|
|
- var cb = onlyOnce(_next(tasks));
|
|
|
+ var cb = onlyOnce(_createCB(tasks));
|
|
|
_worker(data, cb);
|
|
|
}
|
|
|
isProcessing = false;
|
|
@@ -1377,39 +1433,32 @@
|
|
|
setImmediate$1(q.process);
|
|
|
}
|
|
|
};
|
|
|
+ // define these as fixed properties, so people get useful errors when updating
|
|
|
+ Object.defineProperties(q, {
|
|
|
+ saturated: {
|
|
|
+ writable: false,
|
|
|
+ value: eventMethod('saturated')
|
|
|
+ },
|
|
|
+ unsaturated: {
|
|
|
+ writable: false,
|
|
|
+ value: eventMethod('unsaturated')
|
|
|
+ },
|
|
|
+ empty: {
|
|
|
+ writable: false,
|
|
|
+ value: eventMethod('empty')
|
|
|
+ },
|
|
|
+ drain: {
|
|
|
+ writable: false,
|
|
|
+ value: eventMethod('drain')
|
|
|
+ },
|
|
|
+ error: {
|
|
|
+ writable: false,
|
|
|
+ value: eventMethod('error')
|
|
|
+ },
|
|
|
+ });
|
|
|
return q;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * A cargo of tasks for the worker function to complete. Cargo inherits all of
|
|
|
- * the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}.
|
|
|
- * @typedef {Object} CargoObject
|
|
|
- * @memberOf module:ControlFlow
|
|
|
- * @property {Function} length - A function returning the number of items
|
|
|
- * waiting to be processed. Invoke like `cargo.length()`.
|
|
|
- * @property {number} payload - An `integer` for determining how many tasks
|
|
|
- * should be process per round. This property can be changed after a `cargo` is
|
|
|
- * created to alter the payload on-the-fly.
|
|
|
- * @property {Function} push - Adds `task` to the `queue`. The callback is
|
|
|
- * called once the `worker` has finished processing the task. Instead of a
|
|
|
- * single task, an array of `tasks` can be submitted. The respective callback is
|
|
|
- * used for every task in the list. Invoke like `cargo.push(task, [callback])`.
|
|
|
- * @property {Function} saturated - A callback that is called when the
|
|
|
- * `queue.length()` hits the concurrency and further tasks will be queued.
|
|
|
- * @property {Function} empty - A callback that is called when the last item
|
|
|
- * from the `queue` is given to a `worker`.
|
|
|
- * @property {Function} drain - A callback that is called when the last item
|
|
|
- * from the `queue` has returned from the `worker`.
|
|
|
- * @property {Function} idle - a function returning false if there are items
|
|
|
- * waiting or being processed, or true if not. Invoke like `cargo.idle()`.
|
|
|
- * @property {Function} pause - a function that pauses the processing of tasks
|
|
|
- * until `resume()` is called. Invoke like `cargo.pause()`.
|
|
|
- * @property {Function} resume - a function that resumes the processing of
|
|
|
- * queued tasks when the queue is paused. Invoke like `cargo.resume()`.
|
|
|
- * @property {Function} kill - a function that removes the `drain` callback and
|
|
|
- * empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`.
|
|
|
- */
|
|
|
-
|
|
|
/**
|
|
|
* Creates a `cargo` object with the specified payload. Tasks added to the
|
|
|
* cargo will be processed altogether (up to the `payload` limit). If the
|
|
@@ -1433,7 +1482,7 @@
|
|
|
* @param {number} [payload=Infinity] - An optional `integer` for determining
|
|
|
* how many tasks should be processed per round; if omitted, the default is
|
|
|
* unlimited.
|
|
|
- * @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can
|
|
|
+ * @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
|
|
|
* attached as certain properties to listen for specific events during the
|
|
|
* lifecycle of the cargo and inner queue.
|
|
|
* @example
|
|
@@ -1453,9 +1502,8 @@
|
|
|
* cargo.push({name: 'bar'}, function(err) {
|
|
|
* console.log('finished processing bar');
|
|
|
* });
|
|
|
- * cargo.push({name: 'baz'}, function(err) {
|
|
|
- * console.log('finished processing baz');
|
|
|
- * });
|
|
|
+ * await cargo.push({name: 'baz'});
|
|
|
+ * console.log('finished processing baz');
|
|
|
*/
|
|
|
function cargo(worker, payload) {
|
|
|
return queue(worker, 1, payload);
|
|
@@ -1709,6 +1757,7 @@
|
|
|
* @method
|
|
|
* @see [async.concat]{@link module:Collections.concat}
|
|
|
* @category Collection
|
|
|
+ * @alias flatMapLimit
|
|
|
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
|
|
|
* @param {number} limit - The maximum number of async operations at a time.
|
|
|
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
|
|
@@ -1724,7 +1773,7 @@
|
|
|
return mapLimit$1(coll, limit, (val, iterCb) => {
|
|
|
_iteratee(val, (err, ...args) => {
|
|
|
if (err) return iterCb(err);
|
|
|
- return iterCb(null, args);
|
|
|
+ return iterCb(err, args);
|
|
|
});
|
|
|
}, (err, mapResults) => {
|
|
|
var result = [];
|
|
@@ -1750,6 +1799,7 @@
|
|
|
* @memberOf module:Collections
|
|
|
* @method
|
|
|
* @category Collection
|
|
|
+ * @alias flatMap
|
|
|
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
|
|
|
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
|
|
|
* which should use an array as its result. Invoked with (item, callback).
|
|
@@ -1778,6 +1828,7 @@
|
|
|
* @method
|
|
|
* @see [async.concat]{@link module:Collections.concat}
|
|
|
* @category Collection
|
|
|
+ * @alias flatMapSeries
|
|
|
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
|
|
|
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
|
|
|
* The iteratee should complete with an array an array of results.
|
|
@@ -1849,7 +1900,7 @@
|
|
|
const iteratee = wrapAsync(_iteratee);
|
|
|
eachfn(arr, (value, _, callback) => {
|
|
|
iteratee(value, (err, result) => {
|
|
|
- if (err) return callback(err)
|
|
|
+ if (err || err === false) return callback(err);
|
|
|
|
|
|
if (check(result) && !testResult) {
|
|
|
testPassed = true;
|
|
@@ -2180,6 +2231,9 @@
|
|
|
/**
|
|
|
* The same as [`each`]{@link module:Collections.each} but runs only a single async operation at a time.
|
|
|
*
|
|
|
+ * Note, that unlike [`each`]{@link module:Collections.each}, this function applies iteratee to each item
|
|
|
+ * in series and therefore the iteratee functions will complete in order.
|
|
|
+
|
|
|
* @name eachSeries
|
|
|
* @static
|
|
|
* @memberOf module:Collections
|
|
@@ -2364,7 +2418,7 @@
|
|
|
if (v) {
|
|
|
results.push({index, value: x});
|
|
|
}
|
|
|
- iterCb();
|
|
|
+ iterCb(err);
|
|
|
});
|
|
|
}, err => {
|
|
|
if (err) return callback(err);
|
|
@@ -2529,7 +2583,7 @@
|
|
|
return mapLimit$1(coll, limit, (val, iterCb) => {
|
|
|
_iteratee(val, (err, key) => {
|
|
|
if (err) return iterCb(err);
|
|
|
- return iterCb(null, {key, val});
|
|
|
+ return iterCb(err, {key, val});
|
|
|
});
|
|
|
}, (err, mapResults) => {
|
|
|
var result = {};
|
|
@@ -2678,7 +2732,7 @@
|
|
|
_iteratee(val, key, (err, result) => {
|
|
|
if (err) return next(err);
|
|
|
newObj[key] = result;
|
|
|
- next();
|
|
|
+ next(err);
|
|
|
});
|
|
|
}, err => callback(err, newObj));
|
|
|
}
|
|
@@ -3000,6 +3054,9 @@
|
|
|
* @property {number} concurrency - an integer for determining how many `worker`
|
|
|
* functions should be run in parallel. This property can be changed after a
|
|
|
* `queue` is created to alter the concurrency on-the-fly.
|
|
|
+ * @property {number} payload - an integer that specifies how many items are
|
|
|
+ * passed to the worker function at a time. only applies if this is a
|
|
|
+ * [cargo]{@link module:ControlFlow.cargo} object
|
|
|
* @property {Function} push - add a new task to the `queue`. Calls `callback`
|
|
|
* once the `worker` has finished processing the task. Instead of a single task,
|
|
|
* a `tasks` array can be submitted. The respective callback is used for every
|
|
@@ -3012,20 +3069,26 @@
|
|
|
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
|
|
|
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
|
|
|
* `function ({data, priority}) {}` and returns a Boolean.
|
|
|
- * @property {Function} saturated - a callback that is called when the number of
|
|
|
- * running workers hits the `concurrency` limit, and further tasks will be
|
|
|
- * queued.
|
|
|
- * @property {Function} unsaturated - a callback that is called when the number
|
|
|
- * of running workers is less than the `concurrency` & `buffer` limits, and
|
|
|
- * further tasks will not be queued.
|
|
|
+ * @property {Function} saturated - a function that sets a callback that is
|
|
|
+ * called when the number of running workers hits the `concurrency` limit, and
|
|
|
+ * further tasks will be queued. If the callback is omitted, `q.saturated()`
|
|
|
+ * returns a promise for the next occurrence.
|
|
|
+ * @property {Function} unsaturated - a function that sets a callback that is
|
|
|
+ * called when the number of running workers is less than the `concurrency` &
|
|
|
+ * `buffer` limits, and further tasks will not be queued. If the callback is
|
|
|
+ * omitted, `q.unsaturated()` returns a promise for the next occurrence.
|
|
|
* @property {number} buffer - A minimum threshold buffer in order to say that
|
|
|
* the `queue` is `unsaturated`.
|
|
|
- * @property {Function} empty - a callback that is called when the last item
|
|
|
- * from the `queue` is given to a `worker`.
|
|
|
- * @property {Function} drain - a callback that is called when the last item
|
|
|
- * from the `queue` has returned from the `worker`.
|
|
|
- * @property {Function} error - a callback that is called when a task errors.
|
|
|
- * Has the signature `function(error, task)`.
|
|
|
+ * @property {Function} empty - a function that sets a callback that is called
|
|
|
+ * when the last item from the `queue` is given to a `worker`. If the callback
|
|
|
+ * is omitted, `q.empty()` returns a promise for the next occurrence.
|
|
|
+ * @property {Function} drain - a function that sets a callback that is called
|
|
|
+ * when the last item from the `queue` has returned from the `worker`. If the
|
|
|
+ * callback is omitted, `q.drain()` returns a promise for the next occurrence.
|
|
|
+ * @property {Function} error - a function that sets a callback that is called
|
|
|
+ * when a task errors. Has the signature `function(error, task)`. If the
|
|
|
+ * callback is omitted, `error()` returns a promise that rejects on the next
|
|
|
+ * error.
|
|
|
* @property {boolean} paused - a boolean for determining whether the queue is
|
|
|
* in a paused state.
|
|
|
* @property {Function} pause - a function that pauses the processing of tasks
|
|
@@ -3047,6 +3110,12 @@
|
|
|
* for (let item of q) {
|
|
|
* console.log(item)
|
|
|
* }
|
|
|
+ *
|
|
|
+ * q.drain(() => {
|
|
|
+ * console.log('all done')
|
|
|
+ * })
|
|
|
+ * // or
|
|
|
+ * await q.drain()
|
|
|
*/
|
|
|
|
|
|
/**
|
|
@@ -3078,22 +3147,23 @@
|
|
|
* }, 2);
|
|
|
*
|
|
|
* // assign a callback
|
|
|
- * q.drain = function() {
|
|
|
+ * q.drain(function() {
|
|
|
* console.log('all items have been processed');
|
|
|
- * };
|
|
|
+ * });
|
|
|
+ * // or await the end
|
|
|
+ * await q.drain()
|
|
|
*
|
|
|
* // assign an error callback
|
|
|
- * q.error = function(err, task) {
|
|
|
+ * q.error(function(err, task) {
|
|
|
* console.error('task experienced an error');
|
|
|
- * };
|
|
|
+ * });
|
|
|
*
|
|
|
* // add some items to the queue
|
|
|
* q.push({name: 'foo'}, function(err) {
|
|
|
* console.log('finished processing foo');
|
|
|
* });
|
|
|
- * q.push({name: 'bar'}, function (err) {
|
|
|
- * console.log('finished processing bar');
|
|
|
- * });
|
|
|
+ * // callback is optional
|
|
|
+ * q.push({name: 'bar'});
|
|
|
*
|
|
|
* // add some items to the queue (batch-wise)
|
|
|
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
|
|
@@ -3112,6 +3182,121 @@
|
|
|
}, concurrency, 1);
|
|
|
}
|
|
|
|
|
|
+ // Binary min-heap implementation used for priority queue.
|
|
|
+ // Implementation is stable, i.e. push time is considered for equal priorities
|
|
|
+ class Heap {
|
|
|
+ constructor() {
|
|
|
+ this.heap = [];
|
|
|
+ this.pushCount = Number.MIN_SAFE_INTEGER;
|
|
|
+ }
|
|
|
+
|
|
|
+ get length() {
|
|
|
+ return this.heap.length;
|
|
|
+ }
|
|
|
+
|
|
|
+ empty () {
|
|
|
+ this.heap = [];
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ percUp(index) {
|
|
|
+ let p;
|
|
|
+
|
|
|
+ while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) {
|
|
|
+ let t = this.heap[index];
|
|
|
+ this.heap[index] = this.heap[p];
|
|
|
+ this.heap[p] = t;
|
|
|
+
|
|
|
+ index = p;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ percDown(index) {
|
|
|
+ let l;
|
|
|
+
|
|
|
+ while ((l=leftChi(index)) < this.heap.length) {
|
|
|
+ if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) {
|
|
|
+ l = l+1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (smaller(this.heap[index], this.heap[l])) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ let t = this.heap[index];
|
|
|
+ this.heap[index] = this.heap[l];
|
|
|
+ this.heap[l] = t;
|
|
|
+
|
|
|
+ index = l;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ push(node) {
|
|
|
+ node.pushCount = ++this.pushCount;
|
|
|
+ this.heap.push(node);
|
|
|
+ this.percUp(this.heap.length-1);
|
|
|
+ }
|
|
|
+
|
|
|
+ unshift(node) {
|
|
|
+ return this.heap.push(node);
|
|
|
+ }
|
|
|
+
|
|
|
+ shift() {
|
|
|
+ let [top] = this.heap;
|
|
|
+
|
|
|
+ this.heap[0] = this.heap[this.heap.length-1];
|
|
|
+ this.heap.pop();
|
|
|
+ this.percDown(0);
|
|
|
+
|
|
|
+ return top;
|
|
|
+ }
|
|
|
+
|
|
|
+ toArray() {
|
|
|
+ return [...this];
|
|
|
+ }
|
|
|
+
|
|
|
+ *[Symbol.iterator] () {
|
|
|
+ for (let i = 0; i < this.heap.length; i++) {
|
|
|
+ yield this.heap[i].data;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ remove (testFn) {
|
|
|
+ let j = 0;
|
|
|
+ for (let i = 0; i < this.heap.length; i++) {
|
|
|
+ if (!testFn(this.heap[i])) {
|
|
|
+ this.heap[j] = this.heap[i];
|
|
|
+ j++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ this.heap.splice(j);
|
|
|
+
|
|
|
+ for (let i = parent(this.heap.length-1); i >= 0; i--) {
|
|
|
+ this.percDown(i);
|
|
|
+ }
|
|
|
+
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ function leftChi(i) {
|
|
|
+ return (i<<1)+1;
|
|
|
+ }
|
|
|
+
|
|
|
+ function parent(i) {
|
|
|
+ return ((i+1)>>1)-1;
|
|
|
+ }
|
|
|
+
|
|
|
+ function smaller(x, y) {
|
|
|
+ if (x.priority !== y.priority) {
|
|
|
+ return x.priority < y.priority;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ return x.pushCount < y.pushCount;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and
|
|
|
* completed in ascending priority order.
|
|
@@ -3139,6 +3324,8 @@
|
|
|
// Start with a normal queue
|
|
|
var q = queue$1(worker, concurrency);
|
|
|
|
|
|
+ q._tasks = new Heap();
|
|
|
+
|
|
|
// Override push to accept second parameter representing priority
|
|
|
q.push = function(data, priority = 0, callback = () => {}) {
|
|
|
if (typeof callback !== 'function') {
|
|
@@ -3148,16 +3335,11 @@
|
|
|
if (!Array.isArray(data)) {
|
|
|
data = [data];
|
|
|
}
|
|
|
- if (data.length === 0) {
|
|
|
+ if (data.length === 0 && q.idle()) {
|
|
|
// call drain immediately if there are no tasks
|
|
|
return setImmediate$1(() => q.drain());
|
|
|
}
|
|
|
|
|
|
- var nextNode = q._tasks.head;
|
|
|
- while (nextNode && priority >= nextNode.priority) {
|
|
|
- nextNode = nextNode.next;
|
|
|
- }
|
|
|
-
|
|
|
for (var i = 0, l = data.length; i < l; i++) {
|
|
|
var item = {
|
|
|
data: data[i],
|
|
@@ -3165,12 +3347,9 @@
|
|
|
callback
|
|
|
};
|
|
|
|
|
|
- if (nextNode) {
|
|
|
- q._tasks.insertBefore(nextNode, item);
|
|
|
- } else {
|
|
|
- q._tasks.push(item);
|
|
|
- }
|
|
|
+ q._tasks.push(item);
|
|
|
}
|
|
|
+
|
|
|
setImmediate$1(q.process);
|
|
|
};
|
|
|
|
|
@@ -3298,14 +3477,18 @@
|
|
|
var _fn = wrapAsync(fn);
|
|
|
return initialParams(function reflectOn(args, reflectCallback) {
|
|
|
args.push((error, ...cbArgs) => {
|
|
|
+ let retVal = {};
|
|
|
if (error) {
|
|
|
- return reflectCallback(null, { error });
|
|
|
+ retVal.error = error;
|
|
|
}
|
|
|
- var value = cbArgs;
|
|
|
- if (cbArgs.length <= 1) {
|
|
|
- [value] = cbArgs;
|
|
|
+ if (cbArgs.length > 0){
|
|
|
+ var value = cbArgs;
|
|
|
+ if (cbArgs.length <= 1) {
|
|
|
+ [value] = cbArgs;
|
|
|
+ }
|
|
|
+ retVal.value = value;
|
|
|
}
|
|
|
- reflectCallback(null, { value });
|
|
|
+ reflectCallback(null, retVal);
|
|
|
});
|
|
|
|
|
|
return _fn.apply(this, args);
|
|
@@ -3603,7 +3786,7 @@
|
|
|
if (err && attempt++ < options.times &&
|
|
|
(typeof options.errorFilter != 'function' ||
|
|
|
options.errorFilter(err))) {
|
|
|
- setTimeout(retryAttempt, options.intervalFunc(attempt));
|
|
|
+ setTimeout(retryAttempt, options.intervalFunc(attempt - 1));
|
|
|
} else {
|
|
|
callback(err, ...args);
|
|
|
}
|
|
@@ -3896,7 +4079,7 @@
|
|
|
return map$1(coll, (x, iterCb) => {
|
|
|
_iteratee(x, (err, criteria) => {
|
|
|
if (err) return iterCb(err);
|
|
|
- iterCb(null, {value: x, criteria});
|
|
|
+ iterCb(err, {value: x, criteria});
|
|
|
});
|
|
|
}, (err, results) => {
|
|
|
if (err) return callback(err);
|
|
@@ -4169,6 +4352,8 @@
|
|
|
var result;
|
|
|
return eachSeries$1(tasks, (task, taskCb) => {
|
|
|
wrapAsync(task)((err, ...args) => {
|
|
|
+ if (err === false) return taskCb(err);
|
|
|
+
|
|
|
if (args.length < 2) {
|
|
|
[result] = args;
|
|
|
} else {
|
|
@@ -4239,7 +4424,7 @@
|
|
|
callback = onlyOnce(callback);
|
|
|
var _fn = wrapAsync(iteratee);
|
|
|
var _test = wrapAsync(test);
|
|
|
- var results;
|
|
|
+ var results = [];
|
|
|
|
|
|
function next(err, ...rest) {
|
|
|
if (err) return callback(err);
|
|
@@ -4509,6 +4694,9 @@
|
|
|
find: detect$1,
|
|
|
findLimit: detectLimit$1,
|
|
|
findSeries: detectSeries$1,
|
|
|
+ flatMap: concat$1,
|
|
|
+ flatMapLimit: concatLimit$1,
|
|
|
+ flatMapSeries: concatSeries$1,
|
|
|
forEach: each,
|
|
|
forEachSeries: eachSeries$1,
|
|
|
forEachLimit: eachLimit$2,
|
|
@@ -4612,6 +4800,9 @@
|
|
|
exports.find = detect$1;
|
|
|
exports.findLimit = detectLimit$1;
|
|
|
exports.findSeries = detectSeries$1;
|
|
|
+ exports.flatMap = concat$1;
|
|
|
+ exports.flatMapLimit = concatLimit$1;
|
|
|
+ exports.flatMapSeries = concatSeries$1;
|
|
|
exports.forEach = each;
|
|
|
exports.forEachSeries = eachSeries$1;
|
|
|
exports.forEachLimit = eachLimit$2;
|