Существует как минимум два способа общения между двумя программными компонентами.

Компонент 1 может обратится к Компонент 2 для получения каких-то данных или выполнения каких-то операций. В этом случае Компонент 2 выполняет работу, когда его об этом просят.

Компонент 2 является активным, содержит собственный поток исполнения или каким-то другим способом следит за своим состоянием. Компонент 2 может уведомлять Компонент 1 о событиях.

Первая модель взаимодействия называется pull-моделью, а вторая — push-моделью.

Pull-модель

Pull-модель в рамках одного процесса — это вызов функции или метода. Для клиент-серверной архитектуры — это запрос клиента к серверу. Или запрос одного сервиса к другому.

Push-модель

Push-модель в мире структурного программирования реализуется с помощью обратного вызова(callbacks). В мире функционального программирования push-модель представлена в виде реактивной модели. А в ООП реализуется с помощью паттерна Observer.

Паттерн PubSub(Publisher-Subscriber) является вариацией Observer. Взаимодействие между компонентами происходит через канал связи Event channel. Publisher отправляет события в event channel. Subscriber подписывается на нужное события и ждет его поступления в event channel.

Ключевым различием между классическим Observer и PubSub является слабая связность(loose coupling). Publisher и Subscriber в PubSub не знают о существование друг друга, в отличии от Observer и Subject.

Паттерн PubSub подходит для асинхронного взаимодействия различных приложений в системе. В качестве event channal используют вариации брокером, шин событий и пр(broker, message broker,event bus, …)

Classic Observer

Определяет зависимость типа “один ко многим” между объектами таким образом, что при изменении состояния одного объекта все зависящие от него оповещаются об этом и автоматически обновляются.

Subject имеет внутреннее состояние. На изменения этого состояния должны реагировать другие компоненты системы. Subject владеет списком компонентов подписанных на его изменения. А так же механизмом добавления/удаления подписчиков. Когда происходит события, Subject проходит по своему списку подписчиков и оповещает их об этом.

Observer должен иметь механизм получения уведомления о событиях. В классической реализации — это метод Update. Subject вызывает Update у всех своих подписчиков когда меняется его состояние.

Мы уже говорили, что есть push и pull модели взаимодействия. Паттерн Observer представляет собой push-модель взаимодействия между объектами, поскольку Subject самостоятельно “проталкивает” информацию о событии. Но у push-модели остается вопрос о кол-ве передаваемой информации.

В контексте передаваемой информации так же существует push и pull модели.

Subject может только информировать наблюдателей о событии, не передавая никаких подробностей, вызвав метод Update. Если у наблюдателя есть ссылка на издателя событий, то он может запросить дополнительную информацию. Или обратиться к другому компоненту системы. В хранилище данных или к стороннему сервису. Подписчик может даже проигнорировать событие. Такой подход можно назвать pull-модель получения данных.

При push-модель передачи данных, издатель сразу передаёт всю информацию. Например, через метод Update. Плюс такого подхода в том, что наблюдатель не связан с подписчиком. Он может получать различные события и обрабатывать их по своей логике.

Теперь Go

Полный код

Посмотрим на классический паттерн Observer.

type Observer interface {
Update()
}

type Subject interface {
Attach(Observer)
Detach(Observer)
//Notify()
}

У ConcreteSubject есть список подписчиков(observers) и его состояние(state). Реализованы методы подписки(Attach) и отписки(Detach). Состояние объекта можно изменить(SetState) или получить актуальное значение(GetState). При смене состояния вызывается метод оповещения о событии(notify). notify обходит всех подписчиков и вызывает их метод Update.

type ConcreteSubject struct {
observers []Observer // список подписчиков
state interface{} // состояние
}

func (s *ConcreteSubject) Attach(o Observer) {
s.observers = append(s.observers, o)
}
func (s *ConcreteSubject) Detach(o Observer) {
for i, v := range s.observers {
if o == v {
s.observers = append(s.observers[:i], s.observers[i+1:]...)
break
}
}
}
func (s *ConcreteSubject) notify() {
for _, v := range s.observers {
v.Update()
}
}
func (s *ConcreteSubject) SetState(st interface{}) {
s.state = st
s.notify()
}
func (s *ConcreteSubject) GetState() interface{} {
return s.state
}

У наблюдателя будет ссылка на наблюдаемый объект(subject) и свои данные(state). Метод Update(тот самый который вызывает Subject) просто дублируют состояния издателя.

type ConcreteObserver struct {
subject *ConcreteSubject
state interface{}
}

func (o *ConcreteObserver) Update() {
o.state = o.subject.GetState()
log.Printf("new state:%s", o.state)
}

Вот и вся реализация паттерна Observer.

Напишем тесты и убедимся, что все работает, как планировали.

Создаем издателя и двух подписчиков:

subject := ConcreteSubject{}

obs1 := ConcreteObserver{subject: &subject, state: "obs1"}
obs2 := ConcreteObserver{subject: &subject, state: "obs2"}

Две подписки и проверка:

t.Run("Attach", func(t *testing.T) {
subject.Attach(&obs1)
subject.Attach(&obs2)

if len(subject.observers) != 2 {
t.Errorf("expected len: 2, got:%d", len(subject.observers))
}
})

Меняем состояния издателя и проверяем, что оба подписчика были об этом проинформированы:

t.Run("Notify 2 obs", func(t *testing.T) {
state := "new test"

subject.SetState(state)

for _, o := range subject.observers {
obs := o.(*ConcreteObserver)
if obs.state != state {
t.Errorf("expected state:%v, got:%v", state, obs.state)
}
}
})

Тестируем отписку одного из наблюдателей:

t.Run("Detach", func(t *testing.T) {
subject.Detach(&obs1)

if len(subject.observers) != 1 {
t.Errorf("expected len: 1, got:%d", len(subject.observers))
}
})

Проверяем еще раз оповещения:

t.Run("Notify after detach", func(t *testing.T) {
state := "new test2"

subject.SetState(state)

for _, o := range subject.observers {
obs := o.(*ConcreteObserver)
if obs.state != state {
t.Errorf("expected state:%v, got:%v", state, obs.state)
}
}
})

Разобравшись с базовой идеей, можно посмотреть на различные вариации.

Event Listeners

Для начало попробуем сделать компоненты менее зависимыми друг от друга. Например, наблюдателю необязательно что-то знать про издателя. Наблюдателю достаточно получать сообщения о событии.

Полный код

Предположим есть компонент который отслеживает изменения в файловой системе и сообщает об этом. Упрощенный вариант структуры файла и типов событий:

type FileInfo struct {
Name string
}

type EventType string
const (
EventTypeCreate EventType = "create"
EventTypeRemove EventType = "remove"
EventTypeModify EventType = "modify"
)

Наблюдатель больше не будет иметь ссылок на издателя, но при уведомлении будет получать всю необходимую информацию. Push-модель передачи данных. Сразу передается тип события(EventType) и объект события(FileInfo):

type Observer interface {
Update(EventType, FileInfo)
}

Предположим, что необходимо вести логи на все изменения в файловой системе. Добавим реализацию наблюдателя который сохраняет логи в свое хранилище:

type LoggingListener struct {
store io.Writer
}

func (l *LoggingListener) Update(et EventType, fi FileInfo) {
log := fmt.Sprintf("%s|%s", et, fi.Name)
l.store.Write([]byte(log))
}

Еще может понадобиться отправлять уведомления:

type AlertListener struct {
service io.Writer
}

func (l *AlertListener) Update(et EventType, fi FileInfo) {
log := fmt.Sprintf("%s|%s", et, fi.Name)
l.service.Write([]byte(log))
}

Subject очень похож на классическую реализацию, но появляется возможность подписывать наблюдателей на разные типы событий:

type Subject interface {
Attach(EventType, Observer)
Detach(EventType, Observer)
}

type EventManager struct {
observers map[EventType][]Observer
}

func (em *EventManager) Attach(et EventType, o Observer) {
em.observers[et] = append(em.observers[et], o)
}
func (em *EventManager) Detach(et EventType, o Observer) {
for i, v := range em.observers[et] {
if v == o {
em.observers[et] = append(em.observers[et][:i], em.observers[et][i+1:]...)
break
}
}
}
func (em *EventManager) Notify(et EventType, fi FileInfo) {
for _, o := range em.observers[et] {
o.Update(et, fi)
}
}

В данном случае Subject не имеет состояния и не генерирует события о которых нужно уведомлять. Subject управляет подписчиками и их уведомлением.

Компонент который следит за состоянием файловой системы имеет ссылку на EventManager. Когда что-то происходит с файлами он информирует об этом EventManager:

type Watcher struct {
eventManager *EventManager
}

func (w *Watcher) OnCreate(fi FileInfo) {
w.eventManager.Notify(EventTypeCreate, fi)
}
func (w *Watcher) OnRemove(fi FileInfo) {
w.eventManager.Notify(EventTypeRemove, fi)
}
func (w *Watcher) OnModify(fi FileInfo) {
w.eventManager.Notify(EventTypeModify, fi)
}

Получилась одна из вариаций паттерна Mediator. Обработчики событий и генераторы этих событий ничего не знают друг о друге. Они взаимодействуют через единый компонент, в данном случае EventManager.

Все готово, осталось протестировать.

Для тестов понадобится mock-объект в который наблюдатели будут сохранять полученные события. Mock-объекту нужно реализовать интерфейс Writer. Сделаем слайс строк и метод Write который пишет в них данные:

type MockWriter struct {
data []string
}

func (mw *MockWriter) Write(b []byte) (n int, err error) {
mw.data = append(mw.data, string(b))
return len(b), nil
}

Создаем EventManager и связанный с ним Watcher:

evManager := EventManager{observers: map[EventType][]Observer{}}
fileWatcher := Watcher{eventManager: &evManager}

LoggingListener и MockWriter:

store := MockWriter{}
logger := LoggingListener{store: &store}

AlertListener и MockWriter:

service := MockWriter{}
alerts := AlertListener{service: &service}

Добавим обработчик LoggingListener на все три события и обработчик AlertListener на два. Проверим, что у create и modify по два обработчика, а у delete только один:

evManager.Attach(EventTypeCreate, &logger)
evManager.Attach(EventTypeRemove, &logger)
evManager.Attach(EventTypeModify, &logger)
evManager.Attach(EventTypeCreate, &alerts)
evManager.Attach(EventTypeModify, &alerts)

if len(evManager.observers[EventTypeCreate]) != 2 {
t.Errorf("expected 2 listeners for EventTypeCreate, got:%d", len(evManager.observers[EventTypeCreate]))
}
if len(evManager.observers[EventTypeModify]) != 2 {
t.Errorf("expected 2 listeners for EventTypeModify, got:%d", len(evManager.observers[EventTypeModify]))
}
if len(evManager.observers[EventTypeRemove]) != 1 {
t.Errorf("expected 1 listeners for EventTypeRemove, got:%d", len(evManager.observers[EventTypeRemove]))
}

Для тестирования обработки событий потребуется две переменные, которые должны быть равны ожидаемому состоянию LoggingListener и AlertListener:

expectedDataInStore := make([]string, 0)
expectedDataInService := make([]string, 0)

Вспомогательная функция которая сравнивает эти слайсы со слайсами обработчиков:

eqDataOfWriter := func(t *testing.T) {
t.Helper()

if !reflect.DeepEqual(store.data, expectedDataInStore) {
t.Errorf("store expected:%v, got:%v", expectedDataInStore, store.data)
}
if !reflect.DeepEqual(service.data, expectedDataInService) {
t.Errorf("service expected:%v, got:%v", expectedDataInService, store.data)
}
}

Тестируем создание файла. Событие должно обработаться в двух наблюдателях:

fi := FileInfo{Name: "testName"}
fileWatcher.OnCreate(fi)

expectedDataInStore = append(expectedDataInStore, "create|testName")
expectedDataInService = append(expectedDataInService, "create|testName")

eqDataOfWriter(t)

Таким же образом тестируем удаление и изменения:

t.Run("OnRemove", func(t *testing.T) {
fi := FileInfo{Name: "testName"}
fileWatcher.OnRemove(fi)

expectedDataInStore = append(expectedDataInStore, "remove|testName")

eqDataOfWriter(t)
})

t.Run("OnModify", func(t *testing.T) {
fi := FileInfo{Name: "testName"}
fileWatcher.OnModify(fi)

expectedDataInStore = append(expectedDataInStore, "modify|testName")
expectedDataInService = append(expectedDataInService, "modify|testName")

eqDataOfWriter(t)
})

Publish/Subscribe

https://docs.microsoft.com/en-us/previous-versions/msp-n-p/ff649664(v=pandp.10)

Паттер PubSub — это вариация Observer. У pubsub есть event channal. Благодаря event channal у pubsub появляются такие свойства, как слабая связность между компонентами и асинхронность.

В Go на роль event channal отлично подходят встроенные каналы.

Интерфейсы publisher и subscriber могут выглядит следующим образом:

type Subscriber interface {
Notify(interface{})
Close()
}

type Publisher interface {
start()
AddSubscriber() chan<- Subscriber
RemoveSubscriber() chan<- Subscriber
PublishMessage() chan<- interface{}
Stop()
}

Значения в каналы можно передавать на месте, при вызове методов. Так же появляется возможность получить канал и передать его в другие компоненты системы.

Полный код

Publisher

https://github.com/egonelbre/gophers

У publisher есть список подписчиков(subscribers), канал добавления подписчиков(addSubCh), канал удаления подписчиков(removeSubCh), канал получения сообщений(inMsg) и сигнал остановки(stop):

type publisher struct {
subscribers []Subscriber
addSubCh chan Subscriber
removeSubCh chan Subscriber
inMsg chan interface{}
stop chan struct{}

addSubHandler func(Subscriber)
removeSubHandler func(Subscriber)
}

Методы дают доступ к каналам:

func (p *publisher) AddSubscriber() chan<- Subscriber {
return p.addSubCh
}
func (p *publisher) RemoveSubscribe() chan<- Subscriber {
return p.removeSubCh
}
func (p *publisher) PublishMessage() chan<- interface{} {
return p.inMsg
}
func (p *publisher) Stop() {
close(p.stop)
}

Добавляем нового подписчика в список через канал addSubCh. onAddSubscriber позволяет клиенту publisher добавить дополнительную логику в процесс подписки. Например, добавить логирование. Так же можно реализовать переопределения всей логики подписки, в плоть до обработки получения данных из канала addSubCh. Через removeSubCh получаем подписчика на удаление. inMsg получает сообщения и отправляет всем подписчикам. stop вызывает Close у всех подписчиков и закрывает каналы издателя:

func (p *publisher) start() {
for {
select {
case sub := <-p.addSubCh:
{
p.subscribers = append(p.subscribers, sub)
p.onAddSubscriber(sub)
}
case sub := <-p.removeSubCh:
{
for i, s := range p.subscribers {
if sub == s {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
s.Close()
p.onRemoveSubscriber(sub)
break
}
}
}
case msg := <-p.inMsg:
{
for _, sub := range p.subscribers {
sub.Notify(msg)
}
}
case <-p.stop:
{
for _, sub := range p.subscribers {
sub.Close()
}

close(p.addSubCh)
close(p.removeSubCh)
close(p.inMsg)

return
}
}
}
}

Создания и инициализация:

func NewPublisher() *publisher {
em := publisher{
addSubCh: make(chan Subscriber),
removeSubCh: make(chan Subscriber),
inMsg: make(chan interface{}),
stop: make(chan struct{}),
}
go em.start()
return &em
}

Publisher и subscriber могут быть совсем разными компонентами, поэтому есть смысл их тестировать по отдельности.

mock для subscriber:

type mockSubscriber struct {
isClose bool
testNotify *func(string)
}

func (s *mockSubscriber) Notify(msg interface{}) {
(*s.testNotify)(msg.(string))
}
func (s *mockSubscriber) Close() {
s.isClose = true
}

Тестируем добавления подписчиков. Добавляем их в разных горутинах. А addSubHandler поможет определить момент завершения добавления в список. Запускаем 50 горутин с добавлением и в конце сверяем с кол-вом в pub.subscribers:

t.Run("AddSubscriber", func(t *testing.T) {
cntSub := 50
wg := sync.WaitGroup{}
pub.addSubHandler = func(s Subscriber) {
wg.Done()
}

for i := 0; i < cntSub; i++ {
wg.Add(1)
go func() {
sub := mockSubscriber{
isClose: false,
testNotify: &testFunNotify,
}

pub.addSubCh <- &sub
}()
}

wg.Wait()

if cntSub != len(pub.subscribers) {
t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
}
})

Указатель на функцию testFunNotify мы присвоили всем подписчикам. Эта функция будет вызвана при получении сообщения. Отправим сообщение и проверим его через эту функцию.

t.Run("PublishMessage", func(t *testing.T) {
msg := "Test Msg"

testFunNotify = func(s string) {
if msg != s {
t.Errorf("expected:%s got:%s", msg, s)
}
}

pub.PublishMessage() <- msg
})

Удаление подписчика из списка издателя:

t.Run("RemoveSubscribe", func(t *testing.T) {
cntSub := 40
wg := sync.WaitGroup{}
pub.removeSubHandler = func(s Subscriber) {
wg.Done()
}

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
pub.removeSubCh <- pub.subscribers[0]
}()
}

wg.Wait()

if cntSub != len(pub.subscribers) {
t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
}
})

Subscriber

https://github.com/egonelbre/gophers

У subscriber будет канал получения сообщений(in), канал для сигнала остановки(stop) и хранилище с интерфейсом Writer(store):

type subscriber struct {
in chan interface{}
stop chan struct{}
store io.Writer
}

Полученное сообщение отправляем в канал in и дальше из него в хранилище:

func (s subscriber) Notify(msg interface{}) {
s.in <- msg
}
func (s subscriber) Close() {
close(s.stop)
}
func (s subscriber) start() {
for {
select {
case msg := <-s.in:
{
s.store.Write([]byte(msg.(string)))
}
case <-s.stop:
{
close(s.in)
return
}
}
}
}

Получился простой пример subscriber.

Осталось добавить тестов. Mock для store:

type mockWriter struct {
data []string
}

func (mw *mockWriter) Write(b []byte) (n int, err error) {
mw.data = append(mw.data, string(b))
return len(b), nil
}

Создаем 50 подписчиков и отправляем им 50 сообщений:

func TestSubscriber_Notify(t *testing.T) {
excepted := make([]string, 50)
subs := make([]subscriber, 50)

for i := 0; i < 50; i++ {
store := mockWriter{}
subs[i] = NewSubscriber(&store)
}

msg := "test msg"
for i := 0; i < 50; i++ {
excepted[i] = msg
for _, sub := range subs {
sub.Notify(msg)
}
}
time.Sleep(1 * time.Second)

for _, sub := range subs {
if !reflect.DeepEqual(sub.store.(*mockWriter).data, excepted) {
t.Errorf("excepted:%v got:%v", sub.store.(*mockWriter).data, excepted)
}
}
}

У каждого subscriber должно быть в store слайс из 50 сообщений.

PubSub в упрощенном виде готов. Дальше его можно изменять и подстраивать под конкретную задачу.

Рассмотрели концепции взаимодействия компонентов в системе. Подходы push и pull моделей. Реализацию push модели через паттерн Observer и его вариацию PubSub.

https://github.com/GermanGorelkin/go-patterns/tree/master/behavioral/observer

На сегодня все. Спасибо!

--

--