Урок 5. Синхронизация в параллельных циклах C#


На пятом уроке учебника по параллельному программированию в .NET мы продолжим описание параллельных циклов: рассмотрим проблемы синхронизации агрегации в параллельных циклах и способы их преодоления с помощью блокировки и локальных значений состояний.

Синхронизация и агрегирование

Одно из распространенных применений циклов - агрегация. Именно здесь одно значение вычисляется всем циклом, причем каждая итерация потенциально изменяет это значение. Простая операция агрегирования - это суммирование ряда значений. Это просто, когда вы используете последовательные циклы for или foreach. При использовании параллельных циклов, как мы видели ранее в учебнике, синхронизация агрегированного значения между потоками становится проблемой. Если два или более потоков одновременно обращаются к общему изменяемому значению, существует вероятность того, что они будут использовать несогласованные значения, приводящие к неверному результату.

В этой статье мы рассмотрим два способа применения синхронизации, которые полезны при агрегировании значений. Чтобы следовать приведенным примерам, создайте проект консольного приложения и добавьте следующую директиву using, чтобы упростить использование параллельного класса.

using System.Threading.Tasks;
Агрегация в последовательных циклах

Чтобы продемонстрировать агрегацию в параллельных циклах, мы воссоздадим одну и ту же функциональность несколько раз. Каждый пример суммирует все целые числа от одного до ста миллионов. Это базовая функциональность, но демонстрирует проблемы, возникающие при неправильной синхронизации параллельных итераций. Давайте начнем с последовательного цикла foreach, который действует на результаты, генерируемые методом LINQ, Enumerable.Range. Поскольку цикл выполняется в одном потоке, нет никакого риска возникновения проблем с синхронизацией, поэтому результат всегда будет правильным:

long total = 0;
foreach (int value in Enumerable.Range(1, 100000000))
{
    total += value;
}
 
// total = 5000000050000000
Агрегация в параллельных циклах

Если мы попытаемся распараллелить вышеприведенный цикл, преобразовав синтаксис foreach в вызов Parallel.ForEach, то получим ошибки синхронизации. Попробуйте выполнить приведенный ниже код.

long total = 0;
Parallel.ForEach(Enumerable.Range(1, 100000000), value =>
{
    total += value;
});
 
// total = 2769693850306679
Результат, показанный в комментарии, был достигнут с помощью двухъядерного процессора. Поскольку несколько потоков выполнения считывали и обновляли общую переменную, она несколько раз оставалась в несогласованных состояниях. Вышеприведенный результат составляет чуть больше половины правильного итога. На машинах с более чем двумя процессорами результат, скорее всего, будет намного меньше и дальше от правильного значения.

Синхронизация с помощью блокировки

Самый простой способ добавить синхронизацию в цикл - это создать блокировку, которая предотвращает одновременный доступ к значению агрегации. Блокировка в C# достигается с помощью оператора lock и объекта, который управляет блокировкой в качестве его единственного параметра. Код для выполнения при сохранении блокировки добавляется в блок кода оператора блокировки. Когда другой поток достигает оператора блокировки, основанного на том же базовом объекте, этот поток блокируется до тех пор, пока не будет завершен кодовый блок первой блокировки.

Несогласованность, которую мы видим в агрегации в параллельном цикле, вызвана тем, что два или более потока считывают значение, обновляют свои локальные копии и снова сохраняют новое значение. В некоторых случаях изменение значения агрегации одним потоком перезаписывается другим, что приводит к низкому результату. Мы можем предотвратить это, добавив блокировку вокруг команды, которая считывает и изменяет общую переменную. Примечание: применяйте блокировку к минимальному количеству кода.

Обновленная версия цикла с добавленной блокировкой показана ниже:

object sync = новый объект();
object sync = new object();
long total = 0;
 
Parallel.ForEach(Enumerable.Range(1, 100000000), value =>
{
    lock (sync)
    {
        total += value;
    }
});
 
// total = 5000000050000000
При запуске кода с блокировкой результат агрегации будет правильным. Однако производительность цикла существенно ниже. Это происходит потому, что все тело цикла находится внутри блокировки, поэтому процессорные ядра блокируются на протяжении большей части процесса.

Локальное состояние циклов

Некоторые алгоритмы требуют, чтобы вы использовали блокировку, как в предыдущем примере, что значительно снижает преимущество параллельной обработки. Однако во многих случаях вы можете скорректировать свой алгоритм, чтобы уменьшить блокировку и увеличить преимущества параллелизма. Суммирование ряда значений в одном из этих случаев, так как вы можете разложить данные на разделы, которые могут быть индивидуально суммированы, а затем объединить эти результаты в конце процесса для получения окончательной цифры. Если каждая секция обрабатывается последовательно, то во время итераций цикла блокировка не требуется.

Параллельные циклы For и ForEach обеспечивают механизм для этого типа декомпозиции, известный как потоковые локальные данные. Вы можете создать переменную, локальную для одного потока и доступную для каждой итерации цикла для одного разлагаемого раздела данных. Это значение инициализируется при первом обращении к разделу данных. Каждая итерация цикла изменяет локальное значение потока, и когда все подмножество данных было обработано, вы можете выполнить окончательное действие, используя результат.

Чтобы включить потоковые локальные данные в метод ForEach, мы можем использовать перегрузку, имеющую четыре параметра. Первая - это коллекция, которая должна быть перечислена. Второй параметр принимает делегат Func, который возвращает начальное значение локальной переменной. В нашем примере суммирования мы вернем 64-битное целочисленное значение нуля, используя следующее лямбда-выражение:

() => 0L;
Третий параметр - это делегат Func, представляющий тело цикла. Делегат имеет параметры, которые предоставляют следующий элемент в коллекции, объект ParallelLoopState и текущее значение локальной переменной потока. Во время каждой итерации локальная переменная обновляется и новое значение возвращается делегатом, как показано ниже. Обратите внимание, что блокировка не требуется, так как элементы в каждом разделе данных обрабатываются последовательно.

(value, pls, localTotal) => { return localTotal += value; }
Четвертый параметр - это делегат действия. Это называется, когда разложенный раздел коллекции был полностью обработан. Делегат получает конечное значение локальных данных потока. В нашем примере суммирования это значение добавляется к общей сумме. Вы увидите, что это заключительное действие выполняется в операторе блокировки, поскольку возможно, что несколько потоков могут выполнить заключительный этап одновременно и вызвать ошибки синхронизации. Блокировка предотвращает эти ошибки и с меньшей вероятностью вызовет проблемы блокировки, поскольку последнее действие выполняется реже, чем в предыдущем примере.

object sync = new object();
long total = 0;
 
Parallel.ForEach(Enumerable.Range(1, 100000000),
    () => 0L,
    (value, pls, localTotal) =>
    {
        return localTotal += value;
    },
    localTotal =>
    {
        lock (sync)
        {
            total += localTotal;
        }
    });
 
// total = 5000000050000000
Локальное состояние цикла в цикле

Локальное состояние потока может использоваться Parallel.For цикла аналогичным образом, как в Parallel.ForEach использует перегрузку с пятью параметрами. Первые два параметра определяют нижнюю и верхнюю границы цикла. Остальные три параметра определяют делегат инициализации, тело цикла и конечный делегат действия.

Приведенный ниже код показывает параллельный цикл For, который дает те же результаты, что и предыдущий пример ForEach.

object sync = new object();
long total = 0;
 
Parallel.For(1, 100000001,
    () => 0L,
    (value, pls, localTotal) =>
    {
        return localTotal += value;
    },
    localTotal =>
    {
        lock (sync)
        {
            total += localTotal;
        }
    });
 
// total = 5000000050000000
Автор этого материала - я - Пахолков Юрий. Я оказываю услуги по написанию программ на языках Java, C++, C# (а также консультирую по ним) и созданию сайтов. Работаю с сайтами на CMS OpenCart, WordPress, ModX и самописными. Кроме этого, работаю напрямую с JavaScript, PHP, CSS, HTML - то есть могу доработать ваш сайт или помочь с веб-программированием. Пишите сюда.

тегистатьи IT, параллельное программирование, си шарп, циклы




Отправляя сообщение я подтверждаю, что ознакомлен и согласен с политикой конфиденциальности данного сайта.



Несколько полезных сочетаний клавиш для IntelliJ IDEA
Оператор Sizeof в C#
Что такое WPD