[Digi-Key Follow me Issue 3] WiFi-based edge data logger
[Copy link]
This post was last edited by Mrs. Lin on 2023-12-11 12:08
Required Task 1: Use MicroPython system
First, go to the official website of micropython to download the firmware. Just download the generic firmware of C3. Remember to download the firmware file in bin format to facilitate the subsequent installation using esptool.
Next, go to Espressif's official github to download esptool, which is a cmd tool used to flash bin firmware to C3.
After completion, move the previously downloaded bin file to the esptool root directory and rename it to firmware.bin.
Insert XIAO C3 and check the corresponding port number in the device manager. Assume that the port number is COM3.
Finally, you only need to open cmd in the esptool root directory and enter the command esptool.exe --chip esp32c3 --port COM3 --baud 921600 --before default_reset --after hard_reset --no-stub write_flash --flash_mode dio --flash_freq 80m 0x0 firmware.bin to complete the flashing. If you see the following interface, the flashing is successful.
After the flashing is completed, we can write a simple command to see if micropython is running normally. I use Thonny, write a python command in the command window, and you can receive feedback, indicating that micropython is working properly.
Required Task 2: Drive the OLED screen on the expansion board
Here we need a third-party library to drive the OLED. Upload the ssd1306.py file to the root directory of the development board or the lib directory. Here we also put in another library umqttsimple.py required for subsequent tasks. Next, enter our main program file main.py and import all the library files required for the entire project:
import machine
import network
import ntptime
import time
from umqttsimple import MQTTClient
import json
from ssd1306 import SSD1306_I2C
import dht
Then write the following initialization, and you can use the screen. We simply draw two boxes around the screen and display text in the middle to verify the function.
i2c = machine.SoftI2C(sda=machine.Pin(9), scl=machine.Pin(10))
print(i2c.scan())
oled = SSD1306_I2C(128, 64, i2c)
oled.fill(1)
oled.rect(1, 1, 126, 62, 0)
oled.rect(3, 3, 122, 58, 0)
oled.text('Hello World!', 17, 29, 0)
oled.show()
Required Task 3: Control the buzzer to play music
To play music with a buzzer, remember to use a passive buzzer. An active buzzer has a built-in oscillation source and can only emit sounds of a fixed frequency. The oscillation frequency of a passive buzzer comes from the microcontroller, so it can emit sounds of different frequencies. Here we use the built-in PWM method of micropython. First, define the corresponding frequency represented by each pitch, and then encapsulate a function. The function can emit the corresponding frequencies one by one in the order of the string. Next, we write the score in a simplified notation, and finally play it with our own encapsulated function.
Since I am using a breadboard and don't want to run extra wires, I have done a special treatment here. I use the GPIO pins as power and ground:
vcc0 = machine.Pin(3, mode=machine.Pin.OUT, value=1, drive=machine.Pin.DRIVE_3)
gnd0 = machine.Pin(2, mode=machine.Pin.OUT, value=0, drive=machine.Pin.DRIVE_3)
Set the pin to digital output mode and adjust the drive force to the maximum. One pin outputs 1 and the other outputs 0. Then it can be used as VCC and GND in situations where power supply requirements are not high. So far, everything is normal.
def play(pin, melody):
tones = {'1': 262, '2': 294, '3': 330, '4': 349, '5': 392, '6': 440, '7': 494, '-': 0}
beeper = machine.PWM(machine.Pin(pin, machine.Pin.OUT))
for tone in melody:
freq = tones[tone]
if freq:
beeper.init(duty=1000, freq=freq) # 调整PWM的频率,使其发出指定的音调
else:
beeper.duty(0) # 空拍时不上电
# 停顿一下 (四四拍每秒两个音,每个音节中间稍微停顿一下)
time.sleep_ms(400)
beeper.duty(0) # 设备占空比为0,即不上电
time.sleep_ms(100)
beeper.deinit() # 释放PWM
melody = "1155665-4433221-5544332-5544332-1155665-4433221"
vcc0 = machine.Pin(3, mode=machine.Pin.OUT, value=1, drive=machine.Pin.DRIVE_3)
gnd0 = machine.Pin(2, mode=machine.Pin.OUT, value=0, drive=machine.Pin.DRIVE_3)
play(4, melody)
Must-do Task 4: Connect to a WiFi network
Due to some problems of micropython itself, the wifi may be stuck in the situation where it is not connected and is actually not connected, but cannot be connected again. Therefore, some exception handling codes are added to the wifi connection function. After connecting to wifi, relevant information such as ip and mac will be displayed, which is convenient for us to observe that it is connected. Then, in order to verify the wifi function, I used the built-in ntp library of micropython to synchronize the time on the Internet and display the time on the screen.
def wifi():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
try:
sta_if.active(True)
sta_if.disconnect()
sta_if.connect(ssid, password)
except Exception as error:
print(error)
sta_if.disconnect()
sta_if.active(False)
while not sta_if.isconnected():
pass
print('WiFi Connected:', sta_if.ifconfig())
print("")
def ntp_get():
for i in range(10): # 最多尝试获取10次
try:
ntptime.settime() # 获取网络时间
_t=machine.RTC().datetime()
machine.RTC().datetime((_t[0],_t[1],_t[2],_t[3],_t[4]+8,_t[5],_t[6],_t[7]))
print("ntp time(BeiJing): ", machine.RTC().datetime())
return True
except:
print("Can not get time!")
time.sleep_ms(500)
wifi()
ntp_get()
datetime = machine.RTC().datetime()
oled.fill(1)
oled.rect(1, 1, 126, 62, 0)
oled.rect(3, 3, 122, 58, 0)
oled.text('Date:', 5, 6, 0)
oled.text(str(datetime[0]) + "-" + str(datetime[1]) + "-" + str(datetime[2]), 5, 6 + 12, 0)
oled.text('Time:', 5, 33, 0)
oled.text(str(datetime[4]) + ":" + str(datetime[5]) + ":" + str(datetime[6]), 5, 33 + 12, 0)
oled.show()
Required Task 5: Using External Sensors
The external sensor I chose for this project is the DHT11 commonly used by Arduino. This sensor can also work normally under 3.3V. But it should be noted that this sensor is relatively slow and can only read the value once per second. In addition, if the reading value is wrong in micropython, it will directly report an error and terminate the program. In order to avoid the possibility of errors that may cause the entire system to crash, the try method is used here. And if it is a continuous reading, don't forget to make sure that the interval between each reading is not less than 1 second.
d = dht.DHT11(machine.Pin(7))
vcc1 = machine.Pin(21, mode=machine.Pin.OUT, value=1, drive=machine.Pin.DRIVE_3)
gnd1 = machine.Pin(6, mode=machine.Pin.OUT, value=0, drive=machine.Pin.DRIVE_3)
def measure():
d.measure()
oled.fill(1)
oled.rect(1, 1, 126, 62, 0)
oled.rect(3, 3, 122, 58, 0)
oled.text('Temperature:', 5, 6, 0)
oled.text(str(d.temperature()) + " degree", 5, 6 + 12, 0)
oled.text('Humidity:', 5, 33, 0)
oled.text(str(d.humidity()) + " %", 5, 33 + 12, 0)
oled.show()
return d.temperature(), d.humidity()
while True:
try:
measure()
break
except Exception as error:
print(error)
time.sleep(1)
Optional Task 6: Temperature and Humidity Data Logger
In addition to XIAO C3, I also brought two Raspberry Pi Zeros for this project, which I plan to use in the production of data loggers. Because xiao C3 is a sensor module, the working environment is relatively harsh and it is not suitable for data storage and recording. The general practice is that xiao c3, as an edge sensor, is only responsible for data measurement. After the data is measured, it is sent back to the server through the network. The server is equipped with special data recording hardware and is responsible for recording. Here, the two Raspberry Pi Zeros act as central servers. One Raspberry Pi is used as an MQTT server, responsible for sending and receiving data; the other Raspberry Pi, like XIAO C3, is used as an MQTT client, responsible for data recording.
Let's look at the code of XIAO C3 first. The data is sent in the form of JSON string:
def callback(topic, msg):
print((topic, msg))
if topic == topic_sub and msg == b'hello':
print('ESP received hello message')
def connect_and_subscribe():
client = MQTTClient(
client_id = client_id,
server = mqtt_server,
port = port,
user = user,
password = pwd,
keepalive = 10,
lw_topic = topic_availability,
lw_msg = "offline"
)
client.set_callback(callback)
client.connect()
client.subscribe(topic_sub)
client.publish(topic_availability, "online")
print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub))
return client
def restart_and_reconnect():
print('Failed to connect to MQTT broker. Reconnecting...')
machine.reset()
try:
client = connect_and_subscribe()
except:
restart_and_reconnect()
def loop():
time.sleep(1)
client.check_msg()
temperature, humidity = measure()
msg = json.dumps({"temperature" : temperature, "humidity" : humidity})
client.publish(topic_pub, msg)
print("Published: " + msg)
while True:
try:
loop()
except Exception as error:
print(error)
try:
wifi()
client = connect_and_subscribe()
except:
restart_and_reconnect()
Next, we need to build an MQTT server on the first Raspberry Pi. The construction method is relatively simple. First run the following command to complete the installation:
sudo apt update
sudo apt upgrade
sudo apt install mosquitto
sudo echo "listener 1883" >> /etc/mosquitto/mosquitto.conf
sudo echo "allow_anonymous true" >> /etc/mosquitto/mosquitto.conf
sudo systemctl restart mosquitto
Then install the MQTT client on the second Raspberry Pi, subscribe to the corresponding topic, and record the received data in the log file:
sudo apt update
sudo apt upgrade
sudo apt install mosquitto-clients
mosquitto_sub -h 192.168.50.231 -t "sensor/c3/pub" | tee -a log.txt
After receiving a piece of data, we stop at any time and check the log records, and find that all data is recorded normally.
Source code download: https://download.eeworld.com.cn/detail/%E6%9E%97%E5%A4%AA%E5%A4%AA/630254
|