Python script to output status information to a small OLED screen connected to a Raspberry Pi.
I'm relatively new at writing python so although the app works and outputs correct information it is far from the clean code I encountered when looking at other people's code.
I think I covered most of PEP8. I am most interested in code structure, 'the python way', refactoring and cleanup.
# -*- coding:UTF-8 -*-
#
# | file : showStats.py
# | version : V1.0
# | date : 2018-10-01
# | function : Show statistics on Waveshare 1.5inch OLED
# | To preserve OLED screen life the display is turned off unless
# | motion is detected in the server room. (via MQTT).
# | - Listens on MQTT topic
# | + home/basement/serverroom/motionsensor
#
# Requires psUtil. Install: sudo apt-get python-psutil
# Requires pahoMQTT. Install: sudo apt install python-pip
# pip install paho-mqtt
#
# INFO:
# Collects various information about the system and displays the info on the screen
#
# Logging
import logging
import logging.config
import commands
import os
import psutil
import platform
import datetime
import time
# MQTT
import paho.mqtt.client as paho
# RPI
import RPi.GPIO as GPIO
# specific to OLED screen
import DEV_Config
import OLED_Driver
import Image
import ImageDraw
import ImageFont
import ImageColor
# CONSTANTS
LOGGING_CONFIG_FILE = "logging_config.ini"
MQTT_SUBSCRIBE_TOPIC = "home/basement/serverroom/motionsensor"
MQTT_BROKER = ("192.168.1.170", 1883, 60) # (host, port, timeout)
REFRESH_INTERVAL = 2 # seconds
#GLOBAL
OLED = None
mqtt_client = None
show_on_screen = False
# Logging setup
logging.config.fileConfig(fname=LOGGING_CONFIG_FILE, disable_existing_loggers=True)
logger = logging.getLogger()
try:
def main():
logging.addLevelName(100, 'START')
logging.log(100, "============================")
logging.log(100, "STARTING UP")
logger.info("Initialize OLED screen")
global OLED
OLED = OLED_Driver.OLED()
oled_scan_dir = OLED_Driver.SCAN_DIR_DFT # SCAN_DIR_DFT = D2U_L2R
OLED.OLED_Init(oled_scan_dir)
OLED.OLED_Clear()
DEV_Config.Driver_Delay_ms(500)
global mqtt_client
mqtt_client = paho.Client(client_id="RPI-A_OLEDScreen", clean_session=False, userdata=None)
mqtt_client.enable_logger(logger)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_BROKER[0], MQTT_BROKER[1], MQTT_BROKER[2])
mqtt_client.loop_start()
logger.info("Setting up OLED display area")
DEV_Config.Driver_Delay_ms(500)
image = Image.new("L", (OLED.OLED_Dis_Column, OLED.OLED_Dis_Page), 0) # grayscale (luminance)
ImageDraw.Draw(image) # draw = ImageDraw.Draw(image)
logger.info("Fetch platform info once@startup")
os_name, name, version, _, _, _ = platform.uname()
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
mac_address = get_mac_address('eth0')
logging.log(100, "OS: " + os_name)
logging.log(100, "Device name: " + name)
logging.log(100, "version: " + version)
logging.log(100, "Logging config file: " + LOGGING_CONFIG_FILE)
logging.log(100, "============================")
while True:
update_display(os_name, name, version, boot_time, mac_address, show_on_screen)
time.sleep(REFRESH_INTERVAL) # seconds
def on_connect(client, userdata, flags, rc):
logger.info("CONNACK received with code %d." % rc)
client.subscribe(MQTT_SUBSCRIBE_TOPIC, qos=1)
def on_subscribe(client, userdata, mid, granted_qos):
logger.info("Subscribed: " + str(mid) + " " + str(granted_qos))
def on_message(client, userdata, msg):
global show_on_screen
logger.info(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
if msg.topic == MQTT_SUBSCRIBE_TOPIC and msg.payload == "motion":
show_on_screen = True
else:
show_on_screen = False
def update_display(os_name, name, version, boot_time, mac_address, show_on_screen):
if not show_on_screen:
OLED.OLED_Clear()
return
else:
ip_address = None
fs_total, fs_used, fs_free, fs_percent = (None, None, None, None)
# update values every loop
current_time = datetime.datetime.now()
cpu_percent = str(psutil.cpu_percent()) + "%" # first run gives a wrong value due to CPU load while initializing PSUTIL
cpu_temp = get_cpu_temperature()
# update every hour
if current_time.minute % 59:
ip_address = commands.getoutput('hostname -I')
if current_time.hour % 23:
# update every day
fs_total, fs_used, fs_free, fs_percent = psutil.disk_usage("/")
# temporary debug code. Replace with 'draw.text()' and 'draw.line()'
print(os_name, name, version)
print("Local IP address: " + ip_address)
print("MAC: " + mac_address)
print("CPU %:", cpu_percent)
print("CPU temp:", cpu_temp)
print(sizeof_fmt(fs_used), "of", sizeof_fmt(fs_total), "(", fs_percent, "%)")
print("up time:", friendly_time_delta(boot_time, current_time))
logger.info("Done updating OLED screen")
# Return friendly TimeDelta string
# Example: "4 days, 8 hours 2 minutes"
def friendly_time_delta(start, end=datetime.datetime.now()):
up_time = end - start
years, reminder = divmod(up_time.total_seconds(), 31556926)
days, reminder = divmod(reminder, 86400)
hours, reminder = divmod(reminder, 3600)
minutes, seconds = divmod(reminder, 60)
ret = ""
if years > 1:
ret = str(int(years)) + " years, "
elif years == 1:
ret = "1 year, "
if days > 1:
ret += str(int(days)) + " days, "
elif days == 1:
ret += "1 day, "
if hours > 1:
ret += str(int(hours)) + " hours"
elif hours == 1:
ret += str(int(hours)) + " hour"
if ret == "" and minutes > 0:
ret += str(int(minutes)) + " minutes"
if ret == "" and seconds > 0:
ret += str(int(seconds)) + " seconds"
return ret
# Return the MAC address of the specified interface
def get_mac_address(interface='eth0'):
try:
raw_address = open('/sys/class/net/%s/address' % interface).read()
except:
raw_address = "00:00:00:00:00:00"
return raw_address[0:17]
# return friendly byte size string.
# Example: 1024 -> 1KiB
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
# Return CPU temperature as a character string
def get_cpu_temperature():
res = os.popen('vcgencmd measure_temp').readline()
return res.replace("temp=", "").replace("'C\n", "")
if __name__ == "__main__":
main()
except KeyboardInterrupt:
logger.info("exiting program")
except:
logger.error('Exception occurred', exc_info=True)
raise
finally:
GPIO.cleanup() # this ensures a clean exit
mqtt_client.loop_stop()