The goal of this post is to show how to create a system in which the data collected by some sensors is sent, using the MQTT protocol, to the cloud and then shown to the final user.
As example will be used the project that can be found in the following GitHub repository, developed as the individual assignment for the Internet of Things 2021/2022 course of the Master in Engineering in Computer Science, La Sapienza Università di Roma.
As already mention the focus of the post is to show the network part of the project so all the aspects regarding how sensors and actuator are connected and what is the logic behind them will be skipped (the analysis of these aspects can be found on the github repository).
To achieve our goal the following things are needed:
- nucleo-f401re board: all the details about the board can be found on the official page of its manufacturer (on this Link). This board will be connected to a machine running linux
- RIOT OS: an embedded OS for IoT that provides several useful functions and tools to create, manage network connection and work with mqtt and message brokers (i.e ethos, emcute). (here is a link the official page)
- MOSQUITTO: a message broker that can use the MQTT-SN Protocol one of the most used protocol in IoT that allows us to use Publish/Subscribe mechanisms even with few resources, you can see how to properly setup the broker for our situation. (link the the official repository)
- TRANSPARENT BRIDGE a python script whose role is to work as a bridge between mosquitto and the cloud services
- AWS: cloud service provider whose role will be to store and then present the collected data to the user.
(NOTE: for sake of simplicity Mosquitto and the Transparent Bridge will both run on the same linux machine)
A more clear view of the overall system can be seen in the following image, where are also specified the protocol and the topic used to exchange messages between the several components. In the following the analysis will start from the nucleo-f401re board and will go on from left to right.
First of all a small digression about the IP protocol and other elements that will be used.
Due to the intrinsic nature of IoT that can be summed up, maybe a little improperly, with “a lot of devices all connected to the web” an approach with IPv4 is not the best choice due to the limited number of devices that can have a distinct public address. For this reason is common practice to use IPv6. Indeed, in this way is possible to assign a unique IP Address to all the devices. In the situation described here the above observation is relevant not with respect to the number of device connected to the system, only one is considered in the project, but with respect to the fact that the device is configured for IPv6. This means the board has a IPv6 Address that needs configuration. For this reason also the machine to which the board is connected will have an IPv6 Address.
Moreover to simplify the development process the ethos (ethernet over serial) mechanism provided by RIOT OS will be used, but this will require some more configuration on the device i.e the nucleo-f401re.
Now we will firstly analyze the general configuration of the board done in the Makefile and after the code needed on the nucleo to correctly set its interfaces.
Makefile: in this file you can find commands to pass to the compiler at compilation time, in particular the focus is on those instruction needed for network connectivity via USB:
USEMODULE += netdev_default #to include the default network stack
USEMODULE += auto_init_gnrc_netif #allows for the automatic configuration of the device, using some default options
USEMODULE += gnrc_ipv6_default #to build the board as a ipv6 node and not as a ipv6 router
USEMODULE += gnrc_netif_single #to optimize the use of the network interface
USEMODULE += stdio_ethos gnrc_uhcpc #to use the ethos mechanism
USEMODULE += sock_udp #needed to run emcute
USEMODULE += emcute #needed to work with message brokers
Then the flags and instructions to correctly set the board:
CFLAGS += -DCONFIG_GNRC_NETIF_IPV6_ADDRS_NUMOF=3 #allow us to specify the number of IP addressed of the interface (2 are generated automatically and the third one will be added by us)
IPV6_PREFIX ?= 2000:2::/64 #declaring the ipv6 prefix that we'll need later
STATIC_ROUTES ?= 1
UPLINK ?= ethos #specifying that the uplink will be reached using ethos
ETHOS_BAUDRATE ?= 115200 #parameter needed by ethos
CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE) #telling the compiler to set the the ethos_baudrate the the value just defined
Then for everything to work will need to add to the Makefile the following
TAP ?= tap0
host-tools:
$(Q)env -u CC -u CFLAGS $(MAKE) -C $(RIOTTOOLS)
TERMDEPS += host-tools
TERMPROG ?= sudo sh $(RIOTTOOLS)/ethos/start_network.sh
TERMFLAGS ?= $(FLAGS_EXTRAS) $(PORT) $(TAP) $(IPV6_PREFIX) $(ETHOS_BAUDRATE)
ifneq (,$(EMCUTE_ID))
CFLAGS += -DEMCUTE_ID=\"$(EMCUTE_ID)\"
endif
this will generate a virtual bridge with two virtual interfaces connected to it and then will launch the script that runs ethos connecting it the virtual bridge, and finally tells the compiler which is the emcute_id assigned to the board.
main.c:
Now that the basic configuration is done, the specific configuration done on the board after startup can be analyzed. In particular, we'll talk about what are the needed libraries, parameter and functions that makes the connection possible.
Starting from the libraries, we need to import:
#include "msg.h"
#include "net/emcute.h" #imports the functions needed to use mqtt-sn
#include "net/ipv6/addr.h" #imports functions needed to work with ipv6
addresses
Here a second digression is needed: the functions implemented by emcute are for the mqtt-sn protocol and not for the mqtt. Indeed mqtt-sn is a variation of the standard mqtt protocol and is specific for sensor networks. It was specifically designed for sensors that work in wireless and the main differences with mqtt are: limited payload size, is implemented to not always running and uses UDP instead of TCP (indeed in the makefile we imported all the modules to use UDP).
Then these parameter are needed:
#ifndef EMCUTE_ID
#define EMCUTE_ID ("nucleo-board")
#endif
#define EMCUTE_PRIO (THREAD_PRIORITY_MAIN - 1)
#define NUMOFSUBS (16U)
#define TOPIC_MAXLEN (64U)
#define MQTT_TOPIC_OUT "topic_out"
#define MQTT_TOPIC_IN "topic_in"
#define MQTT_QoS (EMCUTE_QOS_0)
#define SERVER_ADDR ("2000:2::1")
#define SERVER_PORT 1885
#define IPV6_PREFIX_LEN (64U)
#define DEFAULT_INTERFACE ("4")
#define DEVICE_IP_ADDR ("2000:2::2")
static char stack[THREAD_STACKSIZE_DEFAULT];
...
static emcute_sub_t subscriptions[NUMOFSUBS];
We can see three "blocks". In the first one we define those parameters needed by emcute in particular we have:
- EMCUTE_ID: define the id that the device will use when sending messages to the broker,
- EMCUTE_PRIO: is the priority that will be given to the thread that handles the messages published on the topic the board is subscribed to.
- NUMOFSUBS and TOPIC_MAXLEN are respectively the maximum number of subscription that the board can make and the maximum length that a topic’s name can have. This two parameters may seem not of much use, indeed it is the opposite. In an environment in which resources are very limited, the goal is to use as much resources as possible to perform computation and not spend them to keep "active" the subscriptions. For the same reason also the topic must have a name of an appropriate length. Indeed, even if there are few subscriptions admitted there is still the risk of allocating too much resources to simply store the name of the topic.
- MQTT_TOPIC_OUT and MQTT_TOPIC_IN are respectively the topics where the board publishes messages and the topic the board is subscribed to.
Instead in the second block there are more general information about the board and network configurations:
- SERVER_ADDR IPv6 address of the message broker
- SERVER_PORT port where to contact the message broker
- IPV6_PREFIX_LEN define the length of the IPv6 prefix, it is set to 64bit so that the system can use standard mechanism like EUI64 to create link local addresses, even if we do not use them. (here a link explaining how EUI64 works) Obviously both the nucleo-f401re board and the linux machine must have the same prefix length so to be in the same subnet.
- DEFAULT_INTERFACE is the network interface to which we will add a new IPv6 address
- DEVICE_IP_ADDR is the IPv6 address that will be assigned to the board
In the last block we define some data structure that will be used later:
- static char stack[THREAD_STACKSIZE_DEFAULT]; array that will be used as stack by the thread that handles emcute
- static emcute_sub_t subscriptions[NUMOFSUBS]; array of object representing the subscription to a topic in the broker. (Every element of this array is then set to zero in the main function)
WHY THE BOARD THE BOARD NEEDS A NEW IPv6 Address? The configuration of the new IPv6 address is needed due to a bug in the mosquitto message broker implementation. Indeed, the broker has issues with link local addresses (see this link for a little bit more details). So for the system to correctly work the board needs either a Site local or a Global Unicast Address, this means that if the board is communicating using one between these 2 types of addresses also the machine connected to the board will need the same type of address. So the IP address “2000:2::1” was chosen for the machine (where both the broker and the transparent bridge are running) and “2000:2::2” was chosen for the nucleo board.
So for everything to work and taking into consideration what already said on ethos and the tap interfaces, the main machine must have the address “2000:2::1 set on the interface tap0. (to set the ip you can use the following command on the terminal sudo ip a a 2000:2::1 dev tap0)
Functions and Code Implementation:- SETUP and CONNECTION TO BROKER
We can can go on analyzing the functions used to configure the interface of the board and connect to the broker:
static int address_setup(char *name, char *ip_address){
netif_t *iface = netif_get_by_name(name); //getting the interface
if(iface == NULL){
puts("No such Interface");
return 1;
}
ipv6_addr_t ip_addr;
uint16_t flag = GNRC_NETIF_IPV6_ADDRS_FLAGS_STATE_VALID | (IPV6_PREFIX_LEN << 8U); //setting flag
//Parsing IPv6 Address from String
if(ipv6_addr_from_str(&ip_addr, ip_address) == NULL){
puts("Error in parsing ipv6 address");
return 1;
}
//Set Interface Options
if(netif_set_opt(iface, NETOPT_IPV6_ADDR, flag, &ip_addr, sizeof(ip_addr)) < 0){
puts("Error in Adding ipv6 address");
return 1;
}
return 0;
}
In this function the following things are done:
- firstly we create an object representing the interface using the function “intnetif_get_name (netif_t *netif, char *name)” that given the name of the interface as a string, in this case DEFAULT_INTERFACE passed as the parameter name, it returns the interface object
- then set the flag used to check in the passed IPv6 address is valid and has the correct prefix
- then we call the function “ipv6_addr_t* ipv6_addr_from_str(ipv6_addr_t *result, const char * addr)” to turn the string received as parameter into a IPv6 address object (this object represents the address inside the code)
- in the end if all the previous function terminated with success, we add the address to the interface using the function "int netif_set_opt(netif_t *netif, netopt_t opt, uint16_t context, void *value, size_t value_len)".
Then we have a function that connects the board to the message broker:
static int connect_broker(void){
sock_udp_ep_t gw = { .family = AF_INET6, .port = SERVER_PORT }; //socket
char *topic = NULL;
char *message = NULL;
size_t len = 0;
//Parsing IPv6 Address from String
if (ipv6_addr_from_str((ipv6_addr_t *)&gw.addr.ipv6, SERVER_ADDR) ==NULL){
printf("error parsing IPv6 address\n");
return 1;
}
//Connecting to brokee
if (emcute_con(&gw, true, topic, message, len, 0) != EMCUTE_OK) {
printf("error: unable to connect);
return 1;
}
printf("Successfully connected to gateway);
//Setting Up the subscription
subscriptions[0].cb = on_pub;
subscriptions[0].topic.name = MQTT_TOPIC_IN;
//Subscribing to topic
if (emcute_sub(&subscriptions[0], MQTT_QoS) != EMCUTE_OK) {
printf("error: unable to subscribe);
return 1;
}
printf("Now subscribed to %s\n", subscriptions[0].topic.name);
return 0;
}
in this function the following things happen:
- first of all we create the UDP Socket
- then we check if the IPv6 Address passed as parameter is a valid one (using the same function as before)
- then using the function "int emcute_con( sock_udp_ep_t *remote, bool clean, const char *will_topic, const void *will_msg, size_t will_msg_len, unsigned flags)" we connect to the broker
- once we successfully connect to the broker we setup for the subscription's handling. In particular subscriptions[0].cb we are assigning to the first subscription the callback function on_pub to execute when a new message is published in subscription 0; instead subscriptions[0].topic.name is used to set the topic's name of subscription 0
- once the subscription setup is done we subscribe to the topic using the function "int emcute_sub (emcute_sub_t *sub, unsigned flags)"
Now if every operation has success the board is now connected to the broker.
- PUBLISHING AND HANDLE SUBSCRIPTION'S MESSAGES
Now we can analyze the other two main aspect of the system: publishing message on the broker and handling the messages published on the broker. These operation are handled in two different functions:
static int publish(char *t, char *message){
emcute_topic_t topic;
topic.name = t;
if(emcute_reg(&topic) != EMCUTE_OK){ //getting ID from the topic
printf("cannot find topic:%s", topic.name);
return 1;
}
//publishing on the broker
if(emcute_pub(&topic, message, strlen(message), MQTT_QoS) != EMCUTE_OK){
printf("cannot publish data\n");
return 1;
}
//printf("published message");
return 0;
}
This function is to publish messages and has the following steps:
- create a topic object assigning the name of topic on which we want to publish
- given the name we use the function "int emcute_reg (emcute_topic_t *topic)" to get the topic id
- in conclusion we use the function "int emcute_pub (emcute_topic_t *topic, const void *buf, size_t len, unsigned flags)" to publish the message passed as function parameter on the broker
Then we have the following function that handles when a message is published on a topic where the board is subscribed. (This function directly "act" in the logic inside the board, topic that will not be covered in this post). In this case the board simply receives the message as a string. It is important that the name of the function (on_pub) is the same that we specified when we set the callback function to call on subscription.
static void on_pub(const emcute_topic_t *topic, void *data, size_t len){
(void)topic;
(void)len;
char *in = (char *)data;
/**************************
INTERNAL HANDLING OF THE MESSAGE RECEIVED
**************************/
return;
}
Now we can simply call all these functions, except for the "on_pub", and other standard ones inside the main like follows:
int main(void){
//Setting memory for the subscription list
memset(subscriptions, 0, (NUMOFSUBS * sizeof(emcute_sub_t)));
//Starting Emcute Thread
thread_create(stack, sizeof(stack), EMCUTE_PRIO, 0, emcute_thread, NULL, "emcute");
// Adding GUA to the interface
if(address_setup(DEFAULT_INTERFACE, DEVICE_IP_ADDR)){
printf("Impossible to set up the interface\n");
return 1;
}
//Connecting to broker and subscribing to topics*/
if(connect_broker()){
printf("Impossible to Connect Correctly with the Broker\n");
return 1;
}
/********
SYSTEM LOGIC
********/
return 0;
}
as we can see we are simply calling the functions, the only peculiar things are:
- setting the memory if the subscriptions array to zero (as mentioned before)
- creating a thread for emcute, this is needed to correctly handling the connection to the broker. In particular the function executed by the thread calls "void emcute_run (uint16_t port, const char *id)" that effectively runs emcute. In particular the function is the following:
static void *emcute_thread(void *arg)
{
(void)arg;
emcute_run(CONFIG_EMCUTE_DEFAULT_PORT, EMCUTE_ID);
return NULL; /* should never be reached */
}
With this the nucleo-f401re board's configuration is done and we can go on talking about the transparent bridge.
TRASPARENT BRIDGE CONFIGURATION:The role of this component is to "take" the messages published on the mosquitto broker and relay them to the broker situated on the cloud broker, in this case the AWS IoT Core one, and vice versa.This python script uses two main libraries:
import paho.mqtt.client as mqtt
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
- paho.mqtt.client that provides the function to interact with the mosquitto broker
- AWSIoTPythonSDK.MQTTLib that provides the functions to interact with the AWS broker
More in detail to make the system work the following details are defined:
#AWS Paramater Init
AWS_HOST = "PUT HERE YOUR AMAZON ENDPOINT" #can be found as parameter '-e' in the last line of the file start.sh
AWS_ROOT_CAP_PATH = "PUT HERE THE ABSOLUTE PATH TO root-CA.crt "
AWS_CERTIFICATE_PATH = "PUT HERE THE ABSOLUTE PATH TO thing.cert.pem"
AWS_PRIVATE_KEY_PATH = "PUT HERE THE ABSOLUTE PATH TO thing.private.key"
AWS_PORT = 8883
AWS_CLIENT_ID = "client ID"
#MQTT Broker Parameters
MQTT_SERVER = "localhost"
MQTT_PORT = 1886
MQTT_KEEP_ALIVE = 60
#Topic Definition
TOPIC_FROM_BOARD = "topic_out"
TOPIC_TO_BOARD = "topic_in"
TOPIC_FROM_AWS = "topic_in"
TOPIC_TEMP_TO_AWS = "topic_out_temp"
TOPIC_SOIL_TO_AWS = "topic_out_soil"
where in the first block are defined all the things needed to connect to the AWS IoT Core (most these information can be obtained when a new thing in AWS IoT Core), in the second one are defined those needed to connect to the MQTT server and in the third one there is the definition of the topics that are used both to do Mosquitto -> AWS and AWS -> Mosquitto
Moreover the script executes the following code in which using the parameters above it connects to the AWS broker and the MQTT one
#AWS and MQTT Objects
AWS_CLIENT = None
MQTT_CLIENT = None
#AWS Start Init
AWS_CLIENT = AWSIoTMQTTClient(AWS_CLIENT_ID)
AWS_CLIENT.configureEndpoint(AWS_HOST, AWS_PORT)
AWS_CLIENT.configureCredentials(AWS_ROOT_CAP_PATH, AWS_PRIVATE_KEY_PATH, AWS_CERTIFICATE_PATH)
AWS_CLIENT.connect() #connection to aws
AWS_CLIENT.subscribe(TOPIC_FROM_AWS, 1, customAWSCallback) #subscripe to aws topic
#AWS Finish Init
#MQTT Start Init
MQTT_CLIENT = mqtt.Client()
MQTT_CLIENT.on_connect = on_connect
MQTT_CLIENT.on_message = on_message
MQTT_CLIENT.connect(MQTT_SERVER, MQTT_PORT, MQTT_KEEP_ALIVE) #connection to broker (subscription is done in the on_connect function)
MQTT_CLIENT.loop_forever()
#MQTT Finish Init
[NOTE: in the "on_connect" function we subscribe to the topic in mosquitto]
Obviously we also need to define the functions that publish and those that handle a message being published, both for AWS and Mosquitto. For the former are simply functions that call:
- the class method "publish(topic, payload, QoS)" on the AWS object
- the class method "publish(topic, payload=None)" on the MQTT object
Instead for the former we need to define the following functions:
- def customAWSCallback(client, userdata, message): that handles the messages published by someone else on the AWS broker
- def on_message(client, userdata, msg): that handles the messages published by someone else on the Mosquitto Broker
Seen the role of the transparent bridge the code implementing it should be something like
def customAWSCallback(client, userdata, message):
.....
MQTT_CLIENT.publish(topic, message)
def on_message(client, userdata, msg):
.....
AWS_CLIENT.publish(topic, message, 1)
[NOTE: my suggestion is to send to the AWS Broker a message in JSON structure]
With this the configuration of the transparent bridge is done.
AMAZON WEB SERVICES CONFIGURATION:Now we can configure the last part of the project which is the amazon web services. (obviously an account on AWS is needed but everything that we are going to do can be done using the free tier). The services that we are going to use are: AWS IoT Core, DynamoDB, Lambda (we'll use Node.js as programming language), API Gateway and AWS Amplify.
DynamoDB and AWS IoT Core:
The first thing to do is to create the DynamoDB tables where the values will be stored. Once this is done we need to setup the AWS Iot Core so that there is a thing that can send messages to the broker and so that messages arriving on the message broker are automatically stored inside the tables. The former can be achieved simply by following the steps to connect a new device from the AWS IoT Core homepage (after this procedure is possible to fill the respective AWS parameters in the Transparent Bridge)
The latter can be achieved using the Rule Engine, to set the rules you must go on Act->your_thing, then add a selection query that looks something like "SELECT field1, field2,... FROM topic" where field1, field2,... are the json fields of the message and topic is the topic where the message has been published. Then we need to add an action simply be clicking on "Add Action" select "Insert into a DynamoDB Table" and follows the steps. And in the end save the rule.
Once this is done the messages arriving on the broker will be automatically stored inside the DynamoDB Table that was selected.
Lambda Functions and API Gateway:
Now to make the data collected by the sensor available to the user we need to create some APIs, this can be done by using API Gateway. In particular, we'll need a API called using a GET to retrieve the data from the tables and a API to which we interact using a POST so to publish messages on the AWS IoT Broker.
So the lambda function to associate to the first API should have a structure like this:
//Import Needed Modules
const AWS = require("aws-sdk");
const ddb = new AWS.DynamoDB.DocumentClient({region: "INSERT HERE YOUR REGION"});
exports.handler = async (event, context, callback) => {
var scan_result = await readMessage().then(data => {
return data.Items
}).catch((err) => {
console.error(err);
});
return {
statusCode: 201,
body: scan_result
};
};
//Function to read data from the soil_humidity_table
function readMessage(){
var tempo = parseInt(Date.now()-3600000);//extracting tuple with timestamp higher than 1 hour ago
//console.log(tempo);
const params = {
TableName: "soil_humidity_table",
FilterExpression: "id >= :lb",
ExpressionAttributeValues:{
":lb": tempo
}
};
return ddb.scan(params).promise();
}
in this case what the function scans the table "soil_humidity_table" and returns those values that were generated no more than an hour ago (see the github repository to have more details about why this is done).
Instead for the function that publishes a message on the AWS Broker, it should be something like:
//Adding Needed Modules
var AWS = require('aws-sdk');
var iotdata = new AWS.IotData({ endpoint: 'INSERT HERE YOUR IOTDATA ENDPOINT' });
//Invocation Event Handler
exports.handler = async (event, context, callback) => {
var result = await publish2Broker(JSON.stringify(event)).then(data =>{
console.log(data); return data;
}).catch((err)=>{
console.error(err);
return {statusCode: 500}
});
console.log(result);
return {
statusCode:201
};
};
//Function the publishes on the MQTT Broker at the endpoint
function publish2Broker(str){
const params = {
topic: "topic_in",
payload: str
};
return iotdata.publish(params).promise();
}
we have that the body of the POST is tuned into a string and then is sent message to the broker.
WHERE ARE WE NOW? At this point I believe that a RECAP is in order. At startup the board configures its network interfaces and connects to the mosquitto broker, so that it can publish and receive messages. In particular when the board publishes a message on the broker this is relayed by the broker to the cloud broker that stores the message on a table and makes it available to the user using an API. Instead when the user wants to act on the nucleo-f401re what it can do is call the API to publish on the broker. Once the lambda function publishes the message the transparent bridge receives it and passes it to the mosquitto broker on the topic where the board is subscribed. In this way the board can receive the message and act accordingly.
FINAL STEPS: (AWS Amplify and Web Interface)Now that the most of the cloud services are set up we can thing about building the web interface that is available to the user. In this case there isn't any particular rule. My suggestion is to use the Fetch API (here is a simple tutorial: link) to call both the API to Read the values from the DB and to call the API that publishes to the message broker. Regarding every other aspect (for example: how the data are shown, how the user can interact with the system) is up to you and your particular use case.
Other step not strictly needed is to configure AWS Amplify. Indeed any service that can host a web page will work, but for the sake of completeness to configure Amplify we need to create a new project and drag and drop the code for the web interface into the Amplify Project main page. Once this is done by going to the Amplify project URL is possible to access to the web interface and see the values and interact with the system.
CONCLUSIONS:In conclusion, in this post we saw how to connect a IoT Device to the Cloud, in such a way that not only a user can see the values detected by the sensors and stored on the cloud but also in such a way that is possible to directly interact with the system
Comments