Zmáčkni a běž (Touch and go)
Ukázka použití MQTT. Pro tento projekt je třeba mít funkční sít a dostupný MQTT broker.
Variace na hru wack-a-mole. Krabičky se náhodně rozsvěcují, K rozsvícení další krabičky je třeba zmáčknout tlačítko "X" na té, která svítí.
Master
Následující kód nahrad do jedné krabičky, ze které se bude řídit hra.
Nezapoměň upravit heslo na wifi a cestu k brokeru.
import random, machine, ubinascii
from time import sleep
from umqtt.simple import MQTTClient
from dtbox.network.shortcuts import network
from dtbox.led.shortcuts import led_red, led_amber, led_green
from dtbox.button.shortcuts import button_o, button_x
from dtbox.display.shortcuts import display
# use custom wifi connection
# network._networks.append({"ssid":"mynetwork", "psk":"mypassword"})
class Channels:
root = "dtbox"
register = f"{root}/register"
win = f"{root}/flash"
pressed = f"{root}/press"
activate = f"{root}/activate"
client_id = ubinascii.hexlify(machine.unique_id()).decode()
c = MQTTClient(client_id, "broker.hivemq.com", ssl=0) # change this
class Game:
set_length = 0
_remeaning_presses = 0
_current_client = None
clients = []
started = False
def __init__(self, length):
self.set_length = length
def register_client(self, client_id):
print("Attempting to register", client_id)
if client_id in self.clients: return
self.clients.append(client_id)
display.show(len(self.clients))
print("Registered", client_id)
def press_button(self, client_id):
print("Pressing button", client_id)
if client_id not in self.clients or client_id != self._current_client:
print("Client not registered or not active", client_id)
return
if self._remeaning_presses == 0: return self.stop()
next_client = client_id
while next_client == client_id:
next_client = self.clients[random.randint(0, len(self.clients) - 1)]
self._current_client = next_client
print("Next client", next_client)
c.publish(Channels.activate, next_client)
self._remeaning_presses -= 1
display.show(self._remeaning_presses)
def receive_message(self, topic, msg):
topic = topic.decode()
msg = msg.decode()
print('received message', topic, msg)
if topic == Channels.register:
self.register_client(msg)
elif topic == Channels.pressed and self.started:
self.press_button(msg)
def stop(self):
print("Stopping game")
c.publish(Channels.win, "1")
self.started = False
self._remeaning_presses = self.set_length
display.show(len(self.clients))
led_green.value(0)
led_amber.value(1)
def start(self):
if self.started: return
if not self.clients:
return
self.started = True
self._remeaning_presses = self.set_length
c.publish(Channels.win, "0")
display.show(self._remeaning_presses)
led_amber.value(0)
led_green.value(1)
self._current_client = self.clients[random.randint(0, len(self.clients) - 1)]
c.publish(Channels.activate, self._current_client)
def ensure_connected(network, c):
wifi_connected = False
broker_connected = False
while not (wifi_connected and broker_connected):
try:
if not wifi_connected:
print("w")
wifi_connected = network.connect()
if not broker_connected:
print("b")
c.connect()
broker_connected = True
c.subscribe(Channels.register)
c.subscribe(Channels.pressed)
except (OSError, IndexError):
pass
game = Game(10)
c.set_callback(game.receive_message)
led_red.value(1)
ensure_connected(network, c)
led_red.value(0)
button_x.on_press(game.stop)
button_o.on_press(game.start)
led_red.value(0)
led_amber.value(1)
while True:
try:
c.check_msg()
sleep(0.1)
print(".",end="")
except (OSError, IndexError):
led_red.value(1)
ensure_connected(network, c)
led_red.value(0)
Client
Tento kód nahraj do každé herní krabičky do main.py
.
Nezapoměň upravit heslo na wifi a cestu k brokeru.
from umqtt.simple import MQTTClient
import machine, ubinascii
from dtbox.network.shortcuts import network
from dtbox.wsled.shortcuts import wsled
from dtbox.colors import *
from time import sleep
from dtbox.led.shortcuts import led_red, led_amber, led_green
from dtbox.button.shortcuts import button_o
# network._networks.append({"ssid":"mynetwork", "psk":"mypassword"})
client_id = ubinascii.hexlify(machine.unique_id()).decode()
broker = "broker.hivemq.com" # change this
register_topic = f"dtbox/register"
subscribe_topic = f"dtbox/activate"
publish_topic = f"dtbox/press"
win_topic = "dtbox/flash"
c = MQTTClient(client_id, broker, ssl=0)
def receive_message(topic, msg):
topic = topic.decode()
msg = msg.decode()
print('received message', topic, msg)
if topic == win_topic:
if msg == "1":
wsled.color(GREEN)
elif msg == "0":
wsled.color(BLACK)
if topic == subscribe_topic:
if msg == client_id:
wsled.color(RED)
else:
wsled.color(BLACK)
def button():
global c
global publish_topic
global client_id
c.publish(publish_topic, client_id)
def ensure_connected(network, c):
wifi_connected = False
broker_connected = False
while not (wifi_connected and broker_connected):
try:
if not wifi_connected:
print("w")
wifi_connected = network.connect()
if not broker_connected:
print("b")
c.connect()
broker_connected = True
c.subscribe(subscribe_topic)
c.subscribe(win_topic)
except (OSError, IndexError):
pass
button_o.on_press(button)
c.set_callback(receive_message)
wsled.color(BLACK)
led_red.value(1)
ensure_connected(network, c)
led_red.value(0)
c.publish(register_topic, client_id)
while True:
try:
c.check_msg()
sleep(0.1)
print(".",end="")
except (OSError, IndexError):
led_red.value(1)
ensure_connected(network, c)
led_red.value(0)