EnMasse


Documentation for EnMasse master


Getting Started


1. EnMasse on OpenShift

This guide will walk through the process of setting up EnMasse on OpenShift with clients for sending and receiving messages.

1.1. Prerequisites

To install EnMasse, you need to have the OpenShift client tools. You can download the OpenShift Origin client from OpenShift Origin. EnMasse has been tested to work with the latest stable release of the OpenShift Origin Client.

If you do not have an OpenShift cluster available, see minishift for an example of how to run a local instance of OpenShift on your machine.

1.2. Installing EnMasse

Procedure
  1. Download one of the releases from https://github.com/EnMasseProject/enmasse/releases and unpack it.

  2. Deploy EnMasse using the deployment script provided in the release.

You can invoke the deployment script with -h to view a list of options.

Deploying EnMasse

Invoke the deployment script to deploy EnMasse

./deploy-openshift.sh -m "https://localhost:8443" -n enmasse 

This will create the deployments required for running EnMasse. Starting up EnMasse will take a while, usually depending on how fast it is able to download the docker images for the various components. In the meantime, you can start to create your address configuration.

1.3. Configuring addresses

Address types

EnMasse is configured with a set of addresses that you can use for messages. Currently, EnMasse supports 4 different address types:

  • Brokered queues

  • Brokered topics (pub/sub)

  • Direct anycast addresses

  • Direct broadcast addresses

See the address model for details. EnMasse also comes with a console that you can use for managing addresses. You can get the console URL by running the following command:

echo "https://$(oc get route -o jsonpath='{.spec.host}' console)"

You can also deploy the addressing config using the address controller API. See the restapi documentation for details on the resources consumed by the API. Here is an example config with all 4 variants that you can save to addresses.json:

{
  "apiVersion": "enmasse.io/v1",
  "kind": "AddressList",
  "items": [
    {
      "metadata": {
        "name": "myqueue"
      },
      "spec": {
        "type": "queue"
      }
    },
    {
      "metadata": {
        "name": "mytopic"
      },
      "spec": {
        "type": "topic"
      }
    },
    {
      "metadata": {
        "name": "myanycast"
      },
      "spec": {
        "type": "anycast"
      }
    },
    {
      "metadata": {
        "name": "mymulticast"
      },
      "spec": {
        "type": "multicast"
      }
    }
  ]
}

To deploy this configuration, you must currently use a http client like curl:

curl -X POST -H "content-type: application/json" --data-binary @addresses.json -k https://$(oc get route -o jsonpath='{.spec.host}' restapi)/apis/enmasse.io/v1/addresses/default

This will connect to the address controller REST API and deploy the address config.

1.4. Sending and receiving messages

Connecting with AMQP

For sending and receiving messages, have a look at an example python sender and receiver.

To send and receive messages, you should connect to the exposed route. To start a receiver, run:

./simple_recv.py -a "amqps://$(oc get route -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

This will block until it has received 10 messages. To start the sender, run:

./simple_send.py -a "amqps://$(oc get route -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

The server certificates is not verified in the above examples. To fetch the certificate, run:

mkdir -p certs
oc get secret external-certs-messaging -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt

You can modify the client code to use this cert to verify the server connection.

Have a look at connecting to EnMasse for more client examples.

Connecting using MQTT

For sending and receiving messages route, you can use the paho-mqtt client library. To connect, fetch the server certificate:

mkdir -p certs
oc get secret external-certs-mqtt  -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt
Subscriber client

Save the following to tls_mqtt_recv.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(opts.topic, int(opts.qos))

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Receive messages from the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("recv")
client.on_connect = on_connect
client.on_message = on_message
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

In order to subscribe to a topic (i.e. mytopic from the previous addresses configuration), the subscriber client can be used in the following way:

./tls_mqtt_recv.py -c "$(oc get route -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt
Publisher client

Save the following to tls_mqtt_send.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.publish(opts.topic, opts.message, int(opts.qos))

def on_publish(client, userdata, mid):
    print("mid: " + str(mid))
    client.disconnect()

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Sends messages to the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

parser.add_option("-m", "--message", default="Hello",
                  help="message to publish (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("send")
client.on_connect = on_connect
client.on_publish = on_publish
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

To start the publisher, the client can be used in the following way:

./tls_mqtt_send.py -c "$(oc get route -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt -m "Hello EnMasse"

The the publisher publishes the message and disconnects from EnMasse. The message is received by the previous connected subscriber.

1.5. Conclusion

We have seen how to setup EnMasse on OpenShift, and how to communicate with it using AMQP and MQTT clients.

2. EnMasse on Kubernetes

This guide will walk through the process of setting up EnMasse on a Kubernetes cluster together with clients for sending and receiving messages.

2.1. Prerequisites

To install EnMasse, you need to have Kubernetes installed. You can use minikube if you want to install EnMasse on your laptop.

2.2. Installing

Procedure
  1. Download one of the releases from https://github.com/EnMasseProject/enmasse/releases and unpack it.

  2. Deploy EnMasse using the deployment script provided in the release.

You can invoke the deployment script with -h to view a list of options.

Deploying EnMasse

Invoke the deployment script to deploy EnMasse

./deploy-kubernetes.sh -m "https://localhost:8443" -n enmasse 

This will create the deployments required for running EnMasse. Starting up EnMasse will take a while, usually depending on how fast it is able to download the docker images for the various components. In the meantime, you can start to create your address configuration.

Role Based Access Control (RBAC)

The Kubernetes deployment script and YAML files currently do not support Role Based Access Control (RBAC). In Kubernetes clusters which have RBAC enabled, it is required to additionally create a role binding for the default service account to the view role and for the enmasse-service-account to the cluster-admin role:

kubectl create clusterrolebinding enmasse-service-account-binding --clusterrole=cluster-admin --serviceaccount=enmasse:enmasse-service-account
kubectl create rolebinding default-view-binding --clusterrole=view --serviceaccount=enmasse:default -n enmasse

Note: The cluster-admin role gives the enmasse-service-account service account unlimited access to the Kubernetes cluster.

2.3. Deploying external load balancers

If you’re running EnMasse in your own Kubernetes instance on any of the cloud providers, you can deploy the external load balancer services to expose EnMasse ports:

kubectl apply -f kubernetes/addons/external-lb.yaml -n enmasse

If you are running in multitenant mode, exposing the restapi is sufficient:

kubectl apply -f kubernetes/addons/external-lb-restapi.yaml -n enmasse

2.4. Configuring addresses

Address types

EnMasse is configured with a set of addresses that you can use for messages. Currently, EnMasse supports 4 different address types:

  • Brokered queues

  • Brokered topics (pub/sub)

  • Direct anycast addresses

  • Direct broadcast addresses

See the address model for details. EnMasse also comes with a console that you can use for managing addresses. You can get the console URL by running the following command:

echo "https://$(kubectl get ingress -o jsonpath='{.spec.host}' console)"

You can also deploy the addressing config using the address controller API. See the restapi documentation for details on the resources consumed by the API. Here is an example config with all 4 variants that you can save to addresses.json:

{
  "apiVersion": "enmasse.io/v1",
  "kind": "AddressList",
  "items": [
    {
      "metadata": {
        "name": "myqueue"
      },
      "spec": {
        "type": "queue"
      }
    },
    {
      "metadata": {
        "name": "mytopic"
      },
      "spec": {
        "type": "topic"
      }
    },
    {
      "metadata": {
        "name": "myanycast"
      },
      "spec": {
        "type": "anycast"
      }
    },
    {
      "metadata": {
        "name": "mymulticast"
      },
      "spec": {
        "type": "multicast"
      }
    }
  ]
}

To deploy this configuration, you must currently use a http client like curl:

curl -X POST -H "content-type: application/json" --data-binary @addresses.json -k https://$(kubectl get ingress -o jsonpath='{.spec.host}' restapi)/apis/enmasse.io/v1/addresses/default

This will connect to the address controller REST API and deploy the address config.

2.5. Sending and receiving messages

Connecting with AMQP

For sending and receiving messages, have a look at an example python sender and receiver.

To send and receive messages, you should connect to the exposed route. To start a receiver, run:

./simple_recv.py -a "amqps://$(kubectl get ingress -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

This will block until it has received 10 messages. To start the sender, run:

./simple_send.py -a "amqps://$(kubectl get ingress -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

The server certificates is not verified in the above examples. To fetch the certificate, run:

mkdir -p certs
kubectl get secret external-certs-messaging -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt

You can modify the client code to use this cert to verify the server connection.

Have a look at connecting to EnMasse for more client examples.

Connecting using MQTT

For sending and receiving messages route, you can use the paho-mqtt client library. To connect, fetch the server certificate:

mkdir -p certs
kubectl get secret external-certs-mqtt  -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt
Subscriber client

Save the following to tls_mqtt_recv.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(opts.topic, int(opts.qos))

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Receive messages from the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("recv")
client.on_connect = on_connect
client.on_message = on_message
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

In order to subscribe to a topic (i.e. mytopic from the previous addresses configuration), the subscriber client can be used in the following way:

./tls_mqtt_recv.py -c "$(oc get route -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt
Publisher client

Save the following to tls_mqtt_send.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.publish(opts.topic, opts.message, int(opts.qos))

def on_publish(client, userdata, mid):
    print("mid: " + str(mid))
    client.disconnect()

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Sends messages to the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

parser.add_option("-m", "--message", default="Hello",
                  help="message to publish (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("send")
client.on_connect = on_connect
client.on_publish = on_publish
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

To start the publisher, the client can be used in the following way:

./tls_mqtt_send.py -c "$(kubectl get ingress -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt -m "Hello EnMasse"

The the publisher publishes the message and disconnects from EnMasse. The message is received by the previous connected subscriber.

2.6. Conclusion

We have seen how to setup a messaging service in Kubernetes, and how to communicate with it using python example AMQP clients.

3. Setting up EnMasse on AWS

This guide walks you through setting up EnMasse on an AWS EC2 instance. This is not even very specific to AWS, so you can probably modify the configuration to fit Microsoft Azure or even Google GCE.

The end result from this guide is an instance of EnMasse suitable for development and/or experimentation, and should not be considered a production ready setup. For instance, no persistence is configured, so neither messages in brokers nor state in other components like hawkular are persisted.

3.1. Prerequisites

First, you must have created an EC2 instance. EnMasse runs on OpenShift and Kubernetes, but this post uses OpenShift purely for convenience. Have a look at the OpenShift prerequisites for the required hardware configuration. The installation will be done using Ansible, so make sure Ansible is installed on laptop or workstation.

3.1.1. Configure Ansible to handle passwordless sudo

For EC2 instance, the default is a passwordless sudo, and Ansible (2.3.0.0 at the time of writing) requires a minor configuration modification to deal with that. On the host you will be running ansible from, edit /etc/ansible/ansible.cfg, and make sure that the sudo_flags parameter is set to -H -S (remove the -n).

3.2. Setting up OpenShift

Once Ansible is setup, installing OpenShift is easy. First, an inventory file with the configuration and the hosts must be created. Save the following configuration to a file, i.e. ansible-inventory.txt:

[OSEv3:children]
masters
nodes

[OSEv3:vars]
deployment_type=origin
openshift_master_identity_providers=[{'name': 'htpasswd_auth', 'login': 'true', 'challenge': 'true', 'kind': 'HTPasswdPasswordIdentityProvider', 'filename': '/etc/origin/master/htpasswd'}]
openshift_master_default_subdomain=<yourdomain>
openshift_public_hostname=openshift.<yourdomain>
openshift_hostname=<ec2 instance hostname>
openshift_metrics_hawkular_hostname=hawkular-metrics.<yourdomain>

openshift_install_examples=false
openshift_hosted_metrics_deploy=true

[masters]
<ec2 host> openshift_scheduleable=true openshift_node_labels="{'region': 'infra'}"

[nodes]
<ec2 host> openshift_scheduleable=true openshift_node_labels="{'region': 'infra'}"

This will configure OpenShift so that it can only be accessed by users defined in /etc/origin/master/htpasswd.

If you don’t have a domain with wildcard support, you can replace with .nip.io, and you will have a working setup without having a specialized domain.

You can now download the ansible playbooks. The simplest way to do this is to just clone the git repository:

git clone https://github.com/openshift/openshift-ansible.git

To install OpenShift, run the playbook like this

ansible-playbook -u ec2-user -b --private-key=<keyfile>.pem -i ansible-inventory.txt openshift-ansible/playbooks/byo/openshift-cluster/config.yml

This command will take a while to finish.

3.2.1. Creating a user

To be able to deploy EnMasse in OpenShift, a user must be created. Log on to your EC2 instance, and create the user:

htpasswd -c /etc/origin/master/htpasswd <myuser>

Where <myuser> is the username you want to use. The command will prompt you for a password that you will later use when deploying EnMasse.

3.3. Installing EnMasse

Procedure
  1. Download one of the releases from https://github.com/EnMasseProject/enmasse/releases and unpack it.

  2. Deploy EnMasse using the deployment script provided in the release.

You can invoke the deployment script with -h to view a list of options.

Deploying EnMasse

Invoke the deployment script to deploy EnMasse

./deploy-openshift.sh -m "https://openshift.yourdomain:8443" -n enmasse -u myuser

This will create the deployments required for running EnMasse. Starting up EnMasse will take a while, usually depending on how fast it is able to download the docker images for the various components. In the meantime, you can start to create your address configuration. followed the above guide, you should have EnMasse deployed. The endpoints will be:

* AMQP: `messaging-enmasse.<yourdomain>`
* MQTT: `mqtt-enmasse.<yourdomain>`
* Console: `console-enmasse.<yourdomain>`

The console can be used for creating and deleting addresses.

3.3.2. Sending and receiving messages

Connecting with AMQP

For sending and receiving messages, have a look at an example python sender and receiver.

To send and receive messages, you should connect to the exposed route. To start a receiver, run:

./simple_recv.py -a "amqps://$(oc get route -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

This will block until it has received 10 messages. To start the sender, run:

./simple_send.py -a "amqps://$(oc get route -o jsonpath='{.spec.host}' messaging):443/anycast" -m 10

The server certificates is not verified in the above examples. To fetch the certificate, run:

mkdir -p certs
oc get secret external-certs-messaging -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt

You can modify the client code to use this cert to verify the server connection.

Have a look at connecting to EnMasse for more client examples.

Connecting using MQTT

For sending and receiving messages route, you can use the paho-mqtt client library. To connect, fetch the server certificate:

mkdir -p certs
oc get secret external-certs-mqtt  -o jsonpath='{.data.tls\.crt}' | base64 -d > certs/tls.crt
Subscriber client

Save the following to tls_mqtt_recv.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(opts.topic, int(opts.qos))

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Receive messages from the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("recv")
client.on_connect = on_connect
client.on_message = on_message
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

In order to subscribe to a topic (i.e. mytopic from the previous addresses configuration), the subscriber client can be used in the following way:

./tls_mqtt_recv.py -c "$(oc get route -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt
Publisher client

Save the following to tls_mqtt_send.py or download:

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import ssl
import optparse

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.publish(opts.topic, opts.message, int(opts.qos))

def on_publish(client, userdata, mid):
    print("mid: " + str(mid))
    client.disconnect()

def on_log(client, userdata, level, string):
    print(string)

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Sends messages to the supplied address.")

parser.add_option("-c", "--connectHost", default="localhost",
                  help="host to connect to (default %default)")

parser.add_option("-p", "--portHost", default="8883",
                  help="port to connect to (default %default)")

parser.add_option("-t", "--topic", default="mytopic",
                  help="topic to subscribe to (default %default)")

parser.add_option("-q", "--qos", default="0",
                  help="quality of service (default %default)")

parser.add_option("-s", "--serverCert", default=None,
                  help="server certificate file path (default %default)")

parser.add_option("-m", "--message", default="Hello",
                  help="message to publish (default %default)")

opts, args = parser.parse_args()

client = mqtt.Client("send")
client.on_connect = on_connect
client.on_publish = on_publish
client.on_log = on_log

context = ssl.create_default_context()
if opts.serverCert == None:
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
else:
    context.load_verify_locations(cafile=opts.serverCert)

# just useful to activate for decrypting local TLS traffic with Wireshark
#context.set_ciphers("RSA")

client.tls_set_context(context)
client.tls_insecure_set(True)
client.connect(opts.connectHost, opts.portHost, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

To start the publisher, the client can be used in the following way:

./tls_mqtt_send.py -c "$(oc get route -o jsonpath='{.spec.host}' mqtt)" -p 443 -t mytopic -q 1 -s ./certs/tls.crt -m "Hello EnMasse"

The the publisher publishes the message and disconnects from EnMasse. The message is received by the previous connected subscriber.

3.4. (Optional) Setting up metrics

The process for setting up grafana is a bit more involved, but it gives you a nice overview of whats going on over time. First of all, I like to setup everything metric-related in the openshift-infra project. To do that, you must first give your user permission sufficient privileges. In this setup, since it’s not a production setup, I grant cluster-admin privileges for simplicity (requires logging into the ec2 instance):

oc adm --config /etc/origin/master/admin.kubeconfig policy add-cluster-role-to-user cluster-admin developer

With this in place, you can setup the hawkular-openshift-agent which pulls metrics from routers and brokers:

oc create -f https://raw.githubusercontent.com/openshift/origin-metrics/master/hawkular-agent/hawkular-openshift-agent-configmap.yaml -n openshift-infra
oc process -f https://raw.githubusercontent.com/openshift/origin-metrics/master/hawkular-agent/hawkular-openshift-agent.yaml IMAGE_VERSION=1.4.0.Final | oc create -n openshift-infra -f -
oc adm policy add-cluster-role-to-user hawkular-openshift-agent system:serviceaccount:openshift-infra:hawkular-openshift-agent

If everything is setup correctly, you can then deploy Grafana:

oc process -f https://raw.githubusercontent.com/hawkular/hawkular-grafana-datasource/master/docker/openshift/openshift-template-ephemeral.yaml -n openshift-infra | oc create -n openshift-infra -f -

After some time, Grafana should become available at oc get route -n openshift-infra -o jsonpath='{.spec.host}' hawkular-grafana. The default username and password is admin/admin. E

3.5. Summary

In this post, you’ve seen how to:

  • Deploy OpenShift on an AWS EC2 instance

  • Deploy EnMasse cloud messaging

  • Deploy Grafana for monitoring