From 6fb64022197d3d3b27935ba74bdf6783d63f17ec Mon Sep 17 00:00:00 2001 From: ovizro Date: Sat, 30 Nov 2024 18:26:46 +0800 Subject: [PATCH] feat(event): implement new event handling mechanism - Rename and restructure event-related functions and data structures - Introduce unique_ptr for automatic memory management of event arguments - Add new event types and update existing ones - Modify telemetry and host communication threads to use new event system - Remove redundant code and optimize data handling --- include/defines.h | 2 +- include/etif_detail.h | 3 + include/event.h | 19 ++++- src/event.cpp | 42 ++++++++--- src/extern_interface.cpp | 6 +- src/host_com.cpp | 158 +++++++-------------------------------- 6 files changed, 79 insertions(+), 151 deletions(-) diff --git a/include/defines.h b/include/defines.h index b636dae..21e446b 100755 --- a/include/defines.h +++ b/include/defines.h @@ -187,7 +187,7 @@ extern char g_strSystemLogPath[500]; extern char g_strPCMSavePath[500]; extern struct tm g_tCurLocalTime; -extern bool g_bKeepSysRuning; +extern bool g_bKeepEtifRuning; extern long g_iSaveInputAudio; extern long g_iSaveTTSAudio; extern long g_iSaveHostAudio; diff --git a/include/etif_detail.h b/include/etif_detail.h index c188b10..1cbee7d 100644 --- a/include/etif_detail.h +++ b/include/etif_detail.h @@ -17,7 +17,10 @@ void* telemetry_host_com_thread(void* args); void* upper_host_com_thread(void* arg); extern int g_iHostCom_tty_id; +extern int g_iTelemetry_Com_tty_id; extern int g_iEnableAlarmCode; +extern bool g_bKeepEtifRuning; +extern int g_iExternTimeDifference; extern uint8_t g_iAlarmCode[4]; #ifdef __cplusplus diff --git a/include/event.h b/include/event.h index 59cdccb..bad8136 100644 --- a/include/event.h +++ b/include/event.h @@ -1,16 +1,27 @@ #ifndef _INCLUDE_EVENT_H_ #define _INCLUDE_EVENT_H_ +#define CONCAT_(prefix, suffix) prefix##suffix +/// Concatenate `prefix, suffix` into `prefixsuffix` +#define CONCAT(prefix, suffix) CONCAT_(prefix, suffix) + +/// Register event handler +#define ON_EVENT_EX(name, func, data) \ + static void func (const char* event_name, void *args, void* user_data); \ + int CONCAT(func, _register_ret) = etif_event_register(#name, func, data);\ + static void func (const char* event_name, void *args, void* user_data) +#define ON_EVENT(name) ON_EVENT_EX(name, CONCAT(_event_ ## name ## _handle_line_, __LINE__), NULL) + #ifdef __cplusplus extern "C" { #endif typedef void (*event_callback)(const char*, void*, void*); -void sys_event(const char *event_name, void *args); -int sys_event_register(const char *event_name, event_callback callback, void* user_data); -int sys_event_unregister(const char *event_name, event_callback callback); -void* sys_event_thread(void*); +void etif_event(const char *event_name, void *args, size_t args_size); +int etif_event_register(const char *event_name, event_callback callback, void* user_data); +int etif_event_unregister(const char *event_name, event_callback callback, void* user_data); +void* etif_event_thread(void*); #ifdef __cplusplus } diff --git a/src/event.cpp b/src/event.cpp index 5935e99..0c4fcad 100644 --- a/src/event.cpp +++ b/src/event.cpp @@ -2,45 +2,63 @@ #include #include #include +#include +#include +#include #include "event.h" #include "dataqueue.hpp" std::unordered_map>> _events; -DataQueue> _event_queue; +DataQueue, void*>> _event_queue; -void sys_event(const char *event_name, void *args) { +void etif_event(const char *event_name, void *args, size_t args_size) { auto it = _events.find(event_name); if (it != _events.end()) { for (auto &pair : it->second) { - pair.first(event_name, args, pair.second); + void* args_copy = malloc(args_size); + assert(args_copy != nullptr); + memcpy(args_copy, args, args_size); + _event_queue.Push(std::make_tuple(event_name, pair.first, std::unique_ptr(args), pair.second)); } } } -int sys_event_register(const char *event_name, event_callback callback, void* user_data) { +int etif_event_register(const char *event_name, event_callback callback, void* user_data) { + auto it = _events.find(event_name); + if (it == _events.end()) { + _events[event_name] = std::vector>(); + } else { + for (auto &pair : it->second) { + if (pair.first == callback && pair.second == user_data) { + return -1; + } + } + } _events[event_name].push_back(std::make_pair(callback, user_data)); + return 0; } -int sys_event_unregister(const char *event_name, event_callback callback) { +int etif_event_unregister(const char *event_name, event_callback callback, void* user_data) { auto it = _events.find(event_name); if (it != _events.end()) { for (auto it2 = it->second.begin(); it2 != it->second.end(); it2++) { - if (it2->first == callback) { + if (it2->first == callback && it2->second == user_data) { it->second.erase(it2); return 0; } } } + return -1; } -void* sys_event_thread(void*) { +void* etif_event_thread(void*) { + const char* event_name; + event_callback callback; + std::unique_ptr args; + void* user_data; while (true) { - const char* event_name; - event_callback callback; - void* args; - void* user_data; std::tie(event_name, callback, args, user_data) = _event_queue.Pop(); - callback(event_name, args, user_data); + callback(event_name, args.get(), user_data); } return NULL; } diff --git a/src/extern_interface.cpp b/src/extern_interface.cpp index c3e6bdf..c8e692c 100644 --- a/src/extern_interface.cpp +++ b/src/extern_interface.cpp @@ -12,7 +12,7 @@ int extern_interface_init(const char* config_path, et_callback_t cb) { inicpp::IniManager manager(config_path); et_callback = cb; - pthread_create(&event_thread, NULL, sys_event_thread, NULL); + pthread_create(&event_thread, NULL, etif_event_thread, NULL); pthread_create(&upperhost_thread, NULL, upper_host_com_thread, NULL); pthread_create(&telemetry_thread, NULL, telemetry_host_com_thread, NULL); } @@ -20,10 +20,10 @@ int extern_interface_init(const char* config_path, et_callback_t cb) { void extern_interface_send(uint32_t type, size_t len, union PacketData data) { switch (type) { case ET_TYPE_COMMAND: - sys_event("send_command", data.pd_pointer); + etif_event("send_command", data.pd_pointer, len); break; case ET_TYPE_AUDIO: - sys_event("send_audio", data.pd_pointer); + etif_event("send_audio", data.pd_pointer, len); break; default: break; diff --git a/src/host_com.cpp b/src/host_com.cpp index 913fceb..f34308b 100644 --- a/src/host_com.cpp +++ b/src/host_com.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include "telemetry.h" #include "dataqueue.hpp" @@ -14,7 +15,7 @@ void play_text(char* pTextStr, int iRepeat_times); // int32_t g_iExternTimeDifference = 0; static TelemetryCommandInfo g_receiveCommandInfo; static std::list g_sendCommandList; -static void* g_pRcvCmdQueue = NULL; +static DataQueue g_recvCommandQueue; volatile static uint8_t g_iUpperHostTelemetryCount = 0; volatile static uint8_t g_iTelemetryCount = 0; @@ -107,6 +108,8 @@ void set_telemetry_host_data(TelemetryData4UpperHost* pTelemetryData) { /* create a telemetry msg data for telemetry, by gejp 2024.10.03 */ void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo *pReceiveCommandInfo) { + TelemetryRequestData data; + get_telemetry_data(&data); pTelemetryHostData->work_status = 0xAA; pTelemetryHostData->com_status = 0xAA; pTelemetryHostData->coprocessor1_status = 0xAA; @@ -114,17 +117,17 @@ void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo pTelemetryHostData->voice_circuit_status = 0xAA; pTelemetryHostData->telemetry_count = g_iTelemetryCount++; - if (g_bCloseASR) { + if (data.close_asr) { pTelemetryHostData->voice_mode = 0xAA; } - else if (g_iSysState == SYS_STATUS_WAKE) { + else if (data.sys_state == ET_SYS_STATUS_WAKE) { pTelemetryHostData->voice_mode = 0x55; } else { pTelemetryHostData->voice_mode = 0xA5; } - if (g_iSysState == SYS_STATUS_WAKE) { + if (data.sys_state == ET_SYS_STATUS_WAKE) { pTelemetryHostData->recognition_status = 0x55; } else { @@ -153,28 +156,18 @@ void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo } // the following same as set_telemetry_host_data - pTelemetryHostData->volume_key_status = g_bVolumeKeyPressed ? 0xAA : 0x55; - pTelemetryHostData->wake_key_status = g_bWakeupKeyPressed ? 0xAA : 0x55; - g_bVolumeKeyPressed = false; - g_bWakeupKeyPressed = false; + pTelemetryHostData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55; + pTelemetryHostData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55; pTelemetryHostData->key_status_backup = 0x55; - if (g_iCurVolumeGrade == g_iVolumeGradeCnt-1) {//Large - pTelemetryHostData->current_volume = 0x3; - } - else if (g_iCurVolumeGrade == 0) {//Small - pTelemetryHostData->current_volume = 0x1; - } - else {//Medium - pTelemetryHostData->current_volume = 0x2; - } + pTelemetryHostData->current_volume = data.volume_grade; - pTelemetryHostData->system_version_high = g_iSysVerHigh; - pTelemetryHostData->system_version_low = g_iSysVerLow; + pTelemetryHostData->system_version_high = data.sys_ver_high; + pTelemetryHostData->system_version_low = data.sys_ver_low; - pTelemetryHostData->application_version_high = g_iAppVerHigh; - pTelemetryHostData->application_version_low = g_iAppVerLow; + pTelemetryHostData->application_version_high = data.app_ver_high; + pTelemetryHostData->application_version_low = data.app_ver_low; } __inline bool is_command(uint8_t* array, uint8_t* value, int size) { @@ -201,10 +194,10 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet memset(&TeleCmdInfo2Send, 0, sizeof(TelemetryCommandInfo)); - while (g_bKeepSysRuning) { + while (g_bKeepEtifRuning) { // if ( (iTelemetryAnswerTimes % 3 == 0) && g_bNeedReboot) system("reboot"); - ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepSysRuning, true); + ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepEtifRuning, true); //ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true); if (ComFrame_Verify(frame, false)) { if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_CHECKSUM, true)) @@ -221,9 +214,8 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet continue; } - if(iTelemetryAnswerTimes % 3 == 0 && g_pRcvCmdQueue != NULL) { - iRet = DeDataQueue(g_pRcvCmdQueue, &pDataBlock); - if (iRet < sizeof(TelemetryCommandInfo)) + if(iTelemetryAnswerTimes % 3 == 0) { + if (g_recvCommandQueue.Empty()) {//empty if (0)//2024.11.04 g_bNeedReboot) { @@ -232,8 +224,8 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet } }else { - TeleCmdInfo2Send = *((TelemetryCommandInfo*)pDataBlock); - PrintFilePos(); printf("telemetry receive command (%d)\n", TeleCmdInfo2Send.seqid); + TeleCmdInfo2Send = g_recvCommandQueue.Pop(); + PrintFilePos(); printf("telemetry receive command (%d)\n", TeleCmdInfo2Send.seqid); } set_telemetry_data(&TelemetryData2Send, &TeleCmdInfo2Send); } @@ -265,27 +257,6 @@ void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemet return NULL; } -void SetCallMode(bool bCall) -{ - if (bCall) - { - PrintFilePos(); printf("Open call with upper host\n"); - g_bCallMode = true; - g_iCallRcvFrameCnt = 0; - g_iPlayedCallFrameCnt = 0; - g_iOrdinaryPlayStop = 1; - g_bCloseASR = true; - g_iInsertSlientSegCnt=0; - - g_iPcmBlockBytes = g_iCallBytesPerBlockMS; - } - else - { - PrintFilePos(); printf("Close call with upper host\n"); - g_bCallMode = false; - g_iPcmBlockBytes = 512 * g_iChannelCnt * 2; - } -} //获取指令文本与报警码 struct AlarmNotice { uint32_t iAlarmCode; //指令编码,小端序 @@ -396,19 +367,10 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit const uint32_t buffer_size = 1024 * 100; // TODO const int64_t iHostAudioBufferSize = 1024 * 1024; - g_pRcvCmdQueue = OpenDataQueue(128, sizeof(struct TelemetryCommandInfo), true); - - struct TTSInfo ttsInfo; uint8_t* buffer = new uint8_t[buffer_size]; int64_t iEnqueueBytes; bool bSucc; char* pStrNotice; - AudioPlayInfo PlayInfo; - PlayInfo.m_iUrgent = 2; - //PlayInfo.m_iDataBytes = retLen; - PlayInfo.m_iSource = 0; - PlayInfo.m_iTimerValue = -1; - PlayInfo.m_iTimerID2Set = 0; size_t offset = 0, cached_size = 0; uint64_t iTelemetryAnswerTimes; int16_t* ptrProcHostAudio = (int16_t*)malloc(iHostAudioBufferSize); @@ -419,9 +381,9 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit TelemetryData4UpperHost UpperHostTeleData; - while (g_bKeepSysRuning) { + while (g_bKeepEtifRuning) { - ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepSysRuning, true); + ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepEtifRuning, true); //ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true); //PrintFilePos(); printf("%04X \n", ComFrame_HEADER(frame)->type); switch (ComFrame_HEADER(frame)->type) @@ -457,10 +419,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit g_receiveCommandInfo.time = telemetryTimeSeconds; if (!g_bNeedReboot) { - int iRet = EnDataQueue(g_pRcvCmdQueue, &g_receiveCommandInfo, sizeof(TelemetryCommandInfo), NULL, 0, NULL, 0); - if (iRet < sizeof(TelemetryCommandInfo)) { - PrintFilePos(); printf("EnDataQueue failed\n"); - } + g_recvCommandQueue.Push(g_receiveCommandInfo); } /*-------START 2024.10.23 noted by zgb start if (is_command(frame->payload, command_open_call, 6)) { @@ -487,27 +446,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit else //-------END 2024.10.23 noted by zgb */ - if (is_command(frame->payload, command_wakeup, 6)) { - setSysWake("telemetry"); - } - else if(is_command(frame->payload, command_sleep, 6)) { - setSysSleep("telemetry"); - } - else if (is_command(frame->payload, command_volume, 6)) { - VolumeUpCycleFunc(); - announce_state(0); - } - else if (is_command(frame->payload, command_reset, 6)) { - PrintFilePos(); printf("reboot flag set!\n"); -#ifdef USE_TELEMETRY_REBOOT_CMD - g_bNeedReboot = true;// system("reboot"); - if(strlen(g_strNoticeCmdText) > 1) - play_text(g_strNoticeCmdText, g_strNoticeCmdTextPlayTimes); // 播报相应内容 -#else - if (strlen(g_strNoticeCmdText) > 1) - play_text(g_strNoticeCmdText, g_strNoticeCmdTextPlayTimes); // 播报相应内容 -#endif - } + handle_pack(ET_TYPE_COMMAND, 6, {.pd_pointer = frame->payload}); break; } case COM_FRAME_TYPE_SPEECH_INJECTED: @@ -523,26 +462,7 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_SPEECH_INJECTED, payload, 6, true); - ttsInfo.m_iSource = 0; - ttsInfo.m_iDataBytes = ComFrame_PAYLOAD_LENGTH(frame); - ttsInfo.m_iTimerValue = -1; - g_iOrdinaryPlayStop = 1; - ttsInfo.m_iUrgent = 1; - iEnqueueBytes = EnDataQueue(g_pDataQueue[TEXT_QUEUE_FOR_TTS_URGENT], (void*)(&ttsInfo), sizeof(ttsInfo), - (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0); - if (iEnqueueBytes <= 0) - { - ProcQueueFull(TEXT_QUEUE_FOR_TTS_URGENT, true); - iEnqueueBytes = EnDataQueue(g_pDataQueue[TEXT_QUEUE_FOR_TTS_URGENT], (void*)(&ttsInfo), sizeof(ttsInfo), - (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0); - bSucc = false; - } - if (bSucc) - pStrNotice = "proc info-msg"; - else - pStrNotice = "***Error: unknow info-msg "; - PrintFilePos(); WriteLog(g_pProcessLog, "%s from %s %s", pStrNotice, "upper-host", g_strCurTime); - + handle_pack(ET_TYPE_TEXT, 6, {.pd_pointer = f_data->payload}); if (ComFrame_Send(tty_id, f_data, true)) { PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno)); } @@ -569,35 +489,11 @@ void* upper_host_com_thread(void* arg) {//hand-control display processing unit gettimeofday(&tCurTime, 0); int iIntervalTime = (tCurTime.tv_sec - tLastTime.tv_sec) * 1000 + (tCurTime.tv_usec - tLastTime.tv_usec) / 1000; if( iIntervalTime > 15 ) - PrintFilePos(); WriteLog(g_pProcessLog, " com frame interval %d ms %s\n", iIntervalTime, g_strCurTime); + PrintFilePos(); fprintf(stderr, " com frame interval %d\n", iIntervalTime); tLastTime = tCurTime; - if (g_bCallMode) { - iProcHostAudioSampleCnt = proc_audio_upper_host((uint16_t*)(frame->payload), (uint16_t*)ptrProcHostAudioEnd, ComFrame_PAYLOAD_LENGTH(frame) / 2, false); - ptrProcHostAudioEnd += iProcHostAudioSampleCnt; - if (ptrProcHostAudioEnd - ptrProcHostAudio >= 16 * ProcHostAudioLengthMS) { - iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo), - (void*)(ptrProcHostAudio), (ptrProcHostAudioEnd - ptrProcHostAudio) * 2, NULL, 0);// 16 * ProcHostAudioLengthMS * 2, NULL, 0); - if (iEnqueueBytes <= 0) { - ProcQueueFull(AUDIO_QUEUE_FOR_PLAY_URGENT, true); - iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo), - (void*)(ptrProcHostAudio), (ptrProcHostAudioEnd - ptrProcHostAudio) * 2, NULL, 0);// 16 * ProcHostAudioLengthMS * 2, NULL, 0); - } - //memcpy(ptrProcHostAudio, ptrProcHostAudio + 16 * ProcHostAudioLengthMS, (ptrProcHostAudioEnd - ptrProcHostAudio - 16 * ProcHostAudioLengthMS) * 2); - ptrProcHostAudioEnd = ptrProcHostAudio;// ptrProcHostAudioEnd -= ptrProcHostAudio; - ProcHostAudioLengthMS = 500;//restore to 100ms - } + handle_pack(ET_TYPE_AUDIO, ComFrame_PAYLOAD_LENGTH(frame), {.pd_pointer = frame->payload}); - //iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo), - // (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0); - //if (iEnqueueBytes <= 0) { - // ProcQueueFull(AUDIO_QUEUE_FOR_PLAY_URGENT, true); - // iEnqueueBytes = EnDataQueue(g_pDataQueue[AUDIO_QUEUE_FOR_PLAY_URGENT], (void*)(&PlayInfo), sizeof(PlayInfo), - // (void*)(frame->payload), ComFrame_PAYLOAD_LENGTH(frame), NULL, 0); - //} - //PrintFilePos(); printf("iProcHostAudioSampleCnt %d\n", iProcHostAudioSampleCnt); - - } break; } case COM_FRAME_TYPE_TELEMETRY_REQUEST: