RabbitMQ and Google Pub/Sub are both powerful and reliable message queue implementations, and if you need to pick one of them for your Google Cloud Platform (GCP) project, the choice may not be simple. We recently started on a new project where we narrowed down our choices to these two products. This blog post is the second part of the summary of our analysis and our choice.

In the first part of this blog post, we compared the building blocks of RabbitMQ and Pub/Sub. Now it’s time to see some practical examples.

Implementations of Frequently Used Queue Topologies

In this section, we’ll take a look at a few frequently used queue topologies and implement them in both RabbitMQ and Pub/Sub.

The examples require the following:

Simple Messaging

This is the simplest possible scenario, where two components communicate asynchronously via a message queue. One component publishes messages to the queue, and the other component consumes them.

Simple Messaging with RabbitMQ

RabbitMQ’s default direct exchange comes in handy here: We can define just one queue, the publisher has to specify this queue as the routing key, and the default exchange will route the messages to the queue.

publisher to queue to consumer
Simple messaging Rabbit MQ

Let’s declare the queue and send a message. Note that the queue declaration also checks if the queue exists on the broker and creates it if it doesn’t exist:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class Send {

  private static final String QUEUE_NAME = "queue-1";

  public static void main(String... args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {
      boolean durable = false;
      boolean exclusive = false;
      boolean autoDelete = false;
      channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
      String message = "This is a message for you 😀";

      channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
      System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

Now set up the consumer. We’re declaring the queue here the same way we do in the publisher. That means the queue will exist regardless of which application — the publisher or the subscriber — connects to the broker first:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class Receive {
  private static final String QUEUE_NAME = "queue-1";

  public static void main(String... args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    boolean durable = false;
    boolean exclusive = false;
    boolean autoDelete = false;
    channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback =
        (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
          System.out.println(" [x] Received '" + message + "'");
        };
    boolean autoAck = true;
    channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
  }
}

Simple Messaging with Pub/Sub

In Pub/Sub, topics and subscriptions can’t exist without each other, and there are no default topics. That means the simplest topology we can define must consist of one topic with one subscription as its output.

publisher to topic to subscription to consumer
Simple messaging Pub/Sub

Let’s set up the topology using the gcloud pubsub commands.

First, we have to create the topic and the subscription. Later, we’ll see how to do this using the Pub/Sub API:

gcloud pubsub topics create topic-1
gcloud pubsub subscriptions create subscription-1 --topic topic-1

Now that the topic exists, we can send a message:

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.concurrent.TimeUnit;

public class SimplePublisher {
  public static void main(String... args) throws Exception {
    String projectId = "my-gcp-project";
    String topicId = "topic-1";

    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      publisher = Publisher.newBuilder(topicName).build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

And since the subscription is set up, we can also consume the message:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SimpleConsumer {
  public static void main(String... args) throws Exception {
    String projectId = "my-gcp-project";
    String subscriptionId = "subscription-1";

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());

      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {

      subscriber.stopAsync();
    }
  }
}

Worker Thread-Style Processing

In this scenario, we need to process all the messages arriving from a single producer as fast as possible. The messages don’t have to be processed in a strict sequence, which means we can speed up the processing by employing multiple worker threads (consumers).

Workers with RabbitMQ

We can set this topology up the same way as in the previous example: The publisher puts the messages on one single queue, and the consumers take it off this one queue.

publisher to queue to multiple consumers
Worker style processing RabbitMQ

Workers with Pub/Sub

Here, the topology setup is the same as in the previous example: The publisher sends messages to the topic, and the workers subscribe to the subscription to receive the messages.

publisher to queue to multiple consumers
Worker style processing Pub/Sub

Broadcasting to Multiple Consumers

In this scenario, we want all our consumers to get all the messages from the publisher.

Broadcasting with RabbitMQ

Fanout exchanges exist for this purpose: They send a copy of the message they received on their input to all the queues connected to them.

publisher to fanout exchange to multiple queues
Broadcasting with a fanout exchange in RabbitMQ

Let’s create the publisher. The declaration of the exchange works the same way as the queue declaration: It creates the exchange in the broker if it doesn’t already exist. However, this will throw an exception if we’re trying to declare an exchange that already exists on the broker with a different type:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class BroadcastSender {

  private static final String exchangeName = "exchange-fanout";

  public static void main(String... args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {

      channel.exchangeDeclare(exchangeName, "fanout");

      String message = "This is a message for all of you 😀";

      channel.basicPublish(exchangeName, "", null, message.getBytes(StandardCharsets.UTF_8));
      System.out.println(" [x] Sent '" + message + "'");
    }
  }
}


The consumer declares both the exchange and the queue and sets up the binding between them:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class BroadcastReceiver {

  private static final String queueName01 = "queue-1";
  private static final String exchangeName = "exchange-1";

  public static void main(String... args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {
      boolean durable = false;
      boolean exclusive = false;
      boolean autoDelete = false;

      channel.exchangeDeclare(exchangeName, "fanout");
      channel.queueDeclare(queueName01, durable, exclusive, autoDelete, null);
      channel.queueBind(queueName01, exchangeName, "");

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback =
          (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
          };
      boolean autoAck = true;
      channel.basicConsume(queueName01, autoAck, deliverCallback, consumerTag -> {});
    }
  }
}

Broadcasting with Pub/Sub

Topics broadcast the messages they receive to all the subscriptions connected to them, which means we can simply hang more subscriptions onto our topic to broadcast the same messages to all the consumers.

publisher to topic to multiple subscriptions
Broadcasting with multiple subscriptions in Pub/Sub

Message Classification

In this scenario, we want to selectively process messages with two consumers: One processes the “success” messages, and one processes the “failure” messages.

Classification with RabbitMQ

In RabbitMQ, we can use a direct exchange, and to implement the classification, we have to configure bindings with the appropriate routing keys.

publisher to direct exchange to multiple queues
Classification based on routing key with RabbitMQ

Classification with Pub/Sub

In Pub/Sub, topics broadcast messages indiscriminatingly to all the subscriptions subscribing to them. But subscriptions can be configured to filter messages and only retain the ones matching the filtering criteria.

publisher to exchange to multiple queues with filtering
Classification using filtering with Pub/Sub

These subscriptions should be set up in a different way:

gcloud pubsub subscriptions create success-subscription \ 
  --topic=topic-1 \ 
  --message-filter='attributes.result="success"'

We can also set up the topic and the subscription using the Pub/Sub API. However, for this, our service account must be configured in IAM as Pub/Sub Admin, which is a level of elevation that’s not necessarily a good idea for a client application (you probably don’t want your message publisher application to also be able to delete all the topics or subscriptions):

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

public class CreateTopicWithSubscription {
  public static void main(String... args) throws Exception {

    String projectId = "my-gcp-project";
    String topicId = "topic-1";
    String subscriptionId = "success-subscription";
    TopicName topicName = TopicName.of(projectId, topicId);
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      Topic topic = topicAdminClient.createTopic(topicName);
      System.out.println("Created topic: " + topic.getName());
    } catch (AlreadyExistsException aee) {
      System.out.println(topicName + " already exists");
    }

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      Subscription subscriptionRequest =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setFilter("attributes.result=\"success\"")
              .setAckDeadlineSeconds(10)
              .setPushConfig(PushConfig.getDefaultInstance())
              .build();

      Subscription subscription = subscriptionAdminClient.createSubscription(subscriptionRequest);
      System.out.println("Created pull subscription: " + subscription.getName());
    } catch (AlreadyExistsException aee) {
      System.out.println(subscriptionName + " already exists");
    }
  }
}

Event Bus Implementation

In this example, we create a simplistic microservice architecture where services communicate with each other via message broadcasting. All services are publishers and subscribers at the same time, so any message one service publishes has to be received by all the other services, but it shouldn’t get sent to the publisher (the publisher isn’t interested in its own message).

Event Bus with RabbitMQ

All publishers send their messages to the same exchange. The exchange is then bound to the queues with multiple direct bindings — each binding for the publisher that the subscriber is interested in.

multiple publishers to direct exchange to multiple queues
Event bus with RabbitMQ

To simplify the example, we take the service ID as a command line parameter. The sender only has to declare the exchange:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class RabbitEventBusSender {

  private static final String exchangeName = "broadcast-exchange";

  public static void main(String... args) throws Exception {

    String senderId = args[0];

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {

      channel.exchangeDeclare(exchangeName, "direct");

      String message = "A message from " + senderId;

      channel.basicPublish(exchangeName, "", null, message.getBytes(StandardCharsets.UTF_8));
      System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

On the consumer side, we declare both the exchange and the queue, and we define the bindings:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;

public class RabbitEventBusConsumer {

  private static final String exchangeName = "broadcast-exchange";

  public static void main(String... args) throws Exception {

    Set<String> otherAppIds = new HashSet<>(Set.of("yellow", "green", "red"));

    String appId = args[0];

    otherAppIds.remove(appId);

    String queueName = appId + "-output-queue";

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {
      boolean durable = false;
      boolean exclusive = false;
      boolean autoDelete = false;

      channel.exchangeDeclare(exchangeName, "direct");
      channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);

      for (String otherAppId : otherAppIds) {
        channel.queueBind(queueName, exchangeName, otherAppId);
      }

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback =
          (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
          };
      boolean autoAck = true;
      channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
    }
  }
}

Event Bus with Pub/Sub

In this setup, all publishers send messages to the same topic. The subscriptions can then selectively drop their own messages.

multiple publishers to topic to subscriptions with filters
Event bus with Pub/Sub

Let’s set up the new subscriptions:

gcloud pubsub subscriptions create red-subscription \ 
  --topic=topic-1 \ 
  --message-filter='attributes.sender!="red"'

gcloud pubsub subscriptions create yellow-subscription \ 
  --topic=topic-1 \ 
  --message-filter='attributes.sender!="yellow"'

gcloud pubsub subscriptions create green-subscription \ 
  --topic=topic-1 \ 
  --message-filter='attributes.sender!="green"'

For simplicity’s sake, we take the service ID as a command line parameter here as well:


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.concurrent.TimeUnit;

public class PubSubEventBusPublisher {
  public static void main(String... args) throws Exception {
    String senderId = args[0];

    String projectId = "my-gcp-project";
    String topicId = "topic-1";

    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      publisher = Publisher.newBuilder(topicName).build();

      String message = "This is a message from " + senderId;
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(data).putAttributes("senderId", senderId).build();

      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

And here’s our consumer that will also take the service ID as a command line parameter:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class PubSubEventBusConsumer {
  public static void main(String... args) throws Exception {
    String appId = args[0];

    String projectId = "my-gcp-project";
    String subscriptionId = appId + "-subscription";
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());

      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Integrating with the Messaging Solution

Let’s see how we can authorize our own applications and third-party applications to use our message broker. And once we have access, we’ll have to see if we have any client libraries that we can use to integrate with the queues quickly.

Integrating with RabbitMQ

RabbitMQ Provisioning

RabbitMQ Cluster (multiple RabbitMQ server instances working together as one service with redundancy) is available from the Google Cloud Marketplace. With a few clicks or via command line tools, you can easily set up a RabbitMQ cluster with Kubernetes. Once it’s set up, you have to manage it the same way you would any other Kubernetes cluster.

RabbitMQ Administration Tools

The RabbitMQ broker can be administered extensively via its web management interface.

If you’d rather use command line tools, there are many shipped with the broker.

Access Control in RabbitMQ

Publishers and subscribers have to use either username/password or X.509 certificates to connect.

Username/password-based authentication has to be set up using the management console or the CLI. Here’s how to create a user via CLI.

RabbitMQ is a multi-tenant system, so resources (queues, exchanges, bindings, etc.) and the access to them are grouped under virtual hosts.

Let’s create a new user with the CLI:

rabbitmqctl add_user "gary" "ban@na$"

And we’ll give them full access to the virtual host:

# First ".*" for configure permission on every entity
# Second ".*" for write permission on every entity
# Third ".*" for read permission on every entity
rabbitmqctl set_permissions -p "minions-vhost" "gary" ".*" ".*" ".*"

RabbitMQ Libraries

RabbitMQ has official client libraries for several languages. Due to its popularity, the developer community created unofficial client libraries for additional languages. An up-to-date collection of the developer-contributed libraries and tools is available on the official RabbitMQ website.

Integrating with Pub/Sub

Pub/Sub Provisioning

Pub/Sub is available for any Google Cloud Project, and separate provisioning isn’t needed. If you want to integrate your applications with Pub/Sub, you have to enable the Cloud Pub/Sub API for your project.

Pub/Sub Administration Tools

Pub/Sub can be administered the same way as any other Google Cloud module: You can use either the web console or the Google Cloud SDK command line tools.

Access Control in Pub/Sub

Publishers and subscribers need to have valid Google Cloud Credentials to use the resources. It’s highly recommended to use service accounts when accessing Pub/Sub via the API. Service accounts can be created in the web console or programmatically with the Cloud SDK.

Pub/Sub topics and subscriptions belong to Google Cloud projects. General access to all queues can be administered on the project level, but access control can also be individually managed per topic or subscription.

Here’s how we create a new service account with Cloud SDK:

gcloud config set project my-gcp-project-name
gcloud iam service-accounts create minion-pubsub

And here’s how we make our new service account a publisher:

gcloud projects add-iam-policy-binding my-gcp-project-name --member="serviceaccount:minion-pubsub@my-gcp-project-name.iam.gserviceaccount.com" --role="roles/pubsub.publisher"

Pub/Sub Libraries

Google maintains a lot of libraries that you can use to integrate with Google Cloud APIs — including the Pub/Sub API. These libraries save you a lot of time that you’d otherwise spend implementing the HTTP protocol, and they’re available for quite a few programming languages.

Some Unique Features

There are features in both message broker implementations that are unique to their respective platforms. These are a few of them that we found interesting and that played an important role in our decision making.

Unique RabbitMQ Features

Publisher Confirms

This feature lets the publisher know exactly what happened with each message. For example, if an exchange is unable to route a message to any queue, it can return the unrouteable message to the publisher.

Message Priorities

Messages can be given a priority level, and if the queue is configured to support it, the messages get delivered according to their priorities.

Implementing a Multitude of Protocols

Advanced Message Queuing Protocol (AMQP) is just one message queueing protocol supported by RabbitMQ, but with the other protocol implementations provided as RabbitMQ plugins, we have a lot more flexibility when our application has to be integrated with third-party applications using specific message queue protocols.

Unique Pub/Sub Features

Push Delivery with Secure Callbacks

Pub/Sub push subscriptions offer multiple layers of security for push-style message delivery. First, the callback endpoint provided by the consumer must be an HTTPS endpoint with a valid SSL certificate issued by a CA. Additionally, push subscriptions can be configured to provide an additional authorization header to allow the callback endpoint to authenticate the request.

Regional Storage

GCP services are available in different service regions, and Pub/Sub isn’t an exception. By default, message storage is enabled without regional limitations; however, topics can be configured to limit storage to specific regions. This limitation is inherited by the subscriptions of these topics. In this way, sensitive data can be kept in certain data centers.

Message Ordering

If messages have the same ordering key and are in the same region, they’ll be delivered in the exact order they arrived in. However, redeliveries may still happen in this case.

Batching

Client libraries automatically batch messages to increase message throughput. However, when minimizing message latency is more important than throughput, then batching can be switched off.

Conclusion

If you’re looking for a product with comprehensive development documentation, both of these products feature lots of resources and practical code snippets in the most popular programming languages, so integrating your code with either of them won’t be a big headache. Still, the community knowledge of Pub/Sub seems to be a lot lower, and our experience was that if something isn’t answered by the official documentation, it may also be difficult to find solutions from fellow developers on Stack Overflow.

RabbitMQ is definitely a winner if you’re looking for a message broker that implements standard message queueing protocols and if you need to integrate with third-party applications.

If the service-level agreement (SLA) is an important factor, Pub/Sub is a clear winner, because it’s the only message broker solution that comes with support and SLAs.

In the end, we decided on Pub/Sub because we needed a message broker with SLA, and we didn’t need to integrate with third-party applications.