EnMasse


Documentation for EnMasse master


Connecting applications to EnMasse


1. Address Model

The EnMasse address model involves three distinct concepts:

  • types of address spaces

  • types of addresses within each address space

  • available plans

1.1. Address Space

An address space is a group of addresses that can be accessed through a single connection (per protocol). This means that clients connected to the endpoints of an address space can send messages to or receive messages from any address it is authorized to send messages to or receive messages from within that address space. An address space can support multiple protocols, which is defined by the address space type.

1.2. Address

An address is part of an address space and represents a destination used for sending and receiving messages. An address has a type, which defines the semantics of sending messages to and receiving messages from that address.

1.3. Plans

Both address spaces and addresses can be restricted by a plan, which enforces a limit on resource usage across multiple dimensions. <tag for upstream only>Note that the set of plans currently offered might be extended in the future, and the constraints imposed by a plan within an address space might change as operational experience is gained.</tag for upstream only>

Address Space Plans

Each address space has a plan that restricts the aggregated resource usage within an address space. Each address space type can translate the plan into a set of restrictions, for example, the ability to scale up to five routers or to create up to 10 addresses. These restrictions are documented within each address space.

Address Plans

The usage of each address is also constrained by a plan. Each address type translates the plan into a set of restrictions, for example, up to five consumers or up to 100 messages per hour. These restrictions are documented within each address type.

2. Address Space

The only currently supported address space is standard.

2.1. Standard Address Space

The default address space in EnMasse is the standard address space and it consists of an AMQP router network in combination with attachable storage units. The implementation of a storage unit is hidden from the client and the routers with a well-defined API. This address space type is appropriate when you have many connections and addresses. However, it has the following limitations: no transaction support, no message ordering, no selectors on queues, and no message groups.

Clients connect and send and receive messages in this address space using the AMQP or MQTT protocols. Note that MQTT does not support qos2 or retained messages.

Address Types

The standard address space supports four address types:

  • queue

  • topic

  • anycast

  • multicast

Queue

The queue address type is a store-and-forward queue. This address type is appropriate for implementing a distributed work queue, handling traffic bursts, and other use cases where you want to decouple the producer and consumer. A queue can be sharded across multiple storage units, in which case message order is no longer guaranteed.

Queue Plans
  • inmemory

  • persisted

  • pooled-inmemory

  • pooled-persisted

In memory

Creates a standalone broker cluster for queues. Messages are not persisted on stable storage.

Persisted

Creates a standalone broker cluster for queues. Messages are persisted on stable storage.

Pooled in memory

Schedules queues to run on a shared broker cluster, reducing overhead. Messages are not persisted on stable storage.

Pooled persisted

Schedules queues to run on a shared broker cluster, reducing overhead. Messages are persisted on stable storage.

Topic

The topic address type supports the publish-subscribe messaging pattern where you have 1..N producers and 1..M consumers. Each message published to a topic address is forwarded to all subscribers for that address. A subscriber can also be durable, in which case messages are kept until the subscriber has acknowledged them.

Topic Plans
  • inmemory

  • persisted

In memory

Creates a standalone broker cluster for topics. Messages are not persisted on stable storage.

Persisted

Creates a standalone broker cluster for topics. Messages are persisted on stable storage.

Anycast

The anycast address type is a scalable direct address for sending messages to one consumer. Messages sent to an anycast address are not stored, but forwarded directly to the consumer. This method makes this address type ideal for request-reply (RPC) uses or even work distribution. This is the cheapest address type as it does not require any persistence.

Anycast Plans
  • standard

Multicast

The multicast address type is a scalable direct address for sending messages to multiple consumers. Messages sent to a multicast address are forwarded to all consumers receiving messages on that address. It is important to note that only pre-settled messages can be sent to multicast addresses, as message acknowledgements from consumers are not propagated to producers.

Multicast Plans
  • standard

2.2. Brokered Address Space

The brokered address space is designed to support broker-specific features, at the cost of limited scale in terms of the number of connections and addresses. This address space supports JMS transactions, message groups, and so on.

Clients connect and send and receive messages in this address space using the AMQP protocol.

Address types
  • queue

  • topic

Queue

The queue address type is a store-and-forward queue. This address type is appropriate for implementing a distributed work queue, handling traffic bursts, and other use cases where you want to decouple the producer and consumer. A queue in the brokered address spaces supports selectors, message groups, transactions and other JMS features. If the queue is a high volume queue and these semantics are not needed, see the standard address space queue type.

Only a standard plan is available for queues.

Topic

The topic address type supports the publish-subscribe messaging pattern where you have 1..N producers and 1..M consumers. Each message published to a topic address is forwarded to all subscribers for that address. A subscriber can also be durable, in which case messages are kept until the subscriber has acknowledged them.

Only a standard plan is available for topics.

3. Connecting to EnMasse

To connect to the messaging service from outside the openshift or kubernetes cluster, TLS must be used with SNI set to specify the fully qualified hostname for the address-space. The port used is 443.

The messaging protocols supported depends on the type of the address-space. At present AMQP and MQTT are supported.

TODO: Add information about retrieving the CA certificate and using that in the client examples.

TODO: Add information about authentication.

3.1. Client Examples

Simple examples are shown here for the following clients:

  • Apache Qpid Proton Python

  • Apache Qpid JMS

  • Rhea JavaScript Client

  • Apache Qpid Proton C++

  • AMQP.Net Lite

These all assume you have created an address of type 'queue' named 'myqueue'.

TODO: add links for all these clients

Apache Qpid Proton Python
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class HelloWorld(MessagingHandler):
    def __init__(self, server, address):
        super(HelloWorld, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(self.server)
        event.container.create_receiver(conn, self.address)
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):
        event.sender.send(Message(body="Hello World!"))
        event.sender.close()

    def on_message(self, event):
        print(event.message.body)
        event.connection.close()

Container(HelloWorld("amqps://<messaging-route-hostname>:443", "myqueue")).run()
Apache Qpid JMS
package org.apache.qpid.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        try {
            // The configuration for the Qpid InitialContextFactory has been supplied in
            // a jndi.properties file in the classpath, which results in it being picked
            // up automatically by the InitialContext constructor.
            Context context = new InitialContext();

            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");

            Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);

            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);

            if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
            } else {
                System.out.println("No message received within the given timeout!");
            }

            connection.close();
        } catch (Exception exp) {
            System.out.println("Caught exception, exiting.");
            exp.printStackTrace(System.out);
            System.exit(1);
        }
    }

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}

with jndi.properties:

connectionfactory.myFactoryLookup = amqps://<messaging-route-hostname>:443?transport.trustAll=true&transport.verifyHost=false
queue.myQueueLookup = myqueue
Rhea JavaScript Client
var container = require('rhea');
container.on('connection_open', function (context) {
    context.connection.open_receiver('myqueue');
    context.connection.open_sender('myqueue');
});
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.on('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
    context.sender.detach();
});
container.connect({port:443, host:<messaging-route-hostname>, transport:tls, rejectUnauthorized:false});
Apache Qpid Proton C++

The C client has equivalent simple_recv and simple_send examples with the same options as python. However, the C library does not perform the same level of processing on the URL; in particular it won’t take amqps:// to imply using TLS, so the example needs to be modified as follows:

#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/ssl.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>

#include <iostream>

#include "fake_cpp11.hpp"

class hello_world : public proton::messaging_handler {
  private:
    proton::url url;

  public:
    hello_world(const std::string& u) : url(u) {}

    void on_container_start(proton::container& c) OVERRIDE {
        proton::connection_options co;
        co.ssl_client_options(proton::ssl_client_options());
        c.client_connection_options(co);
        c.connect(url);
    }

    void on_connection_open(proton::connection& c) OVERRIDE {
        c.open_receiver(url.path());
        c.open_sender(url.path());
    }

    void on_sendable(proton::sender &s) OVERRIDE {
        proton::message m("Hello World!");
        s.send(m);
        s.close();
    }

    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
        std::cout << m.body() << std::endl;
        d.connection().close();
    }
};

int main(int argc, char **argv) {
    try {
        std::string url = argc > 1 ? argv[1] : "<messaging-route-hostname>:443/myqueue";

        hello_world hw(url);
        proton::default_container(hw).run();

        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}
AMQP.Net Lite
using System;
using Amqp;

namespace Test
{
    public class Program
    {
        public static void Main(string[] args)
        {
            String url = (args.Length > 0) ? args[0] : "amqps://<messaging-route-hostname>:443";
            String address = (args.Length > 1) ? args[0] : "myqueue";

            Connection.DisableServerCertValidation = true;
            Connection connection = new Connection(new Address(url));
            Session session = new Session(connection);
            SenderLink sender = new SenderLink(session, "test-sender", address);

            Message messageSent = new Message("Test Message");
            sender.Send(messageSent);

            ReceiverLink receiver = new ReceiverLink(session, "test-receiver", address);
            Message messageReceived = receiver.Receive(TimeSpan.FromSeconds(2));
            Console.WriteLine(messageReceived.GetBody<String>());
            receiver.Accept(messageReceived);

            sender.Close();
            receiver.Close();
            session.Close();
            connection.Close();
        }
    }
}