I've developed this project as part of the IoT course during my Engineering in Computer Science master's degree at Sapienza - University of Rome. It is an example of how to build a crowd-sensing IoT application using the sensors inside our smartphones to do human activity recognition. Two approaches are explored, a cloud-based one in which the raw data from the sensors is sent to the cloud and only there analyzed, and an edge-based one where the analysis is done on the smartphone and only the result is sent to the cloud.
Cloud-based approachArchitecture
The cloud-based architecture is the following:
- A javascript web-app running on the smartphones collects the accelerometer raw data and publishes it on an MQTT topic;
- AWS is used to manage the MQTT broker;
- A stream processing application built using Apache Flink and running inside an AWS ECS container retrieves the raw data from the MQTT topic, analyzes it and publishes the results in another topic;
- Both the raw data and the result of the processing are stored in a DynamoDB database;
- A web-based dashboard developed in javascript displays the latest raw data with the resulting activity and the values received during the last hour.
Smartphone web-app
In the folder "crowd-sensing/cloud-based/smartphone-webapp" of my GitHub repository you can find the code of the web-app. It is a simple html+javascript website, you can run it locally or upload it to a hosting service to access it over the internet. It uses the aws-iot-device-sdk to publish the data to the MQTT topic and the Generic Sensor API on Android or the Device Motion API on iOS to access the accelerometer sensor of the device (note that due to security concerns you need to use https to access the accelerometer).
Before running the web-app we need to configure AWS to give it the permission to publish the sensor data. To do that log-in to your AWS console, go in the "Cognito" section, and click on "Manage Identity Pools".
Click on “Create new identity pool” give it a name, check “Enable access to unauthenticated identities”, click on “Create Pool” and then “Allow”.
Now AWS will show you a bit of code on how to get credentials, from here you need to copy the string “Identity pool ID”.
Open the file "bundle.js" in the folder of the code for the web-app with your favorite editor and find this section:
/*
* The awsConfiguration object is used to store the credentials
* to connect to AWS service.
* MAKE SURE to insert the correct name for your endpoint,
* the correct Cognito PoolID and the correct AWS region.
*/
var AWSConfiguration = {
poolId: '<IDENTITY_POOL_ID>',
host: '<ENDPOINT_ARN>',
region: '<AWS_REGION>'
};
Here paste the string copied before in the "poolId" property and put in "region" your AWS region (it should be the same as the first part of the poolId before the ':'). In "host" you need to put your "Endpoint ARN" that you can retrieve by going in the "IoT Core" section from the AWS console and clicking on "Settings".
The last thing we need to do is to associate with this identity pool the correct permissions. From the AWS console go to the ''IAM" section, click on "Roles" and select the role named “Cognito_<YOUR_IDENTITY_POOL_NAME>Unauth_Role”, where <YOUR_IDENTITY_POOL_NAME> is the name you gave previously to the identity pool.
Under “permission policies” expand the only policy that there is and click on “Edit policy”.
Here remove all the current permissions and then click on “Add additional permissions”.
Under “Service” select “IoT”, under Actions select “Connect”, and “Publish”. Then under “Resources” in the “client” section click “Add ARN”, insert your AWS region, leave “Account” untouched and under “Client id” write “CloudSmartphone-*”. This means that we are allowing access to clients with IDs that start with “CloudSmartphone-”.
Next in the “topic” section click again on “Add ARN”, insert again your region and under “Topic name” write “CloudComputing/*”. This means that we are allowing the clients to publish messages in the topics that start with “CloudComputing/”.
Great! Now the code of the web-app should work, but how does it work? Let's give it a brief look.
The code starts by generating an id for the smartphone that is then saved in the cookies so that the device will be recognized the next time the app is launched.
//The first time the website is loaded a clientId is generated and saved in
//the cookies. On subsequent runs the id is retrieved from the cookies.
function getCookie(cname) {
var name = cname + "=";
var decodedCookie = decodeURIComponent(document.cookie);
var ca = decodedCookie.split(';');
for(var i = 0; i <ca.length; i++) {
var c = ca[i];
while (c.charAt(0) == ' ') {
c = c.substring(1);
}
if (c.indexOf(name) == 0) {
return c.substring(name.length, c.length);
}
}
return "";
}
var clientId = getCookie("clientId");
if (clientId == "") {
clientId = (Math.floor((Math.random() * 100000) + 1));
document.cookie = "clientId="+clientId;
}
clientId = 'CloudSmartphone-' + clientId;
Then it uses the credential we inserted previously to connect to the AWS IoT broker.
AWS.config.region = AWSConfiguration.region;
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
IdentityPoolId: AWSConfiguration.poolId
});
//The mqttClient object used for retrieving the messages from the MQTT server.
const mqttClient = AWSIoTData.device({
region: AWS.config.region, //Set the AWS region we will operate in
host: AWSConfiguration.host, //Set the AWS IoT Host Endpoint
clientId: clientId, //The clientId created earlier
protocol: 'wss', //Connect via secure WebSocket
maximumReconnectTimeMs: 8000, //Set the maximum reconnect time to 8 seconds
debug: true, //Enable console debugging information
accessKeyId: '',
secretKey: '',
sessionToken: ''
});
//The cognitoIdentity used for authentication.
var cognitoIdentity = new AWS.CognitoIdentity();
AWS.config.credentials.get(function(err, data) {
if (!err) {
console.log('retrieved identity: ' + AWS.config.credentials.identityId);
var params = {
IdentityId: AWS.config.credentials.identityId
};
cognitoIdentity.getCredentialsForIdentity(params, function(err, data) {
if (!err) {
mqttClient.updateWebSocketCredentials(data.Credentials.AccessKeyId,
data.Credentials.SecretKey,
data.Credentials.SessionToken);
} else {
console.log('error retrieving credentials: ' + err);
alert('error retrieving credentials: ' + err);
}
});
} else {
console.log('error retrieving identity:' + err);
alert('error retrieving identity: ' + err);
}
});
Once the page it's loaded it checks if the device supports either the Generic Sensor API or the Device Motion API, if it supports one of the two it asks for permission to use the sensor, otherwise it displays an error message.
function requestDeviceMotionPermission() {
window.DeviceMotionEvent.requestPermission()
.then(response => {
if (response === 'granted') {
startDeviceMotionAccelerometer();
} else {
accelerometerNotAllowed();
}
})
.catch(e => {
console.error(e);
accelerometerNotAllowed();
})
}
function accelerometerNotAllowed() {
var errorBanner = "<div id='ErrorBanner' class='Banner'>"
+ "<h3>Ops...</h3>"
+ "<p>The app requires access to the accelerometer to work</p>"
+ "<div>"
document.getElementById("content").innerHTML = errorBanner;
}
function noAccelerometer() {
var errorBanner = "<div id='ErrorBanner' class='Banner'>"
+ "<h3>Ops...</h3>"
+ "<p>Your device doesn't have an accelerometer</p>"
+ "<div>"
document.getElementById("content").innerHTML = errorBanner;
}
//On loading the page it checks what API the device supports for accessing
//the accelerometer. If it finds one it asks for permission and if the user
//allows the use of the sensor it starts retrieving the data.
window.onload = function () {
if ('Accelerometer' in window) {
//Android
document.getElementById("enableButton").onclick = startSensorAPIAccelerometer;
document.getElementById("cancelButton").onclick = accelerometerNotAllowed;
document.getElementById("SensorRequestBanner").style.display = "block";
} else if (window.DeviceMotionEvent) {
//iOS
if (typeof window.DeviceMotionEvent.requestPermission === 'function') {
//iOS 13
document.getElementById("enableButton").onclick = requestDeviceMotionPermission;
document.getElementById("cancelButton").onclick = accelerometerNotAllowed;
document.getElementById("SensorRequestBanner").style.display = "block";
} else {
//Older version of iOS, no need for permission
document.getElementById("enableButton").onclick = startSensorAPIAccelerometer;
document.getElementById("cancelButton").onclick = accelerometerNotAllowed;
document.getElementById("SensorRequestBanner").style.display = "block";
}
} else {
noAccelerometer();
}
}
If the user allows the use of the accelerometer it starts sampling the sensor at a frequency defined by the variable "samplingFrequency", in this case 4 Hz, and every 3 seconds it publishes the data collected in that interval.
//
// Retrieving and publishing the sensor data.
//
//The frequency at which the sensors data is retrieved.
var samplingFrequency = 4;
//The sampler samples the accelerometer data every 'interval' milliseconds and
//buffers it in an array of size 'windowSize'. Once the array is full it
//emits a 'dataEvent' event and clears the buffer.
class Sampler extends EventEmitter {
constructor(interval, windowSize) {
super();
this.last = 0;
this.interval = interval;
this.windowSize = windowSize;
this.buffer = [];
}
timeHasPast(){
var now = Date.now();
if (now >= this.last + this.interval){
this.last = now;
return true;
}
else return false;
}
add(d){
this.buffer.push(d);
if (this.buffer.length == this.windowSize) {
this.emit('dataEvent', this.buffer);
this.buffer = [];
}
}
}
var sampler = new Sampler(1000/samplingFrequency, samplingFrequency*3);
//Connect handler: once the MQTT client has successfully connected
//to the MQTT broker it starts publishing the data every time it receives a
//'dataEvent' event.
function mqttClientConnectHandler() {
console.log('connected to MQTT server');
sampler.on("dataEvent", function(buffer){
mqttClient.publish('CloudComputing/'+clientId, JSON.stringify(buffer));
console.log("publishing ");
});
};
mqttClient.on('connect', mqttClientConnectHandler);
//This function retrieves the accelerometer data from devices that support
//the DeviceMotion API.
function startDeviceMotionAccelerometer() {
document.getElementById("SensorRequestBanner").style.display = "none";
document.getElementById("id").innerHTML = clientId;
window.addEventListener('devicemotion', function(e) {
if(sampler.timeHasPast()){
var d = {};
d.x = e.accelerationIncludingGravity.x;
d.y = e.accelerationIncludingGravity.y;
d.z = e.accelerationIncludingGravity.z;
d.id = clientId;
d.timestamp = Date.now();
sampler.add(d);
document.getElementById("status").innerHTML = 'x: ' + d.x
+ '<br> y: ' + d.y
+ '<br> z: ' + d.z;
}
});
}
//This function retrieves the accelerometer data from devices that support
//the Generic Sensor API.
function startSensorAPIAccelerometer() {
navigator.permissions.query({ name: 'accelerometer' })
.then(result => {
if (result.state === 'denied') {
accelerometerNotAllowed();
} else {
document.getElementById("SensorRequestBanner").style.display = "none";
document.getElementById("id").innerHTML = clientId;
let sensor = new Accelerometer();
sensor.addEventListener('reading', function(e) {
if(sampler.timeHasPast()){
var d = {};
d.x = e.target.x;
d.y = e.target.y;
d.z = e.target.z;
d.id = clientId;
d.timestamp = Date.now();
sampler.add(d);
document.getElementById("status").innerHTML = 'x: ' + d.x
+ '<br> y: ' + d.y
+ '<br> z: ' + d.z;
}
});
sensor.start();
}
});
}
Cloud stream processor
Now the smartphone is publishing the accelerometer data but we aren't doing anything with it! The next step is to set-up the cloud stream processor that analyzes the data and recognizes the activity. You can find the code for this in the folder "crowd-sensing/cloud-based/cloud-code", it is a java program that uses the Apache Flink library.
As with the web-app the first thing we need to do is to authenticate it and give it the proper permissions. This time we will use a certificate.
Open the AWS console and go in the "IoT Core section". From there go to "Secure" then "Certificates" and click on "Create a certificate“. Next click on "Create certificate”.
On this page download the certificate and the private key, then click on "Activate" and on "Done".
Now go to the “Policies” page and click on “Create a policy”.
Give whatever name you want to the policy, then in the “Action” field write “iot:Connect”. AWS will automatically populate the “Resource ARN” field, replace the “replaceWithAClientId” part with “flink”. Now click on “Add statement” and write “iot:Publish” in the “Action” field of the new statement. This time replace “replaceWithATopic” in the Resource ARN with “CloudComputingResult”. Click again on “Add statement”, write “iot:Subscribe” in the “Action” field and replace “replaceWithATopicFilter” with "CloudComputing/*". Finally click one last time on “Add statement”, write "iot:Receive" in the “Action” field and replace “replaceWithATopic” "CloudComputing/*". Now make sure to tick the “Allow” box under “Effect” in all the statements.
It should look something like this:
What we are doing here is allowing our stream processor to connect to the MQTT broker, receive the messages from the topics starting with "CloudCoputing", and to publish to the topic "CloudComputingResult".
Now you can click on “Create”.
To associate the policy we have just created to the certificate go back to the “Certificates” page, click on the three dots over your certificate and then click on “Attach policy”. Finally select your policy and click on "Attach".
Now from the folder with the code of the stream processor navigate in the folder "src/main/java/simone/bartolini/har" and open the file "AppConfiguration.java". Here is where the AWS SDK is configured, you need to replace "brokerHost" with your Endpoint ARN and in "certificateFile" and "privateKeyFile" you need to put the name of your certificate and private key.
/**
* Configuration parameters for AWS iot.
* Make sure to put in 'brokerHost' your endpoint name and in 'certificateFile'
* and 'privateKeyFile' the correct path of your certificate and private key.
*
* @author simbartolini@gmail.com
*/
public interface AppConfiguration {
public final String brokerHost = "<ENDPOINT_ARN>";
public final int brokerPort = 8883;
public final String brokerProtocol = "ssl";
public final String brokerURL = brokerProtocol + "://" + brokerHost + ":" + brokerPort;
public final String topic = "CloudComputing/+";
public final AWSIotQos qos = AWSIotQos.QOS0;
public final String outExchange = "CloudComputingResult";
public final String certificateFile = "./<CERTIFICATE>.pem.crt";
public final String privateKeyFile = "./<PRIVATE_KEY>.pem.key";
}
Before executing it let's give a brief look at the code.
The file "DataListener.java" contains the main class of the program. It starts by declaring the stream execution environment:
// The StreamExecutionEnvironment is the context in which a program
// is executed.
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Then it subscribes to the MQTT topic and puts the messages received into a filnk data stream:
// The input stream from the MQTT topic.
DataStream<byte[]> awsStream = env
.addSource(new AWSIoTMqttStream(AppConfiguration.brokerHost,
"flink", AppConfiguration.certificateFile,
AppConfiguration.privateKeyFile,
AppConfiguration.topic, AppConfiguration.qos));
After that it converts the messages in the stream into "SensorData" objects (look at the file SensorData.java for more information):
// Converts the messages into SensorData.
final DataStream<SensorData> dataStream = awsStream
.flatMap(new ParseMeasurement());
Next, it applies a series of filters to the data in order to remove the noise and the effects of gravity (note that those filters depend on the sampling frequency so the variable samplingFrequency needs to have the same value as in the web-app code):
// The frequency at which the data is sampled from the sensors. This is
// needed for the filtering. Make sure it is the same as in the code
// running on the smartphones.
int samplingFrequency = 4;
// Applies a median filter to the data.
final DataStream<SensorData> medianFilterStream = dataStream
.keyBy("id")
.countWindow(samplingFrequency, 1)
.apply(new MedianFilter());
// Applies a low pass filter to the data.
final DataStream<SensorData> lowPassFilterStream = medianFilterStream
.keyBy("id")
.process(new LowPassFilter(20, samplingFrequency));
// Applies a high pass filter to the data.
final DataStream<SensorData> highPassFilterStream = lowPassFilterStream
.keyBy("id")
.process(new HighPassFilter(0.3, samplingFrequency));
The filtered data is then analyzed in sliding windows of 3 seconds, moving every time by 1.5 seconds. Inside a window, it is computed the average of the absolute value of the acceleration for each axis. The three averages are then summed, if the result is greater then 0.8 it is labeled as "Moving", otherwise it is labeled as "Resting". If a window has the same label as the previous it is removed from the stream in order to minimize the output messages.
// Defines the window and apply the reduce transformation.
final DataStream<SensorData> averageStream = highPassFilterStream
.keyBy("id")
.countWindow(samplingFrequency*3, samplingFrequency*3/2)
.reduce(new AbsAverage());
// Analyzes the data to recognize the activity.
final DataStream<ResultData> resultStream = averageStream
.keyBy("id")
.map(new HarAnalizer())
.keyBy("id")
.process(new RemoveDuplicates());
Finally the results of the analysis are converted into Json messages and published in the output topic.
// Converts the result data into json.
final DataStream<String> jsonStream = resultStream
.map(new ExtractJson());
// Publishes the data into the output topic.
final DataStreamSink<String> finalStream = jsonStream
.addSink(new AWSIoTMqttSink(AppConfiguration.brokerHost,
"flink", AppConfiguration.certificateFile,
AppConfiguration.privateKeyFile,
AppConfiguration.outExchange, AppConfiguration.qos));
To execute it on the cloud you need first to build it using your favorite java IDE, then you need to create a docker image containing the jar file plus the certificate, the key and java virtual machine. To do that first install the docker engine in your machine (you can get it there). Then open the "Dockerfile" in the "cloud-code" folder and replace the certificate and key file names with your ones.
FROM openjdk:8
EXPOSE 8883
COPY HAR.jar /home/HAR.jar
COPY <CERTIFICATE>.pem.crt /home/<CERTIFICATE>.pem.crt
COPY <PRIVATE_KEY>.pem.key /home/<PRIVATE_KEY>.pem.key
WORKDIR /home
ENTRYPOINT java -jar HAR.jar
Put the Dockerfile, the certificate, the key and the "HAR.jar" file in the same folder, open a terminal in it and issue the command docker build -t har .
. This will generate the image.
The next step is to upload it to AWS. To do that you need the AWS CLI, check this page of the documentation to learn how to install it. If you are using an AWS educate account the process for getting the credentials is slightly different:
From your Vocareum workbench page click on "Account Details" and then on "Show".
It should show you your credentials, copy and paste them in the file ~/.aws/credentials
. Note that this credentials will expire at the end of the session, after that you will need to log-in again into AWS Educate and copy the new credential.
With the AWS CLI properly configured go back to the terminal and issue the command aws sts get-caller-identity
. It should output something like this:
The string next to "Account" is your account id, we will need it in the next commands.
Now issue the command
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin aws_account_id.dkr.ecr.us-east-1.amazonaws.com
replacing us-east-1
with your region and aws_account_id
with your id.
Then execute the command
aws ecr create-repository \
--repository-name har \
--image-scanning-configuration scanOnPush=true \
--region us-east-1
again replacing us-east-1
with your region. Here we are creating a repository named "har" for our docker image.
Finally issue the commands
docker tag har:latest aws_account_id.dkr.ecr.us-east-1.amazonaws.com/har:latest
docker push aws_account_id.dkr.ecr.us-east-1.amazonaws.com/har:latest
replacing us-east-1
with your region and aws_account_id
with your id. Here we are uploading the image to the repository.
Great! Now that the image has been uploaded to AWS we can finally execute it. From the AWS console go to the "Elastic Container Service" section.
Here click on "Get started", then click on the "Configure" button.
Give a name to the container, then in the "Image" section write aws_account_id.dkr.ecr.us-east-1.amazonaws.com/har:latest
replacing aws_account_id
with your account id and us-east-1
with your region, in the "Port mappings" section insert the port 8883 and finally click on "Update".
For the rest of the configuration the default parameters are ok for our application so click "Next" until you reach the "Review" page, here click "Create".
Once it finishes creating the service click on "View Service".
Ok now the program should be running, analyzing any incoming message from the smartphones.
DynamoDB database
The next step is to configure the database to save both the raw data and the results of the analysis. From the AWS console go to the IoT Core section, then click on "Act", "Rules" and then on "Create a rule".
Here give a name to the rule, then in the “Rule query statement” section write “SELECT * FROM 'CloudComputing/+’” and finally under the “Set one or more actions” section click on “Add action”.
Now select “Insert a message into a DynamoDB table”, scroll down and click on "Configure action".
Here click on “Create a new resource”, you will be redirected to the DynamoDB page.
Next click on “Create table”, give the table the name “CloudRawHAR", in “Partition key” write “id”, click on “Add sort key”, call it “timestamp” and change its type to “Number” and finally click on “Create”.
Now go back to the AWS IoT page and click on the little refresh button near “Create a new resource”, the table we have just created should appear in the list, select it. In the “Partition key value” field write “${topic(2)}” and in “Sort key value” write “${timestamp()}”. Then click on “Create Role”, give it a name, click on “Create role” and then click on “Add action”. Finally scroll down and click on “Create rule”.
This rule will save the raw data in the table "CloudRawHAR", we need a second one to save the results of the analysis. So like before click on "Create", give a name to the rule and this time in the “Rule query statement” section write “SELECT * FROM 'CloudComputingResult’”. Then Click on "Add action", select "Split message into multiple columns of a DynamoDB table (DynamoDBv2)" and click on "Configure action".
Like before click on "Create a new resource" to be redirected to the DynamoDB page and then click on "Create table". Call this new table “CloudStatusHAR", in “Partition key” write “id”, click on “Add sort key”, call it “timestamp” and change its type to “Number” and finally click on “Create”. Now go back to the AWS IoT page, select the table you just created, create a role and finally add the action.
Web dashboard
We have finally reached the last component of the architecture, the web dashboard. Like the smartphone web-app it is a simple html+javascript website, you can find the code for it in the folder "crowd-sensing/cloud-based/web-dashboard". Also for this we need to give it the correct permissions in order to work, so go back to the "Cognito" section of the AWS console and create a new identity pool. Then insert the pool id, the endpoint ARN and the region in the "AWSConfiguration" object in the bundle.js file.
/*
* The awsConfiguration object is used to store the credentials
* to connect to AWS service.
* MAKE SURE to insert the correct name for your endpoint,
* the correct Cognito PoolID and the correct AWS region.
*/
var AWSConfiguration = {
poolId: '<IDENTITY_POOL_ID>',
host: '<ENDPOINT_ARN>',
region: '<AWS_REGION>'
};
Like before go to the "IAM" section, click on "Roles", select the role named “Cognito_<YOUR_IDENTITY_POOL_NAME>Unauth_Role”, edit the policy and remove all the stock permissions.
Then click on "Add additional permissions", under “Service” select “IoT”, under Actions select “Connect”, “Receive” and “Subscribe”. Under “Resources”, in the “client” section, add an ARN with “Client id” equal to “CloudHARDashboard-*”. In the "topic" and "topicfilter" sections we need to add two ARNs, one for the topic "CloudComputing/*" and the other for the topic "CloudComputingResult".
Add another permission and this time under “Service” select “DynamoDB”, and under “Actions”, select “Scan” and “Query”. In the "table" section add two ARNs, one for the table “CloudRawHAR" and the second for the table “CloudStatusHAR". Remember to put your region in all the ARNs. Finally click on "Review policy".
Ok we are done with the configuration! Before launching the web-dashboard let's look at how it works.
First it configures the AWS SDK:
//The id of the MQTT client.
var clientId = 'CloudHARDashboard-' + (Math.floor((Math.random() * 100000) + 1));
AWS.config.region = AWSConfiguration.region;
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
IdentityPoolId: AWSConfiguration.poolId
});
//The mqttClient object used for retrieving the messages from the MQTT server.
const mqttClient = AWSIoTData.device({
region: AWS.config.region, //Set the AWS region we will operate in
host: AWSConfiguration.host, //Set the AWS IoT Host Endpoint
clientId: clientId, //The clientId created earlier
protocol: 'wss', //Connect via secure WebSocket
maximumReconnectTimeMs: 8000, //Set the maximum reconnect time to 8 seconds
debug: true, //Enable console debugging information
accessKeyId: '',
secretKey: '',
sessionToken: ''
});
//The cognitoIdentity used for authentication.
var cognitoIdentity = new AWS.CognitoIdentity();
AWS.config.credentials.get(function(err, data) {
if (!err) {
console.log('retrieved identity: ' + AWS.config.credentials.identityId);
var params = {
IdentityId: AWS.config.credentials.identityId
};
cognitoIdentity.getCredentialsForIdentity(params, function(err, data) {
if (!err) {
mqttClient.updateWebSocketCredentials(data.Credentials.AccessKeyId,
data.Credentials.SecretKey,
data.Credentials.SessionToken);
} else {
console.log('error retrieving credentials: ' + err);
alert('error retrieving credentials: ' + err);
}
});
} else {
console.log('error retrieving identity:' + err);
alert('error retrieving identity: ' + err);
}
});
//DynamoDB service object.
var ddb = new AWS.DynamoDB({
apiVersion: '2012-08-10'
});
Than it queries the database to get the list of the smartphones and their latest raw data and status:
//
// Building devices list and initializing current values.
//
//List of devices in the database.
var devicesList = new Set();
//Variable storing the current values of the devices.
var devicesValues = {};
//Variable storing the name of the devic we are currently displaying.
var currentDevice = "";
//Parameters for the scan of the database.
var scanParams = {
ExpressionAttributeNames: {
"#id": "id",
},
ProjectionExpression: '#id',
TableName: 'CloudStatusHAR'
};
//Scans the tables CloudStatusHAR and CloudRawHAR,
//adds the IDs of the devices in devicesList and in the select menu,
//adds the devices latest status to devicesValues.
ddb.scan(scanParams, function(err, data) {
if (err) {
console.log("Error", err);
} else {
data.Items.forEach(function(element) {
devicesList.add(element.id.S);
});
devicesList = Array.from(devicesList);
devicesList.sort();
console.log("success", devicesList);
devicesList.forEach(function(id) {
//Parameters of the query.
var queryParams = {
ExpressionAttributeValues: {
":deivce": {
S: id
}
},
ExpressionAttributeNames: {
"#id": "id"
},
KeyConditionExpression: "#id = :deivce",
TableName: 'CloudStatusHAR',
Limit: 1,
"ScanIndexForward": false
};
//Queries the data from the selected device.
ddb.query(queryParams, function(err, data) {
if (err) {
console.log("Error", err);
} else {
//Exploiting the fact that the data is already ordered.
var latest = data.Items[0];
devicesValues[id] = {};
devicesValues[id].status = latest.status.S;
}
});
queryParams.TableName = 'CloudRawHAR';
ddb.query(queryParams, function(err, data) {
if (err) {
console.log("Error", err);
} else {
var latest = data.Items[0];
if(devicesValues[id]==undefined) devicesValues[id]={};
devicesValues[id].raw = [];
latest.payload.L.forEach(function(e) {
var elem = {};
elem.x = (e.M.x.N);
elem.y = (e.M.y.N);
elem.z = (e.M.z.N);
elem.timestamp = (e.M.timestamp.N);
devicesValues[id].raw.push(elem);
});
console.log(devicesValues);
document.getElementById("device-select").innerHTML +=
'<option value="' + id + '">' + id + '</option>';
}
});
});
}
});
It also subscribes to the MQTT topics to update the latest values once new messages arrive:
//
// Subscibing to MQTT topics and updating current values.
//
//The topic where the devices publish the raw data.
var rawTopic = 'CloudComputing/+';
//The topic where the cloud publishes the results of the analysis.
var statusTopic = "CloudComputingResult";
//Connect handler: once the MQTT client has successfully connected
//to the MQTT broker it subscribes to the topics.
function mqttClientConnectHandler() {
console.log('connected to MQTT server');
mqttClient.subscribe(rawTopic);
console.log("subscribed to", rawTopic);
mqttClient.subscribe(statusTopic);
console.log("subscribed to", statusTopic);
};
//Function for updating the div containing the current status of the device.
function updateCurrentValues() {
var status = document.getElementById("current-status");
status.innerHTML = devicesValues[currentDevice].status;
var time = [];
var x = [];
var y = [];
var z = [];
devicesValues[currentDevice].raw.forEach(function(e) {
var d = new Date();
d.setTime(e.timestamp);
time.push(d);
x.push(e.x);
y.push(e.y);
z.push(e.z);
});
var currentChart = new Chart(document.getElementById('current-chart').getContext('2d'), {
type: "line",
data: {
labels: time,
datasets: [{
label: "x",
data: x,
fill: false,
borderColor: "rgb(255, 0, 0)",
}, {
label: "y",
data: y,
fill: false,
borderColor: "rgb(0, 255, 0)",
}, {
label: "z",
data: z,
fill: false,
borderColor: "rgb(0, 0, 255)",
}]
},
"options": {
responsive: true,
scales: {
xAxes: [{
type: 'time',
ticks: {
source: 'data'
}
}]
},
plugins: {
zoom: {
zoom: {
enabled: true,
drag: {
animationDuration: 1000
},
mode: 'x',
speed: 0.05
}
}
}
}
});
}
//Message handler: upon receiving a message if it's relative to a new device
//it adds it to the selection menu then it saves its values in the variable
//devicesValues and finally updates the div.
function mqttClientMessageHandler(topic, payload) {
console.log('message: ' + topic + ':' + payload.toString());
var message = JSON.parse(payload.toString());
var id;
if (topic == statusTopic) {
id = message.id;
if (devicesValues[id] == undefined) {
devicesList.push(id);
devicesValues[id] = {};
document.getElementById("device-select").innerHTML +=
'<option value="' + id + '">' + id + '</option>';
}
devicesValues[id].status = message.status;
} else {
id = topic.slice(15);
if (devicesValues[id] == undefined) {
devicesList.push(id);
devicesValues[id] = {};
document.getElementById("device-select").innerHTML +=
'<option value="' + id + '">' + id + '</option>';
}
devicesValues[id].raw = message;
}
if (currentDevice == id && document.getElementById('current-values-div').style.display == 'block') {
updateCurrentValues();
}
};
//Installing the connect and message handlers.
mqttClient.on('connect', mqttClientConnectHandler);
mqttClient.on('message', mqttClientMessageHandler);
Finally, in the past values section, it queries the database to show the data from the last hour:
//
// Past values.
//
//Function that returns the timestamp of one hour ago.
function lastHour() {
var d = new Date();
d.setHours(d.getHours() - 1);
return d.getTime();
}
//Function for updating the Charts displaying the data from the past hour.
window.refreshCharts = function() {
//Parameters of the query.
var params = {
ExpressionAttributeValues: {
":device": {
S: currentDevice
},
":lastHour": {
N: lastHour().toString()
}
},
ExpressionAttributeNames: {
"#id": "id",
"#time": "timestamp"
},
KeyConditionExpression: "#id = :device and #time >= :lastHour",
TableName: 'CloudStatusHAR'
};
//Queries the table CloudStatusHAR retrieving the data from the
//last hour for the selected deivce.
ddb.query(params, function(err, data) {
if (err) {
console.log("Error", err);
} else {
var time = [];
var dataset = [];
data.Items.forEach(function(element) {
var d = new Date();
d.setTime(element.timestamp.N);
time.push(d);
if(element.status.S=="Moving")
dataset.push(1);
else dataset.push(0);
});
//Draws the chart.
var chart = new Chart(document.getElementById('status-chart').getContext('2d'), {
type: "line",
data: {
labels: time,
datasets: [{
label: "Status",
data: dataset,
fill: false,
borderColor: "rgb(255, 0, 0)",
steppedLine: true
}]
},
"options": {
responsive: true,
scales: {
xAxes: [{
type: 'time',
ticks: {
source: 'auto'
}
}],
yAxes: [{
ticks: {
stepSize: 1,
callback: function(value, index, values) {
if (value==1) return "Moving";
else return "Resting";
}
}
}]
},
plugins: {
zoom: {
zoom: {
enabled: true,
drag: {
animationDuration: 1000
},
mode: 'x',
speed: 0.05
}
}
}
}
});
}
});
params.TableName = 'CloudRawHAR';
//Queries the table CloudRawHAR retrieving the data from the
//last hour for the selected deivce.
ddb.query(params, function(err, data) {
if (err) {
console.log("Error", err);
} else {
var time = [];
var x = [];
var y = [];
var z = [];
data.Items.forEach(function(entry) {
var d = new Date();
d.setTime(entry.timestamp.N);
time.push(d);
x.push(entry.payload.L[0].M.x.N);
y.push(entry.payload.L[0].M.y.N);
z.push(entry.payload.L[0].M.z.N);
});
//Draws the chart.
var chart = new Chart(document.getElementById('raw-chart').getContext('2d'), {
type: "line",
data: {
labels: time,
datasets: [{
label: "x",
data: x,
fill: false,
borderColor: "rgb(255, 0, 0)",
}, {
label: "y",
data: y,
fill: false,
borderColor: "rgb(0, 255, 0)",
}, {
label: "z",
data: z,
fill: false,
borderColor: "rgb(0, 0, 255)",
}]
},
"options": {
responsive: true,
scales: {
xAxes: [{
type: 'time',
ticks: {
source: 'auto'
}
}]
},
plugins: {
zoom: {
zoom: {
enabled: true,
drag: {
animationDuration: 1000
},
mode: 'x',
speed: 0.05
}
}
}
}
});
}
});
}
Great! Now if you open the web-app on your smartphone and the dashboard on your pc everything should be working and this should be the result:
Now we will explore an edge-based approach where the activity recognition is done on the smartphone and only the result is uploaded to the cloud. This time I will be shorter because the program is simpler and because the code has a lot in common with the previous approach.
Architecture
The edge-based architecture is the following:
- A javascript web-app running on the smartphones collects the accelerometer raw data, analyzes it to recognize the activity and publishes the result on an MQTT topic;
- AWS is used to manage the MQTT broker;
- The messages sent are stored in a DynamoDB database;
- A web-based dashboard developed in javascript displays the latest activity and the values received during the last hour.
Smartphone web-app
You can find the code for the web-app in the folder "crowd-sensing/edge-based/smartphone-webapp". Like in the cloud-based version you need to create an identity pool and to update the "AWSConfiguration" object in the "bundle.js" file with your credentials. This time when creating the IAM policy insert the topic “EdgeComputing/” instead of “CloudComputing/” in the ARN.
The majority of the code is the same as in the previous version. It starts by retrieving the client id from the cookies or generating a new one if it doesn't find it, then it configures the AWS SDK, checks if the device supports either the Generic Sensor or the Device Motion API and then asks for permission to use the accelerometer.
If the user allows the use of the accelerometer it starts sampling the data at a frequency of 4 Hz and uses the same model as in the cloud version to recognize the movement. Every time the status changes a message is published to the topic “EdgeComputing/”.
//
// Retrieving and analyzing sensor data.
//
var accData = {x:0, y:0, z:0};
//The frequency at which the sensors data is retrieved.
var samplingFrequency = 4;
//This function creates a median filter with a window of size 'length'.
function createCombinedMedianFilter(length){
var medianX = createMedianFilter(length);
var medianY = createMedianFilter(length);
var medianZ = createMedianFilter(length);
function insertData(d){
var x = medianX(d.x);
var y = medianX(d.y);
var z = medianX(d.z);
return {x:x, y:y, z:z};
}
return insertData;
}
//This function creates a low pass filter where 'cutoff' is the cutoff
//frequency and 'sampleRate' is the sampling rate.
function createLowPassFilter(cutoff, sampleRate) {
var rc = 1.0 / (cutoff * 2 * Math.PI);
var dt = 1.0 / sampleRate;
var alpha = dt / (rc + dt);
var previous;
function filterItem(d){
if (previous == undefined){
previous = d;
return d;
} else {
var next = {
x: previous.x + (alpha * (d.x - previous.x)),
y: previous.y + (alpha * (d.y - previous.y)),
z: previous.z + (alpha * (d.z - previous.z))
}
previous = next;
return next;
}
}
return filterItem;
}
//This function creates a high pass filter where 'cutoff' is the cutoff
//frequency and 'sampleRate' is the sampling rate.
function createHighPassFilter(cutoff, sampleRate) {
var rc = 1.0 / (cutoff * 2 * Math.PI);
var dt = 1.0 / sampleRate;
var alpha = rc / (rc + dt);
var previousFiltered;
var previousSample;
function insertItem(d){
if (previousFiltered == undefined){
previousFiltered = d;
previousSample = d;
return d;
} else {
var next = {
x: alpha * (previousFiltered.x + d.x -previousSample.x),
y: alpha * (previousFiltered.y + d.y -previousSample.y),
z: alpha * (previousFiltered.z + d.z -previousSample.z)
}
previousFiltered = next;
previousSample = d;
return next;
}
}
return insertItem;
}
//The SlidingWindowAnalyzer analyzes the filtered data and checks if the
//person is moving or standing still. If the status changes it fires a
//'statusChanged' event.
class SlidingWindowAnalyzer extends EventEmitter {
constructor(windowSize) {
super();
this.windowSize = windowSize;
this.window = [];
this.status = "Resting";
}
insertItem(d){
this.window.push(d);
if(this.window.length == this.windowSize){
var average = 0;
for(var i=0; i<this.window.length; i++){
average += Math.abs(this.window[i].x)
+ Math.abs(this.window[i].y)
+ Math.abs(this.window[i].z);
}
average = average/(this.windowSize);
console.log(average);
if (average > 0.8 && this.status == "Resting"){
this.status = "Moving";
this.emit('statusChanged');
} else if (average <= 0.8 && this.status == "Moving") {
this.status = "Resting";
this.emit('statusChanged');
}
this.window.splice(0, this.windowSize/2);
}
return this.status;
}
}
var medianFilter = createCombinedMedianFilter(samplingFrequency);
var lowPassFilter = createLowPassFilter(20, samplingFrequency);
var highPassFilter = createHighPassFilter(0.3, samplingFrequency);
var slidingWindowAnalyzer = new SlidingWindowAnalyzer(samplingFrequency*3);
//It filters the raw accelerometer data and passes it to the SlidingWindowAnalyzer.
function analyzeData() {
var filteredData = highPassFilter(lowPassFilter(medianFilter(accData)));
slidingWindowAnalyzer.insertItem(filteredData);
}
//Connect handler: once the MQTT client has successfully connected
//to the MQTT broker it starts publishing the data every time it receives a
//'statusChanged' event.
function mqttClientConnectHandler() {
console.log('connected to MQTT server');
slidingWindowAnalyzer.on("statusChanged", function(){
mqttClient.publish('EdgeComputing/'+clientId, JSON.stringify({status:this.status}));
console.log("publishing " + JSON.stringify({status:this.status}));
});
};
mqttClient.on('connect', mqttClientConnectHandler);
//This function retrieves the accelerometer data from devices that support
//the DeviceMotion API.
function startDeviceMotionAccelerometer() {
document.getElementById("SensorRequestBanner").style.display = "none";
document.getElementById("id").innerHTML = clientId;
window.addEventListener('devicemotion', function(e) {
accData.x = e.accelerationIncludingGravity.x;
accData.y = e.accelerationIncludingGravity.y;
accData.z = e.accelerationIncludingGravity.z;
});
setInterval(analyzeData, 1000/samplingFrequency);
slidingWindowAnalyzer.on("statusChanged", function(){
document.getElementById('status').innerHTML = this.status;
});
}
//This function retrieves the accelerometer data from devices that support
//the Generic Sensor API.
function startSensorAPIAccelerometer() {
navigator.permissions.query({ name: 'accelerometer' })
.then(result => {
if (result.state === 'denied') {
accelerometerNotAllowed();
} else {
document.getElementById("SensorRequestBanner").style.display = "none";
document.getElementById("id").innerHTML = clientId;
let sensor = new Accelerometer();
sensor.addEventListener('reading', function(e) {
accData.x = e.target.x;
accData.y = e.target.y;
accData.z = e.target.z;
});
sensor.start();
setInterval(analyzeData, 1000/samplingFrequency);
slidingWindowAnalyzer.on("statusChanged", function(){
document.getElementById('status').innerHTML = this.status;
});
}
});
}
DynamoDB database
As in the cloud-based version to save the data in the database we need to create an IoT rule. This time the "Rule query statement" should be SELECT * FROM 'EdgeComputing/+'.
For the action select "Insert a message into a DynamoDB table", then create a table named "EdgeHAR" with partition key "id" of type string and sort key "timestamp" of type number.
Like before insert “${topic(2)}” in the "partition key value" field and “${timestamp()}” in “Sort key value”.
Web dashboard
The code for the web dashboard is in the folder "crowd-sensing/edge-based/web-dashboard". Also for this you need to create an identity pool and update the object "AWSConfiguration" at the beginning of the bundle.js file.
For the permissions you need to add the actions “Connect”, “Receive” and “Subscribe” for the "IoT" service and the actions "Scan" and "Query" for “DynamoDB”. For the ARNs in the "IoT" service you need to insert "EdgeHARDashboard-*" as "Client id" and "EdgeComputing/*" as "topic" and "topicfilter". In the “DynamoDB” service insert "EdgeHAR" in the "table" section.
As for the web-app, the code is very similar to the cloud based-version. After having configured the AWS SDK it queries the database to get the list of the smartphones and their status:
//
// Building devices list and initializing current values.
//
//List of devices in the database.
var devicesList = new Set();
//Variable storing the current values of the devices.
var devicesValues = {};
//Variable storing the name of the device we are currently displaying.
var currentDevice = "";
//Parameters for the scan of the database.
var scanParams = {
ExpressionAttributeNames: {
"#id": "id",
},
ProjectionExpression: '#id',
TableName: 'EdgeHAR'
};
//Scans the table EdgeHAR,
//adds the IDs of the devices in devicesList and in the select menu,
//adds the devices latest status to devicesValues.
ddb.scan(scanParams, function(err, data) {
if (err) {
console.log("Error", err);
} else {
data.Items.forEach(function(element) {
devicesList.add(element.id.S);
});
devicesList = Array.from(devicesList);
devicesList.sort();
console.log("success", devicesList);
devicesList.forEach(function(id) {
//Parameters of the query.
var queryParams = {
ExpressionAttributeValues: {
":deivce": {
S: id
}
},
ExpressionAttributeNames: {
"#id": "id"
},
KeyConditionExpression: "#id = :deivce",
TableName: 'EdgeHAR',
Limit: 1,
"ScanIndexForward": false
};
//Queries the data from the selected device.
ddb.query(queryParams, function(err, data) {
if (err) {
console.log("Error", err);
} else {
//Exploiting the fact that the data is already ordered.
var latest = data.Items[0];
devicesValues[id] = latest.payload.M.status.S;
document.getElementById("device-select").innerHTML +=
'<option value="' + id + '">' + id + '</option>';
}
});
});
}
});
Upon receiving a new message it updates the status:
//
// Subscibing to MQTT topic and updating current values.
//
//The topic where the devices publish the sensors data.
var deviceTopic = 'EdgeComputing/+';
//Connect handler: once the MQTT client has successfully connected
//to the MQTT broker it subscribes to the deviceTopic.
function mqttClientConnectHandler() {
console.log('connected to MQTT server');
mqttClient.subscribe(deviceTopic);
console.log("subscribed to", deviceTopic);
};
//Function for updating the div containing the current status of the device.
function updateCurrentValues() {
var status = document.getElementById("device-status");
status.innerHTML = devicesValues[currentDevice];
}
//Message handler: upon receiving a message if it's relative to a new device
//it adds it to the selection menu then it saves its values in the variable
//devicesValues and finally updates the div.
function mqttClientMessageHandler(topic, payload) {
console.log('message: ' + topic + ':' + payload.toString());
if (devicesValues[topic.slice(14)] == undefined) {
devicesList.push(topic.slice(14));
document.getElementById("device-select").innerHTML +=
'<option value="' + topic.slice(14) + '">' + topic.slice(14) + '</option>';
}
devicesValues[topic.slice(14)] = JSON.parse(payload.toString()).status;
if (currentDevice != ""){
updateCurrentValues();
}
};
//Installing the connect and message handlers.
mqttClient.on('connect', mqttClientConnectHandler);
mqttClient.on('message', mqttClientMessageHandler);
And in the past values section it queries the database and displays the data from the last hour:
//
// Past values.
//
//Function that returns the timestamp of one hour ago.
function lastHour() {
var d = new Date();
d.setHours(d.getHours() - 1);
return d.getTime();
}
//Function for updating the chart displaying the data from the past hour.
window.refreshChart = function() {
//Parameters of the query.
var params = {
ExpressionAttributeValues: {
":device": {
S: currentDevice
},
":lastHour": {
N: lastHour().toString()
}
},
ExpressionAttributeNames: {
"#id": "id",
"#time": "timestamp"
},
KeyConditionExpression: "#id = :device and #time >= :lastHour",
TableName: 'EdgeHAR'
};
//Queries the table EdgeHAR retrieving data from the last hour
//for the selected deivce.
ddb.query(params, function(err, data) {
if (err) {
console.log("Error", err);
} else {
console.log("success", data);
var time = [];
var dataset = [];
data.Items.forEach(function(element) {
var d = new Date();
d.setTime(element.timestamp.N);
time.push(d);
if(element.payload.M.status.S=="Moving")
dataset.push(1);
else dataset.push(0);
});
//Draws the chart.
var chart = new Chart(document.getElementById('chart').getContext('2d'), {
type: "line",
data: {
labels: time,
datasets: [{
label: "Status",
data: dataset,
fill: false,
borderColor: "rgb(255, 0, 0)",
steppedLine: true
}]
},
"options": {
responsive: true,
scales: {
xAxes: [{
type: 'time',
ticks: {
source: 'auto'
}
}],
yAxes: [{
ticks: {
stepSize: 1,
callback: function(value, index, values) {
if (value==1) return "Moving";
else return "Resting";
}
}
}]
},
plugins: {
zoom: {
zoom: {
enabled: true,
drag: {
animationDuration: 1000
},
mode: 'x',
speed: 0.05
}
}
}
}
});
}
});
}
If you run the web-app on your smartphone and the dashboard on your pc you should see something like this:
As you have probably noticed, the edge-based approach is way easier and requires to send a lot less messages, so for this example the cloud-based approach may seem overkill. But, this is only true because the model used to process the raw data is very simple. If need it, the cloud allows to do way more complex processing, using for example machine learning algorithms, something that may not be feasible on the edge devices. So in this specific case the edge-based may seem more appropriate but in general it depends on what you are doing, they both have their pros and cons.
DemonstrationCheck this video for a demonstration of the project.
Links
LinkedInprofile:https://www.linkedin.com/in/simone-bartolini-9628561a3
Github repo:https://github.com/51m0n397/IoT2020Course
Youtube video:https://youtu.be/L2newT4ieqA
Comments