Asynchronous parallelFor in Eigen ThreadPoolDevice
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index ef22a26..ca2794c 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -75,9 +75,9 @@
   EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
     deallocate(buffer);
   }
-  
+
   template<typename Type>
-  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const { 
+  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
     return data;
   }
 
@@ -181,44 +181,173 @@
     return pool_->CurrentThreadId();
   }
 
-  // parallelFor executes f with [0, n) arguments in parallel and waits for
-  // completion. F accepts a half-open interval [first, last).
-  // Block size is chosen based on the iteration cost and resulting parallel
+  // WARNING: This function is synchronous and will block the calling thread.
+  //
+  // Synchronous parallelFor executes f with [0, n) arguments in parallel and
+  // waits for completion. F accepts a half-open interval [first, last). Block
+  // size is chosen based on the iteration cost and resulting parallel
   // efficiency. If block_align is not nullptr, it is called to round up the
   // block size.
   void parallelFor(Index n, const TensorOpCost& cost,
                    std::function<Index(Index)> block_align,
                    std::function<void(Index, Index)> f) const {
-    typedef TensorCostModel<ThreadPoolDevice> CostModel;
+    // Compute small problems directly in the caller thread.
     if (n <= 1 || numThreads() == 1 ||
         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
       f(0, n);
       return;
     }
 
-    // Calculate block size based on (1) the iteration cost and (2) parallel
-    // efficiency. We want blocks to be not too small to mitigate
-    // parallelization overheads; not too large to mitigate tail
-    // effect and potential load imbalance and we also want number
-    // of blocks to be evenly dividable across threads.
+    // Compute block size and total count of blocks.
+    ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
 
-    double block_size_f = 1.0 / CostModel::taskSize(1, cost);
+    // Recursively divide size into halves until we reach block_size.
+    // Division code rounds mid to block_size, so we are guaranteed to get
+    // block_count leaves that do actual computations.
+    Barrier barrier(static_cast<unsigned int>(block.count));
+    std::function<void(Index, Index)> handleRange;
+    handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
+                                                  Index lastIdx) {
+      while (lastIdx - firstIdx > block.size) {
+        // Split into halves and schedule the second half on a different thread.
+        const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
+        pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
+        lastIdx = midIdx;
+      }
+      // Single block or less, execute directly.
+      f(firstIdx, lastIdx);
+      barrier.Notify();
+    };
+
+    if (block.count <= numThreads()) {
+      // Avoid a thread hop by running the root of the tree and one block on the
+      // main thread.
+      handleRange(0, n);
+    } else {
+      // Execute the root in the thread pool to avoid running work on more than
+      // numThreads() threads.
+      pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
+    }
+
+    barrier.Wait();
+  }
+
+  // Convenience wrapper for parallelFor that does not align blocks.
+  void parallelFor(Index n, const TensorOpCost& cost,
+                   std::function<void(Index, Index)> f) const {
+    parallelFor(n, cost, NULL, std::move(f));
+  }
+
+  // WARNING: This function is asynchronous and will not block the calling thread.
+  //
+  // Asynchronous parallelFor executes f with [0, n) arguments in parallel
+  // without waiting for completion. When the last block finished, it will call
+  // 'done' callback. F accepts a half-open interval [first, last). Block size
+  // is chosen based on the iteration cost and resulting parallel efficiency. If
+  // block_align is not nullptr, it is called to round up the block size.
+  void parallelForAsync(Index n, const TensorOpCost& cost,
+                        std::function<Index(Index)> block_align,
+                        std::function<void(Index, Index)> f,
+                        std::function<void()> done) const {
+    // Compute block size and total count of blocks.
+    ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
+
+    ParallelForAsyncContext* const ctx =
+        new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
+
+    // Recursively divide size into halves until we reach block_size.
+    // Division code rounds mid to block_size, so we are guaranteed to get
+    // block_count leaves that do actual computations.
+    ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
+      while (lastIdx - firstIdx > block.size) {
+        // Split into halves and schedule the second half on a different thread.
+        const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
+        pool_->Schedule(
+            [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
+        lastIdx = midIdx;
+      }
+
+      // Single block or less, execute directly.
+      ctx->f(firstIdx, lastIdx);
+
+      // Call 'done' callback if it was the last block.
+      if (ctx->count.fetch_sub(1) == 1) {
+        (ctx->done)();
+        // We can't delete ctx right now, because it will deallocate the closure
+        // we are currently in.
+        pool_->Schedule([ctx]() { delete ctx; });
+      }
+    };
+
+    // Execute the root in the thread pool.
+    pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
+  }
+
+  // Convenience wrapper for parallelForAsync that does not align blocks.
+  void parallelForAsync(Index n, const TensorOpCost& cost,
+                        std::function<void(Index, Index)> f,
+                        std::function<void()> done) const {
+    parallelForAsync(n, cost, NULL, std::move(f), std::move(done));
+  }
+
+  // Thread pool accessor.
+  ThreadPoolInterface* getPool() const { return pool_; }
+
+  // Allocator accessor.
+  Allocator* allocator() const { return allocator_; }
+
+ private:
+  typedef TensorCostModel<ThreadPoolDevice> CostModel;
+
+  // For parallelForAsync we must keep passed in closures on the heap, and
+  // delete them only after `done` callback finished.
+  struct ParallelForAsyncContext {
+    ParallelForAsyncContext(Index count, std::function<void(Index, Index)> f,
+                             std::function<void()> done)
+        : count(count), f(std::move(f)), done(std::move(done)) {}
+
+    std::atomic<Index> count;
+    std::function<void(Index, Index)> f;
+    std::function<void()> done;
+
+    std::function<void(Index, Index)> handle_range;
+  };
+
+  struct ParallelForBlock {
+    Index size;   // block size
+    Index count;  // number of blocks
+  };
+
+  // Calculates block size based on (1) the iteration cost and (2) parallel
+  // efficiency. We want blocks to be not too small to mitigate parallelization
+  // overheads; not too large to mitigate tail effect and potential load
+  // imbalance and we also want number of blocks to be evenly dividable across
+  // threads.
+  ParallelForBlock CalculateParallelForBlock(
+      const Index n, const TensorOpCost& cost,
+      std::function<Index(Index)> block_align) const {
+    const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
     const Index max_oversharding_factor = 4;
     Index block_size = numext::mini(
-        n, numext::maxi<Index>(divup<Index>(n, max_oversharding_factor * numThreads()),
-                               block_size_f));
+        n, numext::maxi<Index>(
+               divup<Index>(n, max_oversharding_factor * numThreads()),
+               block_size_f));
     const Index max_block_size = numext::mini(n, 2 * block_size);
+
     if (block_align) {
       Index new_block_size = block_align(block_size);
       eigen_assert(new_block_size >= block_size);
       block_size = numext::mini(n, new_block_size);
     }
+
     Index block_count = divup(n, block_size);
+
     // Calculate parallel efficiency as fraction of total CPU time used for
     // computations:
     double max_efficiency =
         static_cast<double>(block_count) /
         (divup<int>(block_count, numThreads()) * numThreads());
+
     // Now try to increase block size up to max_block_size as long as it
     // doesn't decrease parallel efficiency.
     for (Index prev_block_count = block_count;
@@ -251,47 +380,9 @@
       }
     }
 
-    // Recursively divide size into halves until we reach block_size.
-    // Division code rounds mid to block_size, so we are guaranteed to get
-    // block_count leaves that do actual computations.
-    Barrier barrier(static_cast<unsigned int>(block_count));
-    std::function<void(Index, Index)> handleRange;
-    handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
-      while (lastIdx - firstIdx > block_size) {
-        // Split into halves and schedule the second half on a different thread.
-        const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block_size) * block_size;
-        pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
-        lastIdx = midIdx;
-      }
-      // Single block or less, execute directly.
-      f(firstIdx, lastIdx);
-      barrier.Notify();
-    };
-    if (block_count <= numThreads()) {
-      // Avoid a thread hop by running the root of the tree and one block on the
-      // main thread.
-      handleRange(0, n);
-    } else {
-      // Execute the root in the thread pool to avoid running work on more than
-      // numThreads() threads.
-      pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
-    }
-    barrier.Wait();
+    return {block_size, block_count};
   }
 
-  // Convenience wrapper for parallelFor that does not align blocks.
-  void parallelFor(Index n, const TensorOpCost& cost,
-                   std::function<void(Index, Index)> f) const {
-    parallelFor(n, cost, NULL, std::move(f));
-  }
-
-  // Thread pool accessor.
-  ThreadPoolInterface* getPool() const { return pool_; }
-
-  // Allocator accessor.
-  Allocator* allocator() const { return allocator_; }
-
- private:
   ThreadPoolInterface* pool_;
   int num_threads_;
   Allocator* allocator_;