78 lines
3.0 KiB
Python
78 lines
3.0 KiB
Python
import datetime
|
|
import logging
|
|
import re
|
|
import json
|
|
from channels.generic.websocket import AsyncWebsocketConsumer
|
|
from booker.models import Scanner
|
|
from asgiref.sync import sync_to_async
|
|
from channels.db import database_sync_to_async
|
|
from django.contrib.auth import get_user_model
|
|
from homelog.system_user import get_system_user
|
|
from homelog import settings
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class ScannerAnnounceConsumer(AsyncWebsocketConsumer):
|
|
|
|
async def connect(self):
|
|
self.group_name = 'scanner-announce'
|
|
# join to group
|
|
await self.channel_layer.group_add(self.group_name, self.channel_name)
|
|
LOGGER.debug(f"send mqtt_subscribe to channel layer {settings.MQTT_CHANNEL_NAME} to answer on channel {self.group_name} (my channel is {self.channel_name})")
|
|
await self.channel_layer.send(settings.MQTT_CHANNEL_NAME, {
|
|
"type": "mqtt.subscribe",
|
|
"topic": "barcodescanner/+/status",
|
|
"group": self.group_name
|
|
})
|
|
LOGGER.debug("ScannerAnnounceConsumer initialized")
|
|
await self.accept()
|
|
|
|
async def receive(self, mqtt_message):
|
|
pass
|
|
|
|
async def disconnect(self, code):
|
|
await self.channel_layer.send(settings.MQTT_CHANNEL_NAME, {
|
|
"type": "mqtt.unsubscribe",
|
|
"topic": "barcodescanner/#/status",
|
|
"group": self.group_name
|
|
})
|
|
LOGGER.debug(f"Disconnect from scanner-announce, unsubscribing topic; code={code}")
|
|
await self.channel_layer.group_discard(self.group_name, self.channel_name)
|
|
|
|
async def distribute(self, event):
|
|
valother = event['text']
|
|
await self.send(text_data=valother)
|
|
|
|
@database_sync_to_async
|
|
def get_or_create_scanner(self, named_id, topic):
|
|
scanner, created = Scanner.objects.get_or_create(named_id=named_id, defaults={
|
|
'lwt_topic': topic,
|
|
'last_online_ts': datetime.datetime.now(),
|
|
'created_by': get_system_user(),
|
|
'changed_by': get_system_user()
|
|
})
|
|
if created:
|
|
LOGGER.debug(f"Created new scanner entry for {named_id}")
|
|
else:
|
|
LOGGER.debug(f"Updated scanner entry for {named_id}")
|
|
return scanner
|
|
|
|
async def mqtt_message(self, event):
|
|
message = event['message']
|
|
topic = message['topic']
|
|
qos = message['qos']
|
|
payload = message['payload']
|
|
print('Received a message at topic: ', topic)
|
|
print('with payload ', payload)
|
|
print('and QOS ', qos)
|
|
named_id = 'invalid'
|
|
m = re.match(r'barcodescanner\/(barcodescan-[0-9,a-f]{6})\/status', topic)
|
|
if m:
|
|
named_id = m[1]
|
|
await self.send(text_data=json.dumps({'message': payload, 'scanner': named_id, 'topic': topic}))
|
|
LOGGER.debug(f"Got MQTT message from {topic} with payload {payload}")
|
|
if payload == 'Online' or payload == 'online':
|
|
scanner = await self.get_or_create_scanner(named_id=named_id, topic=topic)
|