Files
Practical_Training_Assignment/backend/app/api/v1/endpoints/websocket_service.py

60 lines
2.2 KiB
Python

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import Set
from aip import AipSpeech
from app.constants.asr import APP_ID, API_KEY, SECRET_KEY
import json
router = APIRouter()
active_connections: Set[WebSocket] = set()
asr_client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
async def asr_buffer(buffer_data: bytes) -> str:
result = asr_client.asr(buffer_data, 'pcm', 16000, {'dev_pid': 1537})
if result.get('err_msg') == 'success.':
return result.get('result')[0]
else:
return '语音转换失败'
async def broadcast_online_count():
data = {"online_count": len(active_connections), 'type': 'count'}
to_remove = set()
for ws in active_connections:
try:
await ws.send_json(data)
except Exception:
to_remove.add(ws)
for ws in to_remove:
active_connections.remove(ws)
@router.websocket("/websocket")
async def websocket_online_count(websocket: WebSocket):
await websocket.accept()
active_connections.add(websocket)
await broadcast_online_count()
temp_buffer = bytes()
try:
while True:
message = await websocket.receive()
if message.get("type") == "websocket.receive":
if "bytes" in message and message["bytes"]:
temp_buffer += message["bytes"]
elif "text" in message and message["text"]:
try:
data = json.loads(message["text"])
except Exception:
continue
msg_type = data.get("type")
if msg_type == "ping":
await websocket.send_json({"online_count": len(active_connections), "type": "count"})
elif msg_type == "asr_end":
asr_text = await asr_buffer(temp_buffer)
await websocket.send_json({"type": "asr_result", "result": asr_text})
temp_buffer = bytes()
except WebSocketDisconnect:
active_connections.remove(websocket)
await broadcast_online_count()
except Exception:
active_connections.remove(websocket)
await broadcast_online_count()