#-----------------------------------------------------------------------------
# qwiic_ccs811.py
#
# Python module for the SparkFun qwiic CCS811 sensor.
#
#
# This sensor is available on the SparkFun Environmental Combo Breakout board.
# https://www.sparkfun.com/products/14348
#
#------------------------------------------------------------------------
#
# Written by SparkFun Electronics, May 2019
#
# This python library supports the SparkFun Electroncis qwiic
# qwiic sensor/board ecosystem.
#
# More information on qwiic is at https:# www.sparkfun.com/qwiic
#
# Do you like this library? Help support SparkFun. Buy a board!
#
#==================================================================================
# Copyright (c) 2019 SparkFun Electronics
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#==================================================================================
#
# This is mostly a port of existing Arduino functionaly, so pylint is sad.
# The goal is to keep the public interface pthonic, but internal is internal
#
# pylint: disable=line-too-long, bad-whitespace, invalid-name
# pylint: disable=too-many-public-methods, too-many-instance-attributes
#
"""
qwiic_ccs811
============
Python module for the qwiic ccs811 sensor, which is part of the [SparkFun Qwiic Environmental Combo Breakout](https://www.sparkfun.com/products/14348)
This python package is a port of the existing [SparkFun CCS811 Arduino Library](https://github.com/sparkfun/SparkFun_CCS811_Arduino_Library)
This package can be used in conjunction with the overall [SparkFun qwiic Python Package](https://github.com/sparkfun/Qwiic_Py)
New to qwiic? Take a look at the entire [SparkFun qwiic ecosystem](https://www.sparkfun.com/qwiic).
"""
from __future__ import print_function, division
import time
import math
import sys
import qwiic_i2c
#======================================================================
# NOTE: For Raspberry Pi
#======================================================================
# For this sensor to work on the Raspberry Pi, I2C clock stretching
# must be enabled.
#
# To do this:
# - Login as root to the target Raspberry Pi
# - Open the file /boot/config.txt in your favorite editor (vi, nano ...etc)
# - Scroll down until the bloct that contains the following is found:
# dtparam=i2c_arm=on
# dtparam=i2s=on
# dtparam=spi=on
# - Add the following line:
# # Enable I2C clock stretching
# dtparam=i2c_arm_baudrate=10000
#
# - Save the file
# - Reboot the raspberry pi
#======================================================================
def __checkIsOnRPi():
# Are we on a Pi? First Linux?
if sys.platform not in ('linux', 'linux2'):
return False
# we can find out if we are on a RPI by looking at the contents
# of /proc/device-tree/compatable
try:
with open('/proc/device-tree/compatible', 'r') as fCompat:
systype = fCompat.read()
return systype.find('raspberrypi') != -1
except IOError:
return False
# check if stretching is set if on a rpi
#
def _checkForRPiI2CClockStretch():
#are we on a rpi?
if not __checkIsOnRPi():
return
# read the boot config file and see if the clock stretch param is set
try:
with open('/boot/config.txt') as fConfig:
strConfig = fConfig.read()
for line in strConfig.split('\n'):
if line.find('i2c_arm_baudrate') == -1:
continue
# start with a comment?
if line.strip().startswith('#'):
break
# is the value less <= 10000
params = line.split('=')
if int(params[-1]) <= 10000:
# Stretching is enabled and set correctly.
return
break
except IOError:
pass
# if we are here, then we are on a Raspberry Pi and Clock Stretching isn't
# set correctly.
# Print out a message!
print("""
============================================================================
NOTE:
For the CCS811 sensor to work on the Raspberry Pi, I2C clock stretching
must be enabled.
The following line must be added to the file /boot/config.txt
dtparam=i2c_arm_baudrate=10000
For more information, see the note at:
https://github.com/sparkfun/Qwiic_CCS811_Py
============================================================================
""")
#======================================================================
# Define the device name and I2C addresses. These are set in the class defintion
# as class variables, making them avilable without having to create a class instance.
#
#
# The name of this device - note this is private
_DEFAULT_NAME = "Qwiic CCS811"
# Some devices have multiple available addresses - this is a list of these addresses.
# NOTE: The first address in this list is considered the default I2C address for the
# device.
_AVAILABLE_I2C_ADDRESS = [0x5B, 0x5A]
_validChipIDs = [0x81]
# Register addresses
CSS811_STATUS = 0x00
CSS811_MEAS_MODE = 0x01
CSS811_ALG_RESULT_DATA = 0x02
CSS811_RAW_DATA = 0x03
CSS811_ENV_DATA = 0x05
CSS811_NTC = 0x06
CSS811_THRESHOLDS = 0x10
CSS811_BASELINE = 0x11
CSS811_HW_ID = 0x20
CSS811_HW_VERSION = 0x21
CSS811_FW_BOOT_VERSION = 0x23
CSS811_FW_APP_VERSION = 0x24
CSS811_ERROR_ID = 0xE0
CSS811_APP_START = 0xF4
CSS811_SW_RESET = 0xFF
[docs]class QwiicCcs811(object):
"""
QwiicCccs811
:param address: The I2C address to use for the device.
If not provided, the default address is used.
:param i2c_driver: An existing i2c driver object. If not provided
a driver object is created.
:return: The Ccs811 device object.
:rtype: Object
"""
# Constructor
device_name = _DEFAULT_NAME
available_addresses = _AVAILABLE_I2C_ADDRESS
# The Arduino lib for this sensor defines return/status codes
# as an enum in the class. Mimicing this here using class variables
SENSOR_SUCCESS = 0
SENSOR_ID_ERROR = 1
SENSOR_I2C_ERROR = 2
SENSOR_INTERNAL_ERROR = 3
SENSOR_GENERIC_ERROR = 4
_RPiCheck = False
def __init__(self, address=None, i2c_driver=None):
# As noted above, to run this device on a Raspberry Pi,
# clock streching is needed.
#
# Lets check if it's enabled. This is done only once in
# the session
if not QwiicCcs811._RPiCheck:
_checkForRPiI2CClockStretch()
QwiicCcs811._RPiCheck = True
# Did the user specify an I2C address?
self.address = address if address is not None else self.available_addresses[0]
# load the I2C driver if one isn't provided
if i2c_driver is None:
self._i2c = qwiic_i2c.getI2CDriver()
if self._i2c is None:
print("Unable to load I2C driver for this platform.")
return
else:
self._i2c = i2c_driver
# qir quality values returned from the sensor
self.refResistance = 10000.
self._resistance = 0.0
self._TVOC = 0
self._CO2 = 0
self.vrefCounts = 0
self.ntcCounts = 0
self._temperature = 0.0
# ----------------------------------
# is_connected()
#
# Is an actual board connected to our system?
[docs] def is_connected(self):
"""
Determine if a CCS811 device is conntected to the system..
:return: True if the device is connected, otherwise False.
:rtype: bool
"""
return qwiic_i2c.isDeviceConnected(self.address)
connected = property(is_connected)
# ----------------------------------
# begin()
#
# Initialize the system/validate the board.
[docs] def begin(self):
"""
Initialize the operation of the Ccs811 module
:return: Returns SENSOR_SUCCESS on success, SENSOR_ID_ERROR on bad chip ID
or SENSOR_INTERNAL_ERROR.
:rtype: integer
"""
# wait for sensor to come up...
time.sleep(.1)
# found it's best to reset the device the try to check chipid.
# If the chip is in a bad state, the ID returns 0xFF and needs
# a kick.
data = [0x11, 0xE5, 0x72, 0x8A] # Reset key
self._i2c.writeBlock(self.address, CSS811_SW_RESET, data)
time.sleep(.5)
# are we who we need to be?
chipID = self._i2c.readByte(self.address, CSS811_HW_ID)
if chipID not in _validChipIDs:
print("Invalid Chip ID: 0x%.2X" % chipID)
return self.SENSOR_ID_ERROR
if self.check_status_error() or not self.app_valid():
return self.SENSOR_INTERNAL_ERROR
self._i2c.writeCommand(self.address, CSS811_APP_START)
return self.set_drive_mode(1) # read every second
#****************************************************************************#
#
# Sensor functions
#
# ****************************************************************************#
# Updates the total voltatile organic compounds (TVOC) in parts per billion (PPB)
# and the CO2 value
# Returns nothing
[docs] def read_algorithm_results(self):
"""
Reads the resutls from the sensor and stores internally
:return: SENSOR_SUCCESS
:rtype: integer
"""
data = self._i2c.readBlock(self.address, CSS811_ALG_RESULT_DATA, 4)
# Data ordered:
# co2MSB, co2LSB, tvocMSB, tvocLSB
self._CO2 = (data[0] << 8) | data[1]
self._TVOC = (data[2] << 8) | data[3]
return self.SENSOR_SUCCESS
#----------------------------------------------------
# Checks to see if error bit is set
[docs] def check_status_error(self):
"""
Returns if the Error bit on the sensor is set.
:return: value of Error bit
:rtype: integer
"""
# return the status bit
value = self._i2c.readByte(self.address, CSS811_STATUS)
return value & 1 << 0
#----------------------------------------------------
# Checks to see if DATA_READ flag is set in the status register
[docs] def data_available(self):
"""
Returns True if data is available on the sensor
:return: True if data is available.
:rtype: bool
"""
try:
value = self._i2c.readByte(self.address, CSS811_STATUS)
except IOError:
value = 0 # This will return 0
return value & 1 << 3 != 0
#----------------------------------------------------
# Checks to see if APP_VALID flag is set in the status register
[docs] def app_valid(self):
"""
Returns True if if the sensor APP_VALID bit is set in the status register
:return: True if APP_VALID is set
:rtype: bool
"""
try:
value = self._i2c.readByte(self.address, CSS811_STATUS)
except IOError:
value = 0 # This will return 0
return value & 1 << 4 != 0
[docs] def get_error_register(self):
"""
Returns the value of the sensors error Register
:return: Error register
:rtype: int
"""
try:
value = self._i2c.readByte(self.address, CSS811_ERROR_ID)
except IOError:
value = 0xFF
return value # Send all errors in the event of communication error
error_register = property(get_error_register)
# Returns the baseline value
# Used for telling sensor what 'clean' air is
# You must put the sensor in clean air and record this value
[docs] def get_baseline(self):
"""
Returns the baseline value
Used for telling sensor what 'clean' air is
You must put the sensor in clean air and record this value
:return: Baseline value for the sensor
:rtype: integer
"""
try:
value = self._i2c.readWord(self.address, CSS811_BASELINE)
except IOError:
value = 0
return value
#----------------------------------------------------
[docs] def set_baseline(self, input_val):
"""
Set the baseline value for the sensor
:return: SENSOR_SUCCESS
:rtype: integer
"""
data = bytearray(2)
data[0] = (input_val >> 8) & 0x00FF
data[1] = input_val & 0x00FF
self._i2c.writeWord(self.address, CSS811_BASELINE, input_val)
return self.SENSOR_SUCCESS
baseline = property(get_baseline, set_baseline)
#----------------------------------------------------
# Enable the nINT signal
[docs] def enable_interrupts(self):
"""
Set the Interrupt bit in the sensor and enable Interrupts
on the sensor
:return: SENSOR_SUCCESS
:rtype: integer
"""
value = self._i2c.readByte(self.address, CSS811_MEAS_MODE)
value |= (1 << 3) #Set INTERRUPT bit
self._i2c.writeByte(self.address, CSS811_MEAS_MODE, value)
return self.SENSOR_SUCCESS
#----------------------------------------------------
# Disable the nINT signal
[docs] def disable_interrupts(self):
"""
Clear the Interrupt bit in the sensor and disable Interrupts
on the sensor
:return: SENSOR_SUCCESS
:rtype: integer
"""
value = self._i2c.readByte(self.address, CSS811_MEAS_MODE)
value &= (~(1 << 3) ) & 0xFF #Set INTERRUPT bit. Just want first Byte
self._i2c.writeByte(self.address, CSS811_MEAS_MODE, value)
return self.SENSOR_SUCCESS
#----------------------------------------------------
# Mode 0 = Idle
# Mode 1 = read every 1s
# Mode 2 = every 10s
# Mode 3 = every 60s
# Mode 4 = RAW mode
[docs] def set_drive_mode(self, mode):
"""
Set the Drive mode for the sensor
:param mode: Valid values are:
0 = Idle, 1 = read every 1s, 2 = every 10s, 3 = every 60s, 4 = RAW mode
:return: SENSOR_SUCCESS
:rtype: integer
"""
if mode > 4:
mode = 4 # sanitize input
value = self._i2c.readByte(self.address, CSS811_MEAS_MODE)
value &= (~(0b00000111 << 4)) & 0xFF # Clear DRIVE_MODE bits. Just 1st byte
value |= (mode << 4) #Mask in mode
self._i2c.writeByte(self.address, CSS811_MEAS_MODE, value)
return self.SENSOR_SUCCESS
#----------------------------------------------------
## Given a temp and humidity, write this data to the CSS811 for better compensation
## This function expects the humidity and temp to come in as floats
[docs] def set_environmental_data(self, relativeHumidity, temperature):
"""
Given a temp and humidity, write this data to the CSS811 for better compensation
This function expects the humidity and temp to come in as floats
:param relativeHumidity: The relativity Humity for the sensor to use
:param temperature: The temperature for the sensor to use
:return: one of the SENSOR_ return codes.
:rtype: integer
"""
# Check for invalid temperatures
if temperature < -25 or temperature > 50:
return self.SENSOR_GENERIC_ERROR
# Check for invalid humidity
if relativeHumidity < 0 or relativeHumidity > 100:
return self.SENSOR_GENERIC_ERROR
rH = int(relativeHumidity * 1000) # 42.348 becomes 42348
temp = int(temperature * 1000) # 23.2 becomes 23200
envData = bytearray(4)
# Split value into 7-bit integer and 9-bit fractional
# Incorrect way from datasheet.
# envData[0] = ((rH % 1000) / 100) > 7 ? (rH / 1000 + 1) << 1 : (rH / 1000) << 1;
# envData[1] = 0; # CCS811 only supports increments of 0.5 so bits 7-0 will always be zero
# if (((rH % 1000) / 100) > 2 && (((rH % 1000) / 100) < 8))
# {
# envData[0] |= 1; # Set 9th bit of fractional to indicate 0.5%
# }
# Correct rounding. See issue 8:
# https://github.com/sparkfun/Qwiic_BME280_CCS811_Combo/issues/8
envData[0] = (rH + 250) // 500
envData[1] = 0 # CCS811 only supports increments of 0.5 so bits 7-0 will always be zero
temp += 25000 # Add the 25C offset
# Split value into 7-bit integer and 9-bit fractional
# envData[2] = ((temp % 1000) / 100) > 7 ? (temp / 1000 + 1) << 1 : (temp / 1000) << 1;
# envData[3] = 0;
# if (((temp % 1000) / 100) > 2 && (((temp % 1000) / 100) < 8))
# {
# envData[2] |= 1; # Set 9th bit of fractional to indicate 0.5C
# }
# Correct rounding
envData[2] = (temp + 250) // 500
envData[3] = 0
self._i2c.writeBlock(self.address, CSS811_ENV_DATA, envData)
return self.SENSOR_SUCCESS
#----------------------------------------------------
[docs] def set_reference_resistance(self, input_val):
"""
Set the sensors referance resistance
:param input: The referance resistance to set in the sensor
:return: No return value
"""
self.refResistance = input_val
[docs] def get_reference_resistance(self):
"""
Get the sensors referance resistance
:return: The current reference resistance
:rtype: integer
"""
return self.refResistance
referance_resistance = property(get_reference_resistance, set_reference_resistance)
#----------------------------------------------------
[docs] def read_ntc(self):
"""
Read the NTC values from the sensor and store for future calications.
NOTE: The qwiic CCS811 doesn't support this function, but other CCS811
sparkfun boards do.
:return: A SENSOR_ status code
:rtype: integer
"""
data = self._i2c.readBlock(self.address, CSS811_NTC, 4)
self.vrefCounts = (data[0] << 8) | data[1]
self.ntcCounts = (data[2] << 8) | data[3]
self._resistance = self.ntcCounts * self.refResistance / float(self.vrefCounts)
# Code from Milan Malesevic and Zoran Stupic, 2011,
# Modified by Max Mayfield,
if self._resistance == 0:
# we had an error of some sorts. Log 0 is not a happy value
print("Error - Invalid received from sensor")
return 1
self._temperature = math.log(int(self._resistance))
self._temperature = 1 / (0.001129148 + (0.000234125 * self._temperature) + \
(0.0000000876741 * self._temperature * self._temperature * self._temperature))
self._temperature = self._temperature - 273.15 # Convert Kelvin to Celsius
return self.SENSOR_SUCCESS
#----------------------------------------------------
# TVOC Value
[docs] def get_tvoc(self):
"""
Return the current TVOC value.
:return: The TVOC Value
:rtype: float
"""
return self._TVOC
TVOC = property(get_tvoc)
#----------------------------------------------------
# CO2 Value
[docs] def get_co2(self):
"""
Return the current CO2 value.
:return: The CO2 Value
:rtype: float
"""
return self._CO2
CO2 = property(get_co2)
#----------------------------------------------------
# Resistance Value
[docs] def get_resistance(self):
"""
Return the current resistance value.
:return: The resistance value
:rtype: float
"""
return self._resistance
resistance = property(get_resistance)
#----------------------------------------------------
# Temperature Value
[docs] def get_temperature(self):
"""
Return the current temperature value.
:return: The temperature Value
:rtype: float
"""
return self._temperature
temperature = property(get_temperature)