In this post, we will talk about some exciting and powerful use cases of event-driven systems that can be solved using RabbitMQ and SNS - SQS combo.
Say we have several microservices running in production, and each service encloses a business entity.
- We want to build a system where any microservice can subscribe to change on any business entity.
- The system should be flexible enough that the subscribers can subscribe to specific actions on the entity like
CREATE
, UPDATE
and DELETE
.
- Any new subscription should be created with minimal code change and no infra change.
- Every message should have metadata about the producer, the type of the entity and the action on the entity.
E.g., In an e-commerce domain, a notification service is responsible for sending notifications across various platforms, and it wants to subscribe to all the changes on an Order
such as CREATE
or UPDATE
.
Contrarily, another service may only want to subscribe to a change when a User
is DELETED
from the system.
- Working knowledge of RMQ, SNS, SQS
- Docker
- Go
- AWS account
Amazon Simple Notification Service (Amazon SNS) is a fully managed messaging service for service-to-service and service-to-person communication.
The pub/sub functionality provides topics for high-throughput, push-based, many-to-many messaging between microservices and event-driven serverless applications.
Below is an example of user service
, which is publishing a message to SNS
when a user
is updated
.
We will be publishing a message to an SNS topic with message attributes such as
- Name of the service which published the message:
userservice.
- Type of the entity:
user.
- Action:
updated
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package main
import (
"encoding/json"
"log"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)
// User simulates the User in a system
type User struct {
Name string `json:"name"`
}
func main() {
topicArn := "arn:aws:sns:ap-south-1:*:events"
// Initialize a session that the SDK will use to load
// credentials from the shared credentials file. (~/.aws/credentials).
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Profile: "dinesh", // my AWS profile
}))
svc := sns.New(sess)
raw, err := json.Marshal(&User{
Name: "Dinesh",
})
if err != nil {
log.Fatal(err)
}
jsonStr := string(raw)
result, err := svc.Publish(&sns.PublishInput{
Message: &jsonStr,
TopicArn: &topicArn,
// hardcoding is bad practice. It is just for demonstration
// we are likely to store the service name in ENV, get the struct name using reflection and send action based on the action performed
MessageAttributes: getMessageAttributes("userservice", "user", "update"),
})
if err != nil {
log.Fatal(err)
}
log.Println(*result.MessageId)
}
// getMessageAttributes returns message attributes which enclose the sender of the event, entity name, action
func getMessageAttributes(sender, entityName, action string) map[string]*sns.MessageAttributeValue {
attributes := make(map[string]*sns.MessageAttributeValue)
stringType := "String"
attributes["sender"] = &sns.MessageAttributeValue{
DataType: &stringType,
StringValue: &sender,
}
attributes["entity"] = &sns.MessageAttributeValue{
DataType: &stringType,
StringValue: &entityName,
}
attributes["action"] = &sns.MessageAttributeValue{
DataType: &stringType,
StringValue: &action,
}
return attributes
}
|
SNS subscription offers a subscription mechanism to send messages to SQS
.
Filter policies are the cherry on top, allowing an application to filter specific messages based on attributes.
These filter policies are just regular JSON
text. A consumer who wishes to consume events from userservice
when a user
is updated
or created
should add the below filter policy.
1
2
3
4
5
6
7
8
9
10
11
12
|
{
"entity": [
"user"
],
"sender": [
"userservice"
],
"action": [
"update",
"create"
]
}
|
Amazon Simple Queue Service (SQS) is a fully managed message queuing service.
SQS offers two types of message queues.
- Standard queues offer maximum throughput, best-effort ordering, and at least-once delivery.
- SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order they are sent.
Once a service sets up SQS with a subscription and filter policy. We would need the queue URL to consume. Below is an example of consumer messages from SQS
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
package main
import (
"context"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
func GetQueueURL(sess *session.Session, queue *string) (*sqs.GetQueueUrlOutput, error) {
svc := sqs.New(sess)
urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: queue,
})
if err != nil {
return nil, err
}
return urlResult, nil
}
func GetMessages(sess *session.Session, queueURL *string, timeout , waitSeconds *int64) (*sqs.ReceiveMessageOutput, error) {
// Create an SQS service client
svc := sqs.New(sess)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*timeout+5))
defer cancel()
msgResult, err := svc.ReceiveMessageWithContext(ctx,
&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: queueURL,
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: timeout,
WaitTimeSeconds: waitSeconds,
})
if err != nil {
return nil, err
}
return msgResult, nil
}
func main() {
// Create a session that gets credential values from ~/.aws/credentials
// and the default region from ~/.aws/config
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Profile: "dinesh",
}))
// Get URL of queue
queueName := "events"
urlResult, err := GetQueueURL(sess, &queueName)
if err != nil {
log.Fatal(err)
}
queueURL := urlResult.QueueUrl
waitSeconds, timeout := int64(20), int64(60)
for {
log.Println("polling for messages")
msgResult, err := GetMessages(sess, queueURL, &timeout, &waitSeconds)
if err != nil {
if err == context.DeadlineExceeded {
continue
}
log.Fatal(err)
}
for _, msg := range msgResult.Messages {
log.Println("Message Body : ", *msg.Body)
}
}
}
|
RMQ open source message broker. It supports various protocols such as AMQP 0-9-1, STOMP, MQTT, and AMQP 1.0. It also comes with the support of web-based monitoring built-in. The most powerful is the dynamic message routing capability using topic
exchange.
1
|
docker run -d --hostname rmq --name rmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq:3-management
|
We were replicating the same thing we did for SNS using RMQ.
Below is an example which creates a topic
exchange called events
. It also publishes a message to exchange with a routing key in the format <service>.<entityname>.<action>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
)
type User struct {
Name string `json:"name"`
}
func main() {
conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "Failed to open a channel", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"events", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare an exchange", err)
}
user := User{"Dinesh"}
raw, err := json.Marshal(&user)
if err != nil {
log.Fatalf("%s: %s", "Failed to marshal message", err)
}
err = ch.Publish(
"events", // exchange
"userservice.user.updated", // routing key <service name>.<entity name>.<action>
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/json",
Body: raw,
})
if err != nil {
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Println("Message published successfully")
}
|
The robust feature of the topic
exchange is that while binding the queue, we can specify wild card characters delimited by .
.
A consumer wanting to subscribe to an event from userservice
when a user
is created, updated or deleted will use the routing key userservice.user.*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"events", // name
amqp.ExchangeTopic, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
q, err := ch.QueueDeclare(
"notificationservice-userevents", // name
true, // durable, survives broker restart
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare queue", err)
}
routingKey := "userservice.user.*"
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "events", routingKey)
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
"events", // exchange
false,
nil)
if err != nil {
log.Fatalf("%s: %s", "Failed to bind queue to exchange", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to consume", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" Message Body : %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
|
Some things in the code are deliberately left out in the interest of time and complexity.