Saturday, March 20, 2010

Планирование задач в сервере при помощи boost.task

Недавно на профильном ресурсе один программист задал вопрос: "Что использовать в сервере ММО для работы с потоками?". Программист склонялся к Intel TBB, но даже не к базовым примитивам, а к кастомному планированию задач (task scheduling). Ну нравится TBB - ну и ладно. А немного позже я увидел исходники сервера ММО другого программиста, который недавно начал переписываться его с нуля для улучшения архитектуры. И там было очень много велосипедов, которые писались самим программистом вместо того что бы использовать сторонние компоненты такие как boost (к примеру класы обертки над pthread-ом, и это в 2010 году, когда boost.thread уже почти в стандарте). Была там реализована и поддержка пула потоков с планировщиком задач. Тема эта мне очень интересна и я начал копать информацию о готовых решениях планировки задач (как в TBB) и нашел boost.task, про что и решил написать.

Определение

Задача (task) - это логически объедененный набор действий. Планировщик задач (task scheduler) асинхронино выполняет задачи руководствуясь определенными стратегиями по выбору кто должен выполняться в данный момент в каком потоке.

Задачи позволяют абстрагировться от обычных потоков и оперировать на более высоком уровне.

Зачем нужен планировщик задач?

Как работает сферический сервер в вакууме? Очень просто:
  1. Приходит запрос от клиента
  2. Он обрабатывается!
  3. Отсылается ответ

Ну кроме того в сервере могут происходить какие то процессы, которые выполняются и без запроса клиента. Например рассылка уведомлений по всей базе пользователей, очистка базы от устаревших данных (крончик), обработка дневной статистики и тд.

Сейчас загвоздка именно в том, как обрабатывается запрос. Надо разобраться как его обрабатывать.

Возьмем к примеру memcached-подобный сервер: у нас есть hash_map с данными, есть запросы чтения, есть запросы записи, которые делают простой лукап по хеш-мапе и возвращают данные либо записывают их в хеш-мап. Пока всё происходит в одном потоке, но что делать, если нам надо задействовать все процессоры системы?

Создаем столько потоков, сколько ядер. В каждом потоке обрабатываем пользователей, которых при создании соеденения раскидываем по принципу round-robin. При обращении к контейнеру используем rwlock-и (boost::shared_mutex). Отлично. А как нам быть с удалением элементов из контейнера? Создаем поток, который раз в N секунд просыпается и чистит контейнер.

Это был простой пример, а теперь более сложный пример: сервис, который может в зависимости от запроса пользователя сделать запрос в базу данных, сделать http запрос на какой то сайт. Что будет если сделать серрвер по предидущей модели (все запросы к другим компонентам будут выполняться синхронно)? Ну база данных находится на той же площадке, что и сервер, ответ будет в приделах пары миллисекунд. Отослать email - тоже не проблема - ставим sendmail на ту же машину, отдаём ему данные, а он сам разберется как отослать письмо.

Отлично. Хотя не совсем. А что делать с http-запросом? Он же может занять очень долго - всё зависит от сайта который находится где то далеко и не известно сколько будет обрабатывать запрос. В таком случае поток будет бездействовать, хотя в очереди есть много запросов, которые могут выполниться, но они ждут пока освободится этот поток.

Такой запрос необходимо выполнять асинхронно. Реализовать можно так:

class LongRequestHandler
{
public:
        void Handle()
        {
                // read client request parameters
                // mysql request 1
                // mysql request 2
                HttpRequestExecutor::GetInstance()->Execute(
                        "example.com?x=1",
                        boost::bind(this, &LongRequestHandler::HandleStage2)
                );
        }
        void HandleStage2(const std::string & http_request_result)
        {
                // mysql request 3
                // write response to client
        }
};

HttpRequestExecutor принемает url запроса и колбек, который надо вызвать по завершению запроса (тип колбека - boost::function).

И такой подход в работает, правда не слишком красиво.

В блоге Thinking Asynchronously in C++ показана интерестная реализация выполнения асинхронных задач. Выглядит результат следующим образом:

template void async_echo(
  tcp::socket& socket,
  mutable_buffer working_buffer,
  Handler handler,
  // coroutine state:
  coroutine coro = coroutine(),
  error_code ec = error_code(),
  size_t length = 0)
{
  reenter (coro)
  {
  entry:
    while (!ec)
    {
      yield socket.async_read_some(
          buffer(working_buffer),
          bind(&async_echo,
            ref(socket), working_buffer,
            box(handler), coro, _1, _2));
      if (ec) break;
      yield async_write(socket,
          buffer(working_buffer, length),
          bind(&async_echo,
            ref(socket), working_buffer,
            box(handler), coro, _1, _2));
    }
    handler(ec);
  }
}

Coroutine и yield в С++ смотрятся необычно;) Реализовано это на дефайнах, в блоге можно почитать как это удалось автору.

Постепенно логика усложняется, добавляются новые элементы, которые надо обрабатывать асинхронно, реализация тоже усложняется. В дальнейшем задачу

mysql request 1
mysql request 2
http request 1
mysql request 3
http request 2
mysql request 4
mysql request 5

И выполняя её последовательно с остановками в http запросах мы видим,что запросы

mysql request 2
http request 1

и

mysql request 3
http request 2
mysql request 4

можно выполнять паралельно и если мы захотим это сделать, то прийдется ещё сильнее усложнять логику. А хотелось бы написать простой код, например:

mysql request 1
x = run(func1)
y = run(func2)
wait(x, y)
mysql request 5

func1:
  mysql request 2
  http request 1

func2:
  mysql request 3
  http request 2
  mysql request 4

Вот тут и пригодиться планировщик задач.

Реализации

Про поддержку планировщика задач в новом стандарте 0x можно почитать тут.
  • just::thread - реализация библиотеки потоков стандарта C++0x от отца boost::thread
  • Parallel Patterns Library (PPL) - реализцаия от Microsoft
  • Asynchronous Agents Library - и ещё одна от Microsoft
  • Intel Threading Building Blocks - очень мощная библиотека для паралельного программирования от Intel. Включает в себя и планировщик задач.
  • boost::task - - реализация от Oliver Kowalke, не принятая ещё в boost

Мне наиболее понравился boost.task. Дальше его детальное рассмотрение.

Описание boost.task

boost.task - реализация предложения в стандарт C++0x. Она поддерживает задание стратегий выполнения задач, создание под-задач, прерывание задач.

Библиотека зависит от:

boost.task и boost.fiber компилируемые библиотеки (boost.atomic и boost.move - header-only) - так что прийдется их собирать. Что бы было удобнее эксперементировать собрал все зависимости в одном месте, приправил cmake-ом и залил поект на github. Работает на linux-е, для сборки под windows - потребуется 2-3 строчки добавить в cmake файлы.

Пример использования

API библиотеки достаточно простой, реализовать обработчик запроса, который описывался выше не совтавит труда. Приведу его ещё раз:

mysql request 1

  mysql request 2
  http request 1

  mysql request 3
  http request 2
  mysql request 4

mysql request 5

В качестве эмуляции запроса к mysql будет использован обычный sleep на случайное время:

boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));

В качестве внешнего http-запроса будет использован асинхронный таймер из boost::asio.

Итак:

Request - класс запроса.

class Request
{
public:
    Request(const std::string & data);
    const std::string & Read() const;
    void Write(const std::string & answer);
};

А RequestHandler - класс обработчика запроса.

class RequestHandler
{
public:
    RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);
    void Process() const;
};

io_service - передается для того, что бы можно было выполнить внешний вызов (использовать таймер boost::asio::deadline_timer).

Итак начнем. Определяем пул потоков, для обработки наших задач:

boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );

boost.task поддерживает два основных вида стратегий планировки задач:

  • ограниченные (bounded) - имеют порог количества выполняемых задач, при достижении которого добавление новой задачи блокирует поток, который выполняет это действие. Основная задача - избежать исчерпания ресурсов (resource exhaustion) когда скорость добавления задач превышает скорость их выполнения
  • неограниченные (unbounded) - позволяют добавлять бесконечное число задач в очередь

Также есть возможность задания стратегии обработки задач внутри очереди:

  • fifo - первая добавленная задача выполняется первой
  • priority - у задачи есть приоритет, для выполнения выбираются задачи с высшим приоритетом
  • smart - очередь такого типа возможно сильно кастомизировать передавая параметры в шаблон. по умолчанию есть возможность индексировать задачи по любому ключу и заменять старую задачу на новую, если она сущесвует

Соответственно описанная строчка кода создает пул из 5 потоков с неограниченной очередью типа fifo.

Теперь нам понадобится создать io_service и пул из 3-х потоков для обработки внешних запросов.

boost::asio::io_service io_service;

Если вызвать io_service::run в момент когда в нем нету задач, метод сразу завершится, а для нормальной работы нам необходимы работающие потоки. Обычно это достигается тем, что в io_service добавлен accept-ор порта, на который подключаются клиенты, а в данном случае можно занять io_service ожиданием исполнения таймера:

boost::asio::deadline_timer dummy_timer(io_service);
dummy_timer.expires_from_now(boost::posix_time::seconds(10));
// void dummy_handler(const boost::system::error_code&) {}
dummy_timer.async_wait(&dummy_handler);

После этого можно создать пул потоков:

boost::thread_group io_service_thread_pool;
for(int i = 0; i < 3; ++i)
    io_service_thread_pool.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service)
    );

Далее создаём запрос:

RequestPtr request(new Request("some data"));
RequestHandlerPtr handler(new RequestHandler(io_service, request));

Все готово, можно выполнять задачу:

boost::tasks::handle< void > request_processing(
    boost::tasks::async(
        boost::tasks::make_task( &RequestHandler::Process, handler ),
        pool));

boost::tasks::make_task( &RequestHandler::Process, handler ) - создает задачу вызова Process у объекта handler, которую можно будет выполнить. boost::tasks::async инициирует асинхронное выполнение задачи. boost::tasks::handle объект, по которому можно отслеживать статус завершения задачи, получить результат если он есть.

boost::tasks::async поддерживает 4 алгоритма выполнения задачи:

  • own_thread - синхронное выполнение в том же потоке
  • new_thread - для задачи создается поток, в котором она будет выполнена, после чего поток будет завершен
  • as_sub_task - если текущая задача выполняется в пуле - добавляет новую задачу в него, иначе создает новый поток, как new_thread. Это поведение по умолчанию
  • static_pool - выполнить задачу в пуле потоков

Далее подождем пока задача выполнится:

request_processing.wait();

И остановим io_service:

io_service.stop();
io_service_thread_pool.join_all();

Функция Process получилась на удивление очень простой

void Subtask1() const
{
    Request("query2");
    ExternalRequest("extquery1");
}

void Subtask2() const
{
    Request("query3");
    ExternalRequest("extquery2");
    Request("query4");
}

void Process() const
{
    std::string data = request_->Read();

    Request("query1");

    boost::tasks::handle< void > subtask1(
        boost::tasks::async(
            boost::tasks::make_task( &RequestHandler::Subtask1, this )));
    boost::tasks::handle< void > subtask2(
        boost::tasks::async(
            boost::tasks::make_task( &RequestHandler::Subtask2, this )));

    boost::tasks::waitfor_all( subtask1, subtask2);

    Request("query5");

    request_->Write("some answer");
}

Подзадачи выполняются при помощи boost::tasks::async без указания policy на запуск и автоматически выбирается as_sub_task алгоритм, который выполнит задачи в том же пуле потоков, что и родительская задача. Реализация функций подзадач тоже тривиальная.

RequestHandler::Request - вызывает boost::this_thread::sleep, а с ExternalRequest все немного сложнее:

void ExternalRequest(const std::string & what) const
{
    ExternalRequestHandler external_handler(io_service_);
    boost::tasks::spin::auto_reset_event ev;
    external_handler.PerformExternalReqeust(what, &ev);
    ev.wait();
}

Создается хендлер, а так же событие с автоматическим сбросом - boost::tasks::spin::auto_reset_event. Это событие передается обработчику внешнего запроса и по его завершению будет вызвано ev.set(), а до тех пор ev.wait() блокирует задачу.

В противовес обычным потокам и примитивам синхронизации (boost::condition) ev.wait() не блокирует поток, а блокирует задачу (вызывает в цикле this_task::yield()). А это значит, что ресурсы процессора будут использованы другими задачами.

Файл целиком может быть найден тут.

Выводы

boost.task вполне удобная библиотека для планирования задач. Она позволяет посмотреть как будет выглядить поддержка асинхронного выполнения кода в новом стандарте C++0x, и её можно использовать уже прямо сейчас не дожидаясь пока будет выпущен стандарт.

Код с использованием boost.task становится меньше и намного понятнее, чем при обычном использовании потоков.

Есть канечно и недостатки: код ещё не оптимизирован, что может вызвать проблемы в редких случаях; библиотека ещё не принята в boost (вместе с её зависимостями).

Что почитать по теме?

No comments:

Post a Comment