top of page
Writer's pictureohm patel

Publisher Subscriber Architecture




In modern architecture, applications are decoupled into smaller, independent building blocks that are easier to develop, deploy and maintain. Publish/Subscribe (Pub/Sub) messaging provides instant event notifications for these distributed applications.


There are three main components to the Publish-Subscribe Model: publishers, event bus/broker and subscribers. A Publish-­Subscribe Architecture is a messaging pattern where the publishers broadcast messages, with no knowledge of the subscribers. Similarly the subscribers ‘listen’ out for messages regarding topic/categories that they are interested in without any knowledge of who the publishers are. The broker transfers the messages from the publishers to the subscribers.



 

Lets Code


First let's create a structure for every component of the system, i.e Publisher, Broker and Subscriber.


//topics to subscribe
type Topic struct {
   message string 
   id int //to uniquly identify each topic
}
//Publisher to publish the topics
type Publisher struct {
   name string
}
//Message broker
type Broker struct {
    TopicBuffer chan Topic //incoming buffer
    Subscribers map[int][]*Subscriber //mapping topics & subscriber
}
//Subscriber to subscribe a message
type Subscriber struct {
    name   string //name to subscriber
    topic  Topic  // topic to which it subscribe
    buffer chan Topic // subscriber buffer
}

Not much to explain, straight forward and simple.


Now its time to define the functionality of every component we defined above.


Publisher


//Publisher publishes the topic
func (pub *Publisher) Publish(topic Topic, queue *Broker) bool {
        fmt.Println("Publishing Topic: ", topic.message, ".....")
        queue.TopicBuffer <- topic
        fmt.Println("\n Published Topic ", topic.message, "To                   message queue")
        return true
}
//Publisher signals to stop the execution
func (pub *Publisher) SignalStop(queue *Broker) bool {
         return queue.SignalStop()
}

Subscriber


//Subscriber subscribe to particular topic with broker
func (sub *Subscriber) Subscribe(queue *Broker) bool {
           
           fmt.Println("Subscriber: ", sub.name, "subscribing to 
                        Topic: ", sub.topic.message, ".....") 
          
          (*queue).Subscribers[sub.topic.id] = 
                  append((*queue).Subscribers[sub.topic.id], sub)
            fmt.Println("Subscriber ", sub.name, "subscribed to 
                         Topic ", sub.topic.message)
            return true
 }
//Subscriber concumes buffer filled with its topics
func (sub *Subscriber) ConsumeBuffer() bool {
          for topic := range (*sub).buffer {
                  fmt.Println("Consumed ", topic.message, "from
                               Subscriber", sub.name)
          }
          fmt.Println("Subscriber ", sub.name, " Closed")
          return true
}

Broker


//Borker notifys the subscriber to consume
func (sub *Broker) NotifySubscriber() bool {
            for topic := range sub.TopicBuffer {
                 subscribers := sub.Subscribers[topic.id]
                 for _, s := range subscribers {
                          s.buffer <- topic
                  }
            }
            return true
}
//Broker signal the subscribers to stop
func (sub *Broker) SignalStop() bool {
            for _, v := range sub.Subscribers {
                   for _, i := range v {
                           close(i.buffer)
                   }
             }
            return true
}

Now its time to execute the above functionalities, i.e to write the main function


Main


func main() {
        //list of predefined topics
        topics := []Topic{
              Topic{
                 message: "first",
                 id:      1,
              },
              Topic{
                 message: "second",
                 id:      2,
              },
              Topic{
                 message: "third",
                 id:      2,
              },
              Topic{
                 message: "fourth",
                 id:      2,
              },
              Topic{
                 message: "fifth",
                 id:      1,
              },
       }
       //message broker
       broker := Broker{
         TopicBuffer: make(chan Topic, 3), //buffer for messages
         Subscribers: make(map[int][]*Subscriber),
       }
       //publisher
       publisher := Publisher{
           name: "publisher 1",
       }
       //subscriber s_1 subscribe to topic having id 1
       subscriber_1 := Subscriber{
           name:   "s_1",
           buffer: make(chan Topic,2), //subscriber buffer
           topic:  topics[0],
       }
       //subscriber s_2 subscribe to topic having id 2
       subscriber_2 := Subscriber{
            name:   "s_2",
            buffer: make(chan Topic), //subscriber buffer
            topic:  topics[1],
       }
       // Start subscribers consuming its bufer async
       go subscriber_1.ConsumeBuffer()
       go subscriber_2.ConsumeBuffer()
       //notify the Subscriber to start consuming async
       go broker.NotifySubscriber()
       
       //Subscribe with the broker for respective topics
       subscriber_1.Subscribe(&broker)
       subscriber_2.Subscribe(&broker)
       //start publishing messages to broker
       for i := range topics {
           publisher.Publish(topics[i], &broker)
       }
        //signal the system to stop
        <-time.After(1 * time.Second)
        publisher.SignalStop(&broker)
        <-time.After(1 * time.Second)
}

Output


Subscriber  s_1 subscribing to Topic  first .....
Subscriber  s_1 subscribed to Topic  first
Subscriber  s_2 subscribing to Topic  second .....
Subscriber  s_2 subscribed to Topic  second
Publishing Topic  first .....
Published Topic  first To message queue
Publishing Topic  second .....
Published Topic  second To message queue
Publishing Topic  third .....
Published Topic  third To message queue
Publishing Topic  fourth .....
Consumed  second from Subscriber s_2
Consumed  third from Subscriber s_2
Published Topic  fourth To message queue
Publishing Topic  fifth .....=
Published Topic  fifth To message queue
Consumed  fourth from Subscriber s_2
Consumed  first from Subscriber s_1
Consumed  fifth from Subscriber s_1
Subscriber  s_2  Closed
Subscriber  s_1  Closed

Note


  • Above system implemented with 1 publisher and 2 subscribers. We can have multiple publishers and subscribers.

  • Buffer size at both incoming and outgoing end is 3 and 2 respectively, which can be changed.

  • The code is running asynchronous code such as NotifySubscriber() and ConsumeBuffer() to demonstrate somewhat parallelism using concurrency, but on production systems publishers, subscribers and broker are all together on different servers making them run parallelly, purely.


The entire working code is given below.




0 views0 comments

Comentários


bottom of page