3
\$\begingroup\$

Python script to output status information to a small OLED screen connected to a Raspberry Pi.

I'm relatively new at writing python so although the app works and outputs correct information it is far from the clean code I encountered when looking at other people's code.

I think I covered most of PEP8. I am most interested in code structure, 'the python way', refactoring and cleanup.

# -*- coding:UTF-8 -*-
#
# | file       :   showStats.py
# | version    :   V1.0
# | date       :   2018-10-01
# | function   :   Show statistics on Waveshare 1.5inch OLED
# |                 To preserve OLED screen life the display is turned off unless
# |                 motion is detected in the server room. (via MQTT).
# |                 - Listens on MQTT topic
# |                     + home/basement/serverroom/motionsensor
#
# Requires psUtil.   Install:   sudo apt-get python-psutil
# Requires pahoMQTT. Install:   sudo apt install python-pip
#                               pip install paho-mqtt
#
# INFO:
# Collects various information about the system and displays the info on the screen
#

# Logging
import logging
import logging.config
import commands
import os
import psutil
import platform
import datetime
import time
# MQTT
import paho.mqtt.client as paho
# RPI
import RPi.GPIO as GPIO
# specific to OLED screen
import DEV_Config
import OLED_Driver
import Image
import ImageDraw
import ImageFont
import ImageColor

# CONSTANTS
LOGGING_CONFIG_FILE = "logging_config.ini"
MQTT_SUBSCRIBE_TOPIC = "home/basement/serverroom/motionsensor"
MQTT_BROKER = ("192.168.1.170", 1883, 60)  # (host, port, timeout)
REFRESH_INTERVAL = 2  # seconds


#GLOBAL
OLED = None
mqtt_client = None
show_on_screen = False


# Logging setup
logging.config.fileConfig(fname=LOGGING_CONFIG_FILE, disable_existing_loggers=True)
logger = logging.getLogger()


try:
    def main():
        logging.addLevelName(100, 'START')
        logging.log(100, "============================")
        logging.log(100, "STARTING UP")

        logger.info("Initialize OLED screen")
        global OLED
        OLED = OLED_Driver.OLED()
        oled_scan_dir = OLED_Driver.SCAN_DIR_DFT  # SCAN_DIR_DFT = D2U_L2R
        OLED.OLED_Init(oled_scan_dir)
        OLED.OLED_Clear()
        DEV_Config.Driver_Delay_ms(500)

        global mqtt_client
        mqtt_client = paho.Client(client_id="RPI-A_OLEDScreen", clean_session=False, userdata=None)
        mqtt_client.enable_logger(logger)
        mqtt_client.on_connect = on_connect
        mqtt_client.on_subscribe = on_subscribe
        mqtt_client.on_message = on_message
        mqtt_client.connect(MQTT_BROKER[0], MQTT_BROKER[1], MQTT_BROKER[2])
        mqtt_client.loop_start()

        logger.info("Setting up OLED display area")
        DEV_Config.Driver_Delay_ms(500)
        image = Image.new("L", (OLED.OLED_Dis_Column, OLED.OLED_Dis_Page), 0)  # grayscale (luminance)
        ImageDraw.Draw(image)  # draw = ImageDraw.Draw(image)

        logger.info("Fetch platform info once@startup")
        os_name, name, version, _, _, _ = platform.uname()
        boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
        mac_address = get_mac_address('eth0')
        logging.log(100, "OS: " + os_name)
        logging.log(100, "Device name: " + name)
        logging.log(100, "version: " + version)
        logging.log(100, "Logging config file: " + LOGGING_CONFIG_FILE)
        logging.log(100, "============================")

        while True:
            update_display(os_name, name, version, boot_time, mac_address, show_on_screen)
            time.sleep(REFRESH_INTERVAL)  # seconds


    def on_connect(client, userdata, flags, rc):
        logger.info("CONNACK received with code %d." % rc)
        client.subscribe(MQTT_SUBSCRIBE_TOPIC, qos=1)


    def on_subscribe(client, userdata, mid, granted_qos):
        logger.info("Subscribed: " + str(mid) + " " + str(granted_qos))


    def on_message(client, userdata, msg):
        global show_on_screen
        logger.info(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
        if msg.topic == MQTT_SUBSCRIBE_TOPIC and msg.payload == "motion":
            show_on_screen = True
        else:
            show_on_screen = False


    def update_display(os_name, name, version, boot_time, mac_address, show_on_screen):
        if not show_on_screen:
            OLED.OLED_Clear()
            return
        else:
            ip_address = None
            fs_total, fs_used, fs_free, fs_percent = (None, None, None, None)

            # update values every loop
            current_time = datetime.datetime.now()
            cpu_percent = str(psutil.cpu_percent()) + "%"  # first run gives a wrong value due to CPU load while initializing PSUTIL
            cpu_temp = get_cpu_temperature()

            # update every hour
            if current_time.minute % 59:
                ip_address = commands.getoutput('hostname -I')

            if current_time.hour % 23:
                # update every day
                fs_total, fs_used, fs_free, fs_percent = psutil.disk_usage("/")

            # temporary debug code. Replace with 'draw.text()' and 'draw.line()'
            print(os_name, name, version)
            print("Local IP address: " + ip_address)
            print("MAC: " + mac_address)
            print("CPU %:", cpu_percent)
            print("CPU temp:", cpu_temp)
            print(sizeof_fmt(fs_used), "of", sizeof_fmt(fs_total), "(", fs_percent, "%)")
            print("up time:", friendly_time_delta(boot_time, current_time))
            logger.info("Done updating OLED screen")

    # Return friendly TimeDelta string
    # Example: "4 days, 8 hours 2 minutes"
    def friendly_time_delta(start, end=datetime.datetime.now()):
        up_time = end - start
        years, reminder = divmod(up_time.total_seconds(), 31556926)
        days, reminder = divmod(reminder, 86400)
        hours, reminder = divmod(reminder, 3600)
        minutes, seconds = divmod(reminder, 60)
        ret = ""
        if years > 1:
            ret = str(int(years)) + " years, "
        elif years == 1:
            ret = "1 year, "
        if days > 1:
            ret += str(int(days)) + " days, "
        elif days == 1:
            ret += "1 day, "
        if hours > 1:
            ret += str(int(hours)) + " hours"
        elif hours == 1:
            ret += str(int(hours)) + " hour"
        if ret == "" and minutes > 0:
            ret += str(int(minutes)) + " minutes"
        if ret == "" and seconds > 0:
            ret += str(int(seconds)) + " seconds"
        return ret


    # Return the MAC address of the specified interface
    def get_mac_address(interface='eth0'):
        try:
            raw_address = open('/sys/class/net/%s/address' % interface).read()
        except:
            raw_address = "00:00:00:00:00:00"
        return raw_address[0:17]


    # return friendly byte size string.
    # Example: 1024 -> 1KiB
    def sizeof_fmt(num, suffix='B'):
        for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
            if abs(num) < 1024.0:
                return "%3.1f%s%s" % (num, unit, suffix)
            num /= 1024.0
        return "%.1f%s%s" % (num, 'Yi', suffix)


    # Return CPU temperature as a character string
    def get_cpu_temperature():
        res = os.popen('vcgencmd measure_temp').readline()
        return res.replace("temp=", "").replace("'C\n", "")


    if __name__ == "__main__":
        main()

except KeyboardInterrupt:
    logger.info("exiting program")
except:
    logger.error('Exception occurred', exc_info=True)
    raise
finally:
    GPIO.cleanup()  # this ensures a clean exit
    mqtt_client.loop_stop()
\$\endgroup\$
1
  • \$\begingroup\$ Please check your indentation. The easiest way to post code is to paste it into the question editor, highlight it, and press Ctrl-K to mark it as a code block. \$\endgroup\$ Commented Oct 3, 2018 at 13:43

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.