-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcode.py
executable file
·221 lines (180 loc) · 6.71 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# SPDX-FileCopyrightText: 2022 Daniel Griswold
#
# SPDX-License-Identifier: MIT
"""
Air Quality Sensor with data published to MQTT.
"""
import time
import ssl
import json
from math import exp, log # pylint: disable=no-name-in-module
from secrets import secrets
import digitalio
import microcontroller
import watchdog
import wifi
import socketpool
import board
import adafruit_bme680
import adafruit_sgp30
from adafruit_pm25.i2c import PM25_I2C
import adafruit_vcnl4040
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_dotstar
import foamyguy_nvm_helper as nvm_helper
microcontroller.watchdog.timeout = 30
microcontroller.watchdog.mode = watchdog.WatchDogMode.RESET
microcontroller.watchdog.feed()
RESET_PIN = None
UPDATE_INTERVAL = 60
BASELINE_UPDATE_INTERVAL = 1800
MQTT_ENVIRONMENT = secrets["mqtt_topic"] + "/environment"
MQTT_AIR_QUALITY = secrets["mqtt_topic"] + "/air-quality"
MQTT_SYSTEM = secrets["mqtt_topic"] + "/system"
MQTT_ERROR = secrets["mqtt_topic"] + "/error"
def sgp30_baseline_to_nvm(co2eq_base, tvoc_base):
"""Writes the baseline sgp30 values to non-volatile memory."""
nvm_helper.save_data(
{"co2eq_base": co2eq_base, "tvoc_base": tvoc_base}, test_run=False
)
def sgp30_nvm_to_baseline():
"""
Retrieves the baseline sgp30 values from non-volatile memory.
Sets default values if nvm values are missing or invalid.
"""
_data = nvm_helper.read_data()
try:
sgp30.set_iaq_baseline(_data["co2eq_base"], _data["tvoc_base"])
except (RuntimeError, ValueError, EOFError) as _error:
print("Could not get baseline from nvm, setting defaults. {}".format(_error))
sgp30.set_iaq_baseline(0x8973, 0x8AAE)
def sgp30_get_data(temperature, humidity):
"""sends temperature and humidity to sgp30, returns TVOC and eCO2"""
sgp30.set_iaq_humidity(compute_absolute_humidity(temperature, humidity))
sgp30.iaq_measure()
return sgp30.TVOC, sgp30.eCO2
def get_sensor_data():
"""Creates dictionary of sensor values."""
_temperature = bme680.temperature
_humidity = bme680.humidity
_pressure = bme680.pressure
_r_gas = bme680.gas
_tvoc, _eco2 = sgp30_get_data(_temperature, _humidity)
_data = {}
_data["temperature"] = _temperature
_data["humidity"] = _humidity
_data["pressure"] = _pressure
_data["light"] = vcnl4040.lux
_data["VOC"] = compute_indoor_air_quality(_r_gas, _humidity)
_data["TVOC"] = _tvoc
_data["eCO2"] = _eco2
return _data
def get_system_data():
"""Creates dictionary of system information"""
_data = {}
_data["reset_reason"] = str(microcontroller.cpu.reset_reason)[28:]
_data["time"] = time.monotonic()
_data["ip_address"] = wifi.radio.ipv4_address
_data["board_id"] = board.board_id
return _data
def compute_absolute_humidity(temperature, humidity):
"""Given a temperature and humidity, returns absolute humidity."""
_abs_temperature = temperature + 273.15
_abs_humidity = 6.112
_abs_humidity *= exp((17.67 * temperature) / (243.5 + temperature))
_abs_humidity *= humidity
_abs_humidity *= 2.1674
_abs_humidity /= _abs_temperature
return round(_abs_humidity, 2)
def compute_indoor_air_quality(resistance, humidity):
"""Calculates IAQ from BME680 gas and humidity."""
return log(resistance) + 0.04 * humidity
def handle_mqtt_exception(error):
""" Attempt MQTT reconnect, if that fails, reset the board """
global mqtt_client
print("Could not publish to mqtt broker. {}".format(error))
try:
mqtt_client.is_connected()
except MQTT.MMQTTException:
try:
mqtt_client.reconnect()
except Exception:
microcontroller.reset()
finally:
mqtt_client.publish(MQTT_ERROR, error, retain=True)
bme680 = adafruit_bme680.Adafruit_BME680_I2C(board.I2C())
sgp30 = adafruit_sgp30.Adafruit_SGP30(board.I2C())
pm25 = PM25_I2C(board.I2C(), RESET_PIN)
vcnl4040 = adafruit_vcnl4040.VCNL4040(board.I2C())
pixel = adafruit_dotstar.DotStar(
board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.1, auto_write=True
)
led = digitalio.DigitalInOut(board.LED)
led.direction = digitalio.Direction.OUTPUT
pixel[0] = (0, 255, 0)
try:
microcontroller.watchdog.feed()
print("Connecting to %s..." % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
pool = socketpool.SocketPool(wifi.radio)
except Exception as error:
print("Could not initialize network. {}".format(error))
raise
try:
microcontroller.watchdog.feed()
mqtt_client = MQTT.MQTT(
broker=secrets["mqtt_broker"],
port=secrets["mqtt_port"],
username=secrets["mqtt_username"],
password=secrets["mqtt_password"],
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
print("Connecting to %s" % secrets["mqtt_broker"])
mqtt_client.connect()
print("Connected to %s" % secrets["mqtt_broker"])
except MQTT.MMQTTException as error:
print("Could not connect to mqtt broker. {}".format(error))
raise
sgp30_nvm_to_baseline()
sgp30.iaq_init()
last_update = 0 # pylint: disable=invalid-name
last_iaq = 0 # pylint: disable=invalid-name
baseline_last_update = time.monotonic()
while True:
_now = time.monotonic()
microcontroller.watchdog.feed()
if last_iaq + 1 < _now:
# After the “sgp30_iaq_init” command, a
# “sgp30_measure_iaq” command has to be sent in regular intervals
# of 1s to ensure proper operation of the dynamic baseline
# compensation algorithm.
last_iaq = _now
sgp30_get_data(bme680.temperature, bme680.humidity)
if last_update + UPDATE_INTERVAL < _now:
led.value = True
last_update = _now
print("Publishing AQI data")
try:
mqtt_client.publish(MQTT_AIR_QUALITY, json.dumps(pm25.read()), retain=True)
except MQTT.MMQTTException as error:
handle_mqtt_exception(error)
except RuntimeError:
print("Could not read from PM25 sensor. {}".format(error))
print("Publishing environmental data")
try:
mqtt_client.publish(
MQTT_ENVIRONMENT, json.dumps(get_sensor_data()), retain=True
)
except MQTT.MMQTTException as error:
handle_mqtt_exception(error)
print("Publishing system data")
try:
mqtt_client.publish(MQTT_SYSTEM, json.dumps(get_system_data()), retain=True)
except MQTT.MMQTTException as error:
handle_mqtt_exception(error)
led.value = False
if baseline_last_update + BASELINE_UPDATE_INTERVAL < _now:
baseline_last_update = _now
sgp30_baseline_to_nvm(sgp30.baseline_eCO2, sgp30.baseline_TVOC)