1

I'm coming from node where handling asynchronous design is as simple as adding a callback and getting on with your life. I'm trying to write some apps in python where I'm not having the same success and I'm struggling to find what to search for since there doesn't seem to be a direct equivalent.

Here's an example where I'm running an MQTT messaging client and waiting for a state change signal from a sensor.

import paho.mqtt.client as mqtt
from ouimeaux.environment import Environment
from ouimeaux.signals import receiver, statechange

def on_connect(client, userdata, rc):
    print('Connected with result code '+str(rc))
    client.subscribe('lights/#')

def turn_lights_on(client, userdata, rc):
    for (x, value) in enumerate(devices['switches']):
        devices['switches'][x].on()

def turn_lights_off(client, userdata, rc):
    for (x, value) in enumerate(devices['switches']):
        devices['switches'][x].off()

def reply_with_devices(client, userdata, rc):
    for (x, value) in enumerate(devices['switches']):
        client.publish('devices/new', switches[x])
    for (x, value) in enumerate(devices['motions']):
        client.publish('devices/new', motions[x])

def on_switch(switch):
    print "Switch found: ", switch.name
    devices['switches'].append(switch)

def on_motion(motion):
    print "Motion found: ", motion.name
    devices['motions'].append(motion)

client = mqtt.Client("wemo_controller")
client.on_connect = on_connect
client.message_callback_add('lights/on', turn_lights_on)
client.message_callback_add('lights/off', turn_lights_off)
client.message_callback_add('devices/discover', reply_with_devices)

client.connect('localhost', 1883, 60)

print 'Running WEMO controller - listening for messages on localhost:1883'

devices = { 'switches': [], 'motions': [] }

env = Environment(on_switch, on_motion)
env.start()
env.discover(seconds=3)

switch = env.get_switch('Desk lights')

@receiver(statechange)
def motion(sender, **kwargs):
    print 'A THING HAPPENED'
    print "{} state is {state}".format(sender.name, state="on" if kwargs.get('state') else "off")

env.wait()

client.loop_forever()

Both libraries seem to have their own way of holding up the thread but it seems like I can only have one blocking and listening at a time. I have a feeling that threading might be the answer, but I'm struggling to work out how to implement this and not sure if it's right. I'm also confused as to what wait() and loop_forever() actually do.

The answer I'm looking for is the 'python' way to solve this problem.

1
  • Python has a few frameworks that provide node-like behavior: twisted (as already mentioned), tornado, and asyncio, which is built into Python starting with version 3.4. It looks like there's a (poorly documented) MQTT library built using tornado. An another built on asyncio. Commented Aug 9, 2015 at 4:30

3 Answers 3

1

You may want to look at the Twisted framework

"Twisted is an event-driven networking engine written in Python" It is specifically designed for building asynchronous network applications.

In particular, read up on the reactor, and using Deffered() to register callbacks

Sign up to request clarification or add additional context in comments.

1 Comment

For reference, Node pulls inspiration from Twisted, among others.
1

Asynchronous programming have been integrated into python recently. So, if you are using python 3.3, then python provides an inbuilt library Asyncio especially for this purpose (which was previously called 'Tulips'). If you are using python 2.7, then you can use Trollius which is backporting of Asyncio. If nothing suits you, than you can obviously use full-fledged network programming framework Twisted as suggested in other answers.

Comments

1

I'm the author of HBMQTT, a MQTT broker/client library which uses Python asyncio API.

The client API doesn't need any callback. You can use the client API to subscribe for some topic and then run a loop for reading and processing incoming messages. Something like:

import asyncio
from hbmqtt.client import MQTTClient

C = MQTTClient()

@asyncio.coroutine
def test_coro():
   yield from C.connect(uri='mqtt://localhost/', username=None, password=None)
   # Adapt QOS as needed
   yield from C.subscribe([
             {'filter': 'lights/on', 'qos': 0x01},
             {'filter': 'lights/off', 'qos': 0x01},
             {'filter': 'devices/discover', 'qos': 0x01},
         ])
   while some_condition:
         # Wait until next PUBLISH message arrives
         message = yield from C.deliver_message()
         if message.variable_header.topic_name == 'lights/on':
             # Lights on
         elif message.variable_header.topic_name == 'lights/off':
             # Lights off
         yield from C.acknowledge_delivery(message.variable_header.packet_id)
   yield from C.disconnect()


if __name__ == '__main__':
    loop=asyncio.get_event_loop()
    loop.run_until_complete(test_coro())

HBMQTT is still under development. It requires Python 3.4.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.