Přeskočit obsah

Zmáčkni a běž (Touch and go)

Ukázka použití MQTT. Pro tento projekt je třeba mít funkční sít a dostupný MQTT broker.

Variace na hru wack-a-mole. Krabičky se náhodně rozsvěcují, K rozsvícení další krabičky je třeba zmáčknout tlačítko "X" na té, která svítí.

Master

Následující kód nahrad do jedné krabičky, ze které se bude řídit hra.

Nezapoměň upravit heslo na wifi a cestu k brokeru.

import random, machine, ubinascii
from time import sleep

from umqtt.simple import MQTTClient
from dtbox.network.shortcuts import network
from dtbox.led.shortcuts import led_red, led_amber, led_green
from dtbox.button.shortcuts import button_o, button_x
from dtbox.display.shortcuts import display

# use custom wifi connection
# network._networks.append({"ssid":"mynetwork", "psk":"mypassword"})

class Channels:
    root = "dtbox"
    register = f"{root}/register"
    win = f"{root}/flash"
    pressed = f"{root}/press"
    activate = f"{root}/activate"

client_id = ubinascii.hexlify(machine.unique_id()).decode()
c = MQTTClient(client_id, "broker.hivemq.com", ssl=0)   # change this

class Game:
    set_length = 0
    _remeaning_presses = 0
    _current_client = None
    clients = []
    started = False

    def __init__(self, length):
        self.set_length = length

    def register_client(self, client_id):
        print("Attempting to register", client_id)
        if client_id in self.clients: return

        self.clients.append(client_id)
        display.show(len(self.clients))
        print("Registered", client_id)

    def press_button(self, client_id):
        print("Pressing button", client_id)

        if client_id not in self.clients or client_id != self._current_client:
            print("Client not registered or not active", client_id)
            return

        if self._remeaning_presses == 0: return self.stop()

        next_client = client_id
        while next_client == client_id:
            next_client = self.clients[random.randint(0, len(self.clients) - 1)]

        self._current_client = next_client

        print("Next client", next_client)
        c.publish(Channels.activate, next_client)

        self._remeaning_presses -= 1
        display.show(self._remeaning_presses)


    def receive_message(self, topic, msg):
        topic = topic.decode()
        msg = msg.decode()
        print('received message', topic, msg)

        if topic == Channels.register:
            self.register_client(msg)

        elif topic == Channels.pressed and self.started:
            self.press_button(msg)

    def stop(self):
        print("Stopping game")
        c.publish(Channels.win, "1")

        self.started = False
        self._remeaning_presses = self.set_length

        display.show(len(self.clients))

        led_green.value(0)
        led_amber.value(1)

    def start(self):
        if self.started: return

        if not self.clients:
            return

        self.started = True
        self._remeaning_presses = self.set_length
        c.publish(Channels.win, "0")

        display.show(self._remeaning_presses)

        led_amber.value(0)
        led_green.value(1)

        self._current_client = self.clients[random.randint(0, len(self.clients) - 1)]
        c.publish(Channels.activate, self._current_client)


def ensure_connected(network, c):
    wifi_connected = False
    broker_connected = False
    while not (wifi_connected and broker_connected):
        try:
            if not wifi_connected:
                print("w")
                wifi_connected = network.connect()
            if not broker_connected:
                print("b")
                c.connect()
                broker_connected = True
                c.subscribe(Channels.register)
                c.subscribe(Channels.pressed)
        except (OSError, IndexError):
            pass


game = Game(10)
c.set_callback(game.receive_message)

led_red.value(1)
ensure_connected(network, c)
led_red.value(0)

button_x.on_press(game.stop)
button_o.on_press(game.start)
led_red.value(0)
led_amber.value(1)

while True:
    try:
        c.check_msg()
        sleep(0.1)

        print(".",end="")
    except (OSError, IndexError):
        led_red.value(1)
        ensure_connected(network, c)
        led_red.value(0)

Client

Tento kód nahraj do každé herní krabičky do main.py.

Nezapoměň upravit heslo na wifi a cestu k brokeru.

from umqtt.simple import MQTTClient
import machine, ubinascii
from dtbox.network.shortcuts import network
from dtbox.wsled.shortcuts import wsled
from dtbox.colors import *
from time import sleep
from dtbox.led.shortcuts import led_red, led_amber, led_green
from dtbox.button.shortcuts import button_o

# network._networks.append({"ssid":"mynetwork", "psk":"mypassword"})

client_id = ubinascii.hexlify(machine.unique_id()).decode()

broker = "broker.hivemq.com" # change this
register_topic = f"dtbox/register"
subscribe_topic = f"dtbox/activate"
publish_topic = f"dtbox/press"
win_topic = "dtbox/flash"

c = MQTTClient(client_id, broker, ssl=0)

def receive_message(topic, msg):
    topic = topic.decode()
    msg = msg.decode()
    print('received message', topic, msg)
    if topic == win_topic:
        if msg == "1":
            wsled.color(GREEN)
        elif msg == "0":
            wsled.color(BLACK)                
    if topic == subscribe_topic:
        if msg == client_id:
            wsled.color(RED)
        else:        
            wsled.color(BLACK)                

def button():
    global c
    global publish_topic
    global client_id
    c.publish(publish_topic, client_id)



def ensure_connected(network, c):
    wifi_connected = False
    broker_connected = False
    while not (wifi_connected and broker_connected):
        try:
            if not wifi_connected:
                print("w")
                wifi_connected = network.connect()
            if not broker_connected:
                print("b")
                c.connect()
                broker_connected = True
                c.subscribe(subscribe_topic)
                c.subscribe(win_topic)
        except (OSError, IndexError):
            pass


button_o.on_press(button)
c.set_callback(receive_message)
wsled.color(BLACK)                


led_red.value(1)
ensure_connected(network, c)
led_red.value(0)

c.publish(register_topic, client_id)


while True:
    try:
        c.check_msg()
        sleep(0.1)
        print(".",end="")

    except (OSError, IndexError):
        led_red.value(1)
        ensure_connected(network, c)
        led_red.value(0)