Commit 09691a63 authored by Frank Heuer's avatar Frank Heuer

Added new functionality to the class and test.py. The constructor now accepts two parameters:

"timeout" lets you set the serial timeout for the call to serial.Serial()
"unit_of_measure" lets return values in different units: microgram/cubic meter (mg/m³) named MassConcentrationEuropean or in particles / 0.01 cubic foot (pcs/0.01cft) named ParticleConcentrationImperial. The latter is calculated by assuming different mean sphere diameters of pm10 or pm2.5 particles. Have a look at http://ir.uiowa.edu/cgi/viewcontent.cgi?article=5915&context=etd for details. Thanks to Teus Hagen the idea and for coding this part.

Also added more fine grained logging between standard levels DEBUG and INFO.

test.py is new in most cases. It is now callable as a script with parameters. Calling "python test.py" without parameters in a command window will tell how to use. It is based on an idea of Teus Hagen.

As a hole test.py and SDS011 class should now work with python2 and python3. Maybe you need to install enum34 (not
as enum!) which is a back port of of 3.4 library module supporting IntEnum.

Furthermore the exception file is no longer needed, only standard exceptions are used now and there are a lot of language corrections in comments.
parent f21bab07
This diff is collapsed.
"""
Copyright 2016, Frank Heuer, Germany
This file is part of SDS011.
SDS011 is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SDS011 is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SDS011. If not, see <http://www.gnu.org/licenses/>.
Diese Datei ist Teil von SDS011.
SDS011 ist Freie Software: Sie können es unter den Bedingungen
der GNU General Public License, wie von der Free Software Foundation,
Version 3 der Lizenz oder (nach Ihrer Wahl) jeder späteren
veröffentlichten Version, weiterverbreiten und/oder modifizieren.
SDS011 wird in der Hoffnung, dass es nützlich sein wird, aber
OHNE JEDE GEWÄHRLEISTUNG, bereitgestellt; sogar ohne die implizite
Gewährleistung der MARKTFÄHIGKEIT oder EIGNUNG FÜR EINEN BESTIMMTEN ZWECK.
Siehe die GNU General Public License für weitere Details.
Sie sollten eine Kopie der GNU General Public License zusammen mit SDS011
erhalten haben. Wenn nicht, siehe <http://www.gnu.org/licenses/>.
"""
"""Moduel holding the exceptions for use in SDS011 class"""
class Error(Exception):
"""Base class for exceptions in this module."""
pass
class WorkStateError(Error):
"""Exception raised for errors in the workingmode."""
pass
class GetStatusError(Error):
"""Exception raised when initial getting the current sensor status won't work."""
pass
class ReportModeError(Error):
"""Exception raised when sensor is in wrong reportmode for requested operation."""
pass
class NoSensorResponse(Error):
"""Exception raised when sensor is unexpected not responding"""
pass
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
Copyright 2016, Frank Heuer, Germany
test.py is demonstrating some capabilities of the SDS011 module.
......@@ -10,92 +12,176 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
'''
import sys
import time
from sds011 import SDS011
import logging
import logging.handlers
# Uncomment this if you want logging.
'''
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.handlers.RotatingFileHandler('SDS011.log', \
mode='a', maxBytes=1048576, backupCount=5)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - \
%(filename)s:%(lineno)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
'''
# Create a new sensor instance
'''
On Win, the path is one of the com ports. On Linux / Raspberry Pi
it depends. May be one of "/dev/ttyUSB..." or "/dev/ttyAMA...".
On the Pi make sure the login getty() is not using the serial interface.
Have a look at Win or Linux documentation.
Look e.g. via lsusb command for Qin Hen Electronics USB id.
'''
# Create an instance of your sensor
sensor = SDS011('COM3')
def printlog(level, string):
"""Change this to reflect the way logging is done."""
sys.stderr.write("%s: %s\n" % (level, string))
debug = 0 # debug level in sds011 class module
cycles = 4 # serial read timeout in seconds, dflt 2
timeout = 2 # timeout on serial line read
# print values in mass or pieces
unit_of_measure = SDS011.UnitsOfMeasure.MassConcentrationEuropean
for arg in range(len(sys.argv) - 1, 0, -1):
if sys.argv[arg][0:2] == '-d':
debug += 1
debug = int(sys.argv[arg + 1])
del sys.argv[arg + 1]
del sys.argv[arg]
elif sys.argv[arg][0:2] == '-c':
cycles = int(sys.argv[arg + 1])
del sys.argv[arg + 1]
del sys.argv[arg]
elif sys.argv[arg][0:2] == '-t':
timeout = int(sys.argv[arg + 1])
del sys.argv[arg + 1]
del sys.argv[arg]
elif sys.argv[arg][0:2] == '-u':
if sys.argv[arg + 1] == '0':
unit_of_measure = SDS011.UnitsOfMeasure.MassConcentrationEuropean
elif sys.argv[arg + 1] == '1':
unit_of_measure = SDS011.UnitsOfMeasure.ParticelConcentrationImperial
else:
raise RuntimeError("%s is not a valid unit of measure")
del sys.argv[arg + 1]
del sys.argv[arg]
print ('Argument List:', str(sys.argv))
if len(sys.argv) < 2:
sys.exit("Usage: python test.py [-d {1..5}] [-c cnt] [-t secs] [-u 0|1] com_port [duty_cycle 0..30]\n"
"com port e.g. /dev/ttyUSB0\n"
"-d(ebug) debug level (dflt 0). Use 10, 14, 16, 18, 20, 30, 40, 50. Low value means verbose, 0 means off\n"
"-c(nt) cnt defines the amount of test cycles (dflt 4).\n"
"-t(imeout) secs defines the timeout of serial line readings (dflt 2).\n"
"-u(nit_of_measure):\n\
\t0: output in µg/m3 (default);\n\
\t1: output in pcs/0.01qf (pieces per cubic feet)\n"
"\t\tduty cycle defines sensor measurement time in minutes (dflt 2).")
if debug > 0:
# Activate simple logging
import logging
import logging.handlers
logger = logging.getLogger()
# Available levels are the well known
# logging.INFO, logging.WARNING and so forth.
# Between INFO (=20)and DEBUG (=10) are fine grained
# messages with levels 14,16 and 18. You might want
# to use these values. Here is an Example with 16
# logger.setLevel(16)
# Activate simple logging
logger.setLevel(debug)
def printValues(timing, values, unit_of_measure):
if unit_of_measure == SDS011.UnitsOfMeasure.MassConcentrationEuropean:
unit = 'µg/m³'
else:
unit = 'pcs/0.01cft'
print("Waited %d secs\nValues measured in %s: PM2.5 " %
(timing, unit), values[1], ", PM10 ", values[0])
# print("Values measured in pcs/0.01sqf: PM2.5 %d, PM10 %d" % (Mass2Con('pm25',values[1]), Mass2Con('pm10',values[0])))
# simple parsing the command arguments for setting options
# Create an instance of your sensor
# options defaults: logging None, debug level 0, serial line timeout 2
# option unit_of_measure (default False) values in pcs/0.01sqf or mass ug/m3
sensor = SDS011(sys.argv[1], timeout=timeout, unit_of_measure=unit_of_measure)
# raise KeyboardInterrupt
# Now we have some details about it
print(sensor.device_id)
print(sensor.firmware)
print(sensor.dutycycle)
print("SDS011 sensor info:")
print("Device ID: ", sensor.device_id)
print("Device firmware: ", sensor.firmware)
print("Current device cycle (0 is permanent on): ", sensor.dutycycle)
print(sensor.workstate)
print(sensor.reportmode)
print("\n%d measurements in permanent measuring mode" % (cycles * 2))
print("This will make the sensor getting old. The TTL is just 8000 hours!")
print("Do you really need to use the permanent measurering mode?")
print("In sleep mode the fan will be turned off.")
# Set dutycyle to nocycle (permanent)
sensor.reset()
print("Permanent measureing (and make the senor get old. Just 8000 hours working!)\n \
Do you really need permanent measurering?")
for a in range(10):
for a in range(2 * cycles):
while True:
values = sensor.get_values()
if values is not None:
print("Values measured: ", values[0], "--", values[1])
printValues(0, values, sensor.unit_of_measure)
break
try:
# Example of switching the WorkState
for a in range(3):
print("waking up if sleeping with a={0}.".format(a))
print("\n%d X switching between measuring and sleeping mode:" % cycles)
print("\tMeasurement state: Read the values, on no read, wait 2 seconds and try again")
print("\tOn read success, put the mode into sleeping mode for 5 seconds, and loop again")
for a in range(cycles):
print("%d time: push it into wake state" % a)
sensor.workstate = SDS011.WorkStates.Measuring
# Just to demonstrate. Should be 60 seconds to get right values. Sensor
# has to warm up!
# Just to demonstrate. Should be 60 seconds to get qualified values.
# The sensor needs to warm up!
time.sleep(10)
last = time.time()
while True:
last1 = time.time()
values = sensor.get_values()
if values is not None:
print("Values measured: ", values[0], "--", values[1])
printValues(time.time() - last, values, sensor.unit_of_measure)
break
print("Waited %d seconds, no values read, wait 2 seconds, and try to read again" % (
time.time() - last1))
time.sleep(2)
print("Going to sleep a while")
print("Read was succesfull. Going to sleep for 5 seconds")
sensor.workstate = SDS011.WorkStates.Sleeping
time.sleep(5)
# Example of dutycycle
sensor.workstate = SDS011.WorkStates.Measuring
# Setting this to 0 means permanent (each second)
sensor.dutycycle = 2 # valid values between 0 and 30
print("You have to wait at most {0} minutes before the first measuring.".format(
# Example of duty cycle
DC = 2
if len(sys.argv) > 2 and sys.argv[2].isdigit():
if int(sys.argv[2]) >= 0 and int(sys.argv[2]) < 30:
DC = int(sys.argv[2])
else:
print("Invalid duty cycle %s. Using 2 minutes" % sys.argv[2])
print("\n%d X measurements with duty cycle of %d minutes." % (cycles, DC))
#sensor.workstate = SDS011.WorkStates.Measuring
# Setting this to 0 means permanent measurement for once each second
# sensor.dutycycle = DC # valid values between 0 and 30
print("Waiting time: at most {0} minutes before the measurement.".format(
sensor.dutycycle))
for a in range(2):
print("Dutycycle with a={0}.".format(a))
for a in range(cycles):
print("Duty cycle measurement nr {0}.".format(a))
sensor.workstate = SDS011.WorkStates.Measuring
sensor.dutycycle = DC # valid values between 0 and 30
last = time.time()
while True:
last1 = time.time()
values = sensor.get_values()
if values is not None:
print("Values measured: ", values[0], "--", values[1])
printValues(time.time() - last, values, sensor.unit_of_measure)
break
print("Waited %d secs, no values read, we try again" %
(time.time() - last1))
sensor.workstate = SDS011.WorkStates.Sleeping
print("Read was succesfull. Going to sleep for 5 seconds")
time.sleep(10)
# end of test
print("\nSensor reset to normal")
sensor.reset()
print("Sensor resetted normal")
sensor = None
except KeyboardInterrupt:
sensor.reset()
print("Sensor resetted because of KeyboardInterrupt")
sensor = None
sys.exit("Sensor reset due to a KeyboardInterrupt")
print("Finished")
print("Finished test")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment