天天看点

ClickHouse 同步-异步 Executor

#pragma once

#include <future>
#include <functional>
#include <Common/ThreadPool.h>

namespace DB
{

/// Interface to run task asynchronously with possibility to wait for execution.
class Executor
{
public:
    virtual ~Executor() = default;
    virtual std::future<void> execute(std::function<void()> task) = 0;
};

/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
    SyncExecutor() = default;
    std::future<void> execute(std::function<void()> task) override
    {
        auto promise = std::make_shared<std::promise<void>>();
        try
        {
            task();
            promise->set_value();
        }
        catch (...)
        {
            try
            {
                promise->set_exception(std::current_exception());
            }
            catch (...) { }
        }
        return promise->get_future();
    }
};


/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
    explicit AsyncExecutor(const String & name_, int thread_pool_size) : name(name_), pool(ThreadPool(thread_pool_size)) { }

    std::future<void> execute(std::function<void()> task) override
    {
        auto promise = std::make_shared<std::promise<void>>();
        pool.scheduleOrThrowOnError([promise, task]() {
            try
            {
                task();
                promise->set_value();
            }
            catch (...)
            {
                tryLogCurrentException("Failed to run async task");

                try
                {
                    promise->set_exception(std::current_exception());
                }
                catch (...)
                {
                }
            }
        });

        return promise->get_future();
    }

    void setMaxThreads(size_t threads) { pool.setMaxThreads(threads); }

private:
    String name;
    ThreadPool pool;
};


}
           

复制