Is using std::async many times for small tasks performance friendly?
the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately
No. If the asynchronous tasks are in fact run asynchronously (rather than deferred) then all that's required is that they are run as if on a new thread. It is perfectly valid for a new thread to be created and started for every task, without any regard for the hardware's limited capacity for parallelism.
There's a note:
[ Note: If this policy is specified together with other policies, such as when using a policy value of launch::async | launch::deferred, implementations should defer invocation or the selection of the policy when no more concurrency can be effectively exploited. —end note ]
However, this is non-normative and in any case it indicates that once no more concurrency can be exploited the tasks may become deferred, and therefore get executed when someone waits on the result, rather than still being asynchronous and running immediately after one of the previous asynchronous tasks is finished, as would be desirable for maximum parallelism.
That is, if we have 10 long running tasks and the implementation can only execute 4 in parallel, then the first 4 will be asynchronous and then the last 6 may be deferred. Waiting on the futures in sequence would execute the deferred tasks on a single thread in sequence, eliminating parallel execution for those tasks.
The note does also say that instead of deferring invocation, the selection of the policy may be deferred. That is, the function may still run asynchronously but that decision may be delayed, say, until one of the earlier tasks completes, freeing up a core for a new task. But again, this is not required, the note is non-normative, and as far as I know Microsoft's implementation is the only one that behaves this way. When I looked at another implementation, libc++, it simply ignores this note altogether so that using either std::launch::async
or std::launch::any
policies result in asynchronous execution on a new thread.
(I believe in Microsoft's case their ConcRT library is responsible for this?)
Microsoft's implementation does indeed behave as you describe, however this is not required and a portable program cannot rely on that behavior.
One way to portably limit how many threads are actually running is to use something like a semaphore:
#include <future>
#include <mutex>
#include <cstdio>
// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
//
class Semaphore {
private:
std::mutex m;
std::condition_variable cv;
unsigned int count;
public:
Semaphore(int n) : count(n) {}
void notify() {
std::unique_lock<std::mutex> l(m);
++count;
cv.notify_one();
}
void wait() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this]{ return count!=0; });
--count;
}
};
// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
Semaphore &s;
public:
Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
~Semaphore_waiter_notifier() { s.notify(); }
};
// some inefficient work for our threads to do
int fib(int n) {
if (n<2) return n;
return fib(n-1) + fib(n-2);
}
// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
Container::size_type i = 0;
for (auto &e : c)
f(i++, e);
return f;
}
// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);
int main() {
std::vector<int> input(100);
for_each(input, [](int i, int &e) { e = (i%10) + 35; });
std::vector<std::future<int>> output;
for_each(input, [&output](int i, int e) {
output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
Semaphore_waiter_notifier w(thread_limiter);
std::printf("Starting task %d\n", task);
int res = fib(n);
std::printf("\t\t\t\t\t\tTask %d finished\n", task);
return res;
}, i, e));
});
for_each(output, [](int i, std::future<int> &e) {
std::printf("\t\t\tWaiting on task %d\n", i);
int res = e.get();
std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
});
}