Monday, March 29, 2010

128-bit

Понадобилось на днях умножать и считать остаток от деления для 128 битных чисел на плюсах: (a * b) % c, где все числа 64 битные, а это значит что надо оперировать именно с 128 битами.
Нашел класс uint128_t, который использует два 64 битных числа. и в нем куча функционала. так как boost использовать нельзя было.
Быстро отпилил его, написав вручную недостоющие операторы. Отлично! Даже работает быстро. Но когда запустил на машине для которой создавался код все оказалось медленне раз в 10.
Немного покопав узнал, что дело в 32 битной системе (у меня на ноуте 64 бита). Очень огорчился и уж было думал что всё пропало и ничего сделать нельзя, но потом решил переписать вручную.
Сложить два 128 битных числа можно так:


inline void a_add_b_inplace(uint64_t& a0, uint64_t& a1, const uint64_t b0, const uint64_t b1)
{
 if(a1 >= 0xffffffffffffffffULL - b1)
 {
  ++a0;
 }
 a1 += b1;
 a0 += b0;
}


Умножение двух 64 битных чисел можно эфективно сделать используя три 32 битных числа и сохранить это все в 2 64 битных числа (получится 128 бит информации). Что то типа:


inline void a_mul_b(const uint64_t& a, const uint64_t& b, uint64_t& d0, uint64_t& d1)
{
 uint32_t a0 = (a >> 32) & 0x00ffffffffU;
 uint32_t a1 = a & 0x00ffffffffU;
 uint32_t b0 = (b >> 32);
 uint32_t b1 = b & 0x00ffffffffU;
 uint64_t c0 = ((uint64_t)a0) * b0; // << 64
 uint64_t c1 = ((uint64_t)a0) * b1 + ((uint64_t)a1) * b0; // << 32
 uint64_t c2 = ((uint64_t)a1) * b1;
 d0 = c0; // << 64
 d1 = c2;
 uint64_t c10 = (c1 >> 32) & 0x00ffffffffU;
 uint64_t c11 = c1 & 0x00ffffffffU;
 a_add_b_inplace(d0, d1, c10, c11<<32);
}
Остаток от деления можно получить уже не так просто. Для этого необходимо реализовать ещё несколько операций. Но удивил сам результат - на 32 битной платформе этот код работал в 5 раз бытрее чем класс uint128_t. А ещё на 64 битке эта реализация по скорости почти не отличается от 32-ной. Ура. В итоге я смог реализовать алгоритм Miller-Rabin-а для 64 битных чисел, который работал бы всего в несколько раз меленне на 32-х битной системе, чем на 64-х битной.

Saturday, March 20, 2010

Планирование задач в сервере при помощи boost.task

Недавно на профильном ресурсе один программист задал вопрос: "Что использовать в сервере ММО для работы с потоками?". Программист склонялся к Intel TBB, но даже не к базовым примитивам, а к кастомному планированию задач (task scheduling). Ну нравится TBB - ну и ладно. А немного позже я увидел исходники сервера ММО другого программиста, который недавно начал переписываться его с нуля для улучшения архитектуры. И там было очень много велосипедов, которые писались самим программистом вместо того что бы использовать сторонние компоненты такие как boost (к примеру класы обертки над pthread-ом, и это в 2010 году, когда boost.thread уже почти в стандарте). Была там реализована и поддержка пула потоков с планировщиком задач. Тема эта мне очень интересна и я начал копать информацию о готовых решениях планировки задач (как в TBB) и нашел boost.task, про что и решил написать.

Определение

Задача (task) - это логически объедененный набор действий. Планировщик задач (task scheduler) асинхронино выполняет задачи руководствуясь определенными стратегиями по выбору кто должен выполняться в данный момент в каком потоке.

Задачи позволяют абстрагировться от обычных потоков и оперировать на более высоком уровне.

Зачем нужен планировщик задач?

Как работает сферический сервер в вакууме? Очень просто:
  1. Приходит запрос от клиента
  2. Он обрабатывается!
  3. Отсылается ответ

Ну кроме того в сервере могут происходить какие то процессы, которые выполняются и без запроса клиента. Например рассылка уведомлений по всей базе пользователей, очистка базы от устаревших данных (крончик), обработка дневной статистики и тд.

Сейчас загвоздка именно в том, как обрабатывается запрос. Надо разобраться как его обрабатывать.

Возьмем к примеру memcached-подобный сервер: у нас есть hash_map с данными, есть запросы чтения, есть запросы записи, которые делают простой лукап по хеш-мапе и возвращают данные либо записывают их в хеш-мап. Пока всё происходит в одном потоке, но что делать, если нам надо задействовать все процессоры системы?

Создаем столько потоков, сколько ядер. В каждом потоке обрабатываем пользователей, которых при создании соеденения раскидываем по принципу round-robin. При обращении к контейнеру используем rwlock-и (boost::shared_mutex). Отлично. А как нам быть с удалением элементов из контейнера? Создаем поток, который раз в N секунд просыпается и чистит контейнер.

Это был простой пример, а теперь более сложный пример: сервис, который может в зависимости от запроса пользователя сделать запрос в базу данных, сделать http запрос на какой то сайт. Что будет если сделать серрвер по предидущей модели (все запросы к другим компонентам будут выполняться синхронно)? Ну база данных находится на той же площадке, что и сервер, ответ будет в приделах пары миллисекунд. Отослать email - тоже не проблема - ставим sendmail на ту же машину, отдаём ему данные, а он сам разберется как отослать письмо.

Отлично. Хотя не совсем. А что делать с http-запросом? Он же может занять очень долго - всё зависит от сайта который находится где то далеко и не известно сколько будет обрабатывать запрос. В таком случае поток будет бездействовать, хотя в очереди есть много запросов, которые могут выполниться, но они ждут пока освободится этот поток.

Такой запрос необходимо выполнять асинхронно. Реализовать можно так:

class LongRequestHandler
{
public:
        void Handle()
        {
                // read client request parameters
                // mysql request 1
                // mysql request 2
                HttpRequestExecutor::GetInstance()->Execute(
                        "example.com?x=1",
                        boost::bind(this, &LongRequestHandler::HandleStage2)
                );
        }
        void HandleStage2(const std::string & http_request_result)
        {
                // mysql request 3
                // write response to client
        }
};

HttpRequestExecutor принемает url запроса и колбек, который надо вызвать по завершению запроса (тип колбека - boost::function).

И такой подход в работает, правда не слишком красиво.

В блоге Thinking Asynchronously in C++ показана интерестная реализация выполнения асинхронных задач. Выглядит результат следующим образом:

template void async_echo(
  tcp::socket& socket,
  mutable_buffer working_buffer,
  Handler handler,
  // coroutine state:
  coroutine coro = coroutine(),
  error_code ec = error_code(),
  size_t length = 0)
{
  reenter (coro)
  {
  entry:
    while (!ec)
    {
      yield socket.async_read_some(
          buffer(working_buffer),
          bind(&async_echo,
            ref(socket), working_buffer,
            box(handler), coro, _1, _2));
      if (ec) break;
      yield async_write(socket,
          buffer(working_buffer, length),
          bind(&async_echo,
            ref(socket), working_buffer,
            box(handler), coro, _1, _2));
    }
    handler(ec);
  }
}

Coroutine и yield в С++ смотрятся необычно;) Реализовано это на дефайнах, в блоге можно почитать как это удалось автору.

Постепенно логика усложняется, добавляются новые элементы, которые надо обрабатывать асинхронно, реализация тоже усложняется. В дальнейшем задачу

mysql request 1
mysql request 2
http request 1
mysql request 3
http request 2
mysql request 4
mysql request 5

И выполняя её последовательно с остановками в http запросах мы видим,что запросы

mysql request 2
http request 1

и

mysql request 3
http request 2
mysql request 4

можно выполнять паралельно и если мы захотим это сделать, то прийдется ещё сильнее усложнять логику. А хотелось бы написать простой код, например:

mysql request 1
x = run(func1)
y = run(func2)
wait(x, y)
mysql request 5

func1:
  mysql request 2
  http request 1

func2:
  mysql request 3
  http request 2
  mysql request 4

Вот тут и пригодиться планировщик задач.

Реализации

Про поддержку планировщика задач в новом стандарте 0x можно почитать тут.
  • just::thread - реализация библиотеки потоков стандарта C++0x от отца boost::thread
  • Parallel Patterns Library (PPL) - реализцаия от Microsoft
  • Asynchronous Agents Library - и ещё одна от Microsoft
  • Intel Threading Building Blocks - очень мощная библиотека для паралельного программирования от Intel. Включает в себя и планировщик задач.
  • boost::task - - реализация от Oliver Kowalke, не принятая ещё в boost

Мне наиболее понравился boost.task. Дальше его детальное рассмотрение.

Описание boost.task

boost.task - реализация предложения в стандарт C++0x. Она поддерживает задание стратегий выполнения задач, создание под-задач, прерывание задач.

Библиотека зависит от:

boost.task и boost.fiber компилируемые библиотеки (boost.atomic и boost.move - header-only) - так что прийдется их собирать. Что бы было удобнее эксперементировать собрал все зависимости в одном месте, приправил cmake-ом и залил поект на github. Работает на linux-е, для сборки под windows - потребуется 2-3 строчки добавить в cmake файлы.

Пример использования

API библиотеки достаточно простой, реализовать обработчик запроса, который описывался выше не совтавит труда. Приведу его ещё раз:

mysql request 1

  mysql request 2
  http request 1

  mysql request 3
  http request 2
  mysql request 4

mysql request 5

В качестве эмуляции запроса к mysql будет использован обычный sleep на случайное время:

boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));

В качестве внешнего http-запроса будет использован асинхронный таймер из boost::asio.

Итак:

Request - класс запроса.

class Request
{
public:
    Request(const std::string & data);
    const std::string & Read() const;
    void Write(const std::string & answer);
};

А RequestHandler - класс обработчика запроса.

class RequestHandler
{
public:
    RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);
    void Process() const;
};

io_service - передается для того, что бы можно было выполнить внешний вызов (использовать таймер boost::asio::deadline_timer).

Итак начнем. Определяем пул потоков, для обработки наших задач:

boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );

boost.task поддерживает два основных вида стратегий планировки задач:

  • ограниченные (bounded) - имеют порог количества выполняемых задач, при достижении которого добавление новой задачи блокирует поток, который выполняет это действие. Основная задача - избежать исчерпания ресурсов (resource exhaustion) когда скорость добавления задач превышает скорость их выполнения
  • неограниченные (unbounded) - позволяют добавлять бесконечное число задач в очередь

Также есть возможность задания стратегии обработки задач внутри очереди:

  • fifo - первая добавленная задача выполняется первой
  • priority - у задачи есть приоритет, для выполнения выбираются задачи с высшим приоритетом
  • smart - очередь такого типа возможно сильно кастомизировать передавая параметры в шаблон. по умолчанию есть возможность индексировать задачи по любому ключу и заменять старую задачу на новую, если она сущесвует

Соответственно описанная строчка кода создает пул из 5 потоков с неограниченной очередью типа fifo.

Теперь нам понадобится создать io_service и пул из 3-х потоков для обработки внешних запросов.

boost::asio::io_service io_service;

Если вызвать io_service::run в момент когда в нем нету задач, метод сразу завершится, а для нормальной работы нам необходимы работающие потоки. Обычно это достигается тем, что в io_service добавлен accept-ор порта, на который подключаются клиенты, а в данном случае можно занять io_service ожиданием исполнения таймера:

boost::asio::deadline_timer dummy_timer(io_service);
dummy_timer.expires_from_now(boost::posix_time::seconds(10));
// void dummy_handler(const boost::system::error_code&) {}
dummy_timer.async_wait(&dummy_handler);

После этого можно создать пул потоков:

boost::thread_group io_service_thread_pool;
for(int i = 0; i < 3; ++i)
    io_service_thread_pool.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service)
    );

Далее создаём запрос:

RequestPtr request(new Request("some data"));
RequestHandlerPtr handler(new RequestHandler(io_service, request));

Все готово, можно выполнять задачу:

boost::tasks::handle< void > request_processing(
    boost::tasks::async(
        boost::tasks::make_task( &RequestHandler::Process, handler ),
        pool));

boost::tasks::make_task( &RequestHandler::Process, handler ) - создает задачу вызова Process у объекта handler, которую можно будет выполнить. boost::tasks::async инициирует асинхронное выполнение задачи. boost::tasks::handle объект, по которому можно отслеживать статус завершения задачи, получить результат если он есть.

boost::tasks::async поддерживает 4 алгоритма выполнения задачи:

  • own_thread - синхронное выполнение в том же потоке
  • new_thread - для задачи создается поток, в котором она будет выполнена, после чего поток будет завершен
  • as_sub_task - если текущая задача выполняется в пуле - добавляет новую задачу в него, иначе создает новый поток, как new_thread. Это поведение по умолчанию
  • static_pool - выполнить задачу в пуле потоков

Далее подождем пока задача выполнится:

request_processing.wait();

И остановим io_service:

io_service.stop();
io_service_thread_pool.join_all();

Функция Process получилась на удивление очень простой

void Subtask1() const
{
    Request("query2");
    ExternalRequest("extquery1");
}

void Subtask2() const
{
    Request("query3");
    ExternalRequest("extquery2");
    Request("query4");
}

void Process() const
{
    std::string data = request_->Read();

    Request("query1");

    boost::tasks::handle< void > subtask1(
        boost::tasks::async(
            boost::tasks::make_task( &RequestHandler::Subtask1, this )));
    boost::tasks::handle< void > subtask2(
        boost::tasks::async(
            boost::tasks::make_task( &RequestHandler::Subtask2, this )));

    boost::tasks::waitfor_all( subtask1, subtask2);

    Request("query5");

    request_->Write("some answer");
}

Подзадачи выполняются при помощи boost::tasks::async без указания policy на запуск и автоматически выбирается as_sub_task алгоритм, который выполнит задачи в том же пуле потоков, что и родительская задача. Реализация функций подзадач тоже тривиальная.

RequestHandler::Request - вызывает boost::this_thread::sleep, а с ExternalRequest все немного сложнее:

void ExternalRequest(const std::string & what) const
{
    ExternalRequestHandler external_handler(io_service_);
    boost::tasks::spin::auto_reset_event ev;
    external_handler.PerformExternalReqeust(what, &ev);
    ev.wait();
}

Создается хендлер, а так же событие с автоматическим сбросом - boost::tasks::spin::auto_reset_event. Это событие передается обработчику внешнего запроса и по его завершению будет вызвано ev.set(), а до тех пор ev.wait() блокирует задачу.

В противовес обычным потокам и примитивам синхронизации (boost::condition) ev.wait() не блокирует поток, а блокирует задачу (вызывает в цикле this_task::yield()). А это значит, что ресурсы процессора будут использованы другими задачами.

Файл целиком может быть найден тут.

Выводы

boost.task вполне удобная библиотека для планирования задач. Она позволяет посмотреть как будет выглядить поддержка асинхронного выполнения кода в новом стандарте C++0x, и её можно использовать уже прямо сейчас не дожидаясь пока будет выпущен стандарт.

Код с использованием boost.task становится меньше и намного понятнее, чем при обычном использовании потоков.

Есть канечно и недостатки: код ещё не оптимизирован, что может вызвать проблемы в редких случаях; библиотека ещё не принята в boost (вместе с её зависимостями).

Что почитать по теме?

Monday, March 8, 2010

Охота на утечки

Давно не сталкивался с утечками памяти, но на днях наш плюсовый демон начал течь. При чем valgrind.memcheck не показывает ничего внятного - сервер долго стартует, большую нагрузку дать не получается - watchdog, который проверяет на зависание сервера при такой нагрузке под валгриндом - резво прибивает сервер.. При выходе из под маленькой нагрузке - ликов нет.
Течет сильно - 7 гигов за 3 дня. Хотя раньше в рабочем режиме больше 1 гига не требовал (был запущен и по 2 недели без перезапусков).

Подумалось, что либо циклические ссылки, либо данные с какого то контейнера не удаляются.
Ну что же решил заюзать старый добрый google perf tools, что бы сделать снимок памяти в момент работы. Хм. Под ubuntu amd64 почему то только старая версия. И она виснет когда приложение разпаралеливается (хотя google perf tools - специально сделано для многопоточных приложений).
В дебагере видно, что все потоки висят на спинлоках.

Решил обновиться до последней версии - но компились версию с сайта не хотелось. Нашел пакеты на packages.debian.org - и на удивление всё установилось без проблем.
Но при входе в многопоточный режим работы демона - опять зависание.
Да и сам демон загружается под 15-20 минут - так как взял базу с продакшена, а там данных очень много.

Поискал какие то хорошие аллокаторы для отладки - того что мне надо было (google perf tools - heap peofiler, который бы не вис) не было.
Ну а может в valgrind.memcheck есть какие то ключи, что бы дампить состояние памяти не в конце работы программы, а в середине? Нет. Но... Есть valgrind.massif - который этим и занимается. Ура!

Безжалостно очистив базу от данных (так что бы хоть нагрузочные тесты могли работать, а они требуют настоящую игру и хоть немного валидных данных), стал ковырять valgrind.massif.
Запустил - прогнал нагрузочные тесты - всё работает, ничего не течет. Очень интерестно.
В общем тулзы мне никакого ответа не дали - наверно синтетическая нагрузка не провоцировала ошибку, влекущую утечку памяти. Надо включить моск;)
К сожалению с последней стабильной выливки очнь много кода поменялось и я тому виной: делал рефакторинг затронув почти половину всего кода;)

Думал задампить память и посмотреть что там, но разбираться в 7 гиговом файлике не хотелось. И тут - пришла безумная идея - анализировать статистику по демонам (благо текущих демона у нас 3 в кластере) которую мог собрать как с операционки, так и с внутреннего мониторинга демонов.

Совершенно случайно увидел пропорцию между количеством утекшей памяти и количеством выполненных команд одного типа. Команда была старой, но я вспомнил, что в эту выливку мы её стали намного активнее использовать. Проверил пропорцию на калькуляторе (питон в шеле;) - всё совпадает. Эта твать жрет память.

Вся проблема в том, что схожая команда использовалась только в биллинге, на неё нагрузочного теста не было. А недавно сделали схожую команду, которая использует схожие фичи что и первая команда, но выполняется в разы больше раз. Используется она сугубо в соц сетях и в нагрузочный тест тоже добавлена не была. Вот такая нехорошая ситуация.

Добавив в нагрузочный тест эту команду - за 10 секунд отожрал 50% памяти своего ноута. А дальше дело техники - даже не стал использовать massif - почитал код - нашел место где может не создаваться циклическая ссылка - сменил policy с Strong на Weak - сбилдил - запустил тест - ура, жучек умер.

Ну и какие я сделал выводы из убитого дня на дебаг? Собственно вот:

  1. Надо чаще прогонять тесты на утечки памяти
  2. Даже если утечка маленькая - в будущем код может начать использоваться активнее - тогда маленькая утечка превратится в сильную головную боль
  3. Надо иметь репрезентативную базу по функционалу аналогичную продакшену, но с меньшим объемом данных. Без этого отладка под дебагерами памяти будет практически невозможна
  4. Ещё раз убедился - отсутствие голых указателей не гарантирует отсутствие утечек (это касается как java и c#, так и если использовать умные указатели в С++)
  5. Когда ничего не помогает - случайность может решить всё
  6. Случайности не случайны (c) =) - не ища какие то закономерности в статистиках я бы не нашел их, хотя я и не надеялся что то найти