【2024 DigiKey Innovation Contest】Pet monitoring alarm device based on AI image recognition
[Copy link]
This post was last edited by 土豆12 on 2024-10-20 21:01
I. Introduction
I have a problem when raising a dog, that is, the dog needs exercise and cannot be locked in a cage all the time. But there are many areas in the house where I don't want it to enter, but it often runs in without being watched. Therefore, I plan to design an alarm device, which can be deployed in some areas where pets are not allowed to enter, such as the kitchen or balcony. When the pet enters the area and is captured by the camera, the ESP32-S3 module placed in other areas plays a sound, and the ESP32-C6 module is used to control the light to flash, so as to attract the pet to leave the area.
2. System Block Diagram
The technical implementation of the project can be divided into the following parts.
1. IoT hub platform, I plan to use the MQTT server deployed on the Raspberry Pi zero2W.
2, followed by ESP32-S3 and ESP32-C6, and of course ESP32-CAMERA used as a camera module. The code can be written using Arduino and circuitpython.
3. AI image recognition is planned to be implemented using Yolov8, and the code is also deployed on the Raspberry Pi zero2W.
4. The recognition result is sent to the MQTT server and then broadcast to all other nodes through the MQTT server.
5. If a pet is found, the ESP32-S3 module placed in another area plays a sound, and the ESP32-C6 module controls the light to flash, so as to attract the pet to leave the area.
3. Functional description of each part
First install the system on the Raspberry Pi. You can refer to the official tutorial of the Raspberry Pi for installation. It is very simple and I will not go into details here. Then install Docker:
# Add Docker's official GPG key:
sudo apt-get update
sudo apt-get install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
# Add the repository to Apt sources:
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
接着使用docker compose来部署服务。Compose文件内容如下:
version: '3'
services:
mosquitto:
image: eclipse-mosquitto
container_name: mosquitto
ports:
- 1883:1883
- 9001:9001
restart: unless-stopped
This completes the deployment of the service. Now let's start writing the code for each node. Let's start with signal input. I use the ESP32-CAMERA module and can directly use the official routine.
The following is the image processing code, written in Python. If a pet appears in the field of view, a message will be reported to the MQTT server:
from ultralytics import YOLO
from ha_mqtt_discoverable import Settings
from ha_mqtt_discoverable.sensors import BinarySensor, BinarySensorInfo
CAM_URL = "http://192.168.1.102:81/stream"
MQTT_USERNAME = ""
MQTT_PASSWORD = ""
mqtt_settings = Settings.MQTT(
host="localhost", username=MQTT_USERNAME, password=MQTT_PASSWORD
)
sensor_info = BinarySensorInfo(name="dog", off_delay=3)
mysensor = BinarySensor(Settings(mqtt=mqtt_settings, entity=sensor_info))
model = YOLO("yolov8n.pt")
results = model.predict(CAM_URL, stream=True, show=False, conf=0.5)
# loop
for result in results:
for box in result.boxes:
class_id = result.names[box.cls[0].item()]
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
confi = round(box.conf[0].item(), 2)
print("Object type:", class_id)
print("Coordinates:", cords)
print("Probability:", confi)
print("---")
if class_id == "dog":
mysensor.on()
In this way, once the camera detects the presence of a dog, it will immediately send it to the MQTT server.
Let's take a look at the ESP32-S3 and ESP32-C6 nodes. I installed them on a breadboard for easy wiring. The wiring method for the ESP32-S3 and SPK2 modules is:
IO4 - BCLK
IO6 - WS
IO5 - DATA
For the ESP32-C6 part I connected an LED to IO9.
First, let's write the code for ESP32-S3. This part is completed using circuitpython. It plays a sound to attract the dog's attention when receiving the dog discovery message published by the MQTT server.
Before you start writing code, you must first write the settings.toml file so that you can use the network correctly and connect to the MQTT server.
CIRCUITPY_WIFI_SSID=" "
CIRCUITPY_WIFI_PASSWORD=" "
MQTT_BROKER = ""
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASSWORD = ""
Here is the program code:
import audiocore
import board
import audiobusio
import os
import wifi
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import json
mqtt_topic = "hmd/binary_sensor/dog/state"
state = 0
# Define callback methods which are called when events occur
def connect(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
def disconnect(client, userdata, rc):
# This method is called when the client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(client, userdata, topic, pid):
# This method is called when the client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
def message(client, topic, message):
global state
# This method is called when a topic the client is subscribed to
# has a new message.
print(f"New message on topic {topic}: {message}")
if message == "on":
state = 1
mqtt_client = MQTT.MQTT(
broker=os.getenv("MQTT_BROKER"),
port=os.getenv("MQTT_PORT"),
username=os.getenv("MQTT_USER"),
password=os.getenv("MQTT_PASSWORD"),
is_ssl=False,
socket_pool=socketpool.SocketPool(wifi.radio),
ssl_context=ssl.create_default_context(),
)
# Connect callback handlers to client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()
print("Subscribing to %s" % mqtt_topic)
mqtt_client.subscribe(mqtt_topic)
wave = audiocore.WaveFile("music3.wav")
audio = audiobusio.I2SOut(board.IO4, board.IO6, board.IO5)
while True:
mqtt_client.loop(1)
if state:
audio.play(wave, loop=False)
state = 0
Finally, here is the ESP32-C6 code. The function of this code is to flash the light to attract the dog's attention when receiving the dog discovery message published by the MQTT server. This part is also written in circuitpython.
import os
import wifi
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import json
import digitalio
import board
import time
mqtt_topic = "hmd/binary_sensor/dog/state"
led = digitalio.DigitalInOut(board.IO9)
led.direction = digitalio.Direction.OUTPUT
# Define callback methods which are called when events occur
def connect(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
def disconnect(client, userdata, rc):
# This method is called when the client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(client, userdata, topic, pid):
# This method is called when the client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
def message(client, topic, message):
# This method is called when a topic the client is subscribed to
# has a new message.
print(f"New message on topic {topic}: {message}")
if message == "on":
led.value = True
mqtt_client = MQTT.MQTT(
broker=os.getenv("MQTT_BROKER"),
port=os.getenv("MQTT_PORT"),
username=os.getenv("MQTT_USER"),
password=os.getenv("MQTT_PASSWORD"),
is_ssl=False,
socket_pool=socketpool.SocketPool(wifi.radio),
ssl_context=ssl.create_default_context(),
)
# Connect callback handlers to client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()
print("Subscribing to %s" % mqtt_topic)
mqtt_client.subscribe(mqtt_topic)
# print("Publishing to %s" % mqtt_topic)d
# mqtt_client.publish(mqtt_topic, "Hello Broker!")
# print("Unsubscribing from %s" % mqtt_topic)
# mqtt_client.unsubscribe(mqtt_topic)
# print("Disconnecting from %s" % mqtt_client.broker)
# mqtt_client.disconnect()
while True:
led.value = False
mqtt_client.loop(1)
4. Source Code
5. Demonstration video of the work’s functions
10月20日
6. Project Summary
Through this project, I tried to coordinate multiple devices through MQTT and experienced the unshakable power of MQTT in the Internet of Things. The word version of the report is attached below:
|