In modern architecture, applications are decoupled into smaller, independent building blocks that are easier to develop, deploy and maintain. Publish/Subscribe (Pub/Sub) messaging provides instant event notifications for these distributed applications.
There are three main components to the Publish-Subscribe Model: publishers, event bus/broker and subscribers. A Publish-Subscribe Architecture is a messaging pattern where the publishers broadcast messages, with no knowledge of the subscribers. Similarly the subscribers ‘listen’ out for messages regarding topic/categories that they are interested in without any knowledge of who the publishers are. The broker transfers the messages from the publishers to the subscribers.
Lets Code
First let's create a structure for every component of the system, i.e Publisher, Broker and Subscriber.
//topics to subscribe
type Topic struct {
message string
id int //to uniquly identify each topic
}
//Publisher to publish the topics
type Publisher struct {
name string
}
//Message broker
type Broker struct {
TopicBuffer chan Topic //incoming buffer
Subscribers map[int][]*Subscriber //mapping topics & subscriber
}
//Subscriber to subscribe a message
type Subscriber struct {
name string //name to subscriber
topic Topic // topic to which it subscribe
buffer chan Topic // subscriber buffer
}
Not much to explain, straight forward and simple.
Now its time to define the functionality of every component we defined above.
Publisher
//Publisher publishes the topic
func (pub *Publisher) Publish(topic Topic, queue *Broker) bool {
fmt.Println("Publishing Topic: ", topic.message, ".....")
queue.TopicBuffer <- topic
fmt.Println("\n Published Topic ", topic.message, "To message queue")
return true
}
//Publisher signals to stop the execution
func (pub *Publisher) SignalStop(queue *Broker) bool {
return queue.SignalStop()
}
Subscriber
//Subscriber subscribe to particular topic with broker
func (sub *Subscriber) Subscribe(queue *Broker) bool {
fmt.Println("Subscriber: ", sub.name, "subscribing to
Topic: ", sub.topic.message, ".....")
(*queue).Subscribers[sub.topic.id] =
append((*queue).Subscribers[sub.topic.id], sub)
fmt.Println("Subscriber ", sub.name, "subscribed to
Topic ", sub.topic.message)
return true
}
//Subscriber concumes buffer filled with its topics
func (sub *Subscriber) ConsumeBuffer() bool {
for topic := range (*sub).buffer {
fmt.Println("Consumed ", topic.message, "from
Subscriber", sub.name)
}
fmt.Println("Subscriber ", sub.name, " Closed")
return true
}
Broker
//Borker notifys the subscriber to consume
func (sub *Broker) NotifySubscriber() bool {
for topic := range sub.TopicBuffer {
subscribers := sub.Subscribers[topic.id]
for _, s := range subscribers {
s.buffer <- topic
}
}
return true
}
//Broker signal the subscribers to stop
func (sub *Broker) SignalStop() bool {
for _, v := range sub.Subscribers {
for _, i := range v {
close(i.buffer)
}
}
return true
}
Now its time to execute the above functionalities, i.e to write the main function
Main
func main() {
//list of predefined topics
topics := []Topic{
Topic{
message: "first",
id: 1,
},
Topic{
message: "second",
id: 2,
},
Topic{
message: "third",
id: 2,
},
Topic{
message: "fourth",
id: 2,
},
Topic{
message: "fifth",
id: 1,
},
}
//message broker
broker := Broker{
TopicBuffer: make(chan Topic, 3), //buffer for messages
Subscribers: make(map[int][]*Subscriber),
}
//publisher
publisher := Publisher{
name: "publisher 1",
}
//subscriber s_1 subscribe to topic having id 1
subscriber_1 := Subscriber{
name: "s_1",
buffer: make(chan Topic,2), //subscriber buffer
topic: topics[0],
}
//subscriber s_2 subscribe to topic having id 2
subscriber_2 := Subscriber{
name: "s_2",
buffer: make(chan Topic), //subscriber buffer
topic: topics[1],
}
// Start subscribers consuming its bufer async
go subscriber_1.ConsumeBuffer()
go subscriber_2.ConsumeBuffer()
//notify the Subscriber to start consuming async
go broker.NotifySubscriber()
//Subscribe with the broker for respective topics
subscriber_1.Subscribe(&broker)
subscriber_2.Subscribe(&broker)
//start publishing messages to broker
for i := range topics {
publisher.Publish(topics[i], &broker)
}
//signal the system to stop
<-time.After(1 * time.Second)
publisher.SignalStop(&broker)
<-time.After(1 * time.Second)
}
Output
Subscriber s_1 subscribing to Topic first .....
Subscriber s_1 subscribed to Topic first
Subscriber s_2 subscribing to Topic second .....
Subscriber s_2 subscribed to Topic second
Publishing Topic first .....
Published Topic first To message queue
Publishing Topic second .....
Published Topic second To message queue
Publishing Topic third .....
Published Topic third To message queue
Publishing Topic fourth .....
Consumed second from Subscriber s_2
Consumed third from Subscriber s_2
Published Topic fourth To message queue
Publishing Topic fifth .....=
Published Topic fifth To message queue
Consumed fourth from Subscriber s_2
Consumed first from Subscriber s_1
Consumed fifth from Subscriber s_1
Subscriber s_2 Closed
Subscriber s_1 Closed
Note
Above system implemented with 1 publisher and 2 subscribers. We can have multiple publishers and subscribers.
Buffer size at both incoming and outgoing end is 3 and 2 respectively, which can be changed.
The code is running asynchronous code such as NotifySubscriber() and ConsumeBuffer() to demonstrate somewhat parallelism using concurrency, but on production systems publishers, subscribers and broker are all together on different servers making them run parallelly, purely.
The entire working code is given below.
Comentários