Golang queue broker
They say that best way to learn something new is to do something new. The biggest problem with starting new side project is all about time. Your daylight job & other stuff consumes so much time that there is not much left. As a full time C# developer I decided to learn new language. I picked Golang while it had so many interesting new concepts and ways of doing things, that it was perfect fit to make me get out of my comfort zone.
Concept
I wanted to do something new but at the same time I didn’t want to reinvent wheel. At my daylight job I used to work with software which communicates via:
- RabbitMQ
- NATS
- Azure Service Bus
I called it Syncbrok — Synchronous Broker. Idea was simple, create message broker using Golang concurrency features, to make it a little bit challenging there are two main modes, single message and parented message:
Building blocks
I decided to use as little as possible abstraction. While this approach limits possibility to extend and grow project in future, it allows to focus on quick implementation. There are only few parts:
- Message — entity with Id, Content and optional ParentId.
- Queue — container for messages where client can send messages and Syncbrok delivers messages to registered subscribers
- Handler —any function with signature:
func(msgId uuid.UUID, endpoint string, callbackWg *sync.WaitGroup)
msgId — identifies message
endpoint — tells handler where to send messages, so one handler can have multiple subscribers.
callbackWg — WaitGroup which allows to inform queue that message was delivered.
- Storage — content of message is written to disk, and pointer to position in file is kept in memory. Storage allows exclusive write (Queue only) and concurrent read (Handler)
- Frontend — way to allow to add new messages/queues/subscribers to system
- Space — Top level container for all queues.
Implementation details
Parented mode
To make Parented mode possible every message has WaitGroup, which allows to synchronize concurrent goroutines
WaitGroup lives in sync package and it has few methods:
Decrease internal counter by one
wg.Done()
Increase internal counter by (1)
wg.Add(1)
Wait till internal counter hits 0
wg.Wait()
Without going into details of message lifecycle, this code shows how to implement waiting for parent using WaitGroup.
func (m *simpleMsg) Process(wgParent *sync.WaitGroup, callback Callback, endpoints []string) { defer m.Waiter.Done() if wgParent != nil { log.Printf("I have parent so I need to wait %v", m.Id) wgParent.Wait() log.Printf("My parent finished let me proceed %v", m.Id) } log.Printf("Processing %v", m.Id) callbackWg := sync.WaitGroup{} for _, endpoint := range endpoints { callbackWg.Add(1) go callback(m.Id, endpoint, &callbackWg) } callbackWg.Wait() m.Delivered = true log.Printf("Finished %v", m.Id)}
Assuming Process method is invoked as goroutine:
go m.Process(parent.GetWaiter(), q.handler, q.subscribers)
Go takes care of making synchronization work. If message has parent it stops execution at :
wgParent.Wait()
and sleeps until parent finishes.
While every message can become parent to another, I ensure that it will notify its children:
defer m.Waiter.Done()
Handling messages is also implemented concurrently:
for _, endpoint := range endpoints { callbackWg.Add(1) go callback(m.Id, endpoint, &callbackWg)}callbackWg.Wait()
In case of N subscribers callbackWg will wait for N times called
callbackWg.Done()
Space frontend
Space should be frontend agnostic, meaning Space doesn’t care if message/queue/subscriber was added using HTTP/gRPC/WebSecket/etc. In Golang this can be achieved using write only channels :
Space provide any frontend with three write only channels:
newMessages chan<- MessagenewQueues chan<- QueuenewSubscribers chan<- Subscriber
Adding new fronted to spaces requires to write to one of channel
universe, newMsgCh, newQueuesCh, newSubscribersCh := space.New(handlers.HttphandleMessageFactory)go universe.Start(&wg)go frontend.HttpStart(&wg, newMsgCh, newSubscribersCh, newQueuesCh)go config.Bootstrap(&wg, newMsgCh, newSubscribersCh, newQueuesCh)
HTTP frontend listens to new message/queue/subscriber on HTTP
config frontend bootstraps initial queues and subscribers based on yml file.
Conclusions
Whole code can be found here:
Messaging systems are complicated :)
You need to keep attention to many details. There are some topics like
- not delivered messages
- retention policy
- fail recovery procedure
which in most cases we take for granted.
Golang is great :)
Goroutines + whole sync package allow to write heavily concurrent code almost forgetting how hard concurrency is. Don’t get me wrong I spent few nights debugging some goroutines but still it is just fun to do.
Learning need stuff is fun :)
And not only fun but also practical. After finishing this project I write my C# interfaces thinking more about behavior not data.
Let me know
Please let me know what do you think about this project, where do you see C# code in Golang, how would you do it simpler/better/more elegant.