feat(event): implement event handling mechanism

- Add event registration and triggering functionality
- Implement event callback mechanism
- Update exint_send to support event type
- Modify alarm and command handlers to use event mechanism
- Update test cases for event handling
This commit is contained in:
ovizro 2024-12-24 22:50:03 +08:00
parent 9e4fdf3b13
commit f46f39b2b2
16 changed files with 303 additions and 133 deletions

View File

@ -88,7 +88,9 @@ public:
while (m_Queue.empty())
{
if (m_Cond.wait_for(lock, timeout) == std::cv_status::timeout) {
throw QueueTimeout(this);
if (m_Queue.empty()) {
throw QueueTimeout(this);
}
}
if (epoch != m_CurrEpoch) {
throw QueueCleared(this);

View File

@ -8,13 +8,13 @@
#define PrintFilePos()
#define COMMAND_RESEND_INTERVAL 250
#define COMMAND_RESEND_MAX_COUNT 3
extern "C" {
void exint_handle_pack(uint32_t type, size_t len, void* data);
void* telemetry_host_com_thread(void* args);
void* upper_host_com_thread(void* arg);
}
extern int g_iExternTimeDifference;

View File

@ -1,8 +1,11 @@
#ifndef _INCLUDE_EVENT_H_
#define _INCLUDE_EVENT_H_
#ifndef _INCLUDE_EXINT_EVENT_
#define _INCLUDE_EXINT_EVENT_
#define EXINT_EVENT
#include <stdint.h>
#include <stddef.h>
#include "external_interface_event.h"
#define CONCAT_(prefix, suffix) prefix##suffix
/// Concatenate `prefix, suffix` into `prefixsuffix`
@ -19,8 +22,6 @@
extern "C" {
#endif
typedef void (*event_callback)(const char*, size_t, void*, void*);
void exint_event(const char *event_name, size_t args_size, void *args);
int exint_event_register(const char *event_name, event_callback callback, void* user_data);
int exint_event_unregister(const char *event_name, event_callback callback, void* user_data);

View File

@ -48,7 +48,7 @@ int exint_initialize(const char* config_path, et_callback_t cb);
EXTERN_INTERFACE_PUBLIC
void exint_send(uint32_t type, size_t len, void* data);
EXTERN_INTERFACE_PUBLIC
void exint_finialize();
int exint_finalize();
#ifdef __cplusplus
}

View File

@ -0,0 +1,79 @@
#ifndef _INCLUDE_EXTERNAL_INTERFACE_EVENT_
#define _INCLUDE_EXTERNAL_INTERFACE_EVENT_
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include "external_interface.h"
#define ET_TYPE_EVENT 32
#define ET_EVENT_TRIGGER_BUFFER_SIZE 32
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*event_callback)(const char*, size_t, void*, void*);
typedef struct _EventRegister {
const char* name;
event_callback callback;
void* user_data;
} EventRegister, EventUnregister;
typedef struct _EventTrigger {
const char* name;
size_t args_size;
uint8_t args[ET_EVENT_TRIGGER_BUFFER_SIZE];
} EventTrigger;
#define EXINT_EVENT_PUBLIC static __inline
#ifndef EXINT_EVENT
#define EXINT_EVENT_PUBLIC_FUNC_NAME(name) exint_ ## name
#else
#define EXINT_EVENT_PUBLIC_FUNC_NAME(name) exint_public_ ## name
#endif
#define EXINT_EVENT_PUBLIC_FUNC(name) EXINT_EVENT_PUBLIC void EXINT_EVENT_PUBLIC_FUNC_NAME(name)
EXINT_EVENT_PUBLIC_FUNC(event)
(const char* name, const size_t args_size, void* args) {
if (args_size <= ET_EVENT_TRIGGER_BUFFER_SIZE) {
EventTrigger trigger = {name, args_size, {0}};
memcpy(trigger.args, args, args_size);
exint_send(
ET_TYPE_EVENT,
sizeof(EventTrigger) + args_size - ET_EVENT_TRIGGER_BUFFER_SIZE,
&trigger
);
} else {
EventTrigger* trigger = (EventTrigger*)malloc(sizeof(EventTrigger) + args_size - ET_EVENT_TRIGGER_BUFFER_SIZE);
if (trigger == NULL) return;
trigger->name = name;
trigger->args_size = args_size;
memcpy(trigger->args, args, args_size);
exint_send(
ET_TYPE_EVENT,
sizeof(EventTrigger) + args_size - ET_EVENT_TRIGGER_BUFFER_SIZE,
trigger
);
free(trigger);
}
}
EXINT_EVENT_PUBLIC_FUNC(event_register)
(const char *event_name, event_callback callback, void* user_data) {
EventRegister register_ = {event_name, callback, user_data};
EXINT_EVENT_PUBLIC_FUNC_NAME(event)("event_register", sizeof(EventRegister), &register_);
}
EXINT_EVENT_PUBLIC_FUNC(event_unregister)
(const char *event_name, event_callback callback, void* user_data) {
EventUnregister unregister = {event_name, callback, user_data};
EXINT_EVENT_PUBLIC_FUNC_NAME(event)("event_unregister", sizeof(EventUnregister), &unregister);
}
#ifdef __cplusplus
}
#endif
#endif

View File

@ -4,6 +4,6 @@ mkdir -p dist
NAME="dist/exint-build-$(date +%Y%m%d-%H%M%S).zip"
zip -r $NAME bin/ lib/ config/\
scripts/ README.md include/external_interface.h
scripts/ README.md include/external_interface*
echo "Created $NAME"

View File

@ -123,3 +123,25 @@ void exint_event_thread_stop() {
}
logger.info("stopped event threads");
}
ON_EVENT(event_register) {
auto data = (EventRegister*)args;
logger.debug("event_register: %s", data->name);
if (exint_event_register(data->name, data->callback, data->user_data)) {
logger.warn("%s already registered", data->name);
}
}
ON_EVENT(event_unregister) {
auto data = (EventUnregister*)args;
logger.debug("event_unregister: %s", data->name);
if (exint_event_unregister(data->name, data->callback, data->user_data)) {
logger.warn("%s not registered", data->name);
}
}
ON_EVENT(event_trigger) {
auto data = (EventTrigger*)args;
logger.debug("event_trigger: %s", data->name);
exint_event(data->name, data->args_size, data->args);
}

View File

@ -22,7 +22,7 @@ EtHandlerDef upper_host_handler_def[] = {
{COM_FRAME_TYPE_COMMAND, ET_HDL_FLAG_DEFAULT, command_handler},
{COM_FRAME_TYPE_REQUEST, ET_HDL_FLAG_DEFAULT, request_handler},
{COM_FRAME_TYPE_SPEECH_TEXT, ET_HDL_FLAG_DEFAULT, text_handler},
{COM_FRAME_TYPE_SPEECH_PCM, ET_HDL_FLAG_DEFAULT, audio_handler},
{COM_FRAME_TYPE_SPEECH_PCM, ET_HDL_FLAG_NOVERIFY, audio_handler},
{COM_FRAME_TYPE_TELEMETRY_REQUEST, ET_HDL_FLAG_NOVERIFY, upperhost_telemetry_request_handler},
{COM_FRAME_TYPE_ALARM, ET_HDL_FLAG_DEFAULT, alarm_handler},
{COM_FRAME_TYPE_GRANT_TIME, ET_HDL_FLAG_DEFAULT, grant_time_handler},
@ -185,6 +185,7 @@ void exint_setup(std::unique_ptr<Transport>&& upperhost_transport, std::unique_p
std::bind(exint_handler, std::ref(*g_upperhost_transport.get()), upper_host_handler_def)
).detach();
exint_event_register("send_command", send_command_event_callback, g_upperhost_transport.get());
exint_event_register("send_audio", send_audio_event_callback, g_upperhost_transport.get());
}
if (g_telemetry_transport) {
std::thread(
@ -208,6 +209,9 @@ void exint_send(uint32_t type, size_t len, void* data) {
case ET_TYPE_AUDIO:
event_name = "send_audio";
break;
case ET_TYPE_EVENT:
event_name = "event_trigger";
break;
default:
logger.error("Unknown type %d", type);
return;
@ -215,12 +219,15 @@ void exint_send(uint32_t type, size_t len, void* data) {
exint_event(event_name, len, data);
}
void exint_finialize() {
if (g_upperhost_transport)
int exint_finalize() {
if (g_upperhost_transport) {
g_upperhost_transport->close();
if (g_telemetry_transport)
}
if (g_telemetry_transport) {
g_telemetry_transport->close();
}
exint_event_thread_stop();
return 0;
}
void exint_handle_pack(uint32_t type, size_t len, void* data) {

View File

@ -117,7 +117,7 @@ ComFrame* alarm_handler(EtHandlerDef *hdl, ComFrame* frame) {
if (!alarm)
return NULL;
exint_handle_pack(ET_TYPE_TEXT, sizeof(alarm), alarm);
exint_handle_pack(ET_TYPE_ALARM, sizeof(alarm), alarm);
uint8_t payload[6] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
return ComFrame_New(

View File

@ -9,9 +9,6 @@
#include "exint/handler.h"
#include "exint/protocol.hpp"
#define COMMAND_RESEND_INTERVAL 250
#define COMMAND_RESEND_MAX_COUNT 3
struct _CommandResendToken {
Transport* transport;
uint8_t data[6];
@ -57,6 +54,10 @@ ComFrame* request_handler(EtHandlerDef *hdl, ComFrame* frame) {
void send_command_event_callback(const char* event_name, size_t args_size, void* args, void* user_data) {
Transport* transport = (Transport*)user_data;
if (!transport) {
logger.error("transport is NULL");
return;
}
transport->send(std::shared_ptr<ComFrame>(
ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_REQUEST, args, args_size, true),
ComFrame_Del
@ -79,6 +80,7 @@ ON_EVENT(_resend_command) {
auto now = std::chrono::steady_clock::now();
if (now - resend_data.last_send_time < std::chrono::milliseconds(COMMAND_RESEND_INTERVAL)) {
exint_event("_resend_command", sizeof(resend_data), &resend_data);
return;
}
@ -99,14 +101,15 @@ ON_EVENT(_resend_command) {
));
resend_data.count++;
resend_data.last_send_time = now;
char buffer[13];
for (int i = 0; i < 6; i++) {
sprintf(buffer + i * 2, "%02x", token->data[i]);
}
buffer[12] = '\0';
if (resend_data.count > 3) {
char buffer[13];
for (int i = 0; i < 6; i++) {
sprintf(buffer + i * 2, "%02x", token->data[i]);
}
buffer[12] = '\0';
logger.warn("Command %s send failed", buffer);
} else {
logger.warn("Command %s send failed, retry %d times...", buffer, resend_data.count);
exint_event("_resend_command", sizeof(resend_data), &resend_data);
}
}

View File

@ -19,6 +19,11 @@ ComFrame* audio_handler(EtHandlerDef *hdl, ComFrame* frame) {
void send_audio_event_callback(const char* event_name, size_t args_size, void* args, void* user_data) {
Transport* transport = (Transport*)user_data;
if (!transport) {
auto& logger = *logging::get_logger("exint::audio");
logger.error("transport is NULL");
return;
}
ComFrame* frame = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_AUDIO,

56
tests/handler/setup.inc Normal file
View File

@ -0,0 +1,56 @@
#pragma once
#include "c_testcase.h"
#include "logging/interface.h"
#include "exint/detail.hpp"
#include "exint/protocol.hpp"
class MockTransport: public Transport {
public:
void send_backend() override {}
void receive_backend() override {}
Transport::FrameType get_sent_frame() {
return super::send_que.Pop(std::chrono::seconds(1)).first; //
}
void set_received_frame(ComFrame* frame) {
super::recv_que.Push(std::make_pair(std::shared_ptr<ComFrame>(frame, ComFrame_Del), nullptr));
}
private:
typedef Transport super;
};
static int g_telemetry_request_count = 0;
static DataQueue<std::tuple<uint32_t, size_t, void*>> g_data_queue;
static MockTransport* g_upperhost_mock_transport;
static MockTransport* g_telemetry_mock_transport;
void exint_data_callback(uint32_t type, size_t size, void* data) {
if (type == ET_TYPE_TELEMETRY_REQUEST) {
g_telemetry_request_count++;
memset(data, 0, size);
((TelemetryRequestData*)data)->app_ver_high = 1;
((TelemetryRequestData*)data)->app_ver_low = 2;
} else {
g_data_queue.Push(std::make_tuple(type, size, data));
}
}
SETUP {
g_upperhost_mock_transport = new MockTransport();
g_telemetry_mock_transport = new MockTransport();
exint_setup(std::unique_ptr<Transport>(g_upperhost_mock_transport), std::unique_ptr<Transport>(g_telemetry_mock_transport), exint_data_callback);
g_telemetry_request_count = 0;
log_set_level(LOG_LEVEL_DEBUG);
return 0;
}
TEARDOWN {
exint_finalize();
g_data_queue.Clear();
return 0;
}

View File

@ -0,0 +1,70 @@
#include "setup.inc"
TEST_CASE(test_send)
{
char command_data[] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05};
exint_send(ET_TYPE_COMMAND, 6, command_data);
auto command_frame = g_upperhost_mock_transport->get_sent_frame();
assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);
END_TEST;
}
TEST_CASE(test_resend)
{
char command_data[] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05};
exint_send(ET_TYPE_COMMAND, 6, command_data);
// send first command frame
auto command_frame = g_upperhost_mock_transport->get_sent_frame();
assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);
// resend frame
command_frame = g_upperhost_mock_transport->get_sent_frame();
assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);\
// resend frame again
command_frame = g_upperhost_mock_transport->get_sent_frame();
assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);
// responce
uint8_t payload[] = {0xAA, 0xAA};
g_upperhost_mock_transport->set_received_frame(
ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_REQUEST, payload, sizeof(payload), false)
);
try {
g_upperhost_mock_transport->get_sent_frame();
} catch (const QueueTimeout&) {
END_TEST;
}
printf("bad resend frame");
return 1;
}
TEST_CASE(test_recv) {
uint8_t command[] = { 0x00, 0x01, 0x02, 0x03, 0xAA, 0xAA};
auto command_msg = ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_COMMAND, command, sizeof(command), false);
g_upperhost_mock_transport->set_received_frame(command_msg);
uint32_t tp;
size_t len;
void* pack;
std::tie(tp, len, pack) = g_data_queue.Pop(std::chrono::milliseconds(1000));
assert_eq(tp, ET_TYPE_COMMAND);
assert_eq(len, sizeof(command));
assert_mem_eq(command, pack, len);
END_TEST;
}

View File

@ -0,0 +1,19 @@
#include "telemetry.h"
#include "setup.inc"
TEST_CASE(test_host_telemetry) {
auto request_msg = NewTelemetryRequestMsg(COM_FRAME_ADDRESS_VOIX, false);
g_telemetry_mock_transport->set_received_frame(request_msg);
auto telemetry_reply = g_telemetry_mock_transport->get_sent_frame();
assert(telemetry_reply);
assert_eq(g_telemetry_request_count, 1);
TelemetryData* telemetry_data = (TelemetryData*)ComFrame_PAYLOAD(telemetry_reply);
assert_eq(ComFrame_TYPE(telemetry_reply.get()), byteswaps(COM_FRAME_TYPE_TELEMETRY_ANSWER));
assert_eq(ComFrame_Length(telemetry_reply.get(), true), sizeof(ComFrameHeader) + sizeof(TelemetryData) + sizeof(uint16_t));
assert_eq(telemetry_data->application_version_high, 1);
assert_eq(telemetry_data->application_version_low, 2);
END_TEST;
}

View File

@ -7,7 +7,7 @@
static std::vector<int> g_vec;
ON_EVENT(test) {
ON_EVENT_EX(test, _test_handler, nullptr) {
int* arg = (int*)args;
g_vec.push_back(*arg);
}
@ -38,6 +38,7 @@ TEST_CASE(test_event) {
TEST_CASE(test_event_thread) {
exint_event_thread_start(2);
int i = 2;
int ret = 0;
exint_event("test", sizeof(int), &i);
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@ -47,9 +48,22 @@ TEST_CASE(test_event_thread) {
assert_eq(g_vec[0], 2);
goto end;
}
assert(false);
ret = 1;
end:
exint_event_thread_stop();
return ret;
}
TEST_CASE(test_public)
{
assert_eq(g_vec.size(), 0);
exint_public_event_register("test_public", _test_handler, nullptr);
exint_public_event("test_public", 0, NULL);
assert_eq(g_vec.size(), 1);
assert_eq(g_vec[0], 0);
exint_public_event_unregister("test_public", _test_handler, nullptr);
exint_public_event("test_public", 0, NULL);
assert_eq(g_vec.size(), 1);
END_TEST;
}

View File

@ -1,108 +0,0 @@
#include <thread>
#include <pty.h>
#include <fcntl.h>
#include <unistd.h>
#include "comframe.h"
#include "telemetry.h"
#include "dataqueue.hpp"
#include "logging/interface.h"
#include "transport/base.hpp"
#include "exint/protocol.hpp"
#include "exint/detail.hpp"
#include "c_testcase.h"
class MockTransport: public Transport {
public:
void send_backend() override {}
void receive_backend() override {}
Transport::FrameType get_sent_frame() {
return super::send_que.Pop().first; // std::chrono::seconds(1)
}
void set_received_frame(ComFrame* frame) {
super::recv_que.Push(std::make_pair(std::shared_ptr<ComFrame>(frame, ComFrame_Del), nullptr));
}
private:
typedef Transport super;
};
static int g_telemetry_request_count = 0;
static DataQueue<std::tuple<uint32_t, size_t, void*>> g_data_queue;
static MockTransport* g_upperhost_mock_transport;
static MockTransport* g_telemetry_mock_transport;
void exint_data_callback(uint32_t type, size_t size, void* data) {
if (type == ET_TYPE_TELEMETRY_REQUEST) {
g_telemetry_request_count++;
memset(data, 0, size);
((TelemetryRequestData*)data)->app_ver_high = 1;
((TelemetryRequestData*)data)->app_ver_low = 2;
} else {
g_data_queue.Push(std::make_tuple(type, size, data));
}
}
SETUP {
g_upperhost_mock_transport = new MockTransport();
g_telemetry_mock_transport = new MockTransport();
exint_setup(std::unique_ptr<Transport>(g_upperhost_mock_transport), std::unique_ptr<Transport>(g_telemetry_mock_transport), exint_data_callback);
g_telemetry_request_count = 0;
log_set_level(LOG_LEVEL_DEBUG);
return 0;
}
TEARDOWN {
exint_finialize();
g_data_queue.Clear();
return 0;
}
TEST_CASE(test_init) {
END_TEST;
}
TEST_CASE(test_host_telemetry) {
auto request_msg = NewTelemetryRequestMsg(COM_FRAME_ADDRESS_VOIX, false);
g_telemetry_mock_transport->set_received_frame(request_msg);
auto telemetry_reply = g_telemetry_mock_transport->get_sent_frame();
assert(telemetry_reply);
assert_eq(g_telemetry_request_count, 1);
TelemetryData* telemetry_data = (TelemetryData*)ComFrame_PAYLOAD(telemetry_reply);
assert_eq(ComFrame_TYPE(telemetry_reply.get()), byteswaps(COM_FRAME_TYPE_TELEMETRY_ANSWER));
assert_eq(ComFrame_Length(telemetry_reply.get(), true), sizeof(ComFrameHeader) + sizeof(TelemetryData) + sizeof(uint16_t));
assert_eq(telemetry_data->application_version_high, 1);
assert_eq(telemetry_data->application_version_low, 2);
END_TEST;
}
TEST_CASE(test_callback) {
uint8_t command[] = { 0x00, 0x01, 0x02, 0x03, 0xAA, 0xAA};
auto command_msg = ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_COMMAND, command, sizeof(command), false);
g_upperhost_mock_transport->set_received_frame(command_msg);
uint32_t tp;
size_t len;
void* pack;
std::tie(tp, len, pack) = g_data_queue.Pop(std::chrono::milliseconds(1000));
assert_eq(tp, ET_TYPE_COMMAND);
assert_eq(len, sizeof(command));
assert_mem_eq(command, pack, len);
END_TEST;
}
TEST_CASE(test_send_command) {
char command_data[] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05};
exint_send(ET_TYPE_COMMAND, 6, command_data);
auto command_frame = g_upperhost_mock_transport->get_sent_frame();
assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);
END_TEST;
}