feat(exint): support UDP and Unix socket transport for telemetry

- Add UDP and Unix socket transport options for telemetry data
- Implement new handler functions for text and audio data
- Update configuration file format to support new transport options
- Refactor existing code to accommodate new functionality
This commit is contained in:
ovizro 2024-12-19 18:00:09 +08:00
parent 02ca1e00b3
commit 9e4fdf3b13
17 changed files with 195 additions and 63 deletions

View File

@ -1,5 +1,22 @@
[General-Setting]
Log_Path = ./log
Log_Level = DEBUG
[External-Interface]
Enable_Alarm_Code = 1 ; 0: Disable, 1: Enable
HostInfo_Transport_Open = 1 ; 0: Disable, 1: Enable
HostInfo_Transport_Mode = COM ; COM, UDP, UNIX_UDP
HostInfo_Transport_COM_Path = /dev/ttyUSB0
HostInfo_Transport_COM_Baudrate = 115200
HostInfo_COM_Path = /dev/ttyUSB0
HostInfo_COM_Baudrate = 115200
HostInfo_Enable_Telemetry = 1 ; 0: Disable, 1: Enable
Telemetry_Transport_Open = 1 ; 0: Disable, 1: Enable
Telemetry_Transport_Mode = UDP ; COM, UDP, UNIX_UDP
Telemetry_COM_Path = /dev/ttyUSB1
Telemetry_COM_Baudrate = 115200
Telemetry_UDP_Local_Host = localhost
Telemetry_UDP_Local_Port = 5000
Telemetry_UDP_Remote_Host = localhost
Telemetry_UDP_Remote_Port = 5001
Telemetry_Enable_Telemetry = 1 ; 0: Disable, 1: Enable

View File

@ -21,8 +21,8 @@ public:
char chRecordEMark = ';', //record end-mark
char chCommentMark = '#'
)
:m_base64(base64), m_chSectionBMark(chSectionBMark), m_chSectionEMark(chSectionEMark),
m_chRecordEMark(chRecordEMark), m_chCommentMark(chCommentMark), m_bShowError(false)
:m_base64(base64), m_bShowError(false), m_chSectionBMark(chSectionBMark), m_chSectionEMark(chSectionEMark),
m_chRecordEMark(chRecordEMark), m_chCommentMark(chCommentMark)
{
}

View File

@ -5,6 +5,24 @@
#include <stdio.h>
#include <string.h>
#ifndef __ASSERT_FUNCTION
/* Version 2.4 and later of GCC define a magical variable `__PRETTY_FUNCTION__'
which contains the name of the function currently being defined.
This is broken in G++ before version 2.6.
C9x has a similar variable called __func__, but prefer the GCC one since
it demangles C++ function names. */
# if defined __cplusplus ? __GNUC_PREREQ (2, 6) : __GNUC_PREREQ (2, 4)
# define __ASSERT_FUNCTION __extension__ __PRETTY_FUNCTION__
# else
# if defined __STDC_VERSION__ && __STDC_VERSION__ >= 199901L
# define __ASSERT_FUNCTION __func__
# else
# define __ASSERT_FUNCTION ((const char *) 0)
# endif
# endif
#endif
#ifdef __cplusplus
#include <iostream>

View File

@ -17,7 +17,6 @@ void* upper_host_com_thread(void* arg);
}
extern bool g_bKeepExintRuning;
extern int g_iExternTimeDifference;
extern uint8_t g_iAlarmCode[4];

View File

@ -33,11 +33,17 @@ static inline struct EtHandlerDef* find_handler(uint16_t type, struct EtHandlerD
ComFrame* command_handler(EtHandlerDef *hdl, ComFrame* frame);
ComFrame* request_handler(EtHandlerDef *hdl, ComFrame* frame);
void send_command_event_callback(const char* event_name, size_t args_size, void* args, void* user_data);
// alarm.cpp
ComFrame* alarm_handler(EtHandlerDef *hdl, ComFrame* frame);
// text_audio.cpp
ComFrame* text_handler(EtHandlerDef *hdl, ComFrame* frame);
ComFrame* audio_handler(EtHandlerDef *hdl, ComFrame* frame);
void send_audio_event_callback(const char* event_name, size_t args_size, void* args, void* user_data);
// telemetry.cpp
ComFrame* telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame);
ComFrame* upperhost_telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame);
// alarm.cpp
ComFrame* alarm_handler(EtHandlerDef *hdl, ComFrame* frame);
// time.cpp
ComFrame* grant_time_handler(EtHandlerDef *hdl, ComFrame* frame);
#ifdef __cplusplus
}

View File

@ -166,6 +166,11 @@ public:
return logger->get_child(name.substr(sep_pos + seqlen), level);
}
inline void add_stream(std::ostream& stream)
{
_streams.push_back(std::unique_ptr<std::ostream>(new std::ostream(stream.rdbuf())));
}
template <typename T>
inline void add_stream(T&& stream)
{

View File

@ -177,7 +177,7 @@ public:
protected:
void send_backend() override
{
this->ensure_open();
// this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start serial port send backend");
while (!this->is_closed)
@ -213,8 +213,8 @@ protected:
void receive_backend() override
{
// this->ensure_open();
auto &logger = *logging::get_logger("transport");
this->ensure_open();
logger.debug("start serial port receive backend");
bool find_head = false;
size_t min_size = P::pred_size(nullptr, 0);

View File

@ -11,7 +11,7 @@
#include <sys/socket.h>
#include "base.hpp"
#define TRANSPORT_UDP_BUFFER_SIZE 1024
#define TRANSPORT_UDP_BUFFER_SIZE 1024 * 64
namespace transport {
@ -85,9 +85,10 @@ public:
}
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
if (sockfd < 0) {
logger.raise_from_errno("failed to create socket");
} else {
logger.info("open socket fd %d", sockfd);
}
super::open();
@ -142,7 +143,7 @@ public:
protected:
void send_backend() override
{
this->ensure_open();
// this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start datagram send backend");
while (!this->is_closed)
@ -171,7 +172,7 @@ protected:
void receive_backend() override
{
this->ensure_open();
// this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start datagram receive backend");
uint8_t *buffer = new uint8_t[buffer_size];

View File

@ -80,9 +80,10 @@ public:
}
sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sockfd < 0)
{
if (sockfd < 0) {
logger.raise_from_errno("failed to create socket");
} else {
logger.info("open socket fd %d", sockfd);
}
super::open();
@ -123,7 +124,7 @@ public:
protected:
void send_backend() override
{
this->ensure_open();
// this->ensure_open();
auto& logger = *logging::get_logger("transport");
logger.debug("start datagram send backend");
while (!this->is_closed)
@ -152,7 +153,7 @@ protected:
void receive_backend() override
{
this->ensure_open();
// this->ensure_open();
auto& logger = *logging::get_logger("transport");
logger.debug("start datagram receive backend");
uint8_t *buffer = new uint8_t[buffer_size];

View File

@ -92,6 +92,7 @@ void* exint_event_thread(void*) {
} catch (const QueueException&) {
break;
}
logger.debug("Run event %s", event_name.c_str());
callback(event_name.c_str(), args_size, args.get(), user_data);
}
_event_thread_count.fetch_sub(1);

View File

@ -4,6 +4,7 @@
#include "CCfgFileParser.h"
#include "inicpp.hpp"
#include "transport/serial_port.hpp"
#include "transport/udp.hpp"
#include "exint/detail.hpp"
#include "exint/event.h"
#include "exint/handler.h"
@ -14,17 +15,17 @@ static et_callback_t et_callback;
std::unique_ptr<Transport> g_upperhost_transport;
std::unique_ptr<Transport> g_telemetry_transport;
int g_iEnableAlarmCode = true;
bool g_bKeepExintRuning = false;
int g_iExternTimeDifference = 0;
int g_iUseHostComForTelemetry = true;
uint8_t g_iAlarmCode[4] = {0xAA, 0xAA, 0xAA, 0xAA};
EtHandlerDef upper_host_handler_def[] = {
{COM_FRAME_TYPE_COMMAND, ET_HDL_FLAG_DEFAULT, command_handler},
{COM_FRAME_TYPE_REQUEST, ET_HDL_FLAG_DEFAULT, request_handler},
{COM_FRAME_TYPE_ALARM, ET_HDL_FLAG_DEFAULT, alarm_handler},
{COM_FRAME_TYPE_SPEECH_TEXT, ET_HDL_FLAG_DEFAULT, text_handler},
{COM_FRAME_TYPE_SPEECH_PCM, ET_HDL_FLAG_DEFAULT, audio_handler},
{COM_FRAME_TYPE_TELEMETRY_REQUEST, ET_HDL_FLAG_NOVERIFY, upperhost_telemetry_request_handler},
{COM_FRAME_TYPE_ALARM, ET_HDL_FLAG_DEFAULT, alarm_handler},
{COM_FRAME_TYPE_GRANT_TIME, ET_HDL_FLAG_DEFAULT, grant_time_handler},
{0, 0, NULL}
};
@ -43,23 +44,32 @@ static int read_config(const char* config_path) {
}
const char* log_path;
auto& global_logger = logging::get_global_logger();
if (config.getValue(log_section, "Log_Path", log_path)) {
logging::get_global_logger().add_stream(std::ofstream(std::string(log_path) + "/external_interface.log"));
global_logger.add_stream(std::cerr);
global_logger.add_stream(std::ofstream(std::string(log_path) + "/external_interface.log", std::ios::out | std::ios::app));
}
const char* log_level = "WARN";
const char* log_level = "INFO";
if (config.getValue(log_section, "Log_Level", log_level)) {
logging::get_global_logger().set_level(logging::str2level(log_level));
global_logger.set_level(logging::str2level(log_level));
}
#define read_transport(prefix, transport_var) do { \
#define read_transport(prefix, transport_var, hdl_def) do { \
long transport_open;\
if (config.getIntValue(config_section, #prefix "_Transport_Open", transport_open)) {\
if (!config.getIntValue(config_section, #prefix "_Transport_Open", transport_open)) {\
logger.error("failed to read [%s]:" #prefix "_Transport_Open in %s", config_section, config_path); \
} else if (transport_open) { \
const char* transport_mode = "COM"; \
if (!config.getValue(config_section, #prefix "_Transport_Mode", transport_mode)) {\
logger.error("failed to read [%s]:" #prefix "_Transport_Mode in %s", config_section, config_path); \
}\
long use_telemetry = 1;\
if (!config.getIntValue(config_section, #prefix "_Enable_Telemetry", use_telemetry)) {\
logger.error("failed to read [%s]:" #prefix "_Enable_Telemetry in %s", config_section, config_path); \
}\
else if (!use_telemetry) {\
find_handler(COM_FRAME_TYPE_TELEMETRY_REQUEST, hdl_def)->hd_flags |= ET_HDL_FLAG_DISABLED;\
}\
if (strcasecmp(transport_mode, "com") == 0) {\
const char* com_path;\
if (!config.getValue(config_section, #prefix "_COM_Path", com_path)) {\
@ -75,31 +85,41 @@ static int read_config(const char* config_path) {
);\
} else if (strcasecmp(transport_mode, "datagram") == 0 || strcasecmp(transport_mode, "udp")) {\
const char* host = "0.0.0.0";\
if (!config.getValue(config_section, #prefix "_UDP_Host", host)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Host in %s", config_section, config_path); \
return -1;\
if (!config.getValue(config_section, #prefix "_UDP_Local_Host", host)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Local_Host in %s", config_section, config_path); \
}\
long port = 5000; \
if (!config.getIntValue(config_section, #prefix "_UDP_Port", port)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Port in %s", config_section, config_path); \
if (!config.getIntValue(config_section, #prefix "_UDP_Local_Port", port)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Local_Port in %s", config_section, config_path); \
}\
const char* remote_host = "";\
if (!config.getValue(config_section, #prefix "_UDP_Remote_Host", remote_host)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Remote_Host in %s", config_section, config_path); \
}\
long remote_port = 5000; \
if (!config.getIntValue(config_section, #prefix "_UDP_Remote_Port", remote_port)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Remote_Port in %s", config_section, config_path); \
}\
transport_var = std::unique_ptr<Transport>(\
new transport::DatagramTransport<ComframeProtocol>(std::make_pair(host, port), std::make_pair(remote_host, remote_port))\
);\
} else {\
logger.error("unknown transport mode: %s", transport_mode);\
}\
}\
} while (0)
read_transport(HostInfo, g_upperhost_transport);
read_transport(Telemetry, g_telemetry_transport);
read_transport(HostInfo, g_upperhost_transport, upper_host_handler_def);
read_transport(Telemetry, g_telemetry_transport, telemetry_handler_def);
#undef read_transport
long use_host_com_for_telemetry = 1;
if (!config.getIntValue(config_section, "Use_HostCOM_for_Telemetry", use_host_com_for_telemetry)) {
logger.error("fail to read [%s]:Use_HostCOM_for_Telemetry from %s", config_section, config_path);
long enable_alarm = 1;
if (!config.getIntValue(config_section, "Enable_Alarm_Code", enable_alarm)) {
logger.error("fail to read [%s]:Enable_Alarm_Code from %s", config_section, config_path);
}
if (!use_host_com_for_telemetry) {
find_handler(ET_TYPE_TELEMETRY_REQUEST, upper_host_handler_def)->hd_flags |= ET_HDL_FLAG_DISABLED;
else if (!enable_alarm) {
find_handler(COM_FRAME_TYPE_ALARM, upper_host_handler_def)->hd_flags |= ET_HDL_FLAG_DISABLED;
}
return 0;
}
@ -134,13 +154,12 @@ void exint_handler(Transport& transport, struct EtHandlerDef *handlers) {
}
else if (handler->hd_flags & ET_HDL_FLAG_ASYNC)
{
std::thread([handler, frame, token, &transport]()
{
std::thread([handler, frame, token, &transport]() {
auto reply = handler->hd_handler(handler, frame.get());
if (reply) {
transport.send(std::shared_ptr<ComFrame>(reply, ComFrame_Del), token);
} })
.detach();
}
}).detach();
}
else
{
@ -160,15 +179,18 @@ void exint_setup(std::unique_ptr<Transport>&& upperhost_transport, std::unique_p
g_upperhost_transport = std::move(upperhost_transport);
if (telemetry_transport)
g_telemetry_transport = std::move(telemetry_transport);
g_bKeepExintRuning = true;
exint_event_thread_start(1);
exint_event_register("send_command", send_command_event_callback, g_upperhost_transport.get());
std::thread(
std::bind(exint_handler, std::ref(*g_upperhost_transport.get()), upper_host_handler_def)
).detach();
std::thread(
std::bind(exint_handler, std::ref(*g_telemetry_transport.get()), telemetry_handler_def)
).detach();
if (g_upperhost_transport) {
std::thread(
std::bind(exint_handler, std::ref(*g_upperhost_transport.get()), upper_host_handler_def)
).detach();
exint_event_register("send_command", send_command_event_callback, g_upperhost_transport.get());
}
if (g_telemetry_transport) {
std::thread(
std::bind(exint_handler, std::ref(*g_telemetry_transport.get()), telemetry_handler_def)
).detach();
}
}
int exint_initialize(const char* config_path, et_callback_t cb) {
@ -194,9 +216,10 @@ void exint_send(uint32_t type, size_t len, void* data) {
}
void exint_finialize() {
g_bKeepExintRuning = false;
g_upperhost_transport->close();
g_telemetry_transport->close();
if (g_upperhost_transport)
g_upperhost_transport->close();
if (g_telemetry_transport)
g_telemetry_transport->close();
exint_event_thread_stop();
}

View File

View File

@ -148,25 +148,19 @@ ComFrame* telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame) {
TelemetryCommandInfo TeleCmdInfo2Send;
TelemetryData TelemetryData2Send;
if (hdl->hd_flags & ET_HDL_FLAG_NOVERIFY && ComFrame_Verify(frame, false)) {
logger.warn("Invalid checksum");
uint8_t code[] = TELEMETRY_ERROR_CHECKSUM;
return NewTelemetryErrorMsg(address, code, true);
}
if (ComFrame_PAYLOAD_LENGTH(frame) != 2 || frame->payload[0] != 0xFF || frame->payload[1] != 0x11) {
logger.warn("Invalid payload");
uint8_t code[] = TELEMETRY_ERROR_TYPE;
return NewTelemetryErrorMsg(address, code, true);
}
if (iTelemetryAnswerTimes % 3 == 0)
{
if (g_recvCommandQueue.Empty())
{ // empty
if (0) // 2024.11.04 g_bNeedReboot)
{
system("reboot");
return NULL;
}
}
else
if (!g_recvCommandQueue.Empty())
{
TeleCmdInfo2Send = g_recvCommandQueue.Pop();
}

View File

@ -0,0 +1,37 @@
#include "exint/event.h"
#include "exint/handler.h"
#include "exint/detail.hpp"
ComFrame* text_handler(EtHandlerDef *hdl, ComFrame* frame) {
exint_event("recv_text", ComFrame_PAYLOAD_LENGTH(frame), ComFrame_PAYLOAD(frame));
uint8_t payload[6] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
return ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_SPEECH_INJECTED,
payload, 6, true
);
}
ComFrame* audio_handler(EtHandlerDef *hdl, ComFrame* frame) {
exint_event("recv_audio", ComFrame_PAYLOAD_LENGTH(frame), ComFrame_PAYLOAD(frame));
return NULL;
}
void send_audio_event_callback(const char* event_name, size_t args_size, void* args, void* user_data) {
Transport* transport = (Transport*)user_data;
ComFrame* frame = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_AUDIO,
args, args_size, true
);
transport->send(std::shared_ptr<ComFrame>(frame, ComFrame_Del));
}
ON_EVENT(recv_text) {
exint_handle_pack(ET_TYPE_TEXT, args_size, args);
}
ON_EVENT(recv_audio) {
exint_handle_pack(ET_TYPE_AUDIO, args_size, args);
}

26
src/handler/time.cpp Normal file
View File

@ -0,0 +1,26 @@
#include <sys/sysinfo.h>
#include "exint/handler.h"
#include "exint/detail.hpp"
static auto& logger = *logging::get_logger("exint::time");
ComFrame* grant_time_handler(EtHandlerDef *hdl, ComFrame* frame) {
if (frame->payload[0] != 0x66) { // data error
logger.warn("Time data error");
return NULL;
}
int secondsAbs = *((int*)(frame->payload+1));
secondsAbs = byteswapl(secondsAbs);
struct sysinfo info;
if (sysinfo(&info)) {
logger.raise_from_errno("sysinfo() failed");
return NULL;
}
auto old_uptime = info.uptime + g_iExternTimeDifference;
g_iExternTimeDifference = secondsAbs - info.uptime;
logger.info("Uptime changed, old: %d, new: %d", old_uptime, info.uptime + g_iExternTimeDifference);
return NULL;
}

View File

@ -7,6 +7,8 @@ using namespace transport;
TEST_CASE(test_init) {
DatagramTransport<Protocol> transport;
transport.open();
transport.close();
assert_eq(transport.closed(), true);
END_TEST;
}

View File

@ -7,6 +7,8 @@ using namespace transport;
TEST_CASE(test_init) {
UnixDatagramTransport<Protocol> transport;
transport.open();
transport.close();
assert_eq(transport.closed(), true);
END_TEST;
}