What is a Message Queue?

A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer. Message queues can be used to decouple heavyweight processing, to buffer or batch work, and to smooth spiky workloads.

Benefits of Messaging Queues:

  • You can design applications using small programs that you can share between many applications.
  • You can quickly build new applications by reusing these building blocks.
  • Applications written to use message queuing techniques are not affected by changes in the way that queue managers work.
  • You do not need to use any communication protocols. The queue manager deals with all aspects of communication for you.
  • Programs that receive messages need not be running at the time that messages are sent to them. The messages are retained on queues.

What Is Anypoint MQ?

Anypoint MQ is an enterprise multi-tenant, cloud messaging queuing service that is fully integrated with Anypoint Platform.

With Anypoint MQ, users can perform advanced asynchronous messaging scenarios such as queuing and pub/sub with fully hosted and managed cloud message queues and exchanges. A service of Anypoint Platform, Anypoint MQ supports environments, business groups, role-based access control (RBAC) to help you deliver seamless customer experiences across channels and integrate devices reliably for Internet of Things (IoT) applications with enterprise-class functionality.

Features of Anypoint MQ:

  • It is a cloud messaging service that performs asynchronous messaging scenarios between applications
  • It supports large payloads (upto ~10MB)
  • It enables easy connection to non-mule applications using REST APIs
  • It provides a special feature for Dead Letter Queue (DLQ)

Limitations of Anypoint MQ:

  • Anypoint MQ is not available in trial account, we need Enterprise subscription to use it.
  • It is available only for the Cloudhub hosted applications and not for the APIs hosted on-prem server.
  • Regions and environment specific.
  • Two or more instances cannot retrieve the same message from a particular queue at the same time.
  • Sometimes we cannot expect quick reflection in Anypoint MQ user interface, delay occurs.

Types of Queues in Anypoint MQ:

  1. Queue – Standard queue
  2. Exchange – Broadcaster
  3. FIFO Queue – First In First Out
  4. Dead Letter Queue

Queue:

  • It is a standard kind of queue which has storage of messages; an application/service can send and receive messages in unordered manner.
  • Standard queues are the best fit for applications in which messages must be delivered quickly.
  • There is no limitation on the number of messages in a queue.
  • From the API, we can pick a maximum of 1,20,000 messages randomly at a time.
  • From platform UI, we can view a maximum of 50 messages at a time.

Exchange:

  • Used to broadcast a message to one or more standard queues that are bound to the message exchange.
  • We can bind a maximum of 450 queues to a message exchange.
  • A maximum of 10 queues could be bind to an exchange to use the intelligent messaging routing.

 FIFO:

  • It is like a standard queue, but messages arrive in order.
  • FIFO queues are the best fit for applications requiring strict message ordering and exactly once delivery, but in which message delivery speed is of less importance.
  • Anypoint MQ supports 10 in-flight messages per message group in a FIFO queue.
  • FIFO queues provide “exactly once” message-delivery reliability. Anypoint MQ delivers each message to only one consumer.

Dead Letter Queue (DLQ)

  • The dead-letter queue or (undelivered-message queue) is the queue to which messages are sent if they cannot be routed to their correct destination.
  • Few things to remember before creating a DLQ.

The DLQ and parent queue both need to be:

  1.   Same type of queue (standard or FIFO)
  2.  Created in the same geographical region
  3.  Created in the same environment and owned by the same Anypoint Platform account

Stages of Queues:

  1. In-Queue: It holds all the messages got published into a queue. It is ready to process these messages whenever needed.
  2. In-Flight: Once subscriber start consuming the queue, the messages will move from ‘In-Queue’ state to ‘In-Flight’ state. Messages under ‘In-Flight’ are locked and cannot be fetched by other subscribers.

Connectors and Operations on Queues:

The Anypoint MQ connector provides five operations:

  1. Publish: To publish the message to the Queue. It is a process connector.
  2. Subscriber: To subscribe to the queue and listen for the receiving message. It is a source connector.
  3. Consumer: To consume a message from the queue. It is used to consume the message whenever a flow is trigger and is a process connector.
  4. Ack: To send an acknowledgement to the queue about the successful consumption of the message and delete the message from in-flight status.
  5. Nack: To send negative acknowledgement to the queue stating the consumed message is not processed successfully; changes the status of the message from in-flight to in-queue to be consumed again by an available consumer.

Message Acknowledgment

Automatic: By default, Subscriber source uses the AUTO acknowledgment mode. With this mode, the messages that the Subscriber source retrieves are acknowledge automatically after message flow processing succeeds.

Immediate: When you use the IMMEDIATE acknowledgment mode, the consumed message is acknowledged right before being dispatched to the MuleSoft flow.

Manual: When you use the MANUAL acknowledgment mode, the application logic decides when to perform the acknowledgment of the message, using the ACK or NACK sources. To perform the manual acknowledgment, you need the value of ack token provided as part of the resulting message attributes.

Circuit Breaker

The Subscriber source provides circuit breaking capability, which enables you to control how the connector handles errors that occur while processing a consumed message.

For example, when connecting to an external service, you can use the circuit breaker to handle any downtime of that service. The circuit breaker allows the system to stop making requests and allows the external service to recover under a reduced load.

The circuit breaker has three states:

  1. Closed: Normal retrieval and processing of message based on the subscriber strategy.
  2. Half Open: Retrieve a single message and attempt to process it.
  3. Open: Scheduled message fetch are skipped and no more messages are processed.

Configuring the Circuit Breaker

You can configure a Circuit Breaker as either

  • A Global Circuit Breaker or
  • A Private Circuit Breaker.

Either way, the configuration parameters are the same:

onErrorTypes

The error types that count as a failure during the flow execution. An error occurrence counts only when the flow finishes with an error propagation. By default, all errors count as a circuit failure.

errorsThreshold

The number of onErrorTypes errors that must occur for the circuit breaker to open.

tripTimeout

How long the circuit remains open once errorsThreshold is reached.

Example for Private Circuit Breaker (inline):

Creating new queue:

Let’s create a sample queue for demonstration.

Login to Anypoint Platform → Open MQ (Below Runtime Manager)

Note: Anypoint MQ is not available on trial accounts.

After clicking on MQ, we will get the below screen:

To create a queue, we need to select a region. It is good to select a region that is geographically close, or we can select any region. 

Note: We can’t move the queue from one region to another.

In the above image, we can see the region is selected as US East (N. Virginia) and the URL for that is written next to it. This URL is important, and it is different for each region, and we require it for Anypoint MQ Connector configuration in Anypoint Studio.

If we click the icon visible in the above image, we can see the option to create queue/exchanges.

Select Queue option, a create queue window will pop-up, and we need to fill the details.

Here, TTL means time to live. It defines how long the message is in the queue before Anypoint MQ expires the message and removes it from the queue.

Default Acknowledgement Timeout is how long a message remains unacknowledged before being returned to the queue.

Once we fill the details and click Create Queue, the queue gets created.

We can see the new queue with the name test-queue. If you click on the queue, you can see a dashboard on the RHS that shows a graphical view of the messages of the queue and queue details.

Queue is successfully created. To access the queue, we need to create client apps, which will provide us with a unique client ID and password to use this newly created queue.

Click on Client Apps then click on the icon on the right:

A pop-up will open where we need to give the name of the app and then click Save Changes.

After providing the client app name when we click Save Changes, we will get a client app ID and client secret for this client app as shown in below image.

Now, we will use these details in Anypoint Studio and see how this MQ can be used in the flow.

Publish and Subscribe Operation:

We need to open Anypoint Studio and add Anypoint MQ Connector from the exchange, as it is not present in the Mule palette by default.

Once the connector is added in the palette, we start designing the sample API. We will create a flow for publishing the message to the queue.

We will create the API as shown below:

Here, we are getting a message from the client through the HTTP listener. Then, we log that message in the logger and publish it to Anypoint MQ. Finally, in the logger, we are displaying a success message stating that the message is published successfully.

We drag and drop the publish Anypoint MQ component from the palette to the canvas and do the config as below.

We can see that the URL is the URL of the region in which the Queue is created and copy the client app ID and client secret from before. Here I am using it from the properties file and which are encrypted in nature.

After filling the details, we can try to for test connection. If everything is okay, we will get a success message.

This means we are set to go ahead. In the publish Anypoint MQ Connector, give the name of the queue in which we want to publish the message.

Note: The queue name we provide in the Destination field must be present.

Error Types of Connector:

ANYPOINT- MQ:ACKING

ANYPOINT- MQ:TIMEOUT

ANYPOINT- MQ:NACKING

ANYPOINT- MQ:PUBLISHING

ANYPOINT- MQ:CONSUMING

ANYPOINT- MQ:CONNECTIVITY

ANYPOINT- MQ:ILLEGAL_BODY

ANYPOINT- MQ:RETRY_EXHAUSTED

ANYPOINT- MQ:DESTINATION_NOT_FOUND

Now, we will try to run the API and publish the message to the queue.

Once the publish to queue is successful, you could check the queue to view the message.

Go to the Anypoint Platform MQ and select the queue (test-queue):

 

Select Message Browser and click the Get Message button shown below to see the message available in the queue for processing:

We can see the same message in the queue.

Subscriber:

To read the published message from the queue, we could use the subscriber connector.

This enables the app to listen for new messages and consume them as they arrive at the destination.

By default, the Subscriber source works in continuous listening mode.

In studio, we create a flow for getting the message from the queue. Here, we are using the same Anypoint MQ connector Configuration we did for the publish operation, and in the queue tab of the subscriber, we give the name of the queue from which we want to get the messages (in this case, test-queue).

Here, we have received the message from the queue and logger has printed it.

Logs:

INFO  2022-10-28 15:52:32,453 [[MuleRuntime].uber.04: [anypoint-mq-demo].demo-anypoint-mq-subscriber-main-flow.CPU_LITE @5cc85414] [processor: demo-anypoint-mq-subscriber-main-flow/processors/0; event: 97d18ad1-4df6-24eb-b744-834d5a35631b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: “Message received from Queue is ” {

“Input”: “This is a sample Message”

}

 Working of Dead Letter Queue Flow:

Loading

Subscribe To Our Newsletter

Subscribe To Our Newsletter

Join our mailing list to receive the latest news and updates from our team.

You have Successfully Subscribed!