Compare commits

...

1 Commits

Author SHA1 Message Date
Soulter
6c18971f00 feat: 初步接入米家小爱音箱 2025-01-25 02:30:00 +08:00
11 changed files with 755 additions and 0 deletions

View File

@@ -105,6 +105,17 @@ CONFIG_METADATA_2 = {
"host": "localhost", "host": "localhost",
"port": 11451, "port": 11451,
}, },
"mispeaker(小爱音箱)": {
"id": "mispeaker",
"type": "mispeaker",
"enable": False,
"username": "",
"password": "",
"did": "",
"activate_word": "测试",
"deactivate_word": "停止",
"interval": 1,
},
}, },
"items": { "items": {
"id": { "id": {

View File

@@ -27,6 +27,8 @@ class PlatformManager():
from .sources.vchat.vchat_platform_adapter import VChatPlatformAdapter # noqa: F401 from .sources.vchat.vchat_platform_adapter import VChatPlatformAdapter # noqa: F401
case "gewechat": case "gewechat":
from .sources.gewechat.gewechat_platform_adapter import GewechatPlatformAdapter # noqa: F401 from .sources.gewechat.gewechat_platform_adapter import GewechatPlatformAdapter # noqa: F401
case "mispeaker":
from .sources.mispeaker.mispeaker_adapter import MiSpeakerPlatformAdapter # noqa: F401
async def initialize(self): async def initialize(self):

View File

@@ -0,0 +1,137 @@
import os
import json
import asyncio
import aiohttp
import time
import traceback
from .miservice import MiAccount, MiNAService, MiIOService, miio_command, miio_command_help
from astrbot.core import logger
from astrbot.api.platform import AstrBotMessage, MessageMember, MessageType
from astrbot.api.message_components import Plain, Image, At
class SimpleMiSpeakerClient():
'''
@author: Soulter
@references: https://github.com/yihong0618/xiaogpt/blob/main/xiaogpt/xiaogpt.py
'''
def __init__(self, config: dict):
self.username = config['username']
self.password = config['password']
self.did = config['did']
self.store = os.path.join("data", '.mi.token')
self.interval = float(config.get('interval', 1))
self.conv_query_cookies = {
'userId': '',
'deviceId': '',
'serviceToken': ''
}
self.MI_CONVERSATION_URL = "https://userprofile.mina.mi.com/device_profile/v2/conversation?source=dialogu&hardware={hardware}&timestamp={timestamp}&limit=1"
self.session = aiohttp.ClientSession()
self.activate_word = config.get('activate_word', '测试')
self.deactivate_word = config.get('deactivate_word', '停止')
self.entered = False
async def initialize(self):
account = MiAccount(self.session, self.username, self.password, self.store)
self.miio_service = MiIOService(account) # 小米设备服务
self.mina_service = MiNAService(account) # 小爱音箱服务
device = await self.get_mina_device()
self.deviceID = device['deviceID']
self.hardware = device['hardware']
with open(self.store, 'r') as f:
data = json.load(f)
self.userId = data['userId']
self.serviceToken = data['micoapi'][1]
self.conv_query_cookies['userId'] = self.userId
self.conv_query_cookies['deviceId'] = self.deviceID
self.conv_query_cookies['serviceToken'] = self.serviceToken
logger.info(f"MiSpeakerClient initialized. Conv cookies: {self.conv_query_cookies}. Hardware: {self.hardware}")
async def get_mina_device(self) -> dict:
devices = await self.mina_service.device_list()
for device in devices:
if device['miotDID'] == self.did:
logger.info(f"找到设备 {device['alias']}({device['name']}) 了!")
return device
async def get_conv(self) -> str:
# 时区请确保为北京时间
async with aiohttp.ClientSession() as session:
session.cookie_jar.update_cookies(self.conv_query_cookies)
query_ts = int(time.time())*1000
logger.debug(f"Querying conversation at {query_ts}")
async with session.get(self.MI_CONVERSATION_URL.format(hardware=self.hardware, timestamp=str(query_ts))) as resp:
json_blob = await resp.json()
if json_blob['code'] == 0:
data = json.loads(json_blob['data'])
records = data.get('records', None)
for record in records:
if record['time'] >= query_ts - self.interval*1000:
return record['query']
else:
logger.error(f"Failed to get conversation: {json_blob}")
return None
async def start_pooling(self):
while True:
await asyncio.sleep(self.interval)
try:
query = await self.get_conv()
if not query:
continue
# is wake
if query == self.activate_word:
self.entered = True
await self.stop_playing()
await self.send("我来啦!")
continue
elif query == self.deactivate_word:
self.entered = False
await self.stop_playing()
await self.send("再见,欢迎给个 Star。")
continue
if not self.entered:
continue
await self.send("")
abm = await self._convert(query)
if abm:
coro = getattr(self, "on_event_received")
if coro:
await coro(abm)
except BaseException as e:
traceback.print_exc()
logger.error(e)
async def _convert(self, query: str):
abm = AstrBotMessage()
abm.message = [Plain(query)]
abm.message_id = str(int(time.time()))
abm.message_str = query
abm.raw_message = query
abm.session_id = f"{self.hardware}_{self.did}_{self.username}"
abm.sender = MessageMember(self.username, "主人")
abm.self_id = f"{self.hardware}_{self.did}"
abm.type = MessageType.FRIEND_MESSAGE
return abm
async def send(self, message: str):
text = f'5 {message}'
await miio_command(self.miio_service, self.did, text, 'astrbot')
async def stop_playing(self):
text = f'3-2'
await miio_command(self.miio_service, self.did, text, 'astrbot')

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021-2022 Yonsm
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,5 @@
from .miaccount import MiAccount, MiTokenStore
from .minaservice import MiNAService
from .miioservice import MiIOService
from .miiocommand import miio_command, miio_command_help

View File

@@ -0,0 +1,135 @@
import base64
import hashlib
import json
import logging
import os
import random
import string
from urllib import parse
from aiohttp import ClientSession
from aiofiles import open as async_open
_LOGGER = logging.getLogger(__package__)
def get_random(length):
return ''.join(random.sample(string.ascii_letters + string.digits, length))
class MiTokenStore:
def __init__(self, token_path):
self.token_path = token_path
async def load_token(self):
if os.path.isfile(self.token_path):
try:
async with async_open(self.token_path) as f:
return json.loads(await f.read())
except Exception as e:
_LOGGER.exception("Exception on load token from %s: %s", self.token_path, e)
return None
async def save_token(self, token=None):
if token:
try:
async with async_open(self.token_path, 'w') as f:
await f.write(json.dumps(token, indent=2))
except Exception as e:
_LOGGER.exception("Exception on save token to %s: %s", self.token_path, e)
elif os.path.isfile(self.token_path):
os.remove(self.token_path)
class MiAccount:
def __init__(self, session: ClientSession, username, password, token_store='.mi.token'):
self.session = session
self.username = username
self.password = password
self.token_store = MiTokenStore(token_store) if isinstance(token_store, str) else token_store
self.token = None
async def login(self, sid):
if not self.token:
self.token = {'deviceId': get_random(16).upper()}
try:
resp = await self._serviceLogin(f'serviceLogin?sid={sid}&_json=true')
if resp['code'] != 0:
data = {
'_json': 'true',
'qs': resp['qs'],
'sid': resp['sid'],
'_sign': resp['_sign'],
'callback': resp['callback'],
'user': self.username,
'hash': hashlib.md5(self.password.encode()).hexdigest().upper()
}
resp = await self._serviceLogin('serviceLoginAuth2', data)
if resp['code'] != 0:
raise Exception(resp)
self.token['userId'] = resp['userId']
self.token['passToken'] = resp['passToken']
serviceToken = await self._securityTokenService(resp['location'], resp['nonce'], resp['ssecurity'])
self.token[sid] = (resp['ssecurity'], serviceToken)
if self.token_store:
await self.token_store.save_token(self.token)
return True
except Exception as e:
self.token = None
if self.token_store:
await self.token_store.save_token()
_LOGGER.exception("Exception on login %s: %s", self.username, e)
return False
async def _serviceLogin(self, uri, data=None):
headers = {'User-Agent': 'APP/com.xiaomi.mihome APPV/6.0.103 iosPassportSDK/3.9.0 iOS/14.4 miHSTS'}
cookies = {'sdkVersion': '3.9', 'deviceId': self.token['deviceId']}
if 'passToken' in self.token:
cookies['userId'] = self.token['userId']
cookies['passToken'] = self.token['passToken']
url = 'https://account.xiaomi.com/pass/' + uri
async with self.session.request('GET' if data is None else 'POST', url, data=data, cookies=cookies, headers=headers) as r:
raw = await r.read()
resp = json.loads(raw[11:])
_LOGGER.debug("%s: %s", uri, resp)
return resp
async def _securityTokenService(self, location, nonce, ssecurity):
nsec = 'nonce=' + str(nonce) + '&' + ssecurity
clientSign = base64.b64encode(hashlib.sha1(nsec.encode()).digest()).decode()
async with self.session.get(location + '&clientSign=' + parse.quote(clientSign)) as r:
serviceToken = r.cookies['serviceToken'].value
if not serviceToken:
raise Exception(await r.text())
return serviceToken
async def mi_request(self, sid, url, data, headers, relogin=True):
if self.token is None and self.token_store is not None:
self.token = await self.token_store.load_token()
if (self.token and sid in self.token) or await self.login(sid): # Ensure login
cookies = {'userId': self.token['userId'], 'serviceToken': self.token[sid][1]}
content = data(self.token, cookies) if callable(data) else data
method = 'GET' if data is None else 'POST'
_LOGGER.debug("%s %s", url, content)
async with self.session.request(method, url, data=content, cookies=cookies, headers=headers) as r:
status = r.status
if status == 200:
resp = await r.json(content_type=None)
code = resp['code']
if code == 0:
return resp
if 'auth' in resp.get('message', '').lower():
status = 401
else:
resp = await r.text()
if status == 401 and relogin:
_LOGGER.warn("Auth error on request %s %s, relogin...", url, resp)
self.token = None # Auth error, reset login
return await self.mi_request(sid, url, data, headers, False)
else:
resp = "Login failed"
raise Exception(f"Error {url}: {resp}")

View File

@@ -0,0 +1,104 @@
import json
from .miioservice import MiIOService
def twins_split(string, sep, default=None):
pos = string.find(sep)
return (string, default) if pos == -1 else (string[0:pos], string[pos+1:])
def string_to_value(string):
if string[0] in '"\'#':
return string[1:-1] if string[-1] in '"\'#' else string[1:]
elif string == 'null':
return None
elif string == 'false':
return False
elif string == 'true':
return True
elif string.isdigit():
return int(string)
try:
return float(string)
except:
return string
def miio_command_help(did=None, prefix='?'):
quote = '' if prefix == '?' else "'"
return f'\
Get Props: {prefix}<siid[-piid]>[,...]\n\
{prefix}1,1-2,1-3,1-4,2-1,2-2,3\n\
Set Props: {prefix}<siid[-piid]=[#]value>[,...]\n\
{prefix}2=60,2-1=#60,2-2=false,2-3="null",3=test\n\
Do Action: {prefix}<siid[-piid]> <arg1|[]> [...] \n\
{prefix}2 []\n\
{prefix}5 Hello\n\
{prefix}5-4 Hello 1\n\n\
Call MIoT: {prefix}<cmd=prop/get|/prop/set|action> <params>\n\
{prefix}action {quote}{{"did":"{did or "267090026"}","siid":5,"aiid":1,"in":["Hello"]}}{quote}\n\n\
Call MiIO: {prefix}/<uri> <data>\n\
{prefix}/home/device_list {quote}{{"getVirtualModel":false,"getHuamiDevices":1}}{quote}\n\n\
Devs List: {prefix}list [name=full|name_keyword] [getVirtualModel=false|true] [getHuamiDevices=0|1]\n\
{prefix}list Light true 0\n\n\
MIoT Spec: {prefix}spec [model_keyword|type_urn] [format=text|python|json]\n\
{prefix}spec\n\
{prefix}spec speaker\n\
{prefix}spec xiaomi.wifispeaker.lx04\n\
{prefix}spec urn:miot-spec-v2:device:speaker:0000A015:xiaomi-lx04:1\n\n\
MIoT Decode: {prefix}decode <ssecurity> <nonce> <data> [gzip]\n\
'
async def miio_command(service: MiIOService, did, text, prefix='?'):
cmd, arg = twins_split(text, ' ')
if cmd.startswith('/'):
return await service.miio_request(cmd, arg)
if cmd.startswith('prop') or cmd == 'action':
return await service.miot_request(cmd, json.loads(arg) if arg else None)
argv = arg.split(' ') if arg else []
argc = len(argv)
if cmd == 'list':
return await service.device_list(argc > 0 and argv[0], argc > 1 and string_to_value(argv[1]), argc > 2 and argv[2])
if cmd == 'spec':
return await service.miot_spec(argc > 0 and argv[0], argc > 1 and argv[1])
if cmd == 'decode':
return MiIOService.miot_decode(argv[0], argv[1], argv[2], argc > 3 and argv[3] == 'gzip')
if not did or not cmd or cmd == '?' or cmd == '' or cmd == 'help' or cmd == '-h' or cmd == '--help':
return miio_command_help(did, prefix)
if not did.isdigit():
devices = await service.device_list(did)
if not devices:
return "Device not found: " + did
did = devices[0]['did']
props = []
setp = True
miot = True
for item in cmd.split(','):
key, value = twins_split(item, '=')
siid, iid = twins_split(key, '-', '1')
if siid.isdigit() and iid.isdigit():
prop = [int(siid), int(iid)]
else:
prop = [key]
miot = False
if value is None:
setp = False
elif setp:
prop.append(string_to_value(value))
props.append(prop)
if miot and argc > 0:
args = [] if arg == '[]' else [string_to_value(a) for a in argv]
return await service.miot_action(did, props[0], args)
do_props = ((service.home_get_props, service.miot_get_props), (service.home_set_props, service.miot_set_props))[setp][miot]
return await do_props(did, props if miot or setp else [p[0] for p in props])

View File

@@ -0,0 +1,197 @@
import os
import time
import base64
import hashlib
import hmac
import json
# REGIONS = ['cn', 'de', 'i2', 'ru', 'sg', 'us']
class MiIOService:
def __init__(self, account=None, region=None):
self.account = account
self.server = 'https://' + ('' if region is None or region == 'cn' else region + '.') + 'api.io.mi.com/app'
async def miio_request(self, uri, data):
def prepare_data(token, cookies):
cookies['PassportDeviceId'] = token['deviceId']
return MiIOService.sign_data(uri, data, token['xiaomiio'][0])
headers = {'User-Agent': 'iOS-14.4-6.0.103-iPhone12,3--D7744744F7AF32F0544445285880DD63E47D9BE9-8816080-84A3F44E137B71AE-iPhone', 'x-xiaomi-protocal-flag-cli': 'PROTOCAL-HTTP2'}
resp = await self.account.mi_request('xiaomiio', self.server + uri, prepare_data, headers)
if 'result' not in resp:
raise Exception(f"Error {uri}: {resp}")
return resp['result']
async def home_request(self, did, method, params):
return await self.miio_request('/home/rpc/' + did, {'id': 1, 'method': method, "accessKey": "IOS00026747c5acafc2", 'params': params})
async def home_get_props(self, did, props):
return await self.home_request(did, 'get_prop', props)
async def home_set_props(self, did, props):
return [await self.home_set_prop(did, i[0], i[1]) for i in props]
async def home_get_prop(self, did, prop):
return (await self.home_get_props(did, [prop]))[0]
async def home_set_prop(self, did, prop, value):
result = (await self.home_request(did, 'set_' + prop, value if isinstance(value, list) else [value]))[0]
return 0 if result == 'ok' else result
async def miot_request(self, cmd, params):
return await self.miio_request('/miotspec/' + cmd, {'params': params})
async def miot_get_props(self, did, iids):
params = [{'did': did, 'siid': i[0], 'piid': i[1]} for i in iids]
result = await self.miot_request('prop/get', params)
return [it.get('value') if it.get('code') == 0 else None for it in result]
async def miot_set_props(self, did, props):
params = [{'did': did, 'siid': i[0], 'piid': i[1], 'value': i[2]} for i in props]
result = await self.miot_request('prop/set', params)
return [it.get('code', -1) for it in result]
async def miot_get_prop(self, did, iid):
return (await self.miot_get_props(did, [iid]))[0]
async def miot_set_prop(self, did, iid, value):
return (await self.miot_set_props(did, [(iid[0], iid[1], value)]))[0]
async def miot_action(self, did, iid, args=[]):
result = await self.miot_request('action', {'did': did, 'siid': iid[0], 'aiid': iid[1], 'in': args})
return result.get('code', -1)
async def device_list(self, name=None, getVirtualModel=False, getHuamiDevices=0):
result = await self.miio_request('/home/device_list', {'getVirtualModel': bool(getVirtualModel), 'getHuamiDevices': int(getHuamiDevices)})
result = result['list']
return result if name == 'full' else [{'name': i['name'], 'model': i['model'], 'did': i['did'], 'token': i['token']} for i in result if not name or name in i['name']]
async def miot_spec(self, type=None, format=None):
if not type or not type.startswith('urn'):
def get_spec(all):
if not type:
return all
ret = {}
for m, t in all.items():
if type == m:
return {m: t}
elif type in m:
ret[m] = t
return ret
import tempfile
path = os.path.join(tempfile.gettempdir(), 'miservice_miot_specs.json')
try:
with open(path) as f:
result = get_spec(json.load(f))
except:
result = None
if not result:
async with self.account.session.get('http://miot-spec.org/miot-spec-v2/instances?status=all') as r:
all = {i['model']: i['type'] for i in (await r.json())['instances']}
with open(path, 'w') as f:
json.dump(all, f)
result = get_spec(all)
if len(result) != 1:
return result
type = list(result.values())[0]
url = 'http://miot-spec.org/miot-spec-v2/instance?type=' + type
async with self.account.session.get(url) as r:
result = await r.json()
def parse_desc(node):
desc = node['description']
# pos = desc.find(' ')
# if pos != -1:
# return (desc[:pos], ' # ' + desc[pos + 2:])
name = ''
for i in range(len(desc)):
d = desc[i]
if d in '-—{「[【(<《':
return (name, ' # ' + desc[i:])
name += '_' if d == ' ' else d
return (name, '')
def make_line(siid, iid, desc, comment, readable=False):
value = f"({siid}, {iid})" if format == 'python' else iid
return f" {'' if readable else '_'}{desc} = {value}{comment}\n"
if format != 'json':
STR_HEAD, STR_SRV, STR_VALUE = ('from enum import Enum\n\n', '\nclass {}(tuple, Enum):\n', '\nclass {}(int, Enum):\n') if format == 'python' else ('', '{} = {}\n', '{}\n')
text = '# Generated by https://github.com/Yonsm/MiService\n# ' + url + '\n\n' + STR_HEAD
svcs = []
vals = []
for s in result['services']:
siid = s['iid']
svc = s['description'].replace(' ', '_')
svcs.append(svc)
text += STR_SRV.format(svc, siid)
for p in s.get('properties', []):
name, comment = parse_desc(p)
access = p['access']
comment += ''.join([' # ' + k for k, v in [(p['format'], 'string'), (''.join([a[0] for a in access]), 'r')] if k and k != v])
text += make_line(siid, p['iid'], name, comment, 'read' in access)
if 'value-range' in p:
valuer = p['value-range']
length = min(3, len(valuer))
values = {['MIN', 'MAX', 'STEP'][i]: valuer[i] for i in range(length) if i != 2 or valuer[i] != 1}
elif 'value-list' in p:
values = {i['description'].replace(' ', '_') if i['description'] else str(i['value']): i['value'] for i in p['value-list']}
else:
continue
vals.append((svc + '_' + name, values))
if 'actions' in s:
text += '\n'
for a in s['actions']:
name, comment = parse_desc(a)
comment += ''.join([f" # {io}={a[io]}" for io in ['in', 'out'] if a[io]])
text += make_line(siid, a['iid'], name, comment)
text += '\n'
for name, values in vals:
text += STR_VALUE.format(name)
for k, v in values.items():
text += f" {'_' + k if k.isdigit() else k} = {v}\n"
text += '\n'
if format == 'python':
text += '\nALL_SVCS = (' + ', '.join(svcs) + ')\n'
result = text
return result
@staticmethod
def miot_decode(ssecurity, nonce, data, gzip=False):
from Crypto.Cipher import ARC4
r = ARC4.new(base64.b64decode(MiIOService.sign_nonce(ssecurity, nonce)))
r.encrypt(bytes(1024))
decrypted = r.encrypt(base64.b64decode(data))
if gzip:
try:
from io import BytesIO
from gzip import GzipFile
compressed = BytesIO()
compressed.write(decrypted)
compressed.seek(0)
decrypted = GzipFile(fileobj=compressed, mode='rb').read()
except:
pass
return json.loads(decrypted.decode())
@staticmethod
def sign_nonce(ssecurity, nonce):
m = hashlib.sha256()
m.update(base64.b64decode(ssecurity))
m.update(base64.b64decode(nonce))
return base64.b64encode(m.digest()).decode()
@staticmethod
def sign_data(uri, data, ssecurity):
if not isinstance(data, str):
data = json.dumps(data)
nonce = base64.b64encode(os.urandom(8) + int(time.time() / 60).to_bytes(4, 'big')).decode()
snonce = MiIOService.sign_nonce(ssecurity, nonce)
msg = '&'.join([uri, snonce, nonce, 'data=' + data])
sign = hmac.new(key=base64.b64decode(snonce), msg=msg.encode(), digestmod=hashlib.sha256).digest()
return {'_nonce': nonce, 'data': data, 'signature': base64.b64encode(sign).decode()}

View File

@@ -0,0 +1,50 @@
import json
from .miaccount import MiAccount, get_random
import logging
_LOGGER = logging.getLogger(__package__)
class MiNAService:
def __init__(self, account: MiAccount):
self.account = account
async def mina_request(self, uri, data=None):
requestId = 'app_ios_' + get_random(30)
if data is not None:
data['requestId'] = requestId
else:
uri += '&requestId=' + requestId
headers = {'User-Agent': 'MiHome/6.0.103 (com.xiaomi.mihome; build:6.0.103.1; iOS 14.4.0) Alamofire/6.0.103 MICO/iOSApp/appStore/6.0.103'}
return await self.account.mi_request('micoapi', 'https://api2.mina.mi.com' + uri, data, headers)
async def device_list(self, master=0):
result = await self.mina_request('/admin/v2/device_list?master=' + str(master))
return result.get('data') if result else None
async def ubus_request(self, deviceId, method, path, message):
message = json.dumps(message)
result = await self.mina_request('/remote/ubus', {'deviceId': deviceId, 'message': message, 'method': method, 'path': path})
return result and result.get('code') == 0
async def text_to_speech(self, deviceId, text):
return await self.ubus_request(deviceId, 'text_to_speech', 'mibrain', {'text': text})
async def player_set_volume(self, deviceId, volume):
return await self.ubus_request(deviceId, 'player_set_volume', 'mediaplayer', {'volume': volume, 'media': 'app_ios'})
async def send_message(self, devices, devno, message, volume=None): # -1/0/1...
result = False
for i in range(0, len(devices)):
if devno == -1 or devno != i + 1 or devices[i]['capabilities'].get('yunduantts'):
_LOGGER.debug("Send to devno=%d index=%d: %s", devno, i, message or volume)
deviceId = devices[i]['deviceID']
result = True if volume is None else await self.player_set_volume(deviceId, volume)
if result and message:
result = await self.text_to_speech(deviceId, message)
if not result:
_LOGGER.error("Send failed: %s", message or volume)
if devno != -1 or not result:
break
return result

View File

@@ -0,0 +1,63 @@
import logging
import time
import asyncio
import os
from astrbot.api.platform import Platform, AstrBotMessage, MessageMember, MessageType, PlatformMetadata
from astrbot.api.event import MessageChain
from typing import Union, List
from astrbot.api.message_components import Image, Plain, At
from astrbot.core.platform.astr_message_event import MessageSesion
from ...register import register_platform_adapter
from astrbot.core.message.components import BaseMessageComponent
from .client import SimpleMiSpeakerClient
from .mispeaker_event import MiSpeakerPlatformEvent
from astrbot.core import logger
@register_platform_adapter("mispeaker", "小爱音箱")
class MiSpeakerPlatformAdapter(Platform):
def __init__(self, platform_config: dict, platform_settings: dict, event_queue: asyncio.Queue) -> None:
super().__init__(event_queue)
self.config = platform_config
async def send_by_session(self, session: MessageSesion, message_chain: MessageChain):
pass
def meta(self) -> PlatformMetadata:
return PlatformMetadata(
"mispeaker",
"小爱音箱",
)
async def handle_msg(self, message: AstrBotMessage):
message_event = MiSpeakerPlatformEvent(
message_str=message.message_str,
message_obj=message,
platform_meta=self.meta(),
session_id=message.session_id,
client=self.client
)
self.commit_event(message_event)
def run(self):
self.client = SimpleMiSpeakerClient(
self.config
)
async def on_event_received(abm: AstrBotMessage):
logger.info(f"on_event_received: {abm}")
await self.handle_msg(abm)
self.client.on_event_received = on_event_received
return self._run()
async def _run(self):
await self.client.initialize()
await self.client.start_pooling()

View File

@@ -0,0 +1,30 @@
import random
import asyncio
from astrbot.api import logger
from astrbot.api.event import AstrMessageEvent, MessageChain
from astrbot.api.platform import AstrBotMessage, PlatformMetadata
from astrbot.api.message_components import Plain, Image
from .client import SimpleMiSpeakerClient
class MiSpeakerPlatformEvent(AstrMessageEvent):
def __init__(
self,
message_str: str,
message_obj: AstrBotMessage,
platform_meta: PlatformMetadata,
session_id: str,
client: SimpleMiSpeakerClient
):
super().__init__(message_str, message_obj, platform_meta, session_id)
self.client = client
@staticmethod
async def send_with_client(message: MessageChain, user_name: str):
pass
async def send(self, message: MessageChain):
for comp in message.chain:
if isinstance(comp, Plain):
await self.client.send(comp.text)
await super().send(message)