Всё о многопоточности в Swift: Часть 1: Настоящее

Данная статья является вольным переводом с некоторыми дополнениями отличного материала от Umberto Raimondi по многопоточности с www.uraimo.com

Playground c материалами есть на github.


В настоящее время Swift не включает какой-либо нативный функционал для работы с многопоточностью.

К примеру, в языке Go можно отправить функцию выполняться в отдельном потоке (на самом деле в «зеленом потоке») указав перед ней ключевое слово «go». В Swift пока такого нет, поэтому если вы планируете распараллелить какие-либо задачи в вашем приложении и при этом не нарваться на “race condition” и других демонов многопоточного мира, у вас есть два варианта. Использовать внешние библиотеки такие как libDispatch (он же GCD) или примитивы синхронизации [synchronization primitives] (мьютексы, семафоры, блокировки и д.р) предоставляемые Foundation или ОС.

В первой части этой серии, мы посмотрим что мы имеем в своем арсенале работая с Swift 3 , от блокировок, потоков, таймеров и до “гарантий языка”, улучшений GCD и OperationQueues

Как уже было много раз анонсирована, одной из важных доработок после Swift 4.0 будет расширение языка обеспечивающее лучшее определение модели памяти. А также новые нативные фичи с помощью которых можно управлять многопоточностью, без использования внешних решений, тем сам определяя в языке идиоматический подход к решению этого вопроса ( другими словами с помощью конструкций самого языка).


Многопоточность. Основы

Для начала давайте коротко вспомним некоторые базовые концепции, которые следует знать если вы хотите использовать DispatchQueues или OpearaionQueues

Прежде всего, можно задать справедливый вопрос — даже если платформа Apple и фреймворки используют потоки, зачем мне добавлять их в свое приложение?

Есть несколько общих обстоятельств, которые делают использование нескольких потоков очевидным:

Разграничение групп задач: потоки могут использоваться для обеспечения модульности приложения. Различные потоки могут использоваться для выполнения группы задач одного и того же типа, изолируя их от потоков выполнения других частей вашей программы, что упрощает представление о текущем состоянии приложения.

Распараллеливание вычислений: несколько потоков могут быть использованы для параллельного выполнения копий одной и той же задачи на подмножестве исходных данных.

Прозрачный способ ожидания события или ввода/вывода: При использовании блокировок ввода/вывода или других операций, фоновые потоки могут быть использованы для ожидания выполнения этих операций. Что выглядит достаточно прозрачно и улучшает дизайн приложения.

Однако, когда несколько потоков выполняет ваш код, некоторые допущения, сделанные при рассмотрении кода в контексте отсутствия многопоточности, прекращают быть верными.

В идеальном мире где каждый поток выполняется независимо и нет разделяемых ресурсов, писать многопоточный код не труднее обычного. Однако, как это часто бывает, вы собираетесь использовать потоки на одних данных, значит вам нужно управлять доступом к этим данным и гарантировать, что каждая операция завершится как ожидается, без сюрпризов со стороны других потоков.


Модель пямяти

Конкурентное программирование требует дополнительных гарантий от языка и операционной системы, которые определяют как переменные (или ресурсы) будут вести себя при попытки изменить их одновременно из разных потоков. Для этого язык должен определить Модель Памяти [Memory Model], правила определяющие поведение в случае многопоточности, определить как разделяется память и какой способ доступа к памяти является правильным.

Спасибо за это, у пользователей будет язык который ведет себя предсказуемо и мы будем знать, что компилятор будет только применять оптимизацию которая полагается на то, что объявляет модель памяти.

Определение модели памяти очень деликатный шаг в эволюции языка, слишком строгая модель может ограничить возможности дальнейшего развития компилятора. Новые, умные оптимизации могут быть недоступны из-за прошлых решений в отношении модели памяти.

Модель памяти определяет к примеру:

  • Какие выражение языка можно считаться атомарным, а какие нет. Т.е. операции которые могут быть выполнены только целиком. К примеру, важно знать является ли инициализация переменной атомарной или нет.
  • Как распределяемые переменные обрабатываются в потоках. Кешируются ли они по умолчанию или нет и можно ли повлиять на этот кеш.
  • Конкурентные операторы которые используются для управления доступом к критическим секциям — таким секциям кода, которые работают с распределяемым ресурсом и, например, разрешают только одному потоку выполняться в данной секции.

Теперь давайте поговорим об использовании многопоточности в вашем приложении.

Чтобы управлять многопоточностью корректно, вы должны будете выделить критичные участки кода и объявить правила доступ к ним.

Введение данных правил к этим разделам кода открывает путь к целому набору проблем, которые пораждаются при некоторых обстоятельствах в результате какие-то операции не могут быть выполнены, а данные могут вести себя не предсказуемо.

Собственно сами проблемы:

  • Условия гонки [Race condition]: С несколькими потоками, при работе с одними данными, в результате чего сами данные становятся непредсказуемыми и зависят от порядка выполнения потоков.
  • Конкуренция за ресурс [Resource contention]: Несколько потоков, выполняющих разные задачи, пытаются получить доступ к одному ресурсу, тем самым увеличивая время необходимое для безопасного получения ресурса. Эта задержка может привести к непредвиденному поведению.
  • Вечная блокировка [Deadlock]: Несколько потоков блокируют друг друга.
  • Голодание [Starvation] : Поток не может получить доступ к ресурсу и безуспешно пытается сделать это снова и снова.
  • Инверсия приоритетов [Priority Inversion]: Поток с низким приоритетом удерживает ресурс, которые требуется другому потоку с более высоким приоритетом.
  • Неопределенность и справедливость [Non-deterministic and Fairness]: Мы не можем делать предположений, когда и в каком порядке поток сможет получить ресурс, эта задержка не может быть определена априори и в значительной степени зависит от количества конфликтов. Однако, примитивы синхронизации могут обеспечивать справедливость, гарантируя доступ всем потокам которые ожидают, также учитываю порядок.

Гарантии языка

И хотя Swift пока не реализуется функции связанные с многопоточностью самостоятельно, он все же предоставляет некоторые гарантии:

  • Глобальные переменные к примеру инициируются атомарно.
  • Ленивые свойства, напротив, не обеспечены атомарностью при инициализации. И язык не предоставляет никаких аннотаций и модификаторов чтобы изменить данное поведение.
  • Доступ к свойствам класс также не атомарен.

Потоки

Foundation предоставляет Thread класс, реализованный на ptherad, который может буть использован для создания потоков и выполнения замыканий. Поток можно создать используя метод detachNewThreadSelector:toTarget:withObject: либо определив свой класс потока и переопределив метод main()

Начиная с IOS 10 и macOS Sierra стало возможным создать поток используя инициализатор который принимает замыкание.

Поток может быть остановлен немедленно вызовом exit(), но не рекомендуется никогда использовать этот метод, т.к он не даст вам возможность аккуратно завершить текущую задачу. В большинстве случаев вам нужна собственная логика остановки потока, либо вы можете вызвать cancel() и проверить isCanceled свойство внутри вашего основного замыкания, чтобы узнать должен ли поток остановить выполнение текущей работы перед тем как окончательно завершиться.


Примитивы синхронизации

Основными средствами, обычно используемыми для синхронизации потоков, являются блокировки, семафоры и мониторы.

Foundation предоставляет все их них.

NSLock

Базовый тип для блокировки. Когда поток пытается заблокировать объект возможно два варианта, поток либо сможет это сделать и продолжит работу, либо будет ожидать пока другой поток, успевший раньше заблокировать объект, не вызовет unlock() тем самым освободив блокировку.

NSLock и другие блокировки Foundation несправедливы, в том смысле, что когда несколько потоков ожидает освобождения объекта блокировки, они не получат его в том порядке в каком они пытались наложить блокировку. При большом количестве потоков это может служить проблемой голодания, когда один из потоков никогда не получит доступ в положенное время.

Время, необходимое для получения блокировки без конкуренции, порядка 100 наносекунд, но это время быстро растет, когда несколько потоков пытаются получить заблокированный ресурс. Таким образом, с точки зрения производительности блокировки обычно не лучшее решение для управления распределением ресурсов.

Ниже пример с двумя потокам, помните что порядок в котором произойдут блокировки не детерминирован.

Позвольте мне добавить предупреждение. Поскольку вполне вероятно, что рано или поздно вам придется отлаживать проблемы параллелизма, всегда старайтесь ограничивать использование блокировок в рамках какой-либо структуры данных и старайтесь не ссылаться непосредственно на один объект блокировки во многих местах вашего кода.

Проверка состояния синхронизированной структуры данных с несколькими точками входа при отладке проблемы параллелизма является более приятной, чем необходимость отслеживать, какая часть кода удерживает блокировку и к тому же нужно помнить о локальном статусе нескольких функций. Пройдите лишнюю милю и хорошо структурируйте свой параллельный код.

NSRecursiveLock

Рекурсивные блокировки могут быть получены несколько раз из потока, который уже держит эту блокировку. Это полезно в рекурсивной функции или при вызове нескольких функций, которые проверяют одну и ту же блокировку в последовательности. Если заменить NSRecursiveLock на NSLock, выполнение, хотя и внутри одного потока, встанет на вызове rlock.lock() метода callMe()

NSConditionLock

Словные блокировки обеспечивают дополнительные блокировки, которые могут быть заблокированы / разблокированы независимо друг от друга, чтобы поддерживать более сложные настройки блокировки (например, сценарии потребителей-производителей).

Глобальная блокировка (которая блокируется независимо от конкретного условия) также доступна и ведет себя как классический NSLock.

Давайте посмотрим на простой пример с блокировкой, которая защищает общее целое число, которое потребитель печатает, и продюсер обновляет каждый раз, когда он был показан на экране.

NSCondition

NSCondition обеспечивают простой способ ждать возникновения условия.

Когда поток, который приобрел блокировку, понимает, что дополнительное условие которое ему требуется пока не выполнено, ему нужен способ удержаться и продолжиться свою работу когда условие будет выполнено.

Это может быть реализовано путем непрерывной или периодической проверки этого условия (ожидание занятости), но при этом, что произойдет с блокировками потока? Должны ли мы держать их, пока мы ждем или освободим их в надежде, что мы сможем их снова приобрести, когда условие будет выполнено?

NSCondition обеспечивают чистое решение этой проблемы, как только поток поставил блокировку (вызвал lock()) он может быть помещен в лист ожидания для этого условия и проснуться, как только другой поток сигнализирует о том, что условие выполнено.

Давайте посмотрим пример:

NSDistributedLock

Распределенные блокировки сильно отличаются от того, что мы видели до сих пор, и вряд ли, они вам понадобятся.

Они создаются для совместного использования несколькими приложениями и подкрепляются записью в файловой системе (например, простым файлом). Файловая система, очевидно, должна быть доступна всем приложениям, которые должны приобрести блокировку.

Такой тип блокировки приобретается с помощью метода try() — не блокирующего метода, который сразу же возвращается с логическим значением, указывающим, была ли получена блокировка. Приобретение блокировки обычно требует более одной попытки, которая должна выполняться вручную и с надлежащей задержкой между последовательными попытками.

Распределенные блокировки освобождаются, как обычно, с помощью метода unlock().

Рассмотрим основной пример:


OSAtomic Где же ты?

Атомные операции, подобные тем, которые были предоставлены OSAtomic, — это простые операции, которые позволяют устанавливать, получать или сравнивать-и-устанавливать переменные без использования классической логики блокировки, потому что они используют определенные функции процессора (иногда встроенные атомарные команды) и обеспечивают лучшую производительность, чем описанные ранее блокировки.

Само собой разумеется, что они чрезвычайно полезны для создания параллельных структур данных, поскольку накладные расходы, необходимые для обработки параллелизма, сводятся к минимуму.

OSAtomic устарел с момента выхода macOS 10.12 и никогда не был доступен в Linux, но есть несколько проектов с открытым исходным кодом, подобных этому, с его полезными расширениями, или этот, обеспечивают аналогичные функции.


GCD: Grand Central Dispatch

Для тех, кто еще не знаком с этим API, Grand Central Dispatch (GCD) — API на основе очередей, который позволяет выполнять замыкания в рабочих пулах.

Другими словами, замыкания, содержащие задание, которое необходимо выполнить, могут быть добавлены в очередь, которая будет выполнять их с использованием последовательности потоков либо последовательно, либо параллельно в зависимости от параметров конфигурации очереди. Но независимо от типа очереди, задания всегда будут запускаться в порядке «первым пришел — первым ушел», то есть задания всегда будут запускаться с соблюдением порядка добавления. Порядок завершения будет зависеть от продолжительности каждого задания.

Это распространенный шаблон, который можно найти почти во всех относительно современных средах выполнения, которые обрабатывают параллелизм. Пулом потоков проще управлять, проверять и контролировать, чем отдельными и несвязанными потоками.

API GCD претерпел несколько изменений в Swift 3, SE-0088 модернизировал его дизайн и сделал его более объектно-ориентированным.


Dispatch Queues

GCD позволяет создавать пользовательские очереди, а также предоставлять доступ к некоторым предопределенным системным очередям.

Чтобы создать базовую последовательную очередь, которая будет выполнять ваши замыкания последовательно, вам просто нужно предоставить метку строки, которая будет идентифицировать ее, обычно рекомендуется использовать префикс имени домена обратного порядка, чтобы упростить отслеживание владельца очереди в трассировки стека.

let serialQueue = DispatchQueue(label: "com.uraimo.Serial1")  
let concurrentQueue = DispatchQueue(label: "com.uraimo.Concurrent1", attributes: .concurrent)

Вторая очередь, которую мы создали, является параллельной, что означает, что очередь будет использовать все доступные потоки в своем базовом пуле потоков при выполнении заданий, которые она содержит. Порядок исполнения непредсказуем в этом случае, не думайте, что порядок завершения ваших замыканий будет каким-либо образом связан с порядком на размещение.

Очереди по умолчанию можно получить из объекта DispatchQueue:

let mainQueue = DispatchQueue.main  
let globalDefault = DispatchQueue.global()

Главная очередь — это последовательная основная очередь, которая обрабатывает основной цикл событий для графических приложений на iOS или macOS, реагируя на события и обновляя пользовательский интерфейс. Как мы знаем, каждое изменение пользовательского интерфейса должно выполняться в этой очереди, и каждая длительная операция, выполняемая в этом потоке, сделает интерфейс пользователя менее отзывчивым.

Среда выполнения также обеспечивает доступ к другим глобальным очередям с разными приоритетами, которые могут быть идентифицированы с помощью параметра качества обслуживания (Qos).

Различные уровни приоритета объявляются в классе DispatchQoS от более высокого к более низкому:

  • .userInteractive
  • .userInitiated
  • .default
  • .utility
  • .background
  • .unspecified

Важно отметить, что на мобильных устройствах, которые обеспечивают режим с низким энергопотреблением, фоновые очереди будут приостановлены, когда аккумулятор разряжен.

Чтобы получить определенную глобальную очередь по умолчанию, используйте global(qos :), определяющий нужный приоритет:

let backgroundQueue = DispatchQueue.global(qos: .background)
let serialQueueHighPriority = DispatchQueue(label: "com.uraimo.SerialH", qos: .userInteractive)

Использование очередей

Задания в виде замыканий могут быть отправлены в очередь двумя способами: синхронно с использованием метода sync или асинхронно async.

При использовании первого синхронизированный вызов будет блокироваться, другими словами, вызов метода sync завершится, когда его замыкание завершится (полезно, когда вам нужно дождаться завершения замыкания, но есть более эффективные подходы), async добавит замыкание в очередь и завершится, чтобы продолжить выполнение.

globalDefault.async {     print("Async on MainQ, first?") }  globalDefault.sync {     print("Sync in MainQ, second?") }

Несколько вызовов могут быть вложенными, например, когда после некоторой фоновой работы, низкого приоритета, нам нужно обновить пользовательский интерфейс.

DispatchQueue.global(qos: .background).async {     
// Some background work here
DispatchQueue.main.async {
// It's time to update the UI
print("UI updated on main queue")
}
}

Замыкания могут быть выполненены после определенной задержки:

globalDefault.asyncAfter(deadline: .now() + .seconds(5)) {                     print("After 5 seconds") 
}

Если вам нужно выполнить несколько повторений одного и того же замыкания одновременно, вы можете использовать метод concurrentPerform(iterations: execute :), но будьте осторожны, это замыкание будет выполняться параллельно, если это возможно, в контексте текущей очереди, поэтому не забудьте всегда включать вызов этого метода в синхронном или асинхронном вызове, выполняющемся в очереди, поддерживающей параллелизм.

globalDefault.sync {   
DispatchQueue.concurrentPerform(iterations: 5) {
print("\($0) times")
}
}

Хотя обычно очередь готова к обработке своих замыканий при создании, ее можно настроить так, чтобы она была включена по требованию.

let inactiveQueue = DispatchQueue(
label: "com.uraimo.inactiveQueue",
attributes: [.concurrent, .initiallyInactive])
inactiveQueue.async { print("Done!") }  
print("Not yet...")
inactiveQueue.activate()
print("Gone!")

Это первый раз, когда нам нужно указать несколько атрибутов, но, как вы можете видеть, вы можете просто добавить несколько атрибутов массивом.

Выполнение заданий также может быть приостановлено или возобновлено временно с помощью методов, унаследованных от DispatchObject:

inactiveQueue.suspend()  
inactiveQueue.resume()

Также доступен метод setTarget(queue :), который должен использоваться только для настройки приоритета неактивных очередей (использование его в активных очередях приведет к сбою). Результатом вызова этого метода является то, что приоритет очереди устанавливается на тот же приоритет очереди, что и параметр.


Барьеры

Допустим, вы добавили ряд замыканий в определенную очередь (с разными длительностями), но теперь вы хотите выполнить задание только после завершения предыдущей асинхронной задачи. Вы можете использовать барьеры для этого.

Давайте добавим 5 задач (которые будут спать в течение тайм-аута от 1 до 5 секунд) к параллельной очереди, которую мы создали ранее, и используем барьер для печати чего-то после завершения других заданий, мы сделаем это с указанием флага DispatchWorkItemFlags.barrier в наш последний асинхронный вызов:

globalDefault.sync {     
DispatchQueue.concurrentPerform(iterations: 5) { (id:Int) in
sleep(UInt32(id)+1)
print("Async on globalDefault, 5 times: "+String(id)) } }
globalDefault.async (flags: .barrier) {     
print("All 5 concurrent tasks completed")
}

Синглтон и Dispatch_once

Как вы уже могли заметить, в Swift 3 нет эквивалента dispatch_once — функции, используемой в большинстве случаев для создания потокобезопасных синглтонов.

К счастью, Swift гарантирует, что глобальные переменные инициализируются атомарно, и если вы объявите их константами, то эти два обстоятельства делают такие переменные отличным кандидатом для реализации синглтонов:

final class Singleton {      
public static let sharedInstance: Singleton = Singleton()
private init() { } ...
}

Мы объявим класс как final для отказа в возможности создания подкласса, и мы сделаем назначенный инициализатор private, так что не удастся вручную создать дополнительные экземпляры этого объекта. Открытая статическая константа будет единственной точкой входа в singleton и будет использоваться для извлечения одиночного совместно используемого экземпляра.

Такое же поведение можно использовать для определения блоков кода, которые будут выполняться только один раз:

func runMe() {     
struct Inner {
static let i: () = {
print("Once!")
}()
}
Inner.i
}
runMe()
runMe() // Constant already initialized
runMe() // Constant already initialized

На самом деле не очень красиво смотрется, но это работает, и это может быть приемлемая реализация.

Но если нам нужно точно воспроизвести функциональность и API dispatch_once, нам нужно реализовать его с нуля:

Как и ожидалось, будет выполнено только первое из трех замыканий.

Кроме того, с немного лучшей производительностью это можно сделать с использованием objc_sync_enter и objc_sync_exit, если они доступны на вашей платформе:


Dispatch Groups

Если у вас есть несколько задач, даже если они добавлены в разные очереди и вы хотите дождаться их завершения, вы можете сгруппировать их в группу.

Давайте посмотрим на пример, задача может быть добавлена в определенную группу напрямую с помощью sync или async:

let mygroup = DispatchGroup()  
for i in 0..<5 {
globalDefault.async(group: mygroup){
sleep(UInt32(i))
print("Group async on globalDefault:"+String(i))
}
}

Задачи выполняются на globalDefault, но мы можем зарегистрировать обработчик для mygroup, который выполнит закрытие очереди, когда все они будут завершены. Метод wait() может использоваться для выполнения ожидания блокировки.

print("Waiting for completion...") 
mygroup.notify(queue: globalDefault) {
print("Notify received, done waiting.")
}
mygroup.wait()
print("Done waiting.")

Другой способ отслеживания задачи с группами состоит в том, чтобы вручную вводить и покидать группу вместо того, чтобы указывать ее при выполнении вызова в очереди:

for i in 0..<5 {     
mygroup.enter()
sleep(UInt32(i))
print("Group sync on MAINQ:"+String(i))
mygroup.leave()
}

Dispatch Work Items

Замыкания — это не единственный способ указать задание, которое должно выполняться очередью, иногда вам может потребоваться тип контейнера, способность отслеживать состояние его выполнения, для этого у нас есть DispatchWorkItem. Каждый метод, который принимает замыкание, имеет вариант и для DispatchWorkItem.

DispatchWorkItem инкапсулируют замыкание, которое выполняется пулом потоков очереди, вызывающей метод perform():

let workItem = DispatchWorkItem {     
print("Done!")
}
workItem.perform()

И WorkItems также предоставляют другие полезные методы, например, notify, который позволяет выполнять замыкание в определенной очереди при завершении:

workItem.notify(queue: DispatchQueue.main) {     
print("Notify on Main Queue!")
}
defaultQueue.async(execute: workItem)

Мы также можем дождаться завершения замыкания или пометить его для удаления до того, как очередь попытается выполнить его с помощью метода cancel() (который не отменяет замыкания которое уже выполняется).

print("Waiting for work item...") 
workItem.wait()
print("Done waiting.")
workItem.cancel()

Но важно знать, что wait() не только блокирует текущий поток, ожидающий завершения, но также повышает приоритет всех предыдущих work items в своей очереди, чтобы попытаться как можно скорее завершить этот конкретный элемент.


Dispatch Semaphores

Семафоры — это блокировки, которые могут быть приобретены более чем одним потоком в зависимости от текущего значения счетчика.

Потоки ждут на семафоре, когда счетчик, уменьшаемый каждый раз, когда семафор приобретается, достигает нуля.

Доступ к семафору освобождается для ожидающих потоков вызовом signal который увеличивает счетчик.

Давайте посмотрим на простой пример:

let sem = DispatchSemaphore(value: 2)  
// The semaphore will be held by groups of two pool threads globalDefault.sync {     
DispatchQueue.concurrentPerform(iterations: 10) { (id:Int) in
sem.wait(timeout: DispatchTime.distantFuture)
sleep(1)
print(String(id)+" acquired semaphore.")
sem.signal()
}
}

Dispatch Assertions

Swift 3 вводит новую функцию для выполнения утверждений о текущем контексте выполнения, что позволяет проверить, выполняется ли замыкание в ожидаемой очереди. Мы можем построить предикаты с использованием трех перечисляемых случаев DispatchPredicate: .onQueue, чтобы убедиться, что мы находимся в определенной очереди .notOnQueue, чтобы проверить противоположность и .onQueueAsBarrier, чтобы проверить, действует ли текущее замыкание или рабочий элемент как барьер.

dispatchPrecondition(condition: .notOnQueue(mainQueue)) dispatchPrecondition(condition: .onQueue(queue))

Dispatch Sources

Источники — это удобный способ обработки асинхронных событий системного уровня, таких как сигналы ядра или системные события, связанные с файлами и сокетами, с помощью обработчиков событий.

Существует несколько доступных источников, которые можно сгруппировать следующим образом:

  • Источники таймера: используются для генерации событий в определенный момент времени или периодических событий (DispatchSourceTimer).
  • Источники сигналов : используются для обработки сигналов UNIX (DispatchSourceSignal).
  • Источники памяти. Используется для регистрации уведомлений, связанных с использованием памяти (DispatchSourceMemoryPressure).
  • Источники сообщений дескриптора: используется для регистрации различных событий, связанных с файлами и сокетами (DispatchSourceFileSystemObject, DispatchSourceRead, DispatchSourceWrite).
  • Источники процесса: используются для контроля внешнего процесса для некоторых событий, связанных с их состоянием выполнения (DispatchSourceProcess).
  • Источники Маха: Используются для обработки событий, связанных с функциями IPC ядра Mach (DispatchSourceMachReceive, DispatchSourceMachSend).

Кроме того, при необходимости вы можете создавать собственные источники рассылки. Все источники отправки соответствуют протоколу DispatchSourceProtocol, который определяет основные операции, необходимые для регистрации обработчиков и изменения состояния активации источника отправки.

Давайте посмотрим пример с DispatchSourceTimer, чтобы понять, как использовать эти объекты.

Источники создаются с помощью служебных методов, предоставляемых DispatchSource, в этом фрагменте мы будем использовать makeTimerSource, указав очередь отправки, которую мы хотим использовать для выполнения обработчика.

Источники таймера не имеют других параметров, поэтому нам просто нужно указать очередь для создания источника, как мы увидим, источник отправки, способный обрабатывать несколько событий, обычно требует, чтобы вы указывали идентификатор события, которое вы хотите обрабатывать.

let t = DispatchSource.makeTimerSource(queue: DispatchQueue.global()) 
t.setEventHandler{ print("!") }
t.scheduleOneshot(deadline: .now() + .seconds(5), leeway: .nanoseconds(0))
t.activate()

После создания источника мы регистрируем обработчик событий с помощью setEventHandler, и если никакие другие конфигурации не требуются, включите источник отправки с помощью activate() (предыдущие выпуски libDispatch использовали для этой цели метод resume()).

Источники рассылки первоначально неактивны, что означает, что они не начнут сразу доставлять события, что позволяет производить их дальнейшую настройку. Когда мы будем готовы, источник можно активировать с помощью activate(), и при необходимости доставка события может быть временно приостановлена с помощью suspend() и возобновлена с помощью функции resume().

Источники таймеров требуют дополнительного шага для настройки того, какие события будут синхронизироваться с объектом. В приведенном выше примере мы определяем одно событие, которое будет доставлено через 5 секунд после регистрации со строгим предельным сроком.

Мы могли бы также настроить объект для доставки периодических событий, как мы могли бы сделать с объектом Timer:

t.scheduleRepeating(deadline: .now(), interval: .seconds(5), leeway: .seconds(1))

Когда мы закончили с источником отправки, и мы хотим просто полностью прекратить доставку событий, мы вызываем cancel(), который остановит источник события, вызовет обработчик отмены, если мы его установили, и выполнит некоторые заключительные операции очистки такие как отмена регистрации обработчиков.

t.cancel()

API остается тем же для других типов источников отправки, давайте посмотрим, например, как Kitura инициализирует источник чтения, который он использует для обработки асинхронных чтений на установленном сокете:

readerSource = DispatchSource.makeReadSource(
fileDescriptor: socket.socketfd,
queue: socketReaderQueue(fd: socket.socketfd))
readerSource.setEventHandler() {
_ = self.handleRead()
}
readerSource.setCancelHandler(handler: self.handleCancel) readerSource.resume()

Функция handleRead() будет вызываться в выделенной очереди, когда новые новые байты будут доступны в буфере входящих данных сокета. Kitura также использует WriteSource для буферизованных записей, используя события источника отправки для эффективного темпа записи, записи новых байтов, как только сокет-канал готов их отправить. При выполнении операций ввода-вывода источники отправки / чтения / чтения / записи могут быть хорошей высокоуровневой альтернативой другим низкоуровневым API, которые вы обычно используете на платформах * nix.

Другой источник, который может быть полезен в некоторых случаях, это DispatchSourceFileSystemObject, который позволяет прослушивать изменения в конкретном файле, от его имени до изменений его атрибутов. С помощью этого источника отправки вы также сможете получать уведомления, если файл был изменен или удален, по сути это подмножество событий, которые в Linux управляются подсистемой inotify kernel.

Остальные типы источников работают аналогично, вы можете проверить полный список того, что доступно в документации libDispatch, но помните, что некоторые из них, такие как источники Mach и источник давления памяти, будут работать только на платформах Darwin.


Операции и OperationQueues

Давайте кратко поговорим об операционных очередях и дополнительном API, построенном поверх GCD, который использует параллельные очереди и модели задачи как операции, которые легко отменить и которые могут иметь свое выполнение, зависящее от завершения других операций.

Операции могут иметь приоритет, который определяет порядок выполнения, и добавляются к OperationQueues, которые выполняются асинхронно.

Рассмотрим основной пример:

var queue = OperationQueue() 
queue.name = "My Custom Queue"
queue.maxConcurrentOperationCount = 2
//Refers to the queue of the main thread 
var mainqueue = OperationQueue.main
queue.addOperation{     print("Op1") } 
queue.addOperation{ print("Op2") }

Мы также можем создать объект «Операция блока» и сконфигурировать его перед добавлением его в очередь, и при необходимости мы также можем добавить более одного замыкания для этого типа операций.

var op3 = BlockOperation(block: {     
print("Op3")
})
op3.queuePriority = .veryHigh
op3.completionBlock = {
if op3.isCancelled {
print("Someone cancelled me.")
}
print("Completed Op3")
}

var op4 = BlockOperation {
print("Op4 always after Op3")
OperationQueue.main.addOperation{
print("I'm on main queue!")
}
}

Операции могут иметь приоритет и вторичное замыкание завершения, которое будет выполняться после завершения основного замыкания.

Мы можем добавить зависимость от op4 к op3, так что op4 будет ждать завершения операции op3.

op4.addDependency(op3)  
queue.addOperation(op4)
// op3 will complete before op4, always
queue.addOperation(op3)

Зависимости также могут быть удалены с помощью removeDependency (operation :) и сохранены в общедоступном массиве зависимостей.

Текущее состояние операции может быть проверено с использованием определенных свойств:

op3.isReady       //Ready for execution? 
op3.isExecuting //Executing now?
op3.isFinished //Finished naturally or cancelled?
op3.isCancelled //Manually cancelled?

Вы можете отменить все операции, присутствующие в очереди, вызывая метод cancelAllOperations, который устанавливает флаг isCancelled для оставшихся в очереди операций. Одну операцию можно отменить, вызывая метод cancel:

queue.cancelAllOperations()   
op3.cancel()

Рекомендуется проверять свойство isCancelled внутри вашей операции, чтобы пропустить выполнение, если операция была отменена после того, как она была запланирована для выполнения очередью.

И, наконец, вы также можете остановить выполнение новых операций в очереди операций (текущая работа не будет затронута):

queue.isSuspended = true

Заключение

Эта статья должна была дать вам краткий обзор того, что возможно сегодня с точки зрения параллелизма, используя внешние фреймворки, доступные от Swift.

Часть 2 будет посвящена тому, что может произойти дальше, с точки зрения языковых средств, которые могли бы обрабатывать параллелизм «изначально», не прибегая к внешним библиотекам. Несколько интересных парадигм будет описано с помощью нескольких реализаций с открытым исходным кодом, уже доступных сегодня.

Надеюсь, что эти две статьи станут хорошим введением в мир параллелизма и помогут вам понять и принять участие в дискуссиях, которые состоятся в списке рассылки быстрой эволюции, когда сообщество начнет обсуждать, что вводить, Будем надеяться, Свифт 5.

Для более интересного контента по параллелизму и Swift, посмотрите блог Cocoa With Love.

Like what you read? Give Alexej a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.