Hardware components | ||||||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
![]() |
| |||||
![]() |
| |||||
![]() |
| |||||
![]() |
| |||||
![]() |
|
For our IoT Assignment 2, my team and I had decided to come up with a security solution for our smart home by implementing different security features such as a Smart Door with two-factor authentication, smart car-plate authentication and live CCTV footage.
At the entrance of the smart garage, the camera will take a snapshot of the car-plate. Next, the garage door will only be opened if the car-plate is authorised. Once the owner has parked his car, he can proceed to enter his smart home. There is also an ongoing live stream that shows real time footage of what happens just outside the garage, to deter any possible intruders.
To enter his smart home, the owner has to key in a 4-pin password into the keypad. The owner has only a limited number of times that he can try or else he will be locked out of the door. When the user hits the number of thresholds, he will receive an email with the new pin. He can then key in the new pin to pass the first authentication process. The owner will also be alerted with a Telegram message informing him that someone had exceeded the number of thresholds.
Next, the facial recognition will begin and the camera will start scanning for faces in the room. The user has only 10 seconds to complete this process. If the facial recognition recognises the owner, it will allow the user to access the smart home. The LCD screen will then welcome and address the owner by his name.
Once the user enters the room, the room will start recording Temperature and these values are recorded to the Amazon Web Service (AWS) database. Later on, the recorded values can be seen via the web-page in the form of a line graph.
Lastly, the LCD screen will flash the current temperature that is detected in the smart home
snapshot.py
Pythonfrom time import gmtime, strftime
from picamera import PiCamera
import json as simplejson
import time
import os
import boto3
def takePhoto(userName):
count = 5
camera = PiCamera()
camera.vflip = True
camera.hflip = True
directory = os.path.dirname(os.path.realpath(__file__)) + '/' + 'face'
if not os.path.exists(directory):
os.makedirs(directory)
print '[+] A photo will be taken in 5 seconds...'
for i in range(count):
print (count - i)
time.sleep(1)
milli = int(round(time.time() * 1000))
image1 = '{0}/image_{1}.jpg'.format(directory, milli)
camera.capture(image1)
print 'Your image was saved to %s' % image1
camera.close()
print('[+] Adding Image to Container')
client = boto3.client('rekognition')
with open(image1, 'rb') as image:
response = client.index_faces(Image={'Bytes': image.read()}, CollectionId='home', ExternalImageId=userName, DetectionAttributes=['ALL'])
print('[+] Finish Uploading Image to Container')
def deletePhoto(userName):
client = boto3.client('rekognition')
print('[+] Deleting Face from Container')
#response = client.delete_faces(CollectionId='home', FaceIds=[args.id])
response = client.delete_faces(CollectionId='home', FaceIds=['d3c239cd-b279-4506-b6f2-509465b8733d'])
print('[+] Finish Deleting Image from Container')
aws_keypad.py
Pythonfrom AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from boto3.dynamodb.conditions import Key, Attr
from rpi_lcd import LCD
from time import sleep
from picamera import PiCamera
import RPi.GPIO as GPIO
import time
import random
import boto3
import telepot
###################################################################################################
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
def telegrambot(): # Telegram Bot to receive image from RaspberryPi
camera = PiCamera()
print("Telegram Bot")
my_bot_token = ''
bot = telepot.Bot(my_bot_token)
camera.capture('/home/pi/Desktop/Assignment2/image1.jpg')
bot.sendPhoto(chat_id=414346130, photo=open('/home/pi/Desktop/Assignment2/image1.jpg', 'rb'))
print('Capture Intruder Image')
camera.close()
print("End of Telegram Bot")
host = ""
rootCAPath = "certificates/rootca.pem"
certificatePath = "certificates/certificate.pem.crt"
privateKeyPath = "certificates/private.pem.key"
print("-----------------------------------------------------------")
print("[+] First Authentication: KeyPad")
my_rpi = AWSIoTMQTTClient("basicPubSub")
my_rpi.configureEndpoint(host, 8883)
my_rpi.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
my_rpi.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
my_rpi.configureDrainingFrequency(2) # Draining: 2 Hz
my_rpi.configureConnectDisconnectTimeout(10) # 10 sec
my_rpi.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
my_rpi.connect()
my_rpi.subscribe("ush/keypad/pin", 1, customCallback)
sleep(2)
###############################################################################################
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_pin')
#opendoor = 0
response = table.query(
KeyConditionExpression=Key('deviceid').eq('deviceid_bryantay'))
for i in response['Items']:
opendoor = i['pin']
dbThreshold = i['threshhold']
print("[+] Pin fom Database is " + str(opendoor))
print("[+] Threshold is " + dbThreshold)
#################################################################################################
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
MATRIX = [[1,2,3,'A'],
[4,5,6,'B'],
[7,8,9,'C'],
['*',0,'#','D']]
ROW = [7,11,13,15]
COL = [12,16,18,22]
lcd = LCD()
threshhold = 0
keypress = ""
feature = False
for j in range(4):
GPIO.setup(COL[j], GPIO.OUT)
GPIO.output(COL[j], 1)
for i in range(4):
GPIO.setup(ROW[i], GPIO.IN, pull_up_down = GPIO.PUD_UP)
print("[+] KeyPad Program Starting:")
try:
while(True):
for j in range(4):
GPIO.output(COL[j],0)
lcd.text('4-Pin Password', 1)
for i in range(4):
if GPIO.input(ROW[i]) == 0:
print MATRIX[i][j]
keypress = keypress + str(MATRIX[i][j])
lcd.text(str(keypress), 2)
if len(keypress) == 4:
if keypress == str(opendoor): ## Authenticated
print("<Correct 4-Pin Password>")
lcd.clear()
lcd.text('Access Granted', 1)
time.sleep(3)
lcd.clear()
## Facial Recognition
import aws_faceRecognition # Hop on into next file
else: # Error
print("<Incorrect 4-Pin Password>")
lcd.text('Wrong Pin.', 1)
lcd.text('Please try again.',2)
time.sleep(3)
lcd.clear()
keypress ="" ## reset string
threshhold+=1
print("number of threshhold " + str(threshhold))
print("number of database threshhold " + str(dbThreshold))
if str(threshhold) == str(dbThreshold):
print("[-] Maximum Number of Threshhold hits.")
telegrambot()
lcd.text("WARNING!",1)
lcd.text("DOOR LOCKED",2)
time.sleep(3)
# Generate 4-pin Nnumber
newpin = random.randint(1111,9999)
print("[+] New Pin is " + str(newpin))
lcd.text("PIN RESET", 1)
lcd.text("Key in new Pin", 2)
time.sleep(5)
lcd.clear()
# Update into Database
table.update_item(
Key={
'deviceid': 'deviceid_bryantay'
},
UpdateExpression='SET pin = :val1',
ExpressionAttributeValues={
':val1': newpin
}
)
opendoor = newpin
##stringSent = 'The New Pin is ' + opendoor
## Sent Pin to the person's phone number
my_rpi.publish("ush/keypad/pin", opendoor, 1)
print("[+] Sending OTP to Email")
while(GPIO.input(ROW[i]) == 0):
pass
GPIO.output(COL[j],1)
except:
GPIO.cleanup()
aws_temperature.py
Python# Import SDK packages
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
from time import sleep,gmtime, strftime
import Adafruit_DHT
import json
import datetime as datetime
from rpi_lcd import LCD
pin = 26
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
host = ""
rootCAPath = "certificates/rootca.pem"
certificatePath = "certificates/certificate.pem.crt"
privateKeyPath = "certificates/private.pem.key"
my_rpi = AWSIoTMQTTClient("basicPubSub")
my_rpi.configureEndpoint(host, 8883)
my_rpi.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
my_rpi.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
my_rpi.configureDrainingFrequency(2) # Draining: 2 Hz
my_rpi.configureConnectDisconnectTimeout(10) # 10 sec
my_rpi.configureMQTTOperationTimeout(5) # 5 sec
print("-----------------------------------------------------------")
print("[+] Smart Home: Recording Temperature")
# Connect and subscribe to AWS IoT
my_rpi.connect()
my_rpi.subscribe("ush/sensors/temperature", 1, customCallback)
sleep(2)
lcd = LCD()
lcd.text('Smart Home v2 ', 1)
# Publish to the same topic in a loop forever
while True:
message = {}
humidity, temperature = Adafruit_DHT.read_retry(11, pin)
print("[+] Temperature recorded " + str(temperature))
now = datetime.datetime.now()
lcd.text('Temperature :', 1)
lcd.text(str(temperature), 2)
time.sleep(2)
message["deviceid"] = 'deviceID_bryan'
message["datetimeid"] = now.isoformat()
message["temperature"] = temperature
my_rpi.publish("ush/sensors/temperature", json.dumps(message), 1)
sleep(5)
import RPi.GPIO as GPIO
import numpy as np
import cv2
from openalpr import Alpr
import time
import json
from rpi_lcd import LCD
import boto3
from boto3.dynamodb.conditions import Key, Attr
GPIO.setmode(GPIO.BCM)
GPIO.setup(26, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Import SDK packages
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from time import sleep
#intialise LCD
lcd = LCD()
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
host = "certificates/"
rootCAPath = "certificates/rootca.pem"
certificatePath = "certificates/certificate.pem.crt"
privateKeyPath = "certificates/private.pem.key"
my_rpi = AWSIoTMQTTClient("basicPubSub")
my_rpi.configureEndpoint(host, 8883)
my_rpi.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
my_rpi.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
my_rpi.configureDrainingFrequency(2) # Draining: 2 Hz
my_rpi.configureConnectDisconnectTimeout(10) # 10 sec
my_rpi.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
my_rpi.connect()
my_rpi.subscribe("ush/carplate", 1, customCallback)
sleep(1)
alpr = Alpr("sg", "openalpr.conf", "runtime_data")
if not alpr.is_loaded():
print("Error loading OpenALPR")
sys.exit(1)
alpr.set_top_n(1)
alpr.set_default_region("sg")
from picamera import PiCamera
from time import sleep
camera = PiCamera()
camera.resolution = (800, 600)
message = {}
def is_authorise_carplate(carplate):
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_authorise_carplate')
response = table.scan()
items = response['Items']
for i in response['Items']:
if(i['carplate'] == carplate):
lcd.text('Welcome '+carplate,1)
return "Yes"
lcd.text('Unauthorised',1)
lcd.text('License plate',2)
sleep(2)
return "No"
while True:
#intialise last confidence variable
last_confidence = 0
button = GPIO.input(26)
if button:
print("Alerted")
sleep(3)
camera.capture('img.jpg')
#my_rpi.publish("sensors/test", str("test"), 1)
results = alpr.recognize_file("img.jpg")
i = 0
for plate in results['results']:
i += 1
print("Plate #%d" % i)
print(" %12s %12s" % ("Plate", "Confidence"))
for candidate in plate['candidates']:
prefix = "-"
if candidate['matches_template']:
prefix = "*"
print(" %s %12s%12f" % (prefix, candidate['plate'], candidate['confidence']))
if candidate['confidence'] > last_confidence:
message["carplate"] = candidate['plate']
last_confidence = candidate['confidence']
else:
break;
# check whether there is even a lisence plate being detect
if last_confidence is not 0:
message["timestamp"] = time.strftime("%d-%b-%Y %H:%M:%S")
message["authorisation"] = is_authorise_carplate(message["carplate"])
my_rpi.publish("ush/carplate", json.dumps(message), 1)
else:
lcd.text("No license plate detected",1)
print ("no license plate detected")
sleep(4)
lcd.clear()
alpr.unload()
; Specify the path to the runtime data directory
runtime_dir = runtime_data
ocr_img_size_percent = 1.33333333
state_id_img_size_percent = 2.0
; Calibrating your camera improves detection accuracy in cases where vehicle plates are captured at a steep angle
; Use the openalpr-utils-calibrate utility to calibrate your fixed camera to adjust for an angle
; Once done, update the prewarp config with the values obtained from the tool
prewarp =
; detection will ignore plates that are too large. This is a good efficiency technique to use if the
; plates are going to be a fixed distance away from the camera (e.g., you will never see plates that fill
; up the entire image
max_plate_width_percent = 100
max_plate_height_percent = 100
; detection_iteration_increase is the percentage that the LBP frame increases each iteration.
; It must be greater than 1.0. A value of 1.01 means increase by 1%, 1.10 increases it by 10% each time.
; So a 1% increase would be ~10x slower than 10% to process, but it has a higher chance of landing
; directly on the plate and getting a strong detection
detection_iteration_increase = 1.1
; The minimum detection strength determines how sure the detection algorithm must be before signaling that
; a plate region exists. Technically this corresponds to LBP nearest neighbors (e.g., how many detections
; are clustered around the same area). For example, 2 = very lenient, 9 = very strict.
detection_strictness = 3
; The detection doesn't necessarily need an extremely high resolution image in order to detect plates
; Using a smaller input image should still find the plates and will do it faster
; Tweaking the max_detection_input values will resize the input image if it is larger than these sizes
; max_detection_input_width/height are specified in pixels
max_detection_input_width = 1280
max_detection_input_height = 720
; detector is the technique used to find license plate regions in an image. Value can be set to
; lbpcpu - default LBP-based detector uses the system CPU
; lbpgpu - LBP-based detector that uses Nvidia GPU to increase recognition speed.
; lbpopencl - LBP-based detector that uses OpenCL GPU to increase recognition speed. Requires OpenCV 3.0
; morphcpu - Experimental detector that detects white rectangles in an image. Does not require training.
detector = lbpcpu
; If set to true, all results must match a postprocess text pattern if a pattern is available.
; If not, the result is disqualified.
must_match_pattern = 0
; Bypasses plate detection. If this is set to 1, the library assumes that each region provided is a likely plate area.
skip_detection = 0
; Specifies the full path to an image file that constrains the detection area. Only the plate regions allowed through the mask
; will be analyzed. The mask image must match the resolution of your image to be analyzed. The mask is black and white.
; Black areas will be ignored, white areas will be searched. An empty value means no mask (scan the entire image)
detection_mask_image =
; OpenALPR can scan the same image multiple times with different randomization. Setting this to a value larger than
; 1 may increase accuracy, but will increase processing time linearly (e.g., analysis_count = 3 is 3x slower)
analysis_count = 1
; OpenALPR detects high-contrast plate crops and uses an alternative edge detection technique. Setting this to 0.0
; would classify ALL images as high-contrast, setting it to 1.0 would classify no images as high-contrast.
contrast_detection_threshold = 0.3
max_plate_angle_degrees = 15
ocr_min_font_point = 6
; Minimum OCR confidence percent to consider.
postprocess_min_confidence = 65
; Any OCR character lower than this will also add an equally likely
; chance that the character is incorrect and will be skipped. Value is a confidence percent
postprocess_confidence_skip_level = 80
debug_general = 0
debug_timing = 0
debug_detector = 0
debug_prewarp = 0
debug_state_id = 0
debug_plate_lines = 0
debug_plate_corners = 0
debug_char_segment = 0
debug_char_analysis = 0
debug_color_filter = 0
debug_ocr = 0
debug_postprocess = 0
debug_show_images = 0
debug_pause_on_frame = 0
from flask import Flask,render_template, jsonify, redirect, url_for, request,session,Response
from snapshot import takePhoto, deletePhoto ## Bryan
from functools import wraps
import string
import random
import sys
import logging
import os
import boto3
import time
from boto3.dynamodb.conditions import Key, Attr
import dynamodb
import jsonconverter as jsonc
from camera_pi import Camera
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
# generate random session key
def rng_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
app.secret_key = rng_generator()
#Prevent force browsing
def login_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if 'logged_in' in session:
return f(*args, **kwargs)
else:
return redirect(url_for('login'))
return wrap
#Login
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
try:
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_login')
response = table.scan()
items = response['Items']
if request.method == 'POST':
for i in items:
if request.form['password'] == i['password'] and request.form['username'] == i['username']:
session['logged_in'] = True
return redirect(url_for('homepage'))
else:
error = 'wrong username or password'
return redirect(url_for('login'),error=error)
except Exception as e:
print(e)
return render_template('login.html', error=error)
# Logout
@app.route('/logout')
@login_required
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
@app.route('/changepassword',methods=['GET','POST'])
@login_required
def changepw():
try:
formpassword = request.form['password']
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_login')
table.update_item(
Key={
'username': 'admin'
},
UpdateExpression='SET password = :val1',
ExpressionAttributeValues={
':val1': formpassword
}
)
session.pop('logged_in',None)
return redirect(url_for('login'))
except Exception as e:
print(e)
return render_template('changepassword.html')
@app.route('/settings')
@login_required
def settings():
return render_template('settings.html')
@app.route('/garage')
@login_required
def garage():
data = []
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_carplate')
response = table.scan()
items = response['Items']
for i in response['Items']:
data.append(tuple(i.values()))
return render_template('garage.html',data=data)
#live camera stream for garage
def gen(camera):
"""Video streaming generator function."""
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
@login_required
def video_feed():
"""Video streaming route. Put this in the src attribute of an img tag."""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/lisenceplate')
@login_required
def lisenceplate():
data = []
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_authorise_carplate')
response = table.scan()
items = response['Items']
for i in response['Items']:
data.append(tuple(i.values()))
return render_template('lisenceplate.html',data=data)
#insert, update and update authorisation lp table
@app.route('/lisenceplate', methods=['GET', 'POST'])
@login_required
def insertupdatelp():
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_authorise_carplate')
formcurrentlisenceplate = request.form['currentLP']
formlisenceplate = request.form['insertLP']
formdeletelisenceplate = request.form['deleteLP']
if formdeletelisenceplate != "null":
table.delete_item(
Key={
'carplate': formdeletelisenceplate
},
)
if formcurrentlisenceplate != "null":
table.delete_item(
Key={
'carplate': formcurrentlisenceplate
},
)
if formlisenceplate != "null" or formcurrentlisenceplate != "null":
table.update_item(
Key={
'carplate': formlisenceplate
},
UpdateExpression='SET authorisedate = :val1',
ExpressionAttributeValues={
':val1': time.strftime("%d-%b-%Y %H:%M:%S")
}
)
return redirect(url_for('lisenceplate'))
@app.route("/homepage")
@login_required
def homepage():
return render_template("homepage.html")
@app.route('/homepage', methods=['POST','GET'])
@login_required
def index():
if request.method == 'POST' or request.method == 'GET':
try:
data = {'chart_data': jsonc.data_to_json(dynamodb.get_data_from_dynamodb()),
'title': "IOT Data"}
print data
return jsonify(data)
except:
import sys
print 'error'
print(sys.exc_info()[0])
print(sys.exc_info()[1])
@app.route('/changeKeypad')
@login_required
def keypadpassword():
return render_template('changekeypad.html')
@app.route("/changeKeypad",methods=['GET','POST'])
@login_required
def modifyKeypad():
keypadpassword = request.form['password']
try:
## Update Database
import boto3
#dynamodb = boto3.resource('dynamodb')
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_pin')
table.update_item(
Key={
'deviceid': 'deviceid_bryantay'
},
UpdateExpression='SET pin = :val1',
ExpressionAttributeValues={
':val1': keypadpassword
}
)
return(redirect(url_for('settings')))
except Exception as e:
print(e)
@app.route('/changeThreshold')
@login_required
def Threshold():
return render_template('changethreshold.html')
@app.route("/changeThreshold",methods=['GET','POST'])
@login_required
def modifyThreshold():
threshold = request.form['password']
try:
## Update Database
import boto3
#dynamodb = boto3.resource('dynamodb')
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_pin')
table.update_item(
Key={
'deviceid': 'deviceid_bryantay'
},
UpdateExpression='SET threshhold = :val1',
ExpressionAttributeValues={
':val1': threshold
}
)
return(redirect(url_for('settings')))
except Exception as e:
print(e)
@app.route('/registerFace')
@login_required
def registerFace():
return render_template('registerFace.html')
@app.route("/registerFace",methods=['GET','POST'])
@login_required
def registeringFace():
userName = request.form['username']
takePhoto(userName)
return(redirect(url_for('settings')))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0',threaded=True)
import time
import io
import threading
import picamera
class Camera(object):
thread = None # background thread that reads frames from camera
frame = None # current frame is stored here by background thread
last_access = 0 # time of last client access to the camera
def initialize(self):
if Camera.thread is None:
# start background frame thread
Camera.thread = threading.Thread(target=self._thread)
Camera.thread.start()
# wait until frames start to be available
while self.frame is None:
time.sleep(0)
def get_frame(self):
Camera.last_access = time.time()
self.initialize()
return self.frame
@classmethod
def _thread(cls):
with picamera.PiCamera() as camera:
# camera setup
camera.resolution = (800, 600)
camera.hflip = True
camera.vflip = True
# let camera warm up
camera.start_preview()
time.sleep(2)
stream = io.BytesIO()
for foo in camera.capture_continuous(stream, 'jpeg',
use_video_port=True):
# store frame
stream.seek(0)
cls.frame = stream.read()
# reset stream for next frame
stream.seek(0)
stream.truncate()
# if there hasn't been any clients asking for frames in
# the last 10 seconds stop the thread
if time.time() - cls.last_access > 10:
break
cls.thread = None
def get_data_from_dynamodb():
try:
import boto3
from boto3.dynamodb.conditions import Key, Attr
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('ush_temp')
startdate = '2019-02-92T'
response = table.query(
KeyConditionExpression=Key('deviceid').eq('deviceID_bryan')
)
items = response['Items']
n=5 # limit to last 10 items
data = items[:n]
data_reversed = data[::-1]
return data_reversed
except:
import sys
print(sys.exc_info()[0])
print(sys.exc_info()[1])
if __name__ == "__main__":
query_data_from_dynamodb()
#!/usr/bin/env python
from rpi_lcd import LCD
from picamera import PiCamera
import time
import os
import boto3 as b3
from argparse import ArgumentParser
from time import gmtime, strftime
###################################################################################
print("-----------------------------------------------------------")
print("[+] Second Authentication: Facial Recognition")
lcd = LCD()
## Take a Photo
print("[+] Starting to take a photo..")
count = 5
camera = PiCamera()
camera.vflip = True
camera.hflip = True
directory = '/home/pi/Desktop/Assignment2/pi-detector/faces'
if not os.path.exists(directory):
os.makedirs(directory)
print '[+] A photo will be taken in 5 seconds...'
lcd.text('Scanning for ', 1)
lcd.text('nearby faces...', 2)
time.sleep(5)
##image = '{0}/image_{1}.jpg'.format(directory, milli)
image = '{0}/image.jpg'.format(directory)
camera.capture(image)
print 'Your image was saved to %s' % image
camera.close()
###################################################################################
def get_client():
return b3.client('rekognition')
def check_face(client, file):
face_detected = False
with open(file, 'rb') as image:
response = client.detect_faces(Image={'Bytes': image.read()})
if (not response['FaceDetails']):
face_detected = False
else:
face_detected = True
return face_detected, response
def check_matches(client, file, collection):
face_matches = False
with open(file, 'rb') as image:
response = client.search_faces_by_image(CollectionId=collection, Image={'Bytes': image.read()}, MaxFaces=1, FaceMatchThreshold=85)
if (not response['FaceMatches']):
face_matches = False
else:
face_matches = True
return face_matches, response
client = get_client()
capturedImage = "/home/pi/Desktop/Assignment2/pi-detector/faces/image.jpg"
collection = "home"
print '[+] Running face checks against image...'
result, resp = check_face(client, capturedImage)
## Checks Recognition
if (result):
print '[+] Face(s) detected with %r confidence...' % (round(resp['FaceDetails'][0]['Confidence'], 2))
print '[+] Checking for a face match...'
resu, res = check_matches(client, capturedImage, collection)
if (resu):
print '[+] Identity matched %s with %r similarity and %r confidence...' % (res['FaceMatches'][0]['Face']['ExternalImageId'], round(res['FaceMatches'][0]['Similarity'], 1), round(res['FaceMatches'][0]['Face']['Confidence'], 2))
lcd.text('Door Opened!', 1)
lcd.text('Welcome, '+ res['FaceMatches'][0]['Face']['ExternalImageId'] , 2)
time.sleep(5)
lcd.clear()
import aws_temperature
else:
print '[-] No face matches detected...'
lcd.text('Access Denied!', 1)
lcd.text('No face matches detected...', 2)
time.sleep(3)
lcd.clear()
exit()
else:
print "[-] No faces detected..."
lcd.text('Access Denied!', 1)
lcd.text('No face detected...', 2)
time.sleep(3)
lcd.clear()
exit()
from decimal import Decimal
import json
import datetime
import numpy
class GenericEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, numpy.generic):
return numpy.asscalar(obj)
elif isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, Decimal):
return float(obj)
else:
return json.JSONEncoder.default(self, obj)
def data_to_json(data):
json_data = json.dumps(data,cls=GenericEncoder)
print(json_data)
return json_data
Comments