Skip to main content

gRPC based Message Broker

· 19 min read
Kinjal Raykarmakar

Cover

I was learning about gRPC and was quite fascinated by it's capabilities. By looking at it's Server-side streaming RPC, I immediately thought about it's use as a Message Broker. Hence, I built a toy message broker! This article talks about my learnings and how I built Kaho-aur-Suno in Go.

Talk is cheap?

So, here's the code

Theory

Before we jump into our IDE and get our hands dirty with code, let's take a moment to explore some key concepts about Message Brokers and gRPC. Now, you might be thinking, "I already know this stuff," and that's totally okay! If you're eager to dive right in, feel free to skip ahead to our section on Go Essentials to freshen up on some essential Go techniques we'll be using. Or, if you're itching for the main event, head straight to Building Kaho-aur-Suno for the real adventure!

Message Broker

So, what exactly is a message broker? Well, think of it as a friendly intermediary that allows various applications to chat with each other. These apps could be speaking different programming languages or chilling on different platforms, but thanks to a trusty message broker, they can still have a smooth conversation.

Let's say we have Application A and Application B. Now, Application A can send messages to the message broker, acting like a virtual post office. Then, like magic, Application B can pick up those messages whenever it's ready, all thanks to the power of asynchronous communication.

There are primarily two types of message broker patterns:

  • Point-to-point messaging
  • Publish/subscribe messaging

First up, Point-to-Point messaging is all about direct communication. Just like the name suggests, it connects one sender to one receiver. Imagine it as a direct line between two parties. When a sender sends a message through the broker, only the designated receiver can grab it. It's like passing a note in class directly to your friend!

On the other hand, Publish/Subscribe messaging takes a more inclusive approach. Here, when a sender shares a message via the broker, it can reach multiple receivers waiting eagerly for that message. It's like broadcasting your exciting news to a room full of friends, and everyone who's interested gets to hear it.

tip

Then, what are Message Queues?

Well, sometimes the terms Message Queue and Message brokers are used interchangeably, as they closely define the same thing. However, Message Brokers are the systems which allows communication of messages and Message queues are just a sub-component of it. Message Queues hold the message within the broker to make sure it's persistent and transmissions are reliable.

info

For building Kaho-aur-Suno, we'll use the Publish/Subscribe pattern and without any persistence

gRPC

First, what is RPC? RPC (Remote Procedure Call) is a protocol that allows a client to directly call a procedure or function on a server. What's impressive is that this can happen even if the procedure is located remotely. RPC makes it possible for clients to invoke these procedures just as easily as if they were local function calls. gRPC is an open-source and cross-platform RPC framework developed by Google.

There are four types of gRPC services:

  • Unary RPCs: The client sends a request and the server sends back a response, just like a simple procedure call. This is analogous to traditional RESTful APIs.

  • Server streaming RPCs: The client sends a request to the server and the server replies with a stream of data. The server can keep streaming as long as it wants. The client reads till there's no more data to read

  • Client streaming RPCs: The client sends out a stream of data and the server replies with a singular response.

  • Bidirectional streaming RPCs: The client initiates the request and then streams the data. The server can too stream data back. Both these streams are independent of eath other.

tip

There's a lot more gRPCs can do and I'm still learning them. However, I'll keep it till here for this article! If you want to learn more, head to the official docs: Documentation | gRPC

Protocol Buffers

Protocol Buffers are used by gRPC to describe various request/response payloads and gRPC services. What makes Protocol Buffers truly versatile is their agnostic nature when it comes to programming languages. They can be defined independently, devoid of any specific programming language bias. However, once defined, they can be compiled into code that can be utilized across a wide array of supported programming languages. This compiled code serves a vital purpose, as it helps structure data in the respective programming languages, ensuring seamless integration and compatibility within the gRPC ecosystem.

Learning to write a protocol buffer comes up with practice. It's not like a new programming langauge you need to learn. This is an exhaustive starting point for writing protocol buffers: Language Guide (proto 3) | Protocol Buffers Documentation

tip

Sit tight! We'll see example of Protocol Buffers later in this article

Go Essentials

Learning a little bit about some Go techniques we'll be using here. If you are familiar with these, feel free to skip to Building Kaho-aur-Suno

Local Go Packages

In the world of Go programming, each program you create is like a neatly wrapped package, and within that package are other smaller packages waiting to be unwrapped. When working with Go, we often find ourselves reaching out to the vast repository of packages available online, or sometimes nested within our own codebase directories. These packages act as building blocks, allowing us to piece together complex functionalities.

However, here I'm trying to import a locally available package from outside your codebase. The package is ready to be used and is present in some other directory in your system and not published online. To do this, we'll need to use the replace keyword in out go.mod file. This can be done through the command line like this:

go mod edit -replace=github.com/kinjalrk2k/MySideProject/library/go=../library/go

Now you can import github.com/kinjalrk2k/MySideProject/library/go in your project and it'll point to the package present in ../library/go

Remember to run go mod tidy for Go to pick up the package from the right path and you'll see your go.mod file gets updated to something like this

go.mod
module github.com/kinjalrk2k/MyBrandNewProject

go 1.21.4

replace github.com/kinjalrk2k/MySideProject/library/go => ../library/go

require (
github.com/kinjalrk2k/MySideProject/library/go v0.0.0-00010101000000-000000000000
)

...

Go Channels

Channels are unique to Go for dealing with concurrency. Using Channels you can send data from one Go routine (lightweight threads in Go) and receive the data back in another Go routine. Channels are typed and bi-directional in nature. Take a look into the following example:

package main

import "fmt"

func main() {
// creating a channel of type string
channel := make(chan string)

go func() {
// sending a message to the channel
channel <- "boo"
}()

// receiving the message here
fmt.Println(<-channel)
}

By default the sending statements block the program execution till someone receives from the channel. Similarly, the receiving statements will block the program execution till someone sends in the channel. It's interesting to note that in the above example, the receiving statement actually waits for the anonymous Go routine to push something to the channel before exiting the program. This ensures synchronization!

Building Kaho-aur-Suno

"Kaho aur Suno" (Hindi: कहो और सुनो, English: "Say and Listen") is a delightful showcase of a gRPC-based message broker. It provides examples of both the broker server and clients, all elegantly written in Go. It's important to note that this application is intended as a Proof of Concept and not suitable for production use. In the upcoming sections, we'll take a step-by-step journey to build Kaho-aur-Suno together! So, get ready to explore the fascinating world of gRPC and message brokering in Go. Let's dive in!

info

This article will give you an idea of how things work under the hood. For actual code, please refer to the GitHub Repository here. I'll suggest opening up the repository while reading this article for better following along. Many places in this article, you'll find just the function signatures or code snippets. For the full code, please refer the repository!

Writing protobufs

tip

Following along with the codebase? Check out the file: proto/broker.proto

Before getting started with the actual code, we need to plan out the protobufs and write them out. Defining high level requirements here: For a message broker, we first need to define how a message will look like, followed by how different requests/responses for server and clients.

We first need to add some boilerplate code for our protobuf

broker.proto
syntax = "proto3"; // using version 3

// this will define the go package that'll be created once this protobuf is compiled
option go_package = "github.com/kinjalrk2k/Kaho-aur-Suno/lib/go/proto/broker";

Next up, let's take a closer look at how messages are structured. Within a message broker, there can be multiple topics, each serving to categorize and direct messages to their intended recipients. Additionally, every message must have a unique identifier within the system for proper identification and tracking. "Baat" (Hindi: बात, English: talk/word/matter/thing etc) is the structure that'll wrap our message with some metadata.

broker.proto
message Baat {
string topic = 1;
string id = 2; // UUID
string message = 3; // the actual message
string created_at = 4; // timestamp
}

To publish a message, we simply send a request to the gRPC service. This request includes the message we want to publish, along with specifying the topic where we want the message to be delivered.

broker.proto
message KahoRequest {
string topic = 1;
string message = 2;
}

When it comes to listening or subscribing to messages, selecting the right topic is crucial. This ensures that the request aligns perfectly with what we're seeking.

broker.proto
message SunoRequest {
string topic = 1;
}

Finally, let's define the service with two RPC methods for publishing and subscribing. Note that the Kaho procedure is a Unary RPC, while the Suno procedure is a Server Streaming RPC!

broker.proto
service Broker {
rpc Kaho (KahoRequest) returns (Baat);
rpc Suno (SunoRequest) returns (stream Baat);
}

Building Proto Package

We're all set with the protobufs, and now it's time to generate the gRPC code! When we generate the code, we get two types of essential components:

  • Stubs: These are like your trusty sidekicks on the client's end. They simplify the RPC calls by handling all the serialization and deserialization behind the scenes. So, the client can focus on using straightforward methods to make those calls.

  • Skeletons: On the server's side, we have the skeletons. These skeletons handle the incoming RPC calls and set up the processing flow. However, they're not filled with your business logic just yet. It's up to you, the developer, to extend these skeletons and weave in your unique business logic.

To build the Proto packages for Go, run this command:

protoc \
--go_out=lib/go \
--go-grpc_out=lib/go \
--grpc-gateway_out=lib/go \
--go_opt=module=github.com/kinjalrk2k/Kaho-aur-Suno/lib/go \
--go-grpc_opt=module=github.com/kinjalrk2k/Kaho-aur-Suno/lib/go \
--grpc-gateway_opt=module=github.com/kinjalrk2k/Kaho-aur-Suno/lib/go \
--proto_path=proto \
**/*.proto

This creates a lib/go directory which looks like this:

.
└── proto
└── broker
├── broker.pb.go
└── broker_grpc.pb.go
tip

Take a peek at the file: scripts/protogen.sh in the codebase for more information. Depending on your package names and directory structure, you may need to adjust the command above to better fit your requirements!

info

I designed the protogen as a convenient package for seamless integration into both my server and client-side code. To ensure easy accessibility, I've organized the package code within lib/go. This structure sets the stage for future expansions, allowing the project to broaden its horizons and support additional programming languages through dedicated directories like lib/js, lib/py, and more.

Follow along for details about the go package

I cleverly assembled the protobufs and place them in the lib/go/proto directory. This step sets the stage for transforming the lib/go directory into a fully-fledged package! Follow these commands while you are in the lib/go directory to craft your Go package:

lib/go
go mod init github.com/kinjalrk2k/Kaho-aur-Suno/lib/go
go mod tidy
touch lib.go; echo "package lib" >> lib.go

Server

Now that the protobufs and the proto package are ready to go, let's shift our focus to building the server. In this step, we'll implement the gRPC skeleton we discussed earlier and configure the networking for the server. Before getting started just us quickly lay the boilerplate for the server codebase and install the local proto package we just built. In the server directory run this

server
go mod init github.com/kinjalrk2k/Kaho-aur-Suno/server
go mod edit -replace=github.com/kinjalrk2k/Kaho-aur-Suno/lib/go=../lib/go
go mod tidy
caution

go mod tidy might not install/point the package correctly when you are starting off with an empty project because this package is not yet used in your codebase. Do run the same command later once you've used this in any piece of code!

Service and topics

To start creating our message broker service, we need to extend the un-implemented struct from out proto package. It should typically be named like UnimplementedBrokerServer.

type BrokerService struct {
broker.UnimplementedBrokerServer
Topics map[string][]chan *broker.Baat
}

Extending the structure now will simplify our implementation of methods from the skeleton later on. It's important to note that Topics represents a map of channel arrays. Array, as there might be multiple subscribers trying to listen a topic. We've chosen to use a channel here because it will facilitate data transmission from the publish method to the subscribe method in the future. Moreover, this channel is designed to handle messages of the broker.Baat type, as defined in our message structure from protobuf. By utilizing a pointer of broker.Baat, we enhance efficiency in data passing and also establish a convenient way to handle potential nil values down the line.

Kaho (Publish) Procedure

As we delve deeper into our proto package, we'll stumble upon a method eagerly waiting for us to implement.

lib/go/proto/broker/broker_grpc.pb.go
func (UnimplementedBrokerServer) Kaho(context.Context, *KahoRequest) (*Baat, error) {
return nil, status.Errorf(codes.Unimplemented, "method Kaho not implemented")
}

That's our Kaho procedure and we need to attach our business logic there by overriding it

// function signature comes from the proto package
func (b *BrokerService) Kaho(ctx context.Context, in *broker.KahoRequest) (*broker.Baat, error) {
id := uuid.New().String()

baatBolo := broker.Baat{
Topic: in.Topic,
Id: id,
Message: in.Message,
CreatedAt: time.Now().Format(time.RFC3339),
}

// fire a go routine to send message to all listening channels
go func() {
for _, ch := range b.Topics[baatBolo.Topic] {
ch <- &baatBolo
}
}()

return &baatBolo, nil
}

The critical component here is the anonymous Go routine. It tirelessly cycles through all the channels listed and sends the message to each one. Why? Because we want to ensure that our message reaches every single one of our subscribers. Remember, Topics is as a map, with each key representing a different topic. To paint a clearer picture, let's take two topics, 'one' and 'two'. In the Topics map, they would be represented like so:

{
"one": [chan 1, chan 2, ...],
"two": [chan 1, ...]
}

Suno (Subscribe) Procedure

Alright, we've tackled the task of publishing messages. Now, the next step is ensuring they reach the right subscribers. In a similar vein, we'll dive into another procedure here that needs implementation.

lib/go/proto/broker/broker_grpc.pb.go
func (UnimplementedBrokerServer) Suno(*SunoRequest, Broker_SunoServer) error {
return status.Errorf(codes.Unimplemented, "method Suno not implemented")
}

That's our Suno procedure and we need to attach our business logic there by overriding it

// function signature comes from the proto package
func (b *BrokerService) Suno(req *broker.SunoRequest, stream broker.Broker_SunoServer) error {
// started listening. so, create a channel and append it to the list
var ch = make(chan *broker.Baat)
b.Topics[req.Topic] = append(b.Topics[req.Topic], ch)

// this go routine checks whether the context is disconnected from the client side
go func() {
<-stream.Context().Done()
ch <- nil
}()

// infinite loop
for {
baatSuno := <-ch
if baatSuno != nil {
stream.Send(baatSuno)
} else { // oh, the client disconnected
b.RemoveChannel(req.Topic, ch) // remove the channel (see codebase for the method)
break
}
}

return nil
}

In this process, we start by extracting the topic that the subscriber wishes to hear from their request. Next, we establish a fresh channel for the client and associate it to this topic. As the client connection ends, we intentionally send a nil value down this channel. Meanwhile, within the infinite loop, we continuously stream whatever message we receive from the channel, back to the client. If we detect that the client has disconnected, we promptly remove their channel from our list and halt the loop. This approach ensures smooth and efficient communication while handling client disconnections seamlessly.

Setting up the server

Our BrokerService is all set and primed for action! Now, let's get the networking configured to unveil the gRPC service, making it accessible to the world. It all begins with establishing a straightforward TCP connection, bound to a specific port. For our setup, that port happens to be 8000.

var address = "0.0.0.0" + ":" + "8000"

log.Println("Starting TCP Listener")
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatal("failed to listen: ", err)
}

This is followed by setting up of the gRPC server. While registering our BrokerService, we make sure our Topics map is also initialized correctly

server := grpc.NewServer()
reflection.Register(server) // server reflection

broker.RegisterBrokerServer(server, &BrokerService{
Topics: make(map[string][]chan *broker.Baat),
})
tip

gRPC Server Reflection

Server Reflection is a powerful tool that aids gRPC clients in visualizing how procedure calls to the server are structured. It essentially acts as an automatic API documentation resource, offering valuable insights into the server's capabilities. As we move forward to test our project using Postman in the upcoming section, Server Reflection will prove to be a valuable tool

Client

All done at the server side, lets jump to the client side for some testing!

Postman

Let's kick things off with a simple step using Postman. What's great about Postman is that it's not limited to just Web APIs; it also supports gRPCs, GraphQL, Websockets, and more. Remember that Server Reflection setup we did earlier? Well, it's like having a handy assistant in Postman that automatically loads the procedures we need, making our testing process smoother than ever!

Server Reflection

Server Reflection in Postman

Server Reflected

Server Reflection and viewing Procedures

Exciting news! The procedures are now loaded in Postman, ready for action. In the video below, you'll witness the seamless process of subscribing to the Suno procedure and utilizing the Kaho procedure to publish messages.

Video demonstrating testing through Postman

While Postman can be handy for swiftly testing your implementation, it's not typically what you'll rely on for real-world use cases. In practical scenarios, you'll likely need to integrate and use this service programmatically. To guide you through setting up the client programmatically, let's walk through a demo of Subscriber and Publisher in Go. This hands-on approach will give you a solid foundation for working with these services in a more automated and systematic manner.

Subscriber

tip

When the gRPC Proto package is built, it automatically generates client-side stubs for us. These stubs are incredibly useful for making requests. If your client-side implementation resides in a separate standalone project, ensure that you install the local proto package just like we did before.

// getting the topic
topic := os.Args[1]

// establish TCP connection
log.Printf("Subscribing to Topic: %v\n", topic)
conn, err := grpc.Dial("localhost:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal("failed to listen: ", err)
}

// setup gRPC client
client := broker.NewBrokerClient(conn)

// Suno procedure call
stream, err := client.Suno(context.Background(), &broker.SunoRequest{Topic: topic})
if err != nil {
log.Fatal("failed to stream: ", err)
}

// keep receiving the stream
for {
baat, err := stream.Recv()
if err == io.EOF { // the end!
break
}
if err != nil {
log.Fatal("failed to read stream", err)
}
log.Println(baat)
}

To keep things simple, we start by fetching the topic name from the command line parameters. Next, we establish a TCP connection with the server. With this connection in place, we create the gRPC client, which we'll use to initiate gRPC procedure calls. Once these initial steps are completed, we launch into an infinite loop to continuously read from the incoming stream sent by the server. When we detect the end of the stream, we gracefully stop the application.

Publisher

It's time to get those messages out there! Luckily, publishing them is a breeze.

// establish TCP connection
conn, err := grpc.Dial("localhost:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal("failed to listen: ", err)
}

// setup gRPC client
client := broker.NewBrokerClient(conn)

// Kaho procedure call
baat, err := client.Kaho(context.Background(), &broker.KahoRequest{
Topic: "one",
Message: "Hello?",
})
if err != nil {
log.Fatal("failed to publish: ", err)
}
log.Println("Published: ", baat)

Just like setting up a subscriber, the initial steps involve establishing the TCP connection and configuring the gRPC client. Once that's done, invoking the gRPC procedure becomes as effortless as making any function call with client.Kaho.

Check out this video that demonstrates a publisher sending messages to two different topics, with corresponding subscribers tuning in to receive those messages.

Video demonstrating testing through clients

What have we done?

Here in this artcicle, we started off by understanding what a Message broker is and how gRPC works as a communication protocol betwen server and clients. We then touched upon few Go langauge essentials. Finally we brought in the development of a basic Message Broker Kaho-aur-Suno. We looked deep into the strageties of handling multiple topics and clients and the method of sending messages from publishers to subscribers.

This Message Broker is just a hobby project and not a production ready application at all. There are still multiple sectors in which we can improve it - like persisting messages, improved error handlings, message delivery acknowlegements, delayed delivery, etc. There are various industry grade Message Brokers like RabbitMQ, ActiveMQ, Google Pub/Sub etc. which implements such features!

Notes and Further Readings