feat(event): implement new event handling mechanism

- Rename and restructure event-related functions and data structures
- Introduce unique_ptr for automatic memory management of event arguments
- Add new event types and update existing ones
- Modify telemetry and host communication threads to use new event system
- Remove redundant code and optimize data handling
This commit is contained in:
ovizro 2024-11-30 18:26:46 +08:00
parent c9ef0edda6
commit 6fb6402219
6 changed files with 79 additions and 151 deletions

View File

@ -187,7 +187,7 @@ extern char g_strSystemLogPath[500];
extern char g_strPCMSavePath[500];
extern struct tm g_tCurLocalTime;
extern bool g_bKeepSysRuning;
extern bool g_bKeepEtifRuning;
extern long g_iSaveInputAudio;
extern long g_iSaveTTSAudio;
extern long g_iSaveHostAudio;

View File

@ -17,7 +17,10 @@ void* telemetry_host_com_thread(void* args);
void* upper_host_com_thread(void* arg);
extern int g_iHostCom_tty_id;
extern int g_iTelemetry_Com_tty_id;
extern int g_iEnableAlarmCode;
extern bool g_bKeepEtifRuning;
extern int g_iExternTimeDifference;
extern uint8_t g_iAlarmCode[4];
#ifdef __cplusplus

View File

@ -1,16 +1,27 @@
#ifndef _INCLUDE_EVENT_H_
#define _INCLUDE_EVENT_H_
#define CONCAT_(prefix, suffix) prefix##suffix
/// Concatenate `prefix, suffix` into `prefixsuffix`
#define CONCAT(prefix, suffix) CONCAT_(prefix, suffix)
/// Register event handler
#define ON_EVENT_EX(name, func, data) \
static void func (const char* event_name, void *args, void* user_data); \
int CONCAT(func, _register_ret) = etif_event_register(#name, func, data);\
static void func (const char* event_name, void *args, void* user_data)
#define ON_EVENT(name) ON_EVENT_EX(name, CONCAT(_event_ ## name ## _handle_line_, __LINE__), NULL)
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*event_callback)(const char*, void*, void*);
void sys_event(const char *event_name, void *args);
int sys_event_register(const char *event_name, event_callback callback, void* user_data);
int sys_event_unregister(const char *event_name, event_callback callback);
void* sys_event_thread(void*);
void etif_event(const char *event_name, void *args, size_t args_size);
int etif_event_register(const char *event_name, event_callback callback, void* user_data);
int etif_event_unregister(const char *event_name, event_callback callback, void* user_data);
void* etif_event_thread(void*);
#ifdef __cplusplus
}

View File

@ -2,45 +2,63 @@
#include <string>
#include <vector>
#include <tuple>
#include <memory>
#include <assert.h>
#include <string.h>
#include "event.h"
#include "dataqueue.hpp"
std::unordered_map<std::string, std::vector<std::pair<event_callback, void*>>> _events;
DataQueue<std::tuple<const char*, event_callback, void*, void*>> _event_queue;
DataQueue<std::tuple<const char*, event_callback, std::unique_ptr<void>, void*>> _event_queue;
void sys_event(const char *event_name, void *args) {
void etif_event(const char *event_name, void *args, size_t args_size) {
auto it = _events.find(event_name);
if (it != _events.end()) {
for (auto &pair : it->second) {
pair.first(event_name, args, pair.second);
void* args_copy = malloc(args_size);
assert(args_copy != nullptr);
memcpy(args_copy, args, args_size);
_event_queue.Push(std::make_tuple(event_name, pair.first, std::unique_ptr<void>(args), pair.second));
}
}
}
int sys_event_register(const char *event_name, event_callback callback, void* user_data) {
int etif_event_register(const char *event_name, event_callback callback, void* user_data) {
auto it = _events.find(event_name);
if (it == _events.end()) {
_events[event_name] = std::vector<std::pair<event_callback, void*>>();
} else {
for (auto &pair : it->second) {
if (pair.first == callback && pair.second == user_data) {
return -1;
}
}
}
_events[event_name].push_back(std::make_pair(callback, user_data));
return 0;
}
int sys_event_unregister(const char *event_name, event_callback callback) {
int etif_event_unregister(const char *event_name, event_callback callback, void* user_data) {
auto it = _events.find(event_name);
if (it != _events.end()) {
for (auto it2 = it->second.begin(); it2 != it->second.end(); it2++) {
if (it2->first == callback) {
if (it2->first == callback && it2->second == user_data) {
it->second.erase(it2);
return 0;
}
}
}
return -1;
}
void* sys_event_thread(void*) {
while (true) {
void* etif_event_thread(void*) {
const char* event_name;
event_callback callback;
void* args;
std::unique_ptr<void> args;
void* user_data;
while (true) {
std::tie(event_name, callback, args, user_data) = _event_queue.Pop();
callback(event_name, args, user_data);
callback(event_name, args.get(), user_data);
}
return NULL;
}

View File

@ -12,7 +12,7 @@ int extern_interface_init(const char* config_path, et_callback_t cb) {
inicpp::IniManager manager(config_path);
et_callback = cb;
pthread_create(&event_thread, NULL, sys_event_thread, NULL);
pthread_create(&event_thread, NULL, etif_event_thread, NULL);
pthread_create(&upperhost_thread, NULL, upper_host_com_thread, NULL);
pthread_create(&telemetry_thread, NULL, telemetry_host_com_thread, NULL);
}
@ -20,10 +20,10 @@ int extern_interface_init(const char* config_path, et_callback_t cb) {
void extern_interface_send(uint32_t type, size_t len, union PacketData data) {
switch (type) {
case ET_TYPE_COMMAND:
sys_event("send_command", data.pd_pointer);
etif_event("send_command", data.pd_pointer, len);
break;
case ET_TYPE_AUDIO:
sys_event("send_audio", data.pd_pointer);
etif_event("send_audio", data.pd_pointer, len);
break;
default:
break;

View File

@ -1,6 +1,7 @@
#include <errno.h>
#include <string.h>
#include <list>
#include <sys/time.h>
#include <sys/sysinfo.h>
#include "telemetry.h"
#include "dataqueue.hpp"
@ -14,7 +15,7 @@ void play_text(char* pTextStr, int iRepeat_times);
// int32_t g_iExternTimeDifference = 0;
static TelemetryCommandInfo g_receiveCommandInfo;
static std::list<TelemetryCommandInfo> g_sendCommandList;
static void* g_pRcvCmdQueue = NULL;
static DataQueue<TelemetryCommandInfo> g_recvCommandQueue;
volatile static uint8_t g_iUpperHostTelemetryCount = 0;
volatile static uint8_t g_iTelemetryCount = 0;
@ -107,6 +108,8 @@ void set_telemetry_host_data(TelemetryData4UpperHost* pTelemetryData) {
/* create a telemetry msg data for telemetry, by gejp 2024.10.03 */
void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo *pReceiveCommandInfo) {
TelemetryRequestData data;
get_telemetry_data(&data);
pTelemetryHostData->work_status = 0xAA;
pTelemetryHostData->com_status = 0xAA;
pTelemetryHostData->coprocessor1_status = 0xAA;
@ -114,17 +117,17 @@ void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo
pTelemetryHostData->voice_circuit_status = 0xAA;
pTelemetryHostData->telemetry_count = g_iTelemetryCount++;
if (g_bCloseASR) {
if (data.close_asr) {
pTelemetryHostData->voice_mode = 0xAA;
}
else if (g_iSysState == SYS_STATUS_WAKE) {
else if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->voice_mode = 0x55;
}
else {
pTelemetryHostData->voice_mode = 0xA5;
}
if (g_iSysState == SYS_STATUS_WAKE) {
if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->recognition_status = 0x55;
}
else {
@ -153,28 +156,18 @@ void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo
}
// the following same as set_telemetry_host_data
pTelemetryHostData->volume_key_status = g_bVolumeKeyPressed ? 0xAA : 0x55;
pTelemetryHostData->wake_key_status = g_bWakeupKeyPressed ? 0xAA : 0x55;
g_bVolumeKeyPressed = false;
g_bWakeupKeyPressed = false;
pTelemetryHostData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->key_status_backup = 0x55;
if (g_iCurVolumeGrade == g_iVolumeGradeCnt-1) {//Large
pTelemetryHostData->current_volume = 0x3;
}
else if (g_iCurVolumeGrade == 0) {//Small
pTelemetryHostData->current_volume = 0x1;
}
else {//Medium
pTelemetryHostData->current_volume = 0x2;
}
pTelemetryHostData->current_volume = data.volume_grade;
pTelemetryHostData->system_version_high = g_iSysVerHigh;
pTelemetryHostData->system_version_low = g_iSysVerLow;
pTelemetryHostData->system_version_high = data.sys_ver_high;
pTelemetryHostData->system_version_low = data.sys_ver_low;
pTelemetryHostData->application_version_high = g_iAppVerHigh;
pTelemetryHostData->application_version_low = g_iAppVerLow;
pTelemetryHostData->application_version_high = data.app_ver_high;
pTelemetryHostData->application_version_low = data.app_ver_low;
}
__inline bool is_command(uint8_t* array, uint8_t* value, int size) {
@ -201,10 +194,10 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet
memset(&TeleCmdInfo2Send, 0, sizeof(TelemetryCommandInfo));
while (g_bKeepSysRuning) {
while (g_bKeepEtifRuning) {
// if ( (iTelemetryAnswerTimes % 3 == 0) && g_bNeedReboot) system("reboot");
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepSysRuning, true);
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepEtifRuning, true);
//ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true);
if (ComFrame_Verify(frame, false)) {
if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_CHECKSUM, true))
@ -221,9 +214,8 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet
continue;
}
if(iTelemetryAnswerTimes % 3 == 0 && g_pRcvCmdQueue != NULL) {
iRet = DeDataQueue(g_pRcvCmdQueue, &pDataBlock);
if (iRet < sizeof(TelemetryCommandInfo))
if(iTelemetryAnswerTimes % 3 == 0) {
if (g_recvCommandQueue.Empty())
{//empty
if (0)//2024.11.04 g_bNeedReboot)
{
@ -232,7 +224,7 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet
}
}else
{
TeleCmdInfo2Send = *((TelemetryCommandInfo*)pDataBlock);
TeleCmdInfo2Send = g_recvCommandQueue.Pop();
PrintFilePos(); printf("telemetry receive command (%d)\n", TeleCmdInfo2Send.seqid);
}
set_telemetry_data(&TelemetryData2Send, &TeleCmdInfo2Send);
@ -265,27 +257,6 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet
return NULL;
}
void SetCallMode(bool bCall)
{
if (bCall)
{
PrintFilePos(); printf("Open call with upper host\n");
g_bCallMode = true;
g_iCallRcvFrameCnt = 0;
g_iPlayedCallFrameCnt = 0;
g_iOrdinaryPlayStop = 1;
g_bCloseASR = true;
g_iInsertSlientSegCnt=0;
g_iPcmBlockBytes = g_iCallBytesPerBlockMS;
}
else
{
PrintFilePos(); printf("Close call with upper host\n");
g_bCallMode = false;
g_iPcmBlockBytes = 512 * g_iChannelCnt * 2;
}
}
//获取指令文本与报警码
struct AlarmNotice {
uint32_t iAlarmCode; //指令编码,小端序
@ -396,19 +367,10 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
const uint32_t buffer_size = 1024 * 100; // TODO
const int64_t iHostAudioBufferSize = 1024 * 1024;
g_pRcvCmdQueue = OpenDataQueue(128, sizeof(struct TelemetryCommandInfo), true);
struct TTSInfo ttsInfo;
uint8_t* buffer = new uint8_t[buffer_size];
int64_t iEnqueueBytes;
bool bSucc;
char* pStrNotice;
AudioPlayInfo PlayInfo;
PlayInfo.m_iUrgent = 2;
//PlayInfo.m_iDataBytes = retLen;
PlayInfo.m_iSource = 0;
PlayInfo.m_iTimerValue = -1;
PlayInfo.m_iTimerID2Set = 0;
size_t offset = 0, cached_size = 0;
uint64_t iTelemetryAnswerTimes;
int16_t* ptrProcHostAudio = (int16_t*)malloc(iHostAudioBufferSize);
@ -419,9 +381,9 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
TelemetryData4UpperHost UpperHostTeleData;
while (g_bKeepSysRuning) {
while (g_bKeepEtifRuning) {
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepSysRuning, true);
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepEtifRuning, true);
//ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true);
//PrintFilePos(); printf("%04X \n", ComFrame_HEADER(frame)->type);
switch (ComFrame_HEADER(frame)->type)
@ -457,10 +419,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
g_receiveCommandInfo.time = telemetryTimeSeconds;
if (!g_bNeedReboot) {
int iRet = EnDataQueue(g_pRcvCmdQueue, &g_receiveCommandInfo, sizeof(TelemetryCommandInfo), NULL, 0, NULL, 0);
if (iRet < sizeof(TelemetryCommandInfo)) {
PrintFilePos(); printf("EnDataQueue failed\n");
}
g_recvCommandQueue.Push(g_receiveCommandInfo);
}
/*-------START 2024.10.23 noted by zgb start
if (is_command(frame->payload, command_open_call, 6)) {
@ -487,27 +446,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
else
//-------END 2024.10.23 noted by zgb
*/
if (is_command(frame->payload, command_wakeup, 6)) {
setSysWake("telemetry");
}
else if(is_command(frame->payload, command_sleep, 6)) {
setSysSleep("telemetry");
}
else if (is_command(frame->payload, command_volume, 6)) {
VolumeUpCycleFunc();
announce_state(0);
}
else if (is_command(frame->payload, command_reset, 6)) {
PrintFilePos(); printf("reboot flag set!\n");
#ifdef USE_TELEMETRY_REBOOT_CMD
g_bNeedReboot = true;// system("reboot");
if(strlen(g_strNoticeCmdText) > 1)
play_text(g_strNoticeCmdText, g_strNoticeCmdTextPlayTimes); // 播报相应内容
#else
if (strlen(g_strNoticeCmdText) > 1)
play_text(g_strNoticeCmdText, g_strNoticeCmdTextPlayTimes); // 播报相应内容
#endif
}
handle_pack(ET_TYPE_COMMAND, 6, {.pd_pointer = frame->payload});
break;
}
case COM_FRAME_TYPE_SPEECH_INJECTED:
@ -523,26 +462,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_SPEECH_INJECTED,
payload, 6, true);
ttsInfo.m_iSource = 0;
ttsInfo.m_iDataBytes = ComFrame_PAYLOAD_LENGTH(frame);
ttsInfo.m_iTimerValue = -1;
g_iOrdinaryPlayStop = 1;
ttsInfo.m_iUrgent = 1;
iEnqueueBytes = EnDataQueue(g_pDataQueue[TEXT_QUEUE_FOR_TTS_URGENT], (void*)(&ttsInfo), sizeof(ttsInfo),
(void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0);
if (iEnqueueBytes <= 0)
{
ProcQueueFull(TEXT_QUEUE_FOR_TTS_URGENT, true);
iEnqueueBytes = EnDataQueue(g_pDataQueue[TEXT_QUEUE_FOR_TTS_URGENT], (void*)(&ttsInfo), sizeof(ttsInfo),
(void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0);
bSucc = false;
}
if (bSucc)
pStrNotice = "proc info-msg";
else
pStrNotice = "***Error: unknow info-msg ";
PrintFilePos(); WriteLog(g_pProcessLog, "%s from %s %s", pStrNotice, "upper-host", g_strCurTime);
handle_pack(ET_TYPE_TEXT, 6, {.pd_pointer = f_data->payload});
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
@ -569,35 +489,11 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit
gettimeofday(&tCurTime, 0);
int iIntervalTime = (tCurTime.tv_sec - tLastTime.tv_sec) * 1000 + (tCurTime.tv_usec - tLastTime.tv_usec) / 1000;
if( iIntervalTime > 15 )
PrintFilePos(); WriteLog(g_pProcessLog, " com frame interval %d ms %s\n", iIntervalTime, g_strCurTime);
PrintFilePos(); fprintf(stderr, " com frame interval %d\n", iIntervalTime);
tLastTime = tCurTime;
if (g_bCallMode) {
iProcHostAudioSampleCnt = proc_audio_upper_host((uint16_t*)(frame->payload), (uint16_t*)ptrProcHostAudioEnd, ComFrame_PAYLOAD_LENGTH(frame) / 2, false);
ptrProcHostAudioEnd += iProcHostAudioSampleCnt;
if (ptrProcHostAudioEnd - ptrProcHostAudio >= 16 * ProcHostAudioLengthMS) {
iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo),
(void*)(ptrProcHostAudio), (ptrProcHostAudioEnd - ptrProcHostAudio) * 2, NULL, 0);// 16 * ProcHostAudioLengthMS * 2, NULL, 0);
if (iEnqueueBytes <= 0) {
ProcQueueFull(AUDIO_QUEUE_FOR_PLAY_URGENT, true);
iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo),
(void*)(ptrProcHostAudio), (ptrProcHostAudioEnd - ptrProcHostAudio) * 2, NULL, 0);// 16 * ProcHostAudioLengthMS * 2, NULL, 0);
}
//memcpy(ptrProcHostAudio, ptrProcHostAudio + 16 * ProcHostAudioLengthMS, (ptrProcHostAudioEnd - ptrProcHostAudio - 16 * ProcHostAudioLengthMS) * 2);
ptrProcHostAudioEnd = ptrProcHostAudio;// ptrProcHostAudioEnd -= ptrProcHostAudio;
ProcHostAudioLengthMS = 500;//restore to 100ms
}
handle_pack(ET_TYPE_AUDIO, ComFrame_PAYLOAD_LENGTH(frame), {.pd_pointer = frame->payload});
//iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo),
// (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0);
//if (iEnqueueBytes <= 0) {
// ProcQueueFull(AUDIO_QUEUE_FOR_PLAY_URGENT, true);
// iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo),
// (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0);
//}
//PrintFilePos(); printf("iProcHostAudioSampleCnt %d\n", iProcHostAudioSampleCnt);
}
break;
}
case COM_FRAME_TYPE_TELEMETRY_REQUEST: