In the latest blog post we've touched the topic of parallelism in the Meteor framework using node-fibers. We've shown how to do multiple HTTP requests at the same time.
As we've seen, unbounded parallelism in Meteor is relatively easy. There are, however, cases when you wish to limit the degree of concurrency (akin to async's eachSeries). For example when issuing a lot of HTTP requests or accessing the file system; there's only so much open files and/or sockets one can have at the same time. How would one do that with blocking fibers?
Turns out it's not that hard either! Let's have a look at a simple implementation. What we want is something like this:
Future.task(() => {
// Call `getRandomGif` 50 times but use max 5 connections at the same time.
return eachWithLimit(_.times(50, () => getRandomGif), 5);
}).resolve(() => console.log('I have a lot of GIFs now.'))
where getRandomGif
returns a future for the result of HTTP GET request to a service that returns random GIFs. Now this here's a simple way to implement eachWithLimit
:
const eachWithLimit = (futureFuns, limit = Number.MAX_VALUE) => {
var info = {
blockedTasks: [],
futures: [],
running: 0,
limit
}
// Queue up our tasks.
futureFuns.forEach((fun) => {
addTask(fun, info)
});
// Wait for the results.
let fut, results = [];
while (fut = info.futures.shift()) {
results.push(fut.wait());
}
return results;
};
And here's how we could implement the addTask
function that takes care of scheduling and running the actual parallel jobs.
var addTask = (fun, info) => {
if (info.running >= info.limit) {
// We've hit the concurrency limit. Wait up.
info.blockedTasks.push(fun);
return;
}
++info.running;
// Run the task in a future.
let fut = fun();
info.futures.push(fut);
// Wait for the task to finish.
fut.resolve((err, val) => {
--info.running;
// Schedule the next waiting task in the queue.
if (info.blockedTasks.length > 0) {
addTask(info.blockedTasks.shift(), info);
}
});
}
Everything is pretty straightforward here. We queue up the jobs if we reach the concurrency limit and every time a job is done, we schedule the next one from the blockedTasks
queue. When they're all done, blockedTasks
will become empty rendering the while
cycle condition in eachWithLimit
falsy and returning an array with results. This way we can easily run N jobs in parallel and limit the concurrency to acceptable level. Easy enough right?
For a mature implementation of concurrency throttling with fibers, I recommend you check out the very fine asyncblock and asyncflow libraries.
Have fun with Meteor and fibers! I'm @tomas_brambora.