684 views|5 replies

6

Posts

2

Resources
The OP
 

[Digi-Key Follow me Issue 2] Pomodoro Calendar Clock [Copy link]

 

WechatIMG9.jpg (808.29 KB, downloads: 0)

WechatIMG9.jpg

WechatIMG8.jpg (1.18 MB, downloads: 0)

WechatIMG8.jpg

WechatIMG10.jpg (744.27 KB, downloads: 0)

WechatIMG10.jpg
This post is from DigiKey Technology Zone

Latest reply

Thank you for your hard work. Thank you for sharing such good technology.   Details Published on 2023-11-18 21:23
 
 

6

Posts

2

Resources
From 2
 
This post was last edited by ltpop on 2023-12-28 16:46

1. Introduction Video


2. Project Summary

I am a smart home enthusiast. I have played with HomeAssistant, ESPHome, Tasmota and other platforms, and used ESP8266 and ESP32 to DIY my own devices. Usually I only use existing platforms for configuration and rarely write hardware code directly. I am very grateful to DigiKey and EEWORLD for the Follow me event, which gave me the opportunity to play with the underlying code of smart devices.

Like most of my classmates, I chose CircuitPython as the development platform and language. The community resources are very rich, the development threshold is low, and debugging is convenient, which is very suitable for novices like me. In order to meet the requirements of the activity and realize a complete function, I chose to make a calendar + weather + Pomodoro function. This function can be used in daily work and study to improve efficiency by using the Pomodoro Technique.

This is my first time to use CircuitPython. I learned CircuitPython while completing the project. My approach is to first determine the project goal, that is, the minimum functionality that needs to be implemented. Then, for each function, refer to the official examples and learn the usage of related libraries, and debug the code at the same time. After each function code passes the test, merge it into the final complete code.

There are many functional points in the project. In addition to the WIFI configuration, network request, Chinese display, and Neopixel LED control required by the activity, it also includes the use of screen control, coroutines, and touch sensors. The difficulty lies in screen control and coroutines. As a CircuitPython novice, the project code currently completed only implements the functions, and there is still a lot of room for optimization. I will continue to explore and practice in the future, and everyone is welcome to communicate with me.

Project Objectives:

  1. Complete the activity requirements
    1. Network data request
    2. Chinese display
    3. Neopixel Control
  2. To achieve the tomato calendar function:
    1. Date and time display, and display lunar calendar
    2. Weather display
    3. Pomodoro control function
      1. You can set the length of the Pomodoro
      2. Show Pomodoro Countdown
      3. After completion, LED prompts and confirms
      4. Tomato Count
  3. Implementation
    1. WIFI connection to the Internet
    2. Get the date and time from the NTP service
    3. Get lunar date from OpenCalendar API
    4. Get local weather from the Open Weather API
    5. Use coroutine functions and shared variables to update data and control screen display
    6. Use the ESP32S3's touch sensor for input control to start and confirm the Pomodoro
    7. Use different colors and states of Neopixel LED to indicate the status of the device, such as network status, Pomodoro running, confirmation stage, etc.
  4. Project Finished Product Display
  5. Completed post: [DigiKey Follow me Issue 2] Pomodoro Calendar Clock - DigiKey Technology Zone - Electronic Engineering World - Forum (eeworld.com.cn)

Event Mission

Follow me Issue 2! Unlock the superpowers of development boards with Digi-Key! (eeworld.com.cn)

Task 1: Control the screen to display Chinese

Complete the screen control and display Chinese

Task 2: Network Function Usage

Complete the use of network functions, be able to create hotspots and connect to WiFi

Task 3: Controlling the WS2812B

Use buttons to control the display and color switching of the onboard Neopixel LED

Task 4: Subtask 1: Calendar & Clock

Complete a perpetual calendar clock that can be updated via the Internet and display local weather information

Project Materials

ESP32S3 TFT Overview | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System

Environment Preparation

Installing CircuitPython

Double-click the reset button and copy the .uf2 file to the FTHRS2BOOT disk. It will automatically restart and complete the installation.

Use Mu editor or VSCode+ plug-in to connect to the development board and enter code development. Both are very convenient for debugging in REPL mode. Mu connection is more stable, and VSCode code writing is more powerful.

The development process is generally to find similar official examples, debug the code snippet in REPL mode, and write it into the code.py file after the test is successful for a complete test.

In the normal model, after modifying the content of the code.py file, the development board will automatically restart and load the code. However, if you need to manually reset the development board in REPL mode, in addition to pressing the RST button on the board, you can also use Ctrl+D or execute the following code to reset it.

import microcontroller;
microcontroller.reset();

Install Lib

Some of the libraries used in the code are already built-in and can be imported directly. Otherwise, you need to install them manually. The official Lib link is Libraries (circuitpython.org) or

链接已隐藏,如需查看请登录或者注册
. You can download the package corresponding to the system version for installation, such as
链接已隐藏,如需查看请登录或者注册
.

The installation method is very simple. After decompression, find the corresponding .mpy file in the lib directory and copy it to the lib directory in the CIRCUITPY disk. If you need the source code, you can also download the original py file.

Function point implementation method

Chinese font display (Task 1)

Official example: Introduction — Adafruit Bitmap_Font Library 1.0 documentation (circuitpython.org)

Requires additional lib: adafruit_display_text, adafruit_bitmap_font

The key to displaying Chinese characters is to find Chinese fonts. I used the open source fonts here:

链接已隐藏,如需查看请登录或者注册
based on the code of netizens . Due to the limited space on the development board, I chose wenquanyi_13px.pcf of appropriate size and saved it in the font directory of CIRCUITPY disk.

Core code

import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font

display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)

group = displayio.Group()
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)

display.root_group = group

The lunar calendar and weather information in the project can display Chinese content normally, as shown in the following figure (upper middle and lower left corner)

Connecting WIF (Task 2)

This is relatively simple and does not require additional lib. But it is better to retry the connection multiple times to avoid connection failure during power-on.

Core code

import os
import wifi
import time

# 连接wifi
# 事先在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>

state = wifi.radio.connected
failure_count = 0
while not wifi.radio.connected:
    try:
        wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
    except OSError as error:
        print("Failed to connect, retrying\n", error)
        failure_count += 1
    time.sleep(5)

The project can be displayed differently under different network conditions. If it is not connected to the Internet, it will be displayed as follows (the lower left corner reminds you that you are not connected to WIFI)

Controlling the WS2812B (Task 3)

Official examples: Adafruit CircuitPython NeoPixel — Adafruit CircuitPython NeoPixel Library 1.0 documentation

Requires additional Lib: neopixel, adafruit_led_animation

The use of NeoPixel LED is very flexible, and there are many official special effect libraries that can achieve various effects. Different colors are used in the project to indicate the current state of the development board, such as the Pomodoro timer is displayed as a constant red, the Pomodoro countdown confirmation is displayed as a cyan blue flash, and so on.

Core code

import board
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink

pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)

# 红色常亮
pixels.fill((255, 0, 0))
pixels.show()

# 显示为青蓝色闪烁
blink = Blink(pixels, 0.5, color.CYAN)
blink.animate()

During the tomato confirmation in the project, the LED light flashes blue to remind the user to confirm the tomato.

Get date and time (Task 4)

Official example:

链接已隐藏,如需查看请登录或者注册

Requires additional Lib: adafruit_ntp, adafruit_requests

Because the development board is not connected to the clock module, it is necessary to obtain the time from the NTP server. The official library is available for calling. For stability, the domestic NTP server can be used. The data returned by NTP contains the date and time. In order to facilitate the acquisition of the lunar calendar date, the API is used to obtain it. Similar to the weather API, you need to register an account and use the API KEY to access it.

Core code

import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_requests库
import adafruit_requests as requests

tz_offset=os.getenv("TIME_ZONE")
ntp = None
while True:
    # 使用前需要先联网
    # 获取当前ntp时间,
    if not ntp:
        pool = socketpool.SocketPool(wifi.radio)
        try:
            ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
        except OSError as error:
            print("NTP failed, retrying\n", error)
            pass

    # 获取阴历
    pool = socketpool.SocketPool(wifi.radio)
    https = requests.Session(pool, ssl.create_default_context())
    lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
    lunar_json = lunar_response.json()
    if lunar_json['code'] == 200:
        lunar_data = lunar_json['data']
        week_day = lunar_data['week_no']
        week_name = lunar_data['week_name']
        lunar_year_ganzhi = lunar_data['week_name']
        lunar_month_chinese = lunar_data['lunar_month_chinese']
        lunar_day_chinese = lunar_data['lunar_day_chinese']

        print(f"当前农历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
        print(f"当前星期:{week_name}")
    # 仅在第二天时再更新
    dt = ntp.datetime
    print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
    wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
    time.sleep(wait_seconds)

The top of the screen in the project displays the date, lunar calendar, and time in sequence, as shown below

Obtaining weather information (Task 4)

Official example: EEWORLDLINKTK4. You need to register an account and obtain an API KEY before you can use it.

Password information can be saved in the settings.toml file in the root directory in the format of <variable name> = <value>, and then used in the code to obtain it using os.getenv("<variable name>"), which is safe and convenient.

In order to facilitate the modification of city information, the weather city is also set in the toml file, or the local city is obtained through the external IP of the current network. The relevant API can be accessed to obtain it. The coordinates can be referred to the code.

Core code:

import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_requests库
import adafruit_requests as requests

# 获取天气信息
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
    # 创建https
    pool = socketpool.SocketPool(wifi.radio)
    https = requests.Session(pool, ssl.create_default_context())
    # 如果读取不到配置的城市,则获取当前IP城市
    if not weather_city:
        # 获取当前外网IP和城市
        ip_city_response = https.get("https://myip.ipip.net/json")
        ip_city_json = ip_city_response.json()
        if ip_city_json["ret"] == "ok":
            weather_city = ip_city_json['data']['location'][2]
            print(f"当前IP城市:{weather_city}")

    # 当前天气
    weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
    weather_now_response = https.get(weather_now_url)
    weather_json = weather_now_response.json()
    if weather_json["results"]:
        now_weather = weather_json["results"][0]["now"]["text"]
        now_temperature = weather_json["results"][0]["now"]["temperature"]
    print(f"当前天气:{now_weather},气温:{now_temperature}℃")
    time.sleep(300) #5分钟更新一次

The weather and temperature information in the project is displayed at the bottom of the screen, as shown below

Screen display control

Official example links: Introduction | CircuitPython Display Support Using displayio | Adafruit Learning System

Requires additional libs: board, displayio, terminalio

Screen control is divided into two parts. The first part is to draw the display layout of the screen, which can be implemented using the official displayio library; the second part is to control the display content. The basic idea is to update the display content of each element in each loop, such as text, numbers, pictures, etc.

This part involves the processing of wifi connection, network time, weather, etc. in the project, and it needs to control the screen display synchronously. There are many lines of code, because the countdown updates the screen content every second. The screen display is not very smooth when the current code is executed, and it will occasionally get stuck for 1-2 seconds. The code still needs to be optimized.

The subordinate relationship of each component in the screen layout is shown in the figure below. Simply put, Display contains Group, and Group contains Bitmap.

Screen coordinates start from the upper left corner

The anchor_point in bitmap_label is the reference position of the element. The value has the following meanings. For example, the element placed in the upper left corner of the screen can be set to (0,0), and the element placed in the lower right corner can be set to (1,1). For details, please refer to the core code:

Core code:

import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font

display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)

group = displayio.Group()

# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)


# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)

# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)


# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)

# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (display.width // 2, display.height - 5)
temperature_label.text = "0℃"
group.append(temperature_label)


# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)

# 设置倒番茄钟统计显示(右下角)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)

os.getenv("POMODORO_TIMEOUT")

# 定义main_group
main_group = displayio.Group()
main_group.append(group)
# 只能有一个main_group
display.root_group = main_group

# 更新屏幕内容,以下为示例代码,需要环境变量,无法直接执行
while True:
    timeout = 0.1
    if internet.state and ntp_datetime.ntp:
        dt = ntp_datetime.ntp.datetime
        text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
        # 设置日期文本
        date_label.text = f"{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday}"
        # 设置时间文本
        time_label.text = f"{dt.tm_hour}:{dt.tm_min}"
        # 设置农历文本
        lunar_label.text = f"{lunar_month_chinese}月{lunar_day_chinese}"

    if weather.text:
        # 设置天气文本
        weather_label.text = f"{weather.text}"
        # 设置气温文本
        temperature_label.text = f"{weather.temperature}℃"

    else:
        text = f'请先连接WIFI'
        weather_label.text = text

    await asyncio.sleep(timeout)

Rendering

Multi-task concurrency control

Official examples: Concurrent Tasks | Cooperative Multitasking in CircuitPython with asyncio | Adafruit Learning System

Requires additional Lib: asyncio, adafruit_ticks

In the project, different network requests need to be controlled and different contents need to be displayed on the screen. It will be very complicated to implement them in the same loop. Concurrency control is needed to decouple various events to facilitate code writing. That is, the asynchronous IO (asyncio) in Python. Referring to the official documentation, the implementation can be summarized into the following steps:

  • Add the async keyword before a normal function to convert it into a coroutine function
  • Use await asyncio.sleep instead of time.sleep to delay and release the exclusive control of the controller
  • After defining each coroutine function, call it using asyncio.create_task(some_coroutine(arg1, arg2, ...))
  • Use await asyncio.gather(task1, task2, ...) to wait for each coroutine task to complete
  • Once again, don’t forget to use await

Core code: This is the official example. For the actual code, please refer to the complete project source code below.

# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT

import asyncio
import board
import digitalio
import keypad


class Interval:
    """Simple class to hold an interval value. Use .value to to read or write."""

    def __init__(self, initial_interval):
        self.value = initial_interval


async def monitor_interval_buttons(pin_slower, pin_faster, interval):
    """Monitor two buttons: one lengthens the interval, the other shortens it.
    Change interval.value as appropriate.
    """
    # Assume buttons are active low.
    with keypad.Keys(
        (pin_slower, pin_faster), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                if key_event.key_number == 0:
                    # Lengthen the interval.
                    interval.value += 0.1
                else:
                    # Shorten the interval.
                    interval.value = max(0.1, interval.value - 0.1)
                print("interval is now", interval.value)
            # Let another task run.
            await asyncio.sleep(0)


async def blink(pin, interval):
    """Blink the given pin forever.
    The blinking rate is controlled by the supplied Interval object.
    """
    with digitalio.DigitalInOut(pin) as led:
        led.switch_to_output()
        while True:
            led.value = not led.value
            await asyncio.sleep(interval.value)


async def main():
    interval1 = Interval(0.5)
    interval2 = Interval(1.0)

    led1_task = asyncio.create_task(blink(board.D1, interval1))
    led2_task = asyncio.create_task(blink(board.D2, interval2))
    interval1_task = asyncio.create_task(
        monitor_interval_buttons(board.D3, board.D4, interval1)
    )
    interval2_task = asyncio.create_task(
        monitor_interval_buttons(board.D5, board.D6, interval2)
    )

    await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)


asyncio.run(main())

ESP32S3 Touch Sensor Usage

Official examples: CircuitPython Cap Touch | CircuitPython Essentials | Adafruit Learning System

ESP32S3 has multiple touch sensors built in, which can realize input control without external devices. In this project, they are used for the start trigger and confirmation function of the Pomodoro timer. When using, you can connect the GPIO that supports the touch sensor to a wire or directly touch the PCB board. You can get a list of which GPIO supports touch by executing the following official code.

Capacitive Touch | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System

The execution results are:

# 只显示支持触摸的GPIO
Touch on: A4
Touch on: A5
Touch on: TX
Touch on: D10
Touch on: D11
Touch on: D12
Touch on: LED
Touch on: RX
Touch on: D5
Touch on: D6
Touch on: D9

The test results are 11, but not all of them can be used. For example, A4 and A5 are always in the trigger state. The reason is unknown. Friends who know are welcome to comment. D10 is used in the project, located in the middle of the upper part of the screen. After use and testing, the touch response is very sensitive.

Core code

import board
import touchio
import time

touch = touchio.TouchIn(board.D10)
while True:
    if touch.value:
        # 按钮开始番茄计时
        print("start")
    time.sleep(0.1)

Multi-device MQTT communication

Also ordered in this event were two other ESP32 microcontrollers, which are used to simulate multi-device communication.

This experiment uses MQTT to achieve communication between multiple devices. The specific equipment list is as follows:

Device 1: The main control of this project, Adafruit ESP32-S3 TFT Feather , is used to simulate the central control panel to display the switch status of the light.

Device 2: Another ESP32S3 development board, AtomS3 Lite ESP32S3 , used to simulate smart lights

Device 3: ESP32C3 module, esp32-c3-wroom-02 , which can be used to simulate a smart switch after simple peripheral circuit welding

Show results:

Device 1: Displays the status of the light, on and off in the lower right corner

Device 2: Use the LED on the back to simulate a smart light, as shown below

Device 3: Simulates a smart switch. The module adds a touch button to simulate a switch.

Let's briefly talk about the wiring of the module. Because this is an ESP32C3 module, it cannot be used directly. The necessary peripherals and switches are welded here and connected through the USB2TTL module, as shown in the figure below. In addition to 3V3, GND, TXD, and RXD directly connected to USB2TTL, EN, IO2, and IO8 need to be connected to a high level. Here, three 10K resistors are connected in series to 3V3, and the two ends of the key switch are connected to IO9 and GND. When powered on, press it to flash the firmware, otherwise it will start normally. (Limited conditions, only used for experiments)

Environment Construction

To build an MQTT server locally, mosquitto is used. The process is relatively simple and there are many tutorials, so I will not go into details.

The design of MQTT is as follows:

Two MQTT topics are defined: cmd/d2/led1 and state/d2/led1, where cmd is the command to turn on the light and state is the status update of the light. d2 is the device. This experiment only involves device 2, which is defined as d2. led1 is the specific module in the device, which is led1 in this example.

Device 1 subscribes to the state/d2/led1 topic. When the received message is 1, it means the light is on, and when it is 0, it means the light is off. The corresponding screen displays on and off.

Device 2 subscribes to the cmd/d2/led1 topic. When the received message is 1, it indicates the light is turned on (0 indicates the light is turned off, and 2 indicates the light is switched on). At this time, the device's led1 is turned on and a message is sent to the topic state/d2/led1. The message content is the specific state of the light, 0 or 1. At the same time, the device can also turn the light on and off by pressing the buttons on the device, and the corresponding mqtt message is also sent.

Device 3 monitors the local key operation. When the key is pressed, it sends a message to the topic cmd/d2/led1. The message content is 2, indicating that the light is switched on.

Core code

Device 1 and Device 2 are both ESP32S3 chips, and use the CircuitPython environment directly; Device 3 is ESP32C3, and uses the microPython environment.

Device 1 (central control terminal, responsible for displaying the light status)

# 此处为核心代码,完整代码见项目源码
import displayio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    group = displayio.Group()

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

	# 其他代码省略,完整代码见项目源码
	
    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text

		# 其他代码省略,完整代码见项目源码
        await asyncio.sleep(pomodoro.wait)

Device 2 (Smart Light)

import board
import os
import wifi
import socketpool
from digitalio import DigitalInOut, Direction, Pull
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 需要导入neopixel库
import neopixel
# 需要导入asyncio、adafruit_ticks库
import asyncio


# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.cmd_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        print(f"topic: {topic}")
        if topic == self.cmd_topic:
            print(f"message: {message}")
            if self.led:
                if message == "0":
                    self.led.power = False
                elif message == "1":
                    self.led.power = True
                elif message == "2":
                    self.led.power = not self.led.power


# LED
class LED:
    def __init__(self):
        self.pixels = None
        self.power = False
        self.wait = 0.05



# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)

# 按钮点击检测
async def monitor_buttons(led):
    button = DigitalInOut(board.BTN)
    while True:
        if not button.value:
            # 按钮按下,防抖
            # await asyncio.sleep(led.wait)
            # if not button.value:
            print("button pressed")
            led.power = not led.power
        await asyncio.sleep(led.wait)

# led控制
async def pixels_led(led, mqtt_client):
    if not led.pixels:
        # Neopixel LED定义
        pixel_pin = board.NEOPIXEL
        pixel_num = 1
        pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0, auto_write=True)
        pixels.fill((255, 255, 255))
        led.pixels = pixels

    last_power = None
    mqtt_client.led = led
    while True:
        if led.power:
            pixels.brightness = 1
        else:
            pixels.brightness = 0
        
        if last_power != led.power:
            print("led toggle")
            last_power = led.power
            print(mqtt_client)
            if mqtt_client.is_connected:
                print(mqtt_client.state_topic)  
                mqtt_client.mqtt.publish(mqtt_client.state_topic, 1 if led.power else 0)

        await asyncio.sleep(led.wait)

async def main():
    # 共享变量设置
    mqtt_client = MqttClient()
    led = LED()

    # 协程函数定义
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    monitor_buttons_task = asyncio.create_task(monitor_buttons(led))
    pixels_led_task = asyncio.create_task(pixels_led(led,mqtt_client))

    # 启动协程
    await asyncio.gather(mqtt_task, monitor_buttons_task, pixels_led_task)


asyncio.run(main())

Device 3 (Smart Switch)

import ujson
import network
from umqtt.simple import MQTTClient
from machine import Pin
from time import sleep_ms

# 保存 Wi-Fi MQTT 配置到配置文件
def save_wifi_config(ssid, password, mqtt_host, mqtt_port, mqtt_user, mqtt_password):
    config = {
        'wifi_ssid': ssid,
        'wifi_password': password,
        'mqtt_host': mqtt_host,
        'mqtt_port': mqtt_port,
        'mqtt_user': mqtt_user,
        'mqtt_password': mqtt_password
    }
    with open('config.json', 'w') as f:
        ujson.dump(config, f)

# 从配置文件中读取 Wi-Fi 配置
def load_wifi_config():
    try:
        with open('config.json', 'r') as f:
            config = ujson.load(f)
            return config['wifi_ssid'], config['wifi_password']
    except OSError:
        return None, None
    
# 从配置文件中读取 MQTT 配置
def load_mqtt_config():
    try:
        with open('config.json', 'r') as f:
            config = ujson.load(f)
            return config['mqtt_host'], config['mqtt_port'], config['mqtt_user'], config['mqtt_password']
    except OSError:
        return None, None, None, None

# 示例用法:保存 Wi-Fi 配置到配置文件
# wifi_ssid = 'your_wifi_ssid'
# wifi_password = 'your_wifi_password'
# mqtt_host = ''
# mqtt_port = ''
# mqtt_user = ''
# mqtt_password = ''

# save_wifi_config(wifi_ssid, wifi_password, mqtt_host, mqtt_port, mqtt_user, mqtt_password)

# 示例用法:从配置文件中读取 Wi-Fi 配置
# saved_wifi_ssid, saved_wifi_password = load_wifi_config()  


# 设置 Wi-Fi 连接
wifi_ssid, wifi_password = load_wifi_config()
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(wifi_ssid, wifi_password)

# 设置 MQTT 客户端
mqtt_broker, mqtt_port, mqtt_username, mqtt_password = load_mqtt_config()
mqtt_client_id = 'd2'
mqtt_topic = 'cmd/d1/led1'

mqtt_client = MQTTClient(mqtt_client_id, mqtt_broker, port=mqtt_port, user=mqtt_username, password=mqtt_password)
mqtt_client.connect()



# 设置按键引脚
button_pin = Pin(9, Pin.IN, Pin.PULL_UP)

# 按键防抖延迟时间(毫秒)
debounce_delay = 100


# 检测按键状态并处理按下事件
def handle_button_press():
    if not button_pin.value():
        sleep_ms(debounce_delay)
        if not button_pin.value():
            # 当按键按下时,发送 MQTT 消息
            mqtt_client.publish(mqtt_topic, '2')

while True:
    handle_button_press()

The final complete code of the project

Finally completed ♂, the complete code is as follows

"""
番茄日历钟
ltpop@163.com
20231126
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT


# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True

# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 3600.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.ntp_sync = None
        self.next_lunar_request_time = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.2


# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset = os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            pool = socketpool.SocketPool(wifi.radio)
            # 获取当前ntp时间,
            if not ntp:
                try:
                    ntp = adafruit_ntp.NTP(
                        pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                    ntp_datetime.ntp_sync = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
                    
            lunar_need_update = False
            if not ntp_datetime.next_lunar_request_time:
                lunar_need_update = True
            elif time.time() > ntp_datetime.next_lunar_request_time:
                # 超过下一次请求时间
                lunar_need_update = True
                
            if lunar_need_update:
                # 获取阴历
                try:
                    https = requests.Session(pool, ssl.create_default_context())
                    lunar_response = https.get(
                        f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                except RuntimeError as error:
                    print("Lunar request failed, retrying\n", error)
                    lunar_response = None
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(
                            f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
                        # 设置下一次更新时间为第二天0点
                        current_time = time.localtime()
                        ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                # dt = time.localtime()
                # print(
                #     f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(ntp_datetime.wait)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                try:
                    ip_city_response = https.get("https://myip.ipip.net/json")
                except RuntimeError as error:
                    print("IP city request failed, retrying\n", error)    
                if ip_city_response:
                    ip_city_json = ip_city_response.json()
                    if ip_city_json["ret"] == "ok":
                        weather_city = ip_city_json['data']['location'][2]
                        print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            try:
                weather_now_response = https.get(weather_now_url)
            except RuntimeError as error:
                print("Weather request failed, retrying\n", error)
                weather_now_response = None
            if weather_now_response:
                weather_json = weather_now_response.json()
                if weather_json["results"]:
                    now_weather = weather_json["results"][0]["now"]["text"]
                    now_temperature = weather_json["results"][0]["now"]["temperature"]
                weather.text = now_weather
                weather.temperature = now_temperature
                print(f"当前天气:{now_weather},气温:{now_temperature}℃")

            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(
        pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    blink = Blink(pixels, 0.5, color.GREEN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为绿色闪烁
            if pomodoro.confirming:
                blink.speed = 0.5
                blink.color = color.GREEN
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        elif not internet.state:
            blink.speed = 2
            blink.color = color.RED
            blink.animate()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)

    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)

    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (
        display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(
        terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (
        display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)


    # 设置倒番茄钟统计显示(右下角)
    # with open("img/tomato.bmp", "rb") as f:
    #     image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
    #     sprite = displayio.TileGrid(image, pixel_shader=palette)
    #     group.append(sprite)

    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        if ntp_datetime.ntp_sync:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            if date_label.text != date_text:
                date_label.text = date_text
            # 设置时间文本
            time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            if time_label.text != time_text:
                time_label.text = time_text
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
                if lunar_label.text != lunar_text:
                    lunar_label.text = lunar_text

        if weather.text:
            # 设置天气文本
            if weather_label.text != weather.text:
                weather_label.text = weather.text
            
            # 设置气温文本
            temperature_text = f"{weather.temperature}℃"
            if temperature_label.text != temperature_text:
                temperature_label.text = temperature_text
        else:
            weather_text = f'请先连接WIFI'
            if weather_label.text != weather_text:
                weather_label.text = weather_text

        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_text = f"{pomodoro.count}"
        if count_label.text != count_text:
            count_label.text = count_text
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                pomodoro_text = f"{minute:02d}:{second:02d}"
                if pomodoro_label.text != pomodoro_text:
                    pomodoro_label.text = pomodoro_text
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        await asyncio.sleep(pomodoro.wait)


async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            await asyncio.sleep(0.1)
            if touch.value:
                # 按钮开始番茄计时
                if not pomodoro.run:
                    pomodoro.run = True
                    pomodoro.end = time.monotonic() + pomodoro.time
                # 番茄确认状态时,将番茄数加1
                elif pomodoro.confirming:
                    pomodoro.confirmed = True
                    pomodoro.count += 1
        await asyncio.sleep(0.1)


async def main():
    # 共享变量设置
    internet = Internet()
    mqtt_client = MqttClient()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
    lcd_display_task = asyncio.create_task(
        lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(
        monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

Attached is the contents of settings.toml

CIRCUITPY_WIFI_SSID = "<根据实际情况修改>"
CIRCUITPY_WIFI_PASSWORD = "<根据实际情况修改>"
CIRCUITPY_WEB_API_PASSWORD = "passw0rd"	#未使用
CIRCUITPY_WEB_API_PORT = 80	#未使用

TIME_ZONE = 8

# https://www.alapi.cn/api/view/54
LUNAR_API_KEY = "<根据实际情况修改>"
WEATHER_API_KEY = "<根据实际情况修改>"
WEATHER_CITY = "<根据实际情况修改,可以设置为城市中文名或拼音>"

# 一个番茄钟时间秒数
POMODORO_TIME = 900
# 番茄钟完成时,手动确认超时秒数
POMODORO_TIMEOUT = 100

MQTT_HOST = "<根据实际情况修改>"
MQTT_PORT = 1883
MQTT_USER = "<根据实际情况修改>"
MQTT_PASSWORD = "<根据实际情况修改>"

MQTT_TOPIC = "d1"
MQTT_CLIENT_ID = "<根据实际情况修改>"

3. Source code download

Source code packaging: (not the latest version, the updated code version can refer to the content of the post)

ESP32S3-TFT-CircuitPython-Tomato Calendar Clock Source Code-Embedded Development Related Materials Download-EEWORLD Download Center

END

This post is from DigiKey Technology Zone
 
 
 

6

Posts

2

Resources
3
 
This post was last edited by ltpop on 2023-11-17 22:49

Demo Video


Project source code

"""
番茄日历钟
ltpop@163.com
202311
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color

# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0


# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 300.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.1

# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)


    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')



# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset=os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            # 获取当前ntp时间,
            if not ntp:
                pool = socketpool.SocketPool(wifi.radio)
                try:
                    ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
            else:
                # 获取阴历
                pool = socketpool.SocketPool(wifi.radio)
                https = requests.Session(pool, ssl.create_default_context())
                lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                dt = time.localtime()
                print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(wait_seconds)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                ip_city_response = https.get("https://myip.ipip.net/json")
                ip_city_json = ip_city_response.json()
                if ip_city_json["ret"] == "ok":
                    weather_city = ip_city_json['data']['location'][2]
                    print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            weather_now_response = https.get(weather_now_url)
            weather_json = weather_now_response.json()
            if weather_json["results"]:
                now_weather = weather_json["results"][0]["now"]["text"]
                now_temperature = weather_json["results"][0]["now"]["temperature"]
            weather.text = now_weather
            weather.temperature = now_temperature
            print(f"当前天气:{now_weather},气温:{now_temperature}℃")


            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    # 番茄等待确认中,显示为青蓝色闪烁
    blink = Blink(pixels, 0.5, color.CYAN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为
            if pomodoro.confirming:
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)


    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)


    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)


    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)

    # 设置倒番茄钟统计显示(右下角)
    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group


    while True:
        if internet.state:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_label.text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            # 设置时间文本
            time_label.text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_label.text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"

        if weather.text:
            # 设置天气文本
            weather_label.text = f"{weather.text}"
            # 设置气温文本
            temperature_label.text = f"{weather.temperature}℃"
        else:
            weather_label.text = f'请先连接WIFI'

        timeout = pomodoro.wait
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_label.text = f"{pomodoro.count}"
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                sec = left_seconds % 1
                timeout = 1.0 - sec
                pomodoro_label.text = f"{minute:02d}:{second:02d}"
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        else:
            timeout = pomodoro.wait

        await asyncio.sleep(timeout)



async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            # 按钮开始番茄计时
            if not pomodoro.run:
                pomodoro.run = True
                pomodoro.end = time.monotonic() + pomodoro.time
            # 番茄确认状态时,将番茄数加1
            elif pomodoro.confirming:
                pomodoro.confirmed = True
                pomodoro.count += 1
        await asyncio.sleep(pomodoro.wait)


async def main():
    # 共享变量设置
    internet = Internet()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet,weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet,pomodoro))
    lcd_display_task = asyncio.create_task(lcd_display(internet, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

This post is from DigiKey Technology Zone
 
 
 

6

Posts

2

Resources
4
 
The post uses the markdown format. For some reason, the code indentation is gone, but it is normal in editing mode.
This post is from DigiKey Technology Zone
 
 
 

725

Posts

4

Resources
5
 

Thank you for your hard work. Thank you for sharing such good technology.

This post is from DigiKey Technology Zone
 
 
 

6

Posts

2

Resources
6
 
This post was last edited by ltpop on 2023-11-27 12:02

Final version of the code (with correct indentation)

"""
番茄日历钟
ltpop@163.com
202311 v26
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT


# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True

# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 3600.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.ntp_sync = None
        self.next_lunar_request_time = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.2


# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset = os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            pool = socketpool.SocketPool(wifi.radio)
            # 获取当前ntp时间,
            if not ntp:
                try:
                    ntp = adafruit_ntp.NTP(
                        pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                    ntp_datetime.ntp_sync = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
                    
            lunar_need_update = False
            if not ntp_datetime.next_lunar_request_time:
                lunar_need_update = True
            elif time.time() > ntp_datetime.next_lunar_request_time:
                # 超过下一次请求时间
                lunar_need_update = True
                
            if lunar_need_update:
                # 获取阴历
                try:
                    https = requests.Session(pool, ssl.create_default_context())
                    lunar_response = https.get(
                        f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                except RuntimeError as error:
                    print("Lunar request failed, retrying\n", error)
                    lunar_response = None
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(
                            f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
                        # 设置下一次更新时间为第二天0点
                        current_time = time.localtime()
                        ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                # dt = time.localtime()
                # print(
                #     f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(ntp_datetime.wait)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                try:
                    ip_city_response = https.get("https://myip.ipip.net/json")
                except RuntimeError as error:
                    print("IP city request failed, retrying\n", error)    
                if ip_city_response:
                    ip_city_json = ip_city_response.json()
                    if ip_city_json["ret"] == "ok":
                        weather_city = ip_city_json['data']['location'][2]
                        print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            try:
                weather_now_response = https.get(weather_now_url)
            except RuntimeError as error:
                print("Weather request failed, retrying\n", error)
                weather_now_response = None
            if weather_now_response:
                weather_json = weather_now_response.json()
                if weather_json["results"]:
                    now_weather = weather_json["results"][0]["now"]["text"]
                    now_temperature = weather_json["results"][0]["now"]["temperature"]
                weather.text = now_weather
                weather.temperature = now_temperature
                print(f"当前天气:{now_weather},气温:{now_temperature}℃")

            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(
        pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    blink = Blink(pixels, 0.5, color.GREEN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为绿色闪烁
            if pomodoro.confirming:
                blink.speed = 0.5
                blink.color = color.GREEN
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        elif not internet.state:
            blink.speed = 2
            blink.color = color.RED
            blink.animate()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)

    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)

    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (
        display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(
        terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (
        display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)


    # 设置倒番茄钟统计显示(右下角)
    # with open("img/tomato.bmp", "rb") as f:
    #     image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
    #     sprite = displayio.TileGrid(image, pixel_shader=palette)
    #     group.append(sprite)

    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        if ntp_datetime.ntp_sync:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            if date_label.text != date_text:
                date_label.text = date_text
            # 设置时间文本
            time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            if time_label.text != time_text:
                time_label.text = time_text
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
                if lunar_label.text != lunar_text:
                    lunar_label.text = lunar_text

        if weather.text:
            # 设置天气文本
            if weather_label.text != weather.text:
                weather_label.text = weather.text
            
            # 设置气温文本
            temperature_text = f"{weather.temperature}℃"
            if temperature_label.text != temperature_text:
                temperature_label.text = temperature_text
        else:
            weather_text = f'请先连接WIFI'
            if weather_label.text != weather_text:
                weather_label.text = weather_text

        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_text = f"{pomodoro.count}"
        if count_label.text != count_text:
            count_label.text = count_text
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                pomodoro_text = f"{minute:02d}:{second:02d}"
                if pomodoro_label.text != pomodoro_text:
                    pomodoro_label.text = pomodoro_text
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        await asyncio.sleep(pomodoro.wait)


async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            await asyncio.sleep(0.1)
            if touch.value:
                # 按钮开始番茄计时
                if not pomodoro.run:
                    pomodoro.run = True
                    pomodoro.end = time.monotonic() + pomodoro.time
                # 番茄确认状态时,将番茄数加1
                elif pomodoro.confirming:
                    pomodoro.confirmed = True
                    pomodoro.count += 1
        await asyncio.sleep(0.1)


async def main():
    # 共享变量设置
    internet = Internet()
    mqtt_client = MqttClient()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
    lcd_display_task = asyncio.create_task(
        lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(
        monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

Welcome everyone to learn and communicate together

This post is from DigiKey Technology Zone
 
 
 

Guess Your Favourite
Just looking around
Find a datasheet?

EEWorld Datasheet Technical Support

EEWorld
subscription
account

EEWorld
service
account

Automotive
development
circle

Copyright © 2005-2024 EEWORLD.com.cn, Inc. All rights reserved 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号
快速回复 返回顶部 Return list