feat(external_interface): implement transport layer and refactor handlers

- Add transport layer abstraction for COM and UDP communication
- Refactor handler definitions and processing logic
- Improve error handling and logging
- Remove unnecessary global variables and pthread usage
This commit is contained in:
ovizro 2024-12-18 23:08:16 +08:00
parent 53a5fe6d50
commit 02ca1e00b3
30 changed files with 2038 additions and 949 deletions

View File

@ -33,6 +33,7 @@ endif()
include_directories(./include) include_directories(./include)
aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_LIST) aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/handler SRC_LIST)
add_library(exint SHARED ${SRC_LIST}) add_library(exint SHARED ${SRC_LIST})
add_library(exint_static STATIC ${SRC_LIST}) add_library(exint_static STATIC ${SRC_LIST})

5
config/exint.cfg Normal file
View File

@ -0,0 +1,5 @@
[External-Interface]
HostInfo_Transport_Open = 1 ; 0: Disable, 1: Enable
HostInfo_Transport_Mode = COM ; COM, UDP, UNIX_UDP
HostInfo_Transport_COM_Path = /dev/ttyUSB0
HostInfo_Transport_COM_Baudrate = 115200

View File

@ -27,8 +27,8 @@
// added by gejp start 2024.10.03 // added by gejp start 2024.10.03
#define COM_FRAME_TYPE_GRANT_TIME 0x0085 #define COM_FRAME_TYPE_GRANT_TIME 0x0085
#define COM_FRAME_TYPE_VOICE_ANNOUNCEMENT_AND_ALARM_CODE_REQUEST 0x0083 #define COM_FRAME_TYPE_ALARM 0x0083
#define COM_FRAME_TYPE_VOICE_ANNOUNCEMENT_AND_ALARM_CODE_RESPONSE 0x0083 #define COM_FRAME_TYPE_ALARM_RESPONSE 0x0083
#define byteswapl(x) (0xff000000 & x << 24) \ #define byteswapl(x) (0xff000000 & x << 24) \
|(0x00ff0000 & x << 8) \ |(0x00ff0000 & x << 8) \

View File

@ -2,30 +2,27 @@
#define _INCLUDE_EXTERN_INTERFACE_DETAIL_H_ #define _INCLUDE_EXTERN_INTERFACE_DETAIL_H_
#include <stdint.h> #include <stdint.h>
#include <memory>
#include "external_interface.h" #include "external_interface.h"
#include "exint/protocol.hpp"
#define PrintFilePos() #define PrintFilePos()
#ifdef __cplusplus
extern "C" { extern "C" {
#endif
void exint_handle_pack(uint32_t type, size_t len, void* data); void exint_handle_pack(uint32_t type, size_t len, void* data);
int exint_init_from_tty(int host_com_tty, int telemetry_com_tty, et_callback_t cb);
void* telemetry_host_com_thread(void* args); void* telemetry_host_com_thread(void* args);
void* upper_host_com_thread(void* arg); void* upper_host_com_thread(void* arg);
extern int g_iHostCom_tty_id; }
extern int g_iTelemetry_Com_tty_id;
extern int g_iEnableAlarmCode;
extern bool g_bKeepExintRuning; extern bool g_bKeepExintRuning;
extern int g_iExternTimeDifference; extern int g_iExternTimeDifference;
extern int g_iUseHostComForTelemetry;
extern uint8_t g_iAlarmCode[4]; extern uint8_t g_iAlarmCode[4];
#ifdef __cplusplus extern std::unique_ptr<Transport> g_upperhost_transport;
} extern std::unique_ptr<Transport> g_telemetry_transport;
#endif void exint_setup(std::unique_ptr<Transport>&& upperhost_transport, std::unique_ptr<Transport>&& telemetry_transport, et_callback_t cb);
#endif #endif

View File

@ -16,9 +16,29 @@ extern "C" {
struct EtHandlerDef { struct EtHandlerDef {
uint16_t hd_type; uint16_t hd_type;
uint16_t hd_flags; uint16_t hd_flags;
void (*hd_handler)(struct EtHandlerDef *handler, ComFrame* frame); ComFrame* (*hd_handler)(struct EtHandlerDef *handler, ComFrame* frame);
}; };
static inline struct EtHandlerDef* find_handler(uint16_t type, struct EtHandlerDef *handlers) {
while (handlers->hd_handler) {
if (handlers->hd_type == type) {
return handlers;
}
handlers++;
}
return nullptr;
}
// command.cpp
ComFrame* command_handler(EtHandlerDef *hdl, ComFrame* frame);
ComFrame* request_handler(EtHandlerDef *hdl, ComFrame* frame);
void send_command_event_callback(const char* event_name, size_t args_size, void* args, void* user_data);
// alarm.cpp
ComFrame* alarm_handler(EtHandlerDef *hdl, ComFrame* frame);
// telemetry.cpp
ComFrame* telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame);
ComFrame* upperhost_telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -0,0 +1,44 @@
#ifndef _INCLUDE_EXINT_PROTOCOL_
#define _INCLUDE_EXINT_PROTOCOL_
#include <memory>
#include "transport/base.hpp"
#include "comframe.h"
class ComframeProtocol {
public:
typedef std::shared_ptr<ComFrame> FrameType;
static inline ssize_t pred_size(void* buf, size_t size) {
if (buf == nullptr) return sizeof(ComFrameHeader);
if (size < sizeof(ComFrameHeader)) return -1;
if (ComFrameHeader_Check(ComFrame_HEADER(buf), true)) {
return -1;
}
return ComFrame_Length((ComFrame*)buf, true);
}
static inline FrameType make_frame(void* buf, size_t size) {
if (buf == nullptr || size < sizeof(ComFrameHeader))
return FrameType();
ComFrameHeader_SwapEndian(ComFrame_HEADER(buf));
auto frame_len = ComFrame_LENGTH(buf);
if (size < frame_len)
return FrameType();
auto frame = ComFrame_NewUninited(frame_len);
memcpy(frame, buf, frame_len);
return std::shared_ptr<ComFrame>(frame, ComFrame_Del);
}
static inline size_t frame_size(FrameType frame) {
return ComFrame_Length(frame.get(), true);
}
static inline void* frame_data(FrameType frame) {
return frame.get();
}
};
using Transport = transport::BaseTransport<ComframeProtocol>;
#endif

View File

@ -33,41 +33,100 @@ public:
constexpr static const char* namesep = "::"; constexpr static const char* namesep = "::";
explicit Logger(Level level = Level::INFO); explicit Logger(Level level = Level::INFO) : _parent(nullptr), _level(level) {}
Logger(const Logger&) = delete; Logger(const Logger&) = delete;
virtual ~Logger() = default; virtual ~Logger() = default;
void add_stream(std::ostream& stream); inline Level level() const
Level level() const; {
const std::string& name() const; if (_level == Logger::Level::UNKNOWN) {
Logger* parent() const; if (_parent) {
size_t children_count() const; return _parent->level();
void set_level(Level level); }
return Logger::Level::INFO;
}
return _level;
}
inline const std::string& name() const
{
return _name;
}
inline Logger* parent() const
{
return _parent;
}
inline size_t children_count() const
{
return logger_cache.size();
}
inline void set_level(Level level)
{
_level = level;
}
template<Level level> template<Level level>
void log(const char* fmt, ...) { inline void log(const char* fmt, ...) {
va_list args; va_list args;
va_start(args, fmt); va_start(args, fmt);
vlog(level, fmt, args); vlog(level, fmt, args);
va_end(args); va_end(args);
} }
inline void log(Level level, const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(level, fmt, args);
va_end(args);
}
inline void debug(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::DEBUG, fmt, args);
va_end(args);
}
void info(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::INFO, fmt, args);
va_end(args);
}
void warn(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::WARN, fmt, args);
va_end(args);
}
void error(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::ERROR, fmt, args);
va_end(args);
}
void fatal(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::FATAL, fmt, args);
va_end(args);
}
template<Level level> template<Level level>
void vlog(const char* fmt, va_list args) { inline void vlog(const char* fmt, va_list args) {
vlog(level, fmt, args); vlog(level, fmt, args);
} }
template<typename E = std::runtime_error> template<typename E = std::runtime_error>
void raise_from_errno(const char* msg) { inline void raise_from_errno(const char* msg) {
error("%s: %s", msg, strerror(errno)); error("%s: %s", msg, strerror(errno));
throw E(msg); throw E(msg);
} }
void log(Level level, const char* fmt, ...);
void debug(const char* fmt, ...);
void info(const char* fmt, ...);
void warn(const char* fmt, ...);
void error(const char* fmt, ...);
void fatal(const char* fmt, ...);
void vlog(Level level, const char* fmt, va_list args); void vlog(Level level, const char* fmt, va_list args);
virtual void log_message(Level level, const std::string& msg); virtual void log_message(Level level, const std::string& msg);
@ -107,6 +166,20 @@ public:
return logger->get_child(name.substr(sep_pos + seqlen), level); return logger->get_child(name.substr(sep_pos + seqlen), level);
} }
template <typename T>
inline void add_stream(T&& stream)
{
_streams.push_back(std::unique_ptr<std::ostream>(new typename std::remove_reference<T>::type(std::move(stream))));
}
inline std::vector<std::ostream*> streams() const
{
std::vector<std::ostream*> streams;
for (auto& stream : _streams)
streams.push_back(stream.get());
return streams;
}
protected: protected:
explicit Logger(const std::string& name, Level level = Level::INFO, Logger* parent = nullptr); explicit Logger(const std::string& name, Level level = Level::INFO, Logger* parent = nullptr);

View File

@ -5,13 +5,13 @@
#include <stddef.h> #include <stddef.h>
#include "comframe.h" #include "comframe.h"
#define TELEMETRY_REQUEST_PAYLOAD 0xFF11 #define TELEMETRY_REQUEST_PAYLOAD {0xFF, 0x11}
#define TELEMETRY_STATUS_OK 0xAA #define TELEMETRY_STATUS_OK 0xAA
#define TELEMETRY_STATUS_ERROR 0x55 #define TELEMETRY_STATUS_ERROR 0x55
#define TELEMETRY_ERROR_TYPE 0x00E1 #define TELEMETRY_ERROR_TYPE {0x00, 0xE1}
#define TELEMETRY_ERROR_CHECKSUM 0x00E2 #define TELEMETRY_ERROR_CHECKSUM {0x00, 0xE2}
#define TELEMETRY_SEND_COMMAND_INFO_LENGTH 5 #define TELEMETRY_SEND_COMMAND_INFO_LENGTH 5
@ -172,7 +172,7 @@ static __inline void TelemetryData_SwapEndian(struct TelemetryData* tele_data) {
} }
static __inline ComFrame* NewTelemetryRequestMsg(uint16_t address, const bool swap_endian) { static __inline ComFrame* NewTelemetryRequestMsg(uint16_t address, const bool swap_endian) {
uint16_t tele_request_payload = (swap_endian) ? byteswaps(TELEMETRY_REQUEST_PAYLOAD) : TELEMETRY_REQUEST_PAYLOAD; uint8_t tele_request_payload[] = TELEMETRY_REQUEST_PAYLOAD;
return ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_REQUEST, &tele_request_payload, sizeof(uint16_t), swap_endian); return ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_REQUEST, &tele_request_payload, sizeof(uint16_t), swap_endian);
} }
@ -183,13 +183,10 @@ static __inline ComFrame* NewTelemetryAnswerData(uint16_t address, const struct
return frame; return frame;
} }
static __inline ComFrame* NewTelemetryErrorMsg(uint16_t address, uint16_t code, const bool swap_endian) { static __inline ComFrame* NewTelemetryErrorMsg(uint16_t address, uint8_t code[2], const bool swap_endian) {
uint16_t tele_error_payload = (swap_endian) ? byteswaps(code) : code; return ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_ERROR, code, 2, swap_endian);
return ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_REQUEST, &tele_error_payload, sizeof(uint16_t), swap_endian);
} }
ssize_t SendTelemetryErrorMsg(int tty_id, uint16_t address, uint16_t code, const bool swap_endian);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

164
include/transport/base.hpp Normal file
View File

@ -0,0 +1,164 @@
#ifndef _INCLUDE_TRANSPORT_BASE_
#define _INCLUDE_TRANSPORT_BASE_
#include <stdint.h>
#include <chrono>
#include <memory>
#include <thread>
#include <functional>
#include "dataqueue.hpp"
#include "logging/logger.hpp"
#include "protocol.hpp"
#define TRANSPORT_MAX_RETRY 5
#define TRANSPORT_TIMEOUT 1000
namespace transport
{
template <typename P>
class BaseTransport;
class _transport_base {
public:
_transport_base() : is_open(false), is_closed(false) {}
_transport_base(const _transport_base&) = delete;
virtual ~_transport_base() {
close();
}
virtual void open() {
if (is_open)
return;
is_open = true;
std::thread([this] {
try {
send_backend();
} catch (const QueueCleared&) {}
}).detach();
std::thread([this] {
try {
receive_backend();
} catch (const QueueCleared&) {}
}).detach();
}
virtual void close() {
is_closed = true;
}
bool closed() const {
return is_closed;
}
protected:
void ensure_open() {
auto& logger = *logging::get_logger("transport");
if (is_closed)
{
logger.fatal("transport closed");
throw std::runtime_error("transport closed");
}
if (!is_open)
{
open();
}
}
virtual void send_backend() = 0;
virtual void receive_backend() = 0;
bool is_open;
bool is_closed;
};
class TransportToken
{
public:
explicit TransportToken(_transport_base *transport) : transport_(transport) {}
template <typename P>
BaseTransport<P> *transport() const {
return dynamic_cast<BaseTransport<P>*>(transport_);
}
virtual bool operator==(const TransportToken &other) const {
return transport_ == other.transport_;
}
protected:
_transport_base *transport_;
friend class _transport_base;
friend std::hash<TransportToken>;
};
template <typename P>
class BaseTransport : public _transport_base
{
public:
typedef P Protocol;
typedef typename P::FrameType FrameType;
~BaseTransport() override
{
close();
}
template <typename Rep = uint64_t, typename Period = std::milli>
inline void send(typename P::FrameType frame, std::shared_ptr<TransportToken> token = nullptr)
{
ensure_open();
send_que.Push(std::make_pair(frame, token));
}
template <typename Rep = uint64_t, typename Period = std::milli>
inline std::pair<typename P::FrameType, std::shared_ptr<TransportToken>> receive(std::chrono::duration<Rep, Period> dur = std::chrono::milliseconds(0))
{
ensure_open();
DataPair frame_pair;
if (!dur.count())
frame_pair = recv_que.Pop();
else
frame_pair = recv_que.Pop(dur);
return frame_pair;
}
template <typename Rep = uint64_t, typename Period = std::milli>
inline typename P::FrameType request(typename P::FrameType frame, int max_retry = TRANSPORT_MAX_RETRY, std::chrono::duration<Rep, Period> dur = std::chrono::milliseconds(TRANSPORT_TIMEOUT))
{
ensure_open();
while (max_retry--)
{
send_que.Push(std::make_pair(frame, nullptr));
DataPair frame_pair = recv_que.Pop(dur);
if (frame_pair.first) {
return frame_pair.first;
}
auto& logger = *logging::get_logger("transport");
logger.warn("request timeout, retrying...");
}
return nullptr;
}
void close() override {
is_closed = true;
recv_que.Clear();
send_que.Clear();
}
typedef std::pair<typename P::FrameType, std::shared_ptr<TransportToken>> DataPair;
protected:
DataQueue<DataPair> send_que;
DataQueue<DataPair> recv_que;
};
}
namespace std {
template <>
struct hash<transport::TransportToken>
{
size_t operator()(const transport::TransportToken &token) const
{
return std::hash<uintptr_t>()(reinterpret_cast<uintptr_t>(token.transport_));
}
};
}
#endif

View File

@ -0,0 +1,35 @@
#ifndef _INCLUDE_TRANSPORT_PROTOCOL_
#define _INCLUDE_TRANSPORT_PROTOCOL_
#include <stdint.h>
#include <vector>
namespace transport
{
class Protocol {
public:
typedef std::vector<uint8_t> FrameType;
static ssize_t pred_size(void* buf, size_t size) {
if (buf == nullptr) return 1;
return size;
}
static FrameType make_frame(void* buf, size_t size) {
if (buf == nullptr) return FrameType();
return FrameType((uint8_t*)buf, (uint8_t*)buf + size);
}
static size_t frame_size(FrameType frame) {
return frame.size();
}
static void* frame_data(FrameType frame) {
return frame.data();
}
};
}
#endif

View File

@ -0,0 +1,306 @@
#ifndef _INCLUDE_TRANSPORT_TTY_
#define _INCLUDE_TRANSPORT_TTY_
#include <fcntl.h>
#include <termios.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <assert.h>
#include "base.hpp"
#define TRANSPORT_SERIAL_PORT_BUFFER_SIZE 1024 * 1024
// #define TRANSPORT_SERIAL_PORT_DEBUG
#ifdef COM_FRAME_DEBUG
#define TRANSPORT_SERIAL_PORT_DEBUG
#endif
namespace transport
{
template <typename P>
class SerialPortTransport : public BaseTransport<P>
{
public:
SerialPortTransport(const std::string &path, int baudrate = 115200, size_t buffer_size = TRANSPORT_SERIAL_PORT_BUFFER_SIZE)
: path(path), tty_id(-1), baudrate(baudrate), buffer_size(buffer_size) {}
SerialPortTransport(int tty_id, int baudrate = 115200, size_t buffer_size = 1024)
: tty_id(tty_id), baudrate(baudrate), buffer_size(buffer_size) {}
~SerialPortTransport() override
{
close();
}
void open() override
{
auto &logger = *logging::get_logger("transport");
if (this->is_open)
{
return;
}
else if (this->is_closed)
{
logger.info("reopen serial port transport");
this->is_open = false;
this->is_closed = false;
}
if (tty_id < 0)
{
logger.info("open serial port %s", path.c_str());
tty_id = ::open(path.c_str(), O_RDWR | O_NOCTTY | O_NONBLOCK);
if (tty_id < 0)
{
logger.raise_from_errno("open serial port failed");
}
}
else
{
logger.info("use serial port %d", tty_id);
}
struct termios options, old;
// deploy usart par
memset(&options, 0, sizeof(options));
int ret = tcgetattr(tty_id, &old);
if (ret != 0)
{
logger.error("tcgetattr failed: %s", strerror(errno));
goto end;
}
tcflush(tty_id, TCIOFLUSH);
switch (baudrate)
{
case 9600:
cfsetispeed(&options, B9600);
cfsetospeed(&options, B9600);
break;
case 19200:
cfsetispeed(&options, B19200);
cfsetospeed(&options, B19200);
break;
case 38400:
cfsetispeed(&options, B38400);
cfsetospeed(&options, B38400);
break;
case 57600:
cfsetispeed(&options, B57600);
cfsetospeed(&options, B57600);
break;
case 115200:
cfsetispeed(&options, B115200);
cfsetospeed(&options, B115200);
break;
case 576000:
cfsetispeed(&options, B576000);
cfsetospeed(&options, B576000);
break;
case 921600:
cfsetispeed(&options, B921600);
cfsetospeed(&options, B921600);
break;
case 2000000:
cfsetispeed(&options, B2000000);
cfsetospeed(&options, B2000000);
break;
case 3000000:
cfsetispeed(&options, B3000000);
cfsetospeed(&options, B3000000);
break;
default:
logger.error("bad baud rate %u", baudrate);
break;
}
switch (1)
{
case 0:
options.c_cflag &= ~PARENB;
options.c_cflag &= ~INPCK;
break;
case 1:
options.c_cflag |= (PARODD // 使用奇校验代替偶校验
| PARENB); // 校验位有效
options.c_iflag |= INPCK; // 校验有效
break;
case 2:
options.c_cflag |= PARENB;
options.c_cflag &= ~PARODD;
options.c_iflag |= INPCK;
break;
case 3:
options.c_cflag &= ~PARENB;
options.c_cflag &= ~CSTOPB;
break;
default:
options.c_cflag &= ~PARENB;
break;
}
options.c_cflag |= (CLOCAL | CREAD);
options.c_cflag &= ~CSIZE;
options.c_cflag &= ~CRTSCTS;
options.c_cflag |= CS8;
options.c_cflag &= ~CSTOPB;
options.c_oflag = 0;
options.c_lflag = 0;
options.c_cc[VTIME] = 0;
options.c_cc[VMIN] = 0;
// 启用输出的XON/XOFF控制字符
// Enable software flow control (XON/XOFF) for both input and output
options.c_iflag |= (IXON | IXOFF); // Enable input and output XON/XOFF control characters
options.c_oflag |= (IXON | IXOFF); // Enable input and output XON/XOFF control characters
tcflush(tty_id, TCIFLUSH);
if ((tcsetattr(tty_id, TCSANOW, &options)) != 0)
{
logger.error("tcsetattr failed: %s", strerror(errno));
}
end:
super::open();
}
void close() override
{
if (this->is_open && !this->closed())
{
auto &logger = *logging::get_logger("transport");
logger.info("close serial port %s", path.c_str());
::close(tty_id);
}
super::close();
}
protected:
void send_backend() override
{
this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start serial port send backend");
while (!this->is_closed)
{
auto frame_pair = this->send_que.Pop();
auto frame = frame_pair.first;
if (frame_pair.second && frame_pair.second->template transport<P>() != this)
{
logger.error("invalid token received");
continue;
}
size_t remaining_size = P::frame_size(frame);
if (remaining_size == 0) {
continue;
}
logger.debug("send data %zu", remaining_size);
size_t offset = 0;
while (remaining_size > 0)
{
auto written_size = write(tty_id, static_cast<uint8_t*>(P::frame_data(frame)) + offset, remaining_size);
if (written_size < 0)
{
logger.error("write serial port failed: %s", strerror(errno));
}
else
{
remaining_size -= written_size;
offset += written_size;
}
}
}
}
void receive_backend() override
{
auto &logger = *logging::get_logger("transport");
this->ensure_open();
logger.debug("start serial port receive backend");
bool find_head = false;
size_t min_size = P::pred_size(nullptr, 0);
if (!min_size) min_size = 1;
ssize_t recv_size;
size_t pred_size = min_size * 2; // min buffer size to keep,
size_t offset = 0; // scanned data size
size_t cached_size = 0; // read data size
assert(buffer_size >= pred_size);
uint8_t *buffer = new uint8_t[buffer_size];
while (!this->is_closed)
{
recv_size = read(tty_id, ((uint8_t *)buffer) + cached_size, buffer_size - cached_size);
if (recv_size <= 0)
{
if (cached_size == offset) {
usleep(10);
continue;
}
recv_size = 0;
}
#ifdef TRANSPORT_SERIAL_PORT_DEBUG
printf("receive com data (received=%zd,cached=%zu)\nbuffer: ",
recv_size, cached_size);
for (size_t i = 0; i < cached_size + recv_size; ++i)
{
printf("%02x", ((uint8_t *)buffer)[i]);
}
putchar('\n');
#endif
cached_size += recv_size;
if (!find_head)
{
// update offset to scan the header
for (; offset + min_size < cached_size; ++offset)
{
ssize_t pred = P::pred_size(((uint8_t *)buffer) + offset, cached_size - offset);
find_head = pred > 0;
if (find_head)
{
pred_size = pred;
logger.debug("find valid data (length=%zu)", pred_size);
if (pred_size > buffer_size)
{
logger.error("data size is too large (%zu)\n", pred_size);
find_head = false;
pred_size = min_size * 2;
continue;
}
break;
}
}
}
if (find_head && cached_size >= pred_size + offset)
{
// all data received
auto frame = P::make_frame((uint8_t *)buffer + offset, pred_size);
logger.debug("receive data %zu", pred_size);
this->recv_que.Push(std::make_pair(std::move(frame), std::make_shared<TransportToken>(this)));
offset += pred_size; // update offset for next run
}
// clear the cache when the remaining length of the cache is
if (offset && buffer_size - cached_size < pred_size)
{
cached_size -= offset;
memmove(buffer, ((uint8_t *)buffer) + offset, cached_size);
offset = 0;
}
}
delete[] buffer;
}
private:
typedef BaseTransport<P> super;
std::string path;
int tty_id;
int baudrate;
size_t buffer_size;
};
}
#endif

246
include/transport/udp.hpp Normal file
View File

@ -0,0 +1,246 @@
#ifndef _INCLUDE_TRANSPORT_UDP_
#define _INCLUDE_TRANSPORT_UDP_
#include <memory>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include "base.hpp"
#define TRANSPORT_UDP_BUFFER_SIZE 1024
namespace transport {
class DatagramTransportToken : public TransportToken {
public:
explicit DatagramTransportToken(_transport_base* transport, const struct sockaddr_in& addr, socklen_t addr_len)
: TransportToken(transport), addr(addr), addr_len(addr_len) {}
bool operator==(const TransportToken& other) const override
{
auto other_token = dynamic_cast<const DatagramTransportToken*>(&other);
if (!other_token)
{
return false;
}
if (transport_ != other_token->transport_ || addr_len != other_token->addr_len)
{
return false;
}
return memcmp(&addr, &other_token->addr, addr_len) == 0;
}
protected:
struct sockaddr_in addr;
socklen_t addr_len;
friend std::hash<DatagramTransportToken>;
template<typename P>
friend class DatagramTransport;
};
template<typename P>
class DatagramTransport : public BaseTransport<P> {
public:
explicit DatagramTransport(size_t buffer_size = TRANSPORT_UDP_BUFFER_SIZE)
: sockfd(-1), buffer_size(buffer_size)
{
memset(&bind_addr, 0, sizeof(bind_addr));
memset(&connect_addr, 0, sizeof(connect_addr));
bind_addr.sin_family = AF_INET;
connect_addr.sin_family = AF_INET;
}
DatagramTransport(std::pair<const char*, int> local_addr, std::pair<const char*, int> remote_addr, size_t buffer_size = TRANSPORT_UDP_BUFFER_SIZE)
: DatagramTransport(buffer_size)
{
bind_addr.sin_port = htons(local_addr.second);
resolve_hostname(local_addr.first, bind_addr);
connect_addr.sin_port = htons(remote_addr.second);
resolve_hostname(remote_addr.first, connect_addr);
}
~DatagramTransport()
{
close();
}
void open() override
{
auto &logger = *logging::get_logger("transport");
if (this->is_open)
{
return;
} else if (this->is_closed)
{
logger.info("reopen datagram transport");
this->is_open = false;
this->is_closed = false;
}
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
logger.raise_from_errno("failed to create socket");
}
super::open();
if (bind_addr.sin_port)
{
if (::bind(sockfd, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0)
{
logger.raise_from_errno("failed to bind socket");
}
logger.info("listening on %s:%d", inet_ntoa(bind_addr.sin_addr), ntohs(bind_addr.sin_port));
}
}
void close() override
{
if (this->is_open && !this->closed())
{
auto &logger = *logging::get_logger("transport");
logger.info("close socket fd %d", sockfd);
::close(sockfd);
}
super::close();
}
void bind(const std::string& address, int port)
{
this->ensure_open();
auto &logger = *logging::get_logger("transport");
resolve_hostname(address, bind_addr);
bind_addr.sin_port = htons(port);
if (::bind(sockfd, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0)
{
logger.raise_from_errno("failed to bind socket");
}
logger[logging::LogLevel::INFO] << "listening on " << address << ":" << port << std::endl;
}
void connect(const std::string& address, int port)
{
auto &logger = *logging::get_logger("transport");
resolve_hostname(address, connect_addr);
connect_addr.sin_port = htons(port);
logger[logging::LogLevel::INFO] << "connecting to " << address << ":" << port << std::endl;
}
constexpr static std::pair<const char*, int> nulladdr = {"", 0};
protected:
void send_backend() override
{
this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start datagram send backend");
while (!this->is_closed)
{
auto frame_pair = this->send_que.Pop();
auto frame = frame_pair.first;
if (!P::frame_size(frame))
continue;
auto token = dynamic_cast<DatagramTransportToken *>((frame_pair.second.get()));
struct sockaddr* addr = (struct sockaddr *)((token) ? &token->addr : &connect_addr);
socklen_t addr_len = (token) ? token->addr_len : sizeof(connect_addr);
ssize_t sent_size = sendto(sockfd, P::frame_data(frame), P::frame_size(frame), 0,
addr, addr_len);
logger.debug("send data %zd", sent_size);
if (sent_size < 0)
{
logger.error("udp send failed: %s", strerror(errno));
}
else if ((size_t)sent_size < P::frame_size(frame))
{
logger.warn("sendto failed, only %zd bytes sent", sent_size);
}
}
}
void receive_backend() override
{
this->ensure_open();
auto &logger = *logging::get_logger("transport");
logger.debug("start datagram receive backend");
uint8_t *buffer = new uint8_t[buffer_size];
while (!this->is_closed)
{
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
ssize_t recv_size = recvfrom(sockfd, buffer, buffer_size, 0,
(struct sockaddr *)&addr, &addr_len);
if (recv_size < 0)
{
logger.error("udp recv failed: %s", strerror(errno));
continue;
}
logger.debug("receive data %zd", recv_size);
ssize_t pred_size = P::pred_size(buffer, recv_size);
if (pred_size < 0)
{
logger.error("invalid frame received");
continue;
}
auto frame = P::make_frame(buffer, recv_size);
this->recv_que.Push(std::make_pair(frame, std::make_shared<DatagramTransportToken>(this, addr, addr_len)));
}
delete[] buffer;
}
private:
static void resolve_hostname(const std::string& hostname, struct sockaddr_in& result)
{
if (hostname.empty())
{
result.sin_addr.s_addr = INADDR_ANY;
return;
}
std::string hostname_str(hostname);
struct hostent *he = gethostbyname(hostname_str.c_str());
if (he == nullptr)
{
auto &logger = *logging::get_logger("transport");
logger.error("failed to resolve hostname %s: %s", hostname_str.c_str(), hstrerror(h_errno));
throw std::runtime_error("Failed to resolve hostname");
}
memcpy(&result.sin_addr, he->h_addr_list[0], he->h_length);
}
typedef BaseTransport<P> super;
int sockfd;
struct sockaddr_in bind_addr;
struct sockaddr_in connect_addr;
size_t buffer_size;
};
}
namespace std {
template<>
struct hash<transport::DatagramTransportToken> {
size_t operator()(const transport::DatagramTransportToken &token) const
{
std::size_t hash1 = std::hash<transport::TransportToken>()(token);
std::size_t hash2 = std::hash<in_addr_t>()(token.addr.sin_addr.s_addr);
std::size_t hash3 = std::hash<in_port_t>()(token.addr.sin_port);
hash1 ^= (hash2 + 0x9e3779b9 + (hash1 << 6) + (hash1 >> 2));
hash1 ^= (hash3 + 0x9e3779b9 + (hash1 << 6) + (hash1 >> 2));
return hash1;
}
};
}
#endif

View File

@ -0,0 +1,229 @@
#ifndef _INCLUDE_TRANSPORT_UNIX_UDP_
#define _INCLUDE_TRANSPORT_UNIX_UDP_
#include <memory>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
#include <sys/socket.h>
#include "base.hpp"
#define TRANSPORT_UDP_BUFFER_SIZE 1024
namespace transport {
class UnixDatagramTransportToken : public TransportToken {
public:
explicit UnixDatagramTransportToken(_transport_base* transport, const struct sockaddr_un& addr, socklen_t addr_len)
: TransportToken(transport), addr(addr), addr_len(addr_len) {}
bool operator==(const TransportToken& other) const override
{
auto other_token = dynamic_cast<const UnixDatagramTransportToken*>(&other);
if (!other_token)
{
return false;
}
if (transport_ != other_token->transport_ || addr_len != other_token->addr_len)
{
return false;
}
return memcmp(&addr, &other_token->addr, addr_len) == 0;
}
protected:
struct sockaddr_un addr;
socklen_t addr_len;
friend std::hash<UnixDatagramTransportToken>;
template <typename P>
friend class UnixDatagramTransport;
};
template <typename P>
class UnixDatagramTransport : public BaseTransport<P> {
public:
explicit UnixDatagramTransport(size_t buffer_size = TRANSPORT_UDP_BUFFER_SIZE)
: sockfd(-1), buffer_size(buffer_size)
{
memset(&bind_addr, 0, sizeof(bind_addr));
memset(&connect_addr, 0, sizeof(connect_addr));
bind_addr.sun_family = AF_UNIX;
connect_addr.sun_family = AF_UNIX;
}
UnixDatagramTransport(const std::string& local_addr, const std::string& remote_addr, size_t buffer_size = TRANSPORT_UDP_BUFFER_SIZE)
: UnixDatagramTransport(buffer_size)
{
set_sock_path(local_addr, bind_addr);
set_sock_path(remote_addr, connect_addr);
}
~UnixDatagramTransport()
{
close();
}
void open() override
{
auto& logger = *logging::get_logger("transport");
if (this->is_open)
{
return;
}
else if (this->is_closed)
{
logger.info("reopen datagram transport");
this->is_open = false;
this->is_closed = false;
}
sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sockfd < 0)
{
logger.raise_from_errno("failed to create socket");
}
super::open();
if (*bind_addr.sun_path)
{
_bind();
}
}
void close() override
{
if (this->is_open && !this->closed())
{
auto &logger = *logging::get_logger("transport");
logger.info("close socket fd %d", sockfd);
::close(sockfd);
if (*bind_addr.sun_path)
unlink(bind_addr.sun_path);
}
super::close();
}
void bind(const std::string& address)
{
this->ensure_open();
set_sock_path(address, bind_addr);
_bind();
}
void connect(const std::string& address)
{
set_sock_path(address, connect_addr);
auto& logger = *logging::get_logger("transport");
logger[logging::LogLevel::INFO] << "connecting to " << address << std::endl;
}
protected:
void send_backend() override
{
this->ensure_open();
auto& logger = *logging::get_logger("transport");
logger.debug("start datagram send backend");
while (!this->is_closed)
{
auto frame_pair = this->send_que.Pop();
auto frame = frame_pair.first;
if (!P::frame_size(frame))
continue;
auto token = dynamic_cast<UnixDatagramTransportToken *>((frame_pair.second.get()));
struct sockaddr* addr = (struct sockaddr *)((token) ? &token->addr : &connect_addr);
socklen_t addr_len = (token) ? token->addr_len : sizeof(connect_addr);
ssize_t sent_size = sendto(sockfd, P::frame_data(frame), P::frame_size(frame), 0,
addr, addr_len);
logger.debug("send data %zd", sent_size);
if (sent_size < 0)
{
logger.error("unix udp send failed: %s", strerror(errno));
}
else if ((size_t)sent_size < P::frame_size(frame))
{
logger.warn("sendto failed, only %zd bytes sent", sent_size);
}
}
}
void receive_backend() override
{
this->ensure_open();
auto& logger = *logging::get_logger("transport");
logger.debug("start datagram receive backend");
uint8_t *buffer = new uint8_t[buffer_size];
while (!this->is_closed)
{
struct sockaddr_un addr;
socklen_t addr_len = sizeof(addr);
ssize_t recv_size = recvfrom(sockfd, buffer, buffer_size, 0,
(struct sockaddr *)&addr, &addr_len);
if (recv_size < 0)
{
logger.error("unix udp recv failed: %s", strerror(errno));
continue;
}
logger.debug("receive data %zd", recv_size);
ssize_t pred_size = P::pred_size(buffer, recv_size);
if (pred_size < 0)
{
logger.error("invalid frame received");
continue;
}
auto frame = P::make_frame(buffer, recv_size);
this->recv_que.Push(std::make_pair(frame, std::make_shared<UnixDatagramTransportToken>(this, addr, addr_len)));
}
delete[] buffer;
}
private:
static void set_sock_path(const std::string& path, struct sockaddr_un& result)
{
if (path.size() + 1 >= sizeof(result.sun_path))
{
auto& logger = *logging::get_logger("transport");
logger.fatal("socket path too long");
throw std::runtime_error("socket path too long");
}
strncpy(result.sun_path, path.data(), path.size());
result.sun_path[path.size()] = '\0';
}
void _bind()
{
auto& logger = *logging::get_logger("transport");
unlink(bind_addr.sun_path);
if (::bind(sockfd, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0)
{
logger.raise_from_errno("failed to bind socket");
}
logger.info("listening on %s", bind_addr.sun_path);
}
typedef BaseTransport<P> super;
int sockfd;
struct sockaddr_un bind_addr;
struct sockaddr_un connect_addr;
size_t buffer_size;
};
}
namespace std {
template<>
struct hash<transport::UnixDatagramTransportToken> {
size_t operator()(const transport::UnixDatagramTransportToken &token) const
{
std::size_t hash1 = std::hash<transport::TransportToken>()(token);
std::size_t hash2 = std::hash<std::string>()(token.addr.sun_path);
hash1 ^= (hash2 + 0x9e3779b9 + (hash1 << 6) + (hash1 >> 2));
return hash1;
}
};
}
#endif

View File

@ -215,13 +215,6 @@ ssize_t ComFrame_Send(int tty_id, const ComFrame* frame, const bool swap_endian)
return 0; return 0;
} }
ssize_t SendTelemetryErrorMsg(int tty_id, uint16_t address, uint16_t code, const bool swap_endian) {
ComFrame* f_error = NewTelemetryErrorMsg(address, code, true);
ssize_t wr = ComFrame_Send(tty_id, f_error, swap_endian);
ComFrame_Del(f_error);
return wr;
}
#define COM_FRAME_RECEIVE_PRED_SIZE (2 * sizeof(ComFrameHeader)) #define COM_FRAME_RECEIVE_PRED_SIZE (2 * sizeof(ComFrameHeader))
ComFrame* ComFrame_ReceiveEx(int tty_id, void* buffer, const size_t buffer_size, size_t* p_offset, size_t* p_cached_size, bool* p_run_flag, const bool swap_endian) { ComFrame* ComFrame_ReceiveEx(int tty_id, void* buffer, const size_t buffer_size, size_t* p_offset, size_t* p_cached_size, bool* p_run_flag, const bool swap_endian) {

View File

@ -10,9 +10,9 @@
#include "logging/logger.hpp" #include "logging/logger.hpp"
#include "dataqueue.hpp" #include "dataqueue.hpp"
#include "exint/event.h" #include "exint/event.h"
#include "exint/detail.h" #include "exint/detail.hpp"
using EventCallbackData = std::tuple<std::string, event_callback, size_t, std::unique_ptr<uint8_t[]>, void*>; using EventCallbackData = std::tuple<std::string, event_callback, size_t, std::shared_ptr<uint8_t[]>, void*>;
std::atomic_size_t _event_thread_count { 0 }; std::atomic_size_t _event_thread_count { 0 };
DataQueue<EventCallbackData> _event_queue; DataQueue<EventCallbackData> _event_queue;
@ -25,21 +25,26 @@ static std::unordered_map<std::string, std::vector<std::pair<event_callback, voi
void exint_event(const char *event_name, size_t args_size, void *args) { void exint_event(const char *event_name, size_t args_size, void *args) {
auto& event_map = get_event_map(); auto& event_map = get_event_map();
auto it = event_map.find(event_name); auto it = event_map.find(event_name);
if (it != event_map.end()) { if (it == event_map.end()) {
for (auto &pair : it->second) { logger.warn("Event '%s' not found", event_name);
if (!_event_thread_count.load()) { return;
pair.first(event_name, args_size, args, pair.second); }
continue; std::shared_ptr<uint8_t[]> data(new uint8_t[args_size]);
} memcpy(data.get(), args, args_size);
uint8_t* args_copy;
if (args_size == 0) { for (auto &pair : it->second) {
args_copy = NULL; if (!_event_thread_count.load()) {
} else { pair.first(event_name, args_size, args, pair.second);
args_copy = new uint8_t[args_size]; continue;
memcpy(args_copy, args, args_size);
}
_event_queue.Push(std::make_tuple(event_name, pair.first, args_size, std::unique_ptr<uint8_t[]>(args_copy), pair.second));
} }
uint8_t* args_copy;
if (args_size == 0) {
args_copy = NULL;
} else {
args_copy = new uint8_t[args_size];
memcpy(args_copy, args, args_size);
}
_event_queue.Push(std::make_tuple(event_name, pair.first, args_size, data, pair.second));
} }
} }
@ -77,7 +82,7 @@ void* exint_event_thread(void*) {
std::string event_name; std::string event_name;
event_callback callback; event_callback callback;
size_t args_size; size_t args_size;
std::unique_ptr<uint8_t[]> args; std::shared_ptr<uint8_t[]> args;
void* user_data; void* user_data;
queue_epoch_t queue_epoch = _event_queue.GetEpoch(); queue_epoch_t queue_epoch = _event_queue.GetEpoch();
_event_thread_count.fetch_add(1); _event_thread_count.fetch_add(1);

View File

@ -3,84 +3,177 @@
#include "comframe.h" #include "comframe.h"
#include "CCfgFileParser.h" #include "CCfgFileParser.h"
#include "inicpp.hpp" #include "inicpp.hpp"
#include "exint/detail.h" #include "transport/serial_port.hpp"
#include "exint/detail.hpp"
#include "exint/event.h" #include "exint/event.h"
#include "exint/handler.h"
#include "exint/protocol.hpp"
static auto& logger = *logging::get_logger("exint"); static auto& logger = *logging::get_logger("exint");
static et_callback_t et_callback; static et_callback_t et_callback;
static pthread_t upperhost_thread; std::unique_ptr<Transport> g_upperhost_transport;
static pthread_t telemetry_thread; std::unique_ptr<Transport> g_telemetry_transport;
int g_iHostCom_tty_id = 0;
int g_iTelemetry_Com_tty_id = 0;
int g_iEnableAlarmCode = true; int g_iEnableAlarmCode = true;
bool g_bTelemetry_Open = true;
bool g_bKeepExintRuning = false; bool g_bKeepExintRuning = false;
int g_iExternTimeDifference = 0; int g_iExternTimeDifference = 0;
int g_iUseHostComForTelemetry = true; int g_iUseHostComForTelemetry = true;
uint8_t g_iAlarmCode[4] = {0xAA, 0xAA, 0xAA, 0xAA}; uint8_t g_iAlarmCode[4] = {0xAA, 0xAA, 0xAA, 0xAA};
EtHandlerDef upper_host_handler_def[] = {
{COM_FRAME_TYPE_COMMAND, ET_HDL_FLAG_DEFAULT, command_handler},
{COM_FRAME_TYPE_REQUEST, ET_HDL_FLAG_DEFAULT, request_handler},
{COM_FRAME_TYPE_ALARM, ET_HDL_FLAG_DEFAULT, alarm_handler},
{COM_FRAME_TYPE_TELEMETRY_REQUEST, ET_HDL_FLAG_NOVERIFY, upperhost_telemetry_request_handler},
{0, 0, NULL}
};
EtHandlerDef telemetry_handler_def[] = {
{COM_FRAME_TYPE_TELEMETRY_REQUEST, ET_HDL_FLAG_NOVERIFY, telemetry_request_handler},
{0, 0, NULL}
};
static int read_config(const char* config_path) { static int read_config(const char* config_path) {
const char* config_section = "External-Interface";
const char* log_section = "General-Setting";
CCfgFileParser config; CCfgFileParser config;
if (config.parseFile(config_path) < 0) { if (config.parseFile(config_path) < 0) {
logger.error("read config file %s failed", config_path); logger.error("read config file %s failed", config_path);
return -1; return -1;
} }
const char* host_com_path = NULL;
if (!config.getValue("System-Setting", "HostInfo_COM_Path", host_com_path)) { const char* log_path;
logger.error("fail to read [System-Setting]:HostInfo_ExchangeMode from %s", config_path); if (config.getValue(log_section, "Log_Path", log_path)) {
logging::get_global_logger().add_stream(std::ofstream(std::string(log_path) + "/external_interface.log"));
} }
long host_com_baudrate = 115200; const char* log_level = "WARN";
if (!config.getIntValue("System-Setting", "HostInfo_COM_Baud_Rate", host_com_baudrate)) { if (config.getValue(log_section, "Log_Level", log_level)) {
logger.error("fail to read [System-Setting]:HostInfo_COM_Baud_Rate from %s", config_path); logging::get_global_logger().set_level(logging::str2level(log_level));
} }
if (host_com_path != NULL)
g_iHostCom_tty_id = get_com_tty_id(host_com_path, host_com_baudrate); #define read_transport(prefix, transport_var) do { \
long transport_open;\
if (config.getIntValue(config_section, #prefix "_Transport_Open", transport_open)) {\
logger.error("failed to read [%s]:" #prefix "_Transport_Open in %s", config_section, config_path); \
} else if (transport_open) { \
const char* transport_mode = "COM"; \
if (!config.getValue(config_section, #prefix "_Transport_Mode", transport_mode)) {\
logger.error("failed to read [%s]:" #prefix "_Transport_Mode in %s", config_section, config_path); \
}\
if (strcasecmp(transport_mode, "com") == 0) {\
const char* com_path;\
if (!config.getValue(config_section, #prefix "_COM_Path", com_path)) {\
logger.error("failed to read [%s]:" #prefix "_COM_Path in %s", config_section, config_path); \
return -1;\
}\
long baudrate = 115200;\
if (!config.getIntValue(config_section, #prefix "_COM_Baudrate", baudrate)) {\
logger.error("failed to read [%s]:" #prefix "_COM_Baudrate in %s", config_section, config_path); \
}\
transport_var = std::unique_ptr<Transport>(\
new transport::SerialPortTransport<ComframeProtocol>(com_path, baudrate)\
);\
} else if (strcasecmp(transport_mode, "datagram") == 0 || strcasecmp(transport_mode, "udp")) {\
const char* host = "0.0.0.0";\
if (!config.getValue(config_section, #prefix "_UDP_Host", host)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Host in %s", config_section, config_path); \
return -1;\
}\
long port = 5000; \
if (!config.getIntValue(config_section, #prefix "_UDP_Port", port)) {\
logger.error("failed to read [%s]:" #prefix "_UDP_Port in %s", config_section, config_path); \
}\
} else {\
logger.error("unknown transport mode: %s", transport_mode);\
}\
}\
} while (0)
read_transport(HostInfo, g_upperhost_transport);
read_transport(Telemetry, g_telemetry_transport);
#undef read_transport
long use_host_com_for_telemetry = 1; long use_host_com_for_telemetry = 1;
if (!config.getIntValue("System-Setting", "Use_HostCOM_for_Telemetry", use_host_com_for_telemetry)) { if (!config.getIntValue(config_section, "Use_HostCOM_for_Telemetry", use_host_com_for_telemetry)) {
logger.error("fail to read [System-Setting]:Use_HostCOM_for_Telemetry from %s", config_path); logger.error("fail to read [%s]:Use_HostCOM_for_Telemetry from %s", config_section, config_path);
} }
g_iUseHostComForTelemetry = use_host_com_for_telemetry == 1; if (!use_host_com_for_telemetry) {
long telemetry_open = 1; find_handler(ET_TYPE_TELEMETRY_REQUEST, upper_host_handler_def)->hd_flags |= ET_HDL_FLAG_DISABLED;
if (!config.getIntValue("System-Setting", "Telemetry_Open", telemetry_open)) {
logger.error("fail to read [System-Setting]:Telemetry_Open from %s", config_path);
}
g_bTelemetry_Open = telemetry_open == 1;
if (g_bTelemetry_Open) {
const char* telemetry_com_path = NULL;
if (!config.getValue("System-Setting", "Telemetry_COM_Path", telemetry_com_path)) {
logger.error("fail to read [System-Setting]:Telemetry_COM_Path from %s", config_path);
}
long telemetry_com_baudrate = 115200;
if (!config.getIntValue("System-Setting", "Telemetry_COM_Baud_Rate", telemetry_com_baudrate)) {
logger.error("fail to read [System-Setting]:Telemetry_COM_Baud_Rate from %s", config_path);
}
if (telemetry_com_path != NULL)
g_iTelemetry_Com_tty_id = get_com_tty_id(telemetry_com_path, telemetry_com_baudrate);
} }
return 0; return 0;
} }
void exint_handler(Transport& transport, struct EtHandlerDef *handlers) {
transport.open();
while (!transport.closed()) {
Transport::DataPair data_pair;
try {
data_pair = transport.receive();
} catch (const QueueCleared &e) {
logger.debug("Transport closed, exiting");
break;
}
auto frame = data_pair.first;
auto token = data_pair.second;
if (!frame) continue;
auto frame_type = ComFrame_TYPE(frame.get());
logger.debug("Received frame of type %d", frame_type);
auto handler = find_handler(frame_type, handlers);
if (!handler)
{
logger.warn("No handler for frame type %d", frame_type);
}
else if (handler->hd_flags & ET_HDL_FLAG_DISABLED)
{
logger.debug("Handler for frame type %d is disabled", frame_type);
}
else if (!(handler->hd_flags & ET_HDL_FLAG_NOVERIFY) && ComFrame_Verify(frame.get(), false))
{
logger.warn("Frame verification failed");
}
else if (handler->hd_flags & ET_HDL_FLAG_ASYNC)
{
std::thread([handler, frame, token, &transport]()
{
auto reply = handler->hd_handler(handler, frame.get());
if (reply) {
transport.send(std::shared_ptr<ComFrame>(reply, ComFrame_Del), token);
} })
.detach();
}
else
{
auto reply = handler->hd_handler(handler, data_pair.first.get());
if (reply)
{
transport.send(std::shared_ptr<ComFrame>(reply, ComFrame_Del), token);
}
}
}
}
void exint_setup(std::unique_ptr<Transport>&& upperhost_transport, std::unique_ptr<Transport>&& telemetry_transport, et_callback_t cb) {
et_callback = cb;
if (upperhost_transport)
g_upperhost_transport = std::move(upperhost_transport);
if (telemetry_transport)
g_telemetry_transport = std::move(telemetry_transport);
g_bKeepExintRuning = true;
exint_event_thread_start(1);
exint_event_register("send_command", send_command_event_callback, g_upperhost_transport.get());
std::thread(
std::bind(exint_handler, std::ref(*g_upperhost_transport.get()), upper_host_handler_def)
).detach();
std::thread(
std::bind(exint_handler, std::ref(*g_telemetry_transport.get()), telemetry_handler_def)
).detach();
}
int exint_initialize(const char* config_path, et_callback_t cb) { int exint_initialize(const char* config_path, et_callback_t cb) {
if (read_config(config_path)) return -1; if (read_config(config_path)) return -1;
et_callback = cb; exint_setup(nullptr, nullptr, cb);
g_bKeepExintRuning = true;
exint_event_thread_start(1);
pthread_create(&upperhost_thread, NULL, upper_host_com_thread, NULL);
pthread_create(&telemetry_thread, NULL, telemetry_host_com_thread, NULL);
return 0;
}
int exint_init_from_tty(int host_com_tty, int telemetry_com_tty, et_callback_t cb) {
et_callback = cb;
g_iHostCom_tty_id = host_com_tty;
g_iTelemetry_Com_tty_id = telemetry_com_tty;
g_bKeepExintRuning = true;
exint_event_thread_start(1);
pthread_create(&upperhost_thread, NULL, upper_host_com_thread, NULL);
pthread_create(&telemetry_thread, NULL, telemetry_host_com_thread, NULL);
return 0; return 0;
} }
@ -94,6 +187,7 @@ void exint_send(uint32_t type, size_t len, void* data) {
event_name = "send_audio"; event_name = "send_audio";
break; break;
default: default:
logger.error("Unknown type %d", type);
return; return;
} }
exint_event(event_name, len, data); exint_event(event_name, len, data);
@ -101,8 +195,8 @@ void exint_send(uint32_t type, size_t len, void* data) {
void exint_finialize() { void exint_finialize() {
g_bKeepExintRuning = false; g_bKeepExintRuning = false;
pthread_cancel(upperhost_thread); g_upperhost_transport->close();
pthread_cancel(telemetry_thread); g_telemetry_transport->close();
exint_event_thread_stop(); exint_event_thread_stop();
} }

128
src/handler/alarm.cpp Normal file
View File

@ -0,0 +1,128 @@
#include "logging/logger.hpp"
#include "exint/detail.hpp"
#include "exint/handler.h"
static auto& logger = *logging::get_logger("exint::alarm");
//获取指令文本与报警码
AlarmData g_AlarmTable[] = {
{0x61006100, "热控A回路压力报警"},
{0x62006200, "热控B回路压力报警"},
{0x63006300, "热控分系统,参数异常"},
{0x64006400, "环控水箱满提醒"},
{0x65006500, "氮瓶压力过低报警"},
{0x66006600, "氧瓶压力过低报警"},
{0x67006700, "返回舱,火灾报警"},
{0x68006800, "返回舱,总压告警"},
{0x69006900, "返回舱,氧分压告警"},
{0x70007000, "返回舱,二氧化碳,分压告警"},
{0x71007100, "内回路泵,壁,转速报警"},
{0x72007200, "循环副风机转速报警"},
{0x73007300, "着陆器密封舱,总压报警"},
{0x74007400, "着陆器密封舱,氧分压报警"},
{0x75007500, "着陆器密封舱,二氧化碳分压报警"},
{0x76007600, "着陆器密封舱,火灾报警"},
{0x77007700, "着陆器辐射器,泄漏报警"},
{0x01000100, "起飞"},
{0x02000200, "船箭分离"},
{0x03000300, "组合体停靠"},
{0x04000400, "组合体分离"},
{0x05000500, "服返分离"},
{0x06000600, "回收开始"},
{0x07000700, "着陆"},
{0x08000800, "逃逸塔逃逸"},
{0x09000900, "预留"},
{0x0A000A00, "逃逸塔分离"},
{0x11001100, "整船逃逸"},
{0x71177117, "五分钟准备"},
{0x72177217, "一分钟准备"},
{0x73177317, "对接准备"},
{0x74177417, "对接环接触"},
{0x75177517, "分离完成"},
{0x76177617, "整流罩分离"},
{0x77177717, "帆板展开"},
{0x78177817, "船箭分离准备"},
{0x79177917, "帆板展开准备"},
{0x7A177A17, ""}, // 预留
{0x7B177B17, "三十分钟后变轨"},
{0x7C177C17, "十分钟后变轨"},
{0x7D177D17, "五分钟后变轨"},
{0x7E177E17, "轨控发动机,开机指令发出"},
{0x7F177F17, "轨控发动机,关机指令发出"},
{0x80178017, "变轨结束"},
{0x81178117, "GNC转入自主控制模式"},
{0x82178217, "开始接近飞行"},
{0x83178317, "开始偏航调姿"},
{0x84178417, "偏航调姿完成"},
{0x85178517, "转平移,靠拢段"},
{0x86178617, "进入停泊点"},
{0x87178717, "开始接近飞行"},
{0x88178817, "开始俯仰调姿"},
{0x89178917, "俯仰调姿完成"},
{0x8A178A17, "最后靠拢飞行"},
{0x8B178B17, ""}, //预留
{0x8C178C17, ""}, //预留
{0x8D178D17, "转芬离,撤离段"},
{0x8E178E17, ""}, // 预留
{0x8F178F17, "开始撤退飞行"},
{0x90179017, "转返回准备段"},
{0x91179117, "开始调姿"},
{0x92179217, ""}, //预留
{0x93179317, ""}, //预留
{0x94179417, ""}, //预留
{0x95179517, "2分钟后服返分离"},
{0x96179617, ""}, //预留
{0x97179717, ""}, //预留
{0x98179817, ""}, //预留
{0x99179917, "2分钟后回收启动"},
{0x9A179A17, "回收开关接通"},
{0x9B179B17, ""}, //预留
{0x9C179C17, "着陆准备"},
{0x9D179D17, ""}, //预留
{0x9E179E17, "三十分钟准备"},
{0x9F179F17, "制动发动机开机,指令发出"},
{0xA017A017, "制动发动机关机,指令发出"},
{0xA117A117, "逃逸转入值班"},
{0, NULL}
};
//获取指令文本与报警码
AlarmData* GetAlarmTextByCode(uint8_t cmd_code[], uint8_t iAlarmCode[4])
{
uint32_t iCode = *(uint32_t*)cmd_code ;
uint32_t* pAlarmCode = (uint32_t*)iAlarmCode;
AlarmData* pAlarmData = g_AlarmTable;
while (pAlarmData->alarm_id != 0)
{
if (pAlarmData->alarm_id == iCode)
{
*pAlarmCode = iCode;
return pAlarmData;
}
pAlarmData++;
}
*pAlarmCode = 0;
logger.warn("Unknown alarm code: %08X", iCode);
return NULL;
}
ComFrame* alarm_handler(EtHandlerDef *hdl, ComFrame* frame) {
if (frame->payload[0] != 0xAA || frame->payload[1] != 0x55) {
return NULL;
}
auto alarm = GetAlarmTextByCode(frame->payload+2, g_iAlarmCode);
if (!alarm)
return NULL;
exint_handle_pack(ET_TYPE_TEXT, sizeof(alarm), alarm);
uint8_t payload[6] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
return ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_ALARM_RESPONSE,
payload, 6, true
);
}

View File

@ -1 +0,0 @@
#include "exint/handler.h"

116
src/handler/command.cpp Normal file
View File

@ -0,0 +1,116 @@
#include <atomic>
#include <mutex>
#include <deque>
#include <memory>
#include <chrono>
#include "logging/logger.hpp"
#include "exint/detail.hpp"
#include "exint/event.h"
#include "exint/handler.h"
#include "exint/protocol.hpp"
#define COMMAND_RESEND_INTERVAL 250
#define COMMAND_RESEND_MAX_COUNT 3
struct _CommandResendToken {
Transport* transport;
uint8_t data[6];
_CommandResendToken(Transport* transport, void* data) : transport(transport) {
memcpy(this->data, data, 6);
}
};
struct _CommandResend {
std::chrono::time_point<std::chrono::steady_clock> last_send_time;
uint8_t token_id;
int count;
_CommandResend(uint8_t token_id) : token_id(token_id) {
last_send_time = std::chrono::steady_clock::now();
count = 0;
}
};
static auto& logger = *logging::get_logger("exint::command");
static std::mutex g_command_resend_mutex;
static std::deque<std::pair<uint8_t, _CommandResendToken>> g_command_resend_tokens;
static std::atomic<uint8_t> g_command_resend_token_id;
ComFrame* command_handler(EtHandlerDef *hdl, ComFrame* frame) {
if (ComFrame_PAYLOAD_LENGTH(frame) != 6) {
return NULL;
}
exint_event("recv_command", 6, ComFrame_PAYLOAD(frame));
return NULL;
}
ComFrame* request_handler(EtHandlerDef *hdl, ComFrame* frame) {
if (g_command_resend_tokens.empty())
return NULL;
std::lock_guard<std::mutex> lock(g_command_resend_mutex);
if (!g_command_resend_tokens.empty()) {
(void)g_command_resend_tokens.pop_front();
}
return NULL;
}
void send_command_event_callback(const char* event_name, size_t args_size, void* args, void* user_data) {
Transport* transport = (Transport*)user_data;
transport->send(std::shared_ptr<ComFrame>(
ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_REQUEST, args, args_size, true),
ComFrame_Del
));
auto token_id = g_command_resend_token_id.fetch_add(1);
{
std::lock_guard<std::mutex> lock(g_command_resend_mutex);
g_command_resend_tokens.push_back(
std::make_pair(token_id, _CommandResendToken(transport, args))
);
}
_CommandResend resend_data(token_id);
exint_event("_resend_command", sizeof(resend_data), &resend_data);
}
ON_EVENT(_resend_command) {
_CommandResend resend_data = *(_CommandResend*)args;
auto now = std::chrono::steady_clock::now();
if (now - resend_data.last_send_time < std::chrono::milliseconds(COMMAND_RESEND_INTERVAL)) {
return;
}
_CommandResendToken* token = NULL;
for (auto& item : g_command_resend_tokens) {
if (item.first == resend_data.token_id) {
token = &item.second;
break;
}
}
if (token == NULL) {
return;
}
auto transport = token->transport;
transport->send(std::shared_ptr<ComFrame>(
ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_REQUEST, token->data, 6, true),
ComFrame_Del
));
resend_data.count++;
resend_data.last_send_time = now;
if (resend_data.count > 3) {
char buffer[13];
for (int i = 0; i < 6; i++) {
sprintf(buffer + i * 2, "%02x", token->data[i]);
}
buffer[12] = '\0';
logger.warn("Command %s send failed", buffer);
} else {
exint_event("_resend_command", sizeof(resend_data), &resend_data);
}
}
ON_EVENT(recv_command) {
exint_handle_pack(ET_TYPE_COMMAND, args_size, args);
}

0
src/handler/misc.cpp Normal file
View File

237
src/handler/telemetry.cpp Normal file
View File

@ -0,0 +1,237 @@
#include <deque>
#include <sys/sysinfo.h>
#include "dataqueue.hpp"
#include "logging/logger.hpp"
#include "telemetry.h"
#include "exint/event.h"
#include "exint/detail.hpp"
#include "exint/handler.h"
static auto& logger = *logging::get_logger("exint::telemetry");
static TelemetryCommandInfo g_receiveCommandInfo;
static std::deque<TelemetryCommandInfo> g_sendCommandList;
static DataQueue<TelemetryCommandInfo> g_recvCommandQueue;
void get_telemetry_data(TelemetryRequestData* data) {
exint_handle_pack(ET_TYPE_TELEMETRY_REQUEST, sizeof(TelemetryRequestData), data);
}
/* create a msg data for upper host telemetry*/
void set_telemetry_host_data(TelemetryData4UpperHost* pTelemetryData) {
static uint8_t iTelemetryCount = 0;
TelemetryRequestData data;
get_telemetry_data(&data);
pTelemetryData->work_status = 0xAA;
pTelemetryData->com_status = 0xAA;
pTelemetryData->coprocessor1_status = 0xAA;
pTelemetryData->coprocessor2_status = 0xAA;
pTelemetryData->voice_circuit_status = 0xAA;
pTelemetryData->telemetry_count = iTelemetryCount++;
if (data.close_asr) {
pTelemetryData->voice_mode = 0xAA;
}
else if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryData->voice_mode = 0x55;
}
else {
pTelemetryData->voice_mode = 0xA5;
}
if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryData->recognition_status = 0x55;
}
else {
pTelemetryData->recognition_status = 0xAA;
}
for (int i = 0; i < 8; i++) {
pTelemetryData->recognition_info[i] = 0;
}
pTelemetryData->particle_detection1 = 0xAA;
pTelemetryData->particle_detection2 = 0xAA;
pTelemetryData->particle_detection3 = 0xAA;
memcpy(&pTelemetryData->receive_command, &g_receiveCommandInfo, sizeof(TelemetryCommandInfo));
if (g_sendCommandList.empty()) {
memset(&pTelemetryData->send_command, 0x00, sizeof(TelemetryCommandInfo));
} else {
memcpy(&pTelemetryData->send_command, &g_sendCommandList.front(), sizeof(TelemetrySendCommandInfo));
}
pTelemetryData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55;
pTelemetryData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55;
pTelemetryData->key_status_backup = 0x55;
pTelemetryData->current_volume = data.volume_grade;
pTelemetryData->system_version_high = data.sys_ver_high;
pTelemetryData->system_version_low = data.sys_ver_low;
pTelemetryData->application_version_high = data.app_ver_high;
pTelemetryData->application_version_low = data.app_ver_low;
pTelemetryData->alarm_code[0] = g_iAlarmCode[0];
pTelemetryData->alarm_code[1] = g_iAlarmCode[1];
pTelemetryData->alarm_code[2] = g_iAlarmCode[2];
pTelemetryData->alarm_code[3] = g_iAlarmCode[3];
for (int i = 0; i < 5; i++) {
pTelemetryData->telemetry_backup[i] = 0xAA;
}
}
/* create a telemetry msg data for telemetry, by gejp 2024.10.03 */
void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo *pReceiveCommandInfo) {
static uint8_t iTelemetryCount = 0;
TelemetryRequestData data;
get_telemetry_data(&data);
pTelemetryHostData->work_status = 0xAA;
pTelemetryHostData->com_status = 0xAA;
pTelemetryHostData->coprocessor1_status = 0xAA;
pTelemetryHostData->coprocessor2_status = 0xAA;
pTelemetryHostData->voice_circuit_status = 0xAA;
pTelemetryHostData->telemetry_count = iTelemetryCount++;
if (data.close_asr) {
pTelemetryHostData->voice_mode = 0xAA;
}
else if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->voice_mode = 0x55;
}
else {
pTelemetryHostData->voice_mode = 0xA5;
}
if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->recognition_status = 0x55;
}
else {
pTelemetryHostData->recognition_status = 0xAA;
}
pTelemetryHostData->particle_detection1 = 0xAA;
pTelemetryHostData->particle_detection2 = 0xAA;
pTelemetryHostData->particle_detection3 = 0xAA;
memcpy(&(pTelemetryHostData->receive_command), (void*)pReceiveCommandInfo, sizeof(TelemetryCommandInfo));
// above same as set_telemetry_host_data
// 5 cmd info in history
size_t i = 0;
for (auto& send_cmd : g_sendCommandList) {
pTelemetryHostData->send_command[i].seqid = send_cmd.seqid & 0xFF;
pTelemetryHostData->send_command[i].flag = 0x66;
pTelemetryHostData->send_command[i].time = send_cmd.time;
memcpy(pTelemetryHostData->send_command[i].code, send_cmd.code, 2);
i++;
}
for (; i < 5; i++) {
memset(pTelemetryHostData->send_command + i, 0x00, sizeof(TelemetrySendCommandInfo));
}
// the following same as set_telemetry_host_data
pTelemetryHostData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->key_status_backup = 0x55;
pTelemetryHostData->current_volume = data.volume_grade;
pTelemetryHostData->system_version_high = data.sys_ver_high;
pTelemetryHostData->system_version_low = data.sys_ver_low;
pTelemetryHostData->application_version_high = data.app_ver_high;
pTelemetryHostData->application_version_low = data.app_ver_low;
}
ComFrame* telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame) {
static uint64_t iTelemetryAnswerTimes = 0;
uint16_t address = ComFrame_ADDRESS(frame);
TelemetryCommandInfo TeleCmdInfo2Send;
TelemetryData TelemetryData2Send;
if (hdl->hd_flags & ET_HDL_FLAG_NOVERIFY && ComFrame_Verify(frame, false)) {
uint8_t code[] = TELEMETRY_ERROR_CHECKSUM;
return NewTelemetryErrorMsg(address, code, true);
}
if (ComFrame_PAYLOAD_LENGTH(frame) != 2 || frame->payload[0] != 0xFF || frame->payload[1] != 0x11) {
uint8_t code[] = TELEMETRY_ERROR_TYPE;
return NewTelemetryErrorMsg(address, code, true);
}
if (iTelemetryAnswerTimes % 3 == 0)
{
if (g_recvCommandQueue.Empty())
{ // empty
if (0) // 2024.11.04 g_bNeedReboot)
{
system("reboot");
return NULL;
}
}
else
{
TeleCmdInfo2Send = g_recvCommandQueue.Pop();
}
set_telemetry_data(&TelemetryData2Send, &TeleCmdInfo2Send);
}
//keep same every 3 answers, count from 0, 1, ...
TelemetryData2Send.send_count = iTelemetryAnswerTimes; //count from 0, 1, ..
iTelemetryAnswerTimes++;
ComFrame* pTeleAnswerMsg = ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_ANSWER, &TelemetryData2Send, sizeof(struct TelemetryData), true);
TelemetryData_SwapEndian((struct TelemetryData*)(pTeleAnswerMsg->payload));
return pTeleAnswerMsg;
}
ComFrame* upperhost_telemetry_request_handler(EtHandlerDef* hdl, ComFrame* frame) {
uint16_t address = ComFrame_ADDRESS(frame);
TelemetryData4UpperHost UpperHostTeleData;
if (hdl->hd_flags & ET_HDL_FLAG_NOVERIFY && ComFrame_Verify(frame, false)) {
uint8_t code[] = TELEMETRY_ERROR_CHECKSUM;
return NewTelemetryErrorMsg(address, code, true);
}
if (ComFrame_PAYLOAD_LENGTH(frame) != 2 || frame->payload[0] != 0xFF || frame->payload[1] != 0x11) {
uint8_t code[] = TELEMETRY_ERROR_TYPE;
return NewTelemetryErrorMsg(address, code, true);
}
set_telemetry_host_data(&UpperHostTeleData); // count from 0, 1, ..
return NewTelemetryAnswerData(address, &UpperHostTeleData, true);
}
ON_EVENT(send_comand) {
if (args_size != 6) return;
uint16_t send_seq = (g_sendCommandList.empty()) ? 0 : g_sendCommandList.front().seqid;
struct sysinfo info;
if (sysinfo(&info)) {
logger.raise_from_errno("sysinfo() failed");
return;
}
uint32_t telemetryTimeSeconds = info.uptime + g_iExternTimeDifference;
TelemetryCommandInfo telemetryCommandInfo = {telemetryTimeSeconds, {((uint8_t*)args)[3], ((uint8_t*)args)[4]}, (uint16_t)(send_seq + 1u)};
g_sendCommandList.push_front(telemetryCommandInfo);
if (g_sendCommandList.size() > TELEMETRY_SEND_COMMAND_INFO_LENGTH) {
g_sendCommandList.pop_back();
}
}
ON_EVENT(recv_command) {
if (args_size != 6) return;
struct sysinfo info;
if (sysinfo(&info)) {
logger.raise_from_errno("sysinfo() failed");
return;
}
uint32_t telemetryTimeSeconds = info.uptime + g_iExternTimeDifference;
uint8_t* payload = (uint8_t*)args;
g_receiveCommandInfo.seqid++;
g_receiveCommandInfo.code[0] = payload[4];
g_receiveCommandInfo.code[1] = payload[5];
g_receiveCommandInfo.time = telemetryTimeSeconds;
g_recvCommandQueue.Push(g_receiveCommandInfo);
}

View File

@ -1,684 +0,0 @@
#include <errno.h>
#include <string.h>
#include <list>
#include <sys/time.h>
#include <sys/sysinfo.h>
#include <unistd.h>
#include "telemetry.h"
#include "dataqueue.hpp"
#include "command.h"
#include "exint/event.h"
#include "exint/detail.h"
#define PrintFilePos()
// int32_t g_iExternTimeDifference = 0;
static TelemetryCommandInfo g_receiveCommandInfo;
static std::list<TelemetryCommandInfo> g_sendCommandList;
static DataQueue<TelemetryCommandInfo> g_recvCommandQueue;
volatile static uint8_t g_iUpperHostTelemetryCount = 0;
volatile static uint8_t g_iTelemetryCount = 0;
// volatile static uint8_t telemetry_3package_loop_count[2] = {0, 0}; // added by gejp
// volatile static uint8_t telemetry_3package_count[2] = {0, 0}; // added by gejp
volatile bool need_resend = false;
volatile bool g_bNeedReboot = false;
volatile int32_t g_iMS2Reboot = 1000;
void get_telemetry_data(TelemetryRequestData* data) {
exint_handle_pack(ET_TYPE_TELEMETRY_REQUEST, sizeof(TelemetryRequestData), data);
}
/* create a msg data for upper host telemetry*/
void set_telemetry_host_data(TelemetryData4UpperHost* pTelemetryData) {
TelemetryRequestData data;
get_telemetry_data(&data);
pTelemetryData->work_status = 0xAA;
pTelemetryData->com_status = 0xAA;
pTelemetryData->coprocessor1_status = 0xAA;
pTelemetryData->coprocessor2_status = 0xAA;
pTelemetryData->voice_circuit_status = 0xAA;
pTelemetryData->telemetry_count = g_iUpperHostTelemetryCount++;
if (data.close_asr) {
pTelemetryData->voice_mode = 0xAA;
}
else if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryData->voice_mode = 0x55;
}
else {
pTelemetryData->voice_mode = 0xA5;
}
if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryData->recognition_status = 0x55;
}
else {
pTelemetryData->recognition_status = 0xAA;
}
for (int i = 0; i < 8; i++) {
pTelemetryData->recognition_info[i] = 0;
}
pTelemetryData->particle_detection1 = 0xAA;
pTelemetryData->particle_detection2 = 0xAA;
pTelemetryData->particle_detection3 = 0xAA;
memcpy(&pTelemetryData->receive_command, &g_receiveCommandInfo, sizeof(TelemetryCommandInfo));
if (g_sendCommandList.empty()) {
memset(&pTelemetryData->send_command, 0x00, sizeof(TelemetryCommandInfo));
} else {
memcpy(&pTelemetryData->send_command, &g_sendCommandList.front(), sizeof(TelemetrySendCommandInfo));
}
//if (g_bVolumeKeyState) {
// pTelemetryData->volume_key_status = 0xAA;
//}
//else {
// pTelemetryData->volume_key_status = 0x55;
//}
//if (g_bWakeupKeyState) {
// pTelemetryData->wake_key_status = 0xAA;
//}
//else {
// pTelemetryData->wake_key_status = 0x55;
//}
//pTelemetryData->volume_key_status = g_bVolumeKeyState ? 0xAA : 0x55;
//pTelemetryData->wake_key_status = g_bWakeupKeyState ? 0xAA : 0x55;
pTelemetryData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55;
pTelemetryData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55;
pTelemetryData->key_status_backup = 0x55;
pTelemetryData->current_volume = data.volume_grade;
pTelemetryData->system_version_high = data.sys_ver_high;
pTelemetryData->system_version_low = data.sys_ver_low;
pTelemetryData->application_version_high = data.app_ver_high;
pTelemetryData->application_version_low = data.app_ver_low;
// 2024.10.10
if (g_iEnableAlarmCode == 0) {
pTelemetryData->alarm_code[0] = 0xAA;
pTelemetryData->alarm_code[1] = 0xAA;
pTelemetryData->alarm_code[2] = 0xAA;
pTelemetryData->alarm_code[3] = 0xAA;
} else {
pTelemetryData->alarm_code[0] = g_iAlarmCode[0];
pTelemetryData->alarm_code[1] = g_iAlarmCode[1];
pTelemetryData->alarm_code[2] = g_iAlarmCode[2];
pTelemetryData->alarm_code[3] = g_iAlarmCode[3];
}
for (int i = 0; i < 5; i++) {
pTelemetryData->telemetry_backup[i] = 0xAA;
}
}
/* create a telemetry msg data for telemetry, by gejp 2024.10.03 */
void set_telemetry_data(TelemetryData* pTelemetryHostData, TelemetryCommandInfo *pReceiveCommandInfo) {
TelemetryRequestData data;
get_telemetry_data(&data);
pTelemetryHostData->work_status = 0xAA;
pTelemetryHostData->com_status = 0xAA;
pTelemetryHostData->coprocessor1_status = 0xAA;
pTelemetryHostData->coprocessor2_status = 0xAA;
pTelemetryHostData->voice_circuit_status = 0xAA;
pTelemetryHostData->telemetry_count = g_iTelemetryCount++;
if (data.close_asr) {
pTelemetryHostData->voice_mode = 0xAA;
}
else if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->voice_mode = 0x55;
}
else {
pTelemetryHostData->voice_mode = 0xA5;
}
if (data.sys_state == ET_SYS_STATUS_WAKE) {
pTelemetryHostData->recognition_status = 0x55;
}
else {
pTelemetryHostData->recognition_status = 0xAA;
}
pTelemetryHostData->particle_detection1 = 0xAA;
pTelemetryHostData->particle_detection2 = 0xAA;
pTelemetryHostData->particle_detection3 = 0xAA;
memcpy(&(pTelemetryHostData->receive_command), (void*)pReceiveCommandInfo, sizeof(TelemetryCommandInfo));
// above same as set_telemetry_host_data
// 5 cmd info in history
size_t i = 0;
for (auto& send_cmd : g_sendCommandList) {
pTelemetryHostData->send_command[i].seqid = send_cmd.seqid & 0xFF;
pTelemetryHostData->send_command[i].flag = 0x66;
pTelemetryHostData->send_command[i].time = send_cmd.time;
memcpy(pTelemetryHostData->send_command[i].code, send_cmd.code, 2);
i++;
}
for (; i < 5; i++) {
memset(pTelemetryHostData->send_command + i, 0x00, sizeof(TelemetrySendCommandInfo));
}
// the following same as set_telemetry_host_data
pTelemetryHostData->volume_key_status = data.volume_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->wake_key_status = data.wakeup_key_pressed ? 0xAA : 0x55;
pTelemetryHostData->key_status_backup = 0x55;
pTelemetryHostData->current_volume = data.volume_grade;
pTelemetryHostData->system_version_high = data.sys_ver_high;
pTelemetryHostData->system_version_low = data.sys_ver_low;
pTelemetryHostData->application_version_high = data.app_ver_high;
pTelemetryHostData->application_version_low = data.app_ver_low;
}
__inline bool is_command(uint8_t* array, uint8_t* value, int size) {
for (int i = 0; i < size; i++) {
if (array[i] != value[i]) {
return false;
}
}
return true;
}
void* telemetry_host_com_thread(void* path) {//Integrated Business Unit, telemetry_com综合业务单元遥测口
int tty_id = g_iTelemetry_Com_tty_id; // TODO
const uint32_t buffer_size = 1024 * 100; // TODO
const uint16_t address = COM_FRAME_ADDRESS_VOIX; // TODO
size_t offset = 0, cached_size = 0;
uint8_t* buffer = new uint8_t[buffer_size];
uint64_t iTelemetryAnswerTimes = 0;
TelemetryCommandInfo TeleCmdInfo2Send;
TelemetryData TelemetryData2Send; // gejp 2024.10.03
memset(&TeleCmdInfo2Send, 0, sizeof(TelemetryCommandInfo));
while (g_bKeepExintRuning) {
// if ( (iTelemetryAnswerTimes % 3 == 0) && g_bNeedReboot) system("reboot");
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepExintRuning, true);
if (frame == NULL) break;
//ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true);
if (ComFrame_Verify(frame, false)) {
if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_CHECKSUM, true))
PrintFilePos(); fprintf(stderr, "send telemetry exception frame error: %s\n", strerror(errno));
continue;
}
if (ComFrame_HEADER(frame)->type != COM_FRAME_TYPE_TELEMETRY_REQUEST ||
ComFrame_PAYLOAD_LENGTH(frame) != 2 ) {
continue;
}
if (frame->payload[0] != 0xFF || frame->payload[1] != 0x11) {
if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_TYPE, true))
PrintFilePos(); fprintf(stderr, "send telemetry exception frame error: %s\n", strerror(errno));
continue;
}
if(iTelemetryAnswerTimes % 3 == 0) {
if (g_recvCommandQueue.Empty())
{//empty
if (0)//2024.11.04 g_bNeedReboot)
{
system("reboot");
break;
}
}else
{
TeleCmdInfo2Send = g_recvCommandQueue.Pop();
PrintFilePos(); printf("telemetry receive command (%d)\n", TeleCmdInfo2Send.seqid);
}
set_telemetry_data(&TelemetryData2Send, &TeleCmdInfo2Send);
}
//keep same every 3 answers, count from 0, 1, ...
TelemetryData2Send.send_count = iTelemetryAnswerTimes; //count from 0, 1, ..
iTelemetryAnswerTimes++;
ComFrame* pTeleAnswerMsg = ComFrame_New(address, COM_FRAME_TYPE_TELEMETRY_ANSWER, &TelemetryData2Send, sizeof(struct TelemetryData), true);
TelemetryData_SwapEndian((struct TelemetryData*)(pTeleAnswerMsg->payload));
if (ComFrame_Send(tty_id, pTeleAnswerMsg, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
// added by gejp start
// if (telemetry_3package_loop_count[0] == 0) {
// telemetry_3package_count[0] += 1;
// }
// telemetry_3package_loop_count[0] += 1;
// if (telemetry_3package_loop_count[0] == 3) {
// telemetry_3package_loop_count[0] = 0;
// }
// added by gejp end
ComFrame_Del(pTeleAnswerMsg);
//ComFrame_Del(frame);
//ComFrame_ReceiveFlushBuffer(ComFrame_LENGTH(frame), buffer, &cached_size);
}
delete [] buffer;
return NULL;
}
//获取指令文本与报警码
AlarmData g_AlarmTable[] = {
{0x61006100, "热控A回路压力报警"},
{0x62006200, "热控B回路压力报警"},
{0x63006300, "热控分系统,参数异常"},
{0x64006400, "环控水箱满提醒"},
{0x65006500, "氮瓶压力过低报警"},
{0x66006600, "氧瓶压力过低报警"},
{0x67006700, "返回舱,火灾报警"},
{0x68006800, "返回舱,总压告警"},
{0x69006900, "返回舱,氧分压告警"},
{0x70007000, "返回舱,二氧化碳,分压告警"},
{0x71007100, "内回路泵,壁,转速报警"},
{0x72007200, "循环副风机转速报警"},
{0x73007300, "着陆器密封舱,总压报警"},
{0x74007400, "着陆器密封舱,氧分压报警"},
{0x75007500, "着陆器密封舱,二氧化碳分压报警"},
{0x76007600, "着陆器密封舱,火灾报警"},
{0x77007700, "着陆器辐射器,泄漏报警"},
{0x01000100, "起飞"},
{0x02000200, "船箭分离"},
{0x03000300, "组合体停靠"},
{0x04000400, "组合体分离"},
{0x05000500, "服返分离"},
{0x06000600, "回收开始"},
{0x07000700, "着陆"},
{0x08000800, "逃逸塔逃逸"},
{0x09000900, "预留"},
{0x0A000A00, "逃逸塔分离"},
{0x11001100, "整船逃逸"},
{0x71177117, "五分钟准备"},
{0x72177217, "一分钟准备"},
{0x73177317, "对接准备"},
{0x74177417, "对接环接触"},
{0x75177517, "分离完成"},
{0x76177617, "整流罩分离"},
{0x77177717, "帆板展开"},
{0x78177817, "船箭分离准备"},
{0x79177917, "帆板展开准备"},
{0x7A177A17, NULL}, // 预留
{0x7B177B17, "三十分钟后变轨"},
{0x7C177C17, "十分钟后变轨"},
{0x7D177D17, "五分钟后变轨"},
{0x7E177E17, "轨控发动机,开机指令发出"},
{0x7F177F17, "轨控发动机,关机指令发出"},
{0x80178017, "变轨结束"},
{0x81178117, "GNC转入自主控制模式"},
{0x82178217, "开始接近飞行"},
{0x83178317, "开始偏航调姿"},
{0x84178417, "偏航调姿完成"},
{0x85178517, "转平移,靠拢段"},
{0x86178617, "进入停泊点"},
{0x87178717, "开始接近飞行"},
{0x88178817, "开始俯仰调姿"},
{0x89178917, "俯仰调姿完成"},
{0x8A178A17, "最后靠拢飞行"},
{0x8B178B17, NULL}, //预留
{0x8C178C17, NULL}, //预留
{0x8D178D17, "转芬离,撤离段"},
{0x8E178E17, NULL}, // 预留
{0x8F178F17, "开始撤退飞行"},
{0x90179017, "转返回准备段"},
{0x91179117, "开始调姿"},
{0x92179217, NULL}, //预留
{0x93179317, NULL}, //预留
{0x94179417, NULL}, //预留
{0x95179517, "2分钟后服返分离"},
{0x96179617, NULL}, //预留
{0x97179717, NULL}, //预留
{0x98179817, NULL}, //预留
{0x99179917, "2分钟后回收启动"},
{0x9A179A17, "回收开关接通"},
{0x9B179B17, NULL}, //预留
{0x9C179C17, "着陆准备"},
{0x9D179D17, NULL}, //预留
{0x9E179E17, "三十分钟准备"},
{0x9F179F17, "制动发动机开机,指令发出"},
{0xA017A017, "制动发动机关机,指令发出"},
{0xA117A117, "逃逸转入值班"}
};
//获取指令文本与报警码
AlarmData* GetAlarmTextByCode(uint8_t cmd_code[], uint8_t iAlarmCode[4])
{
uint32_t iCode = *(uint32_t*)cmd_code ;
uint32_t* pAlarmCode = (uint32_t*)iAlarmCode;
int n = sizeof(g_AlarmTable) / sizeof(struct AlarmData);
for (int i = 0; i<n; ++i) {
if (g_AlarmTable[i].alarm_id == iCode) {
*pAlarmCode = iCode;
return &g_AlarmTable[i];
}
}
*pAlarmCode = 0;
printf("0083: no fit code ");
return NULL;
}
/*----*/
void* upper_host_com_thread(void* arg) {//hand-control display processing unit 手控显示处理单元
int tty_id = g_iHostCom_tty_id;
const uint16_t address = COM_FRAME_ADDRESS_VOIX;
const uint32_t buffer_size = 1024 * 100;
const int64_t iHostAudioBufferSize = 1024 * 1024;
uint8_t* buffer = new uint8_t[buffer_size];
size_t offset = 0, cached_size = 0;
int16_t* ptrProcHostAudio = (int16_t*)malloc(iHostAudioBufferSize);
timeval tLastTime, tCurTime;
TelemetryData4UpperHost UpperHostTeleData;
while (g_bKeepExintRuning) {
ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, &offset, &cached_size, &g_bKeepExintRuning, true);
//ComFrame* frame = ComFrame_ReceiveEx(tty_id, buffer, buffer_size, true);
//PrintFilePos(); printf("%04X \n", ComFrame_HEADER(frame)->type);
if (frame == NULL) break;
switch (ComFrame_HEADER(frame)->type)
{
case COM_FRAME_TYPE_COMMAND:
{
//PrintFilePos(); printf("123\n");
// 2024.10.25 disable command frame verify
#ifdef USE_TELE_CTRL_CHECK_SUM
if (ComFrame_Verify(frame, false)) {
PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
// //ComFrame_Del(frame);
// //ComFrame_ReceiveFlushBuffer(ComFrame_LENGTH(frame), buffer, &cached_size);
continue;
}
#endif
if (ComFrame_PAYLOAD_LENGTH(frame) != 6) {
break;
}
struct sysinfo info;
if (sysinfo(&info)) {
fprintf(stderr, "Failed to get sysinfo, errno:%u, reason:%s\n",errno, strerror(errno));
break;
}
uint32_t telemetryTimeSeconds = info.uptime + g_iExternTimeDifference;
g_receiveCommandInfo.seqid++;
g_receiveCommandInfo.code[0] = ComFrame_PAYLOAD(frame)[4];
g_receiveCommandInfo.code[1] = ComFrame_PAYLOAD(frame)[5];
g_receiveCommandInfo.time = telemetryTimeSeconds;
if (!g_bNeedReboot) {
g_recvCommandQueue.Push(g_receiveCommandInfo);
}
/*-------START 2024.10.23 noted by zgb start
if (is_command(frame->payload, command_open_call, 6)) {
// ProcHostAudioLengthMS = 500;//first audio block at least 200ms
// gettimeofday(&tLastTime, 0);
// SetCallMode(true);
}
else if (is_command(frame->payload, command_close_call, 6)) {
// SetCallMode(false);
}
else if (is_command(frame->payload, command_open_asr, 6)) {
PrintFilePos(); printf("Open Asr\n");
setSysWake("host");
PrintFilePos(); printf("...wakeup by host\n");
g_bCloseASR = false;
SetCallMode(false); //g_bCall = false;
}
else if (is_command(frame->payload, command_close_asr, 6)) {
PrintFilePos(); printf("Close Asr\n");
setSysSleep("by host");
PrintFilePos(); printf("...sleep by host\n");
g_bCloseASR = true;
}
else
//-------END 2024.10.23 noted by zgb
*/
exint_handle_pack(ET_TYPE_COMMAND, 6, frame->payload);
break;
}
case COM_FRAME_TYPE_SPEECH_INJECTED:
{
if (ComFrame_Verify(frame, false)) {
PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
//ComFrame_Del(frame);
//ComFrame_ReceiveFlushBuffer(ComFrame_LENGTH(frame), buffer, &cached_size);
continue;
}
uint8_t payload[6] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
ComFrame* f_data = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_SPEECH_INJECTED,
payload, 6, true);
exint_handle_pack(ET_TYPE_TEXT, 6, f_data->payload);
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
ComFrame_Del(f_data);
break;
}
case COM_FRAME_TYPE_REQUEST:
{
if (ComFrame_Verify(frame, false)) {
PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
continue;
}
need_resend = false;
break;
}
case COM_FRAME_TYPE_AUDIO:
{
//if (ComFrame_Verify(frame, false)) {
// PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
// //ComFrame_Del(frame);
// ComFrame_ReceiveFlushBuffer(ComFrame_LENGTH(frame), buffer, &cached_size);
// continue;
//}
gettimeofday(&tCurTime, 0);
int iIntervalTime = (tCurTime.tv_sec - tLastTime.tv_sec) * 1000 + (tCurTime.tv_usec - tLastTime.tv_usec) / 1000;
if( iIntervalTime > 15 )
PrintFilePos(); fprintf(stderr, " com frame interval %d\n", iIntervalTime);
tLastTime = tCurTime;
exint_handle_pack(ET_TYPE_AUDIO, ComFrame_PAYLOAD_LENGTH(frame), frame->payload);
break;
}
case COM_FRAME_TYPE_TELEMETRY_REQUEST:
{
if (g_iUseHostComForTelemetry != 1) {
break;
}
if (ComFrame_Verify(frame, false)) {
if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_CHECKSUM, true))
PrintFilePos(); fprintf(stderr, "send telemetry exception frame error: %s\n", strerror(errno));
continue;
}
if (frame->payload[0] != 0xFF || frame->payload[1] != 0x11) {
if (SendTelemetryErrorMsg(tty_id, address, TELEMETRY_ERROR_TYPE, true))
PrintFilePos(); fprintf(stderr, "send telemetry exception frame error: %s\n", strerror(errno));
continue;
}
if (ComFrame_HEADER(frame)->type != COM_FRAME_TYPE_TELEMETRY_REQUEST ||
ComFrame_PAYLOAD_LENGTH(frame) != 2) {
continue;
}
set_telemetry_host_data(&UpperHostTeleData); // count from 0, 1, ..
ComFrame* pUpperHostTeleAnswerMsg = NewTelemetryAnswerData(address, &UpperHostTeleData, true);
if (ComFrame_Send(tty_id, pUpperHostTeleAnswerMsg, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
// added by gejp start
// if (telemetry_3package_loop_count[1] == 0) {
// telemetry_3package_count[1] += 1;
// }
// telemetry_3package_loop_count[1] += 1;
// if (telemetry_3package_loop_count[1] == 3) {
// telemetry_3package_loop_count[1] = 0;
// }
// added by gejp end
ComFrame_Del(pUpperHostTeleAnswerMsg);
break;
}
case COM_FRAME_TYPE_VOICE_ANNOUNCEMENT_AND_ALARM_CODE_REQUEST: // gejp 2024.10.03
{
if (g_iEnableAlarmCode == 0) {
continue;
}
if (ComFrame_Verify(frame, false)) {
PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
continue;
}
if (frame->payload[0] != 0xAA || frame->payload[1] != 0x55) { // data error
// printf("alarm code: data error");
continue;
}
int64_t iEnqueueBytes;
int iTextBytes;
const char *pNoticeStr;
//语音通报与报警码
auto alarm = GetAlarmTextByCode(frame->payload+2, g_iAlarmCode);
if(!alarm)
continue;
exint_handle_pack(ET_TYPE_TEXT, sizeof(alarm), alarm);
uint8_t payload[6] = {0xAA, 0x55, 0xAA, 0x55, 0xAA, 0x55};
ComFrame* f_data = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_VOICE_ANNOUNCEMENT_AND_ALARM_CODE_RESPONSE,
payload, 6, true);
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
ComFrame_Del(f_data);
break;
}
case COM_FRAME_TYPE_GRANT_TIME: { // gejp 2024.10.03
if (ComFrame_Verify(frame, false)) {
PrintFilePos(); fprintf(stderr, "com frame checksum error\n");
continue;
}
if (frame->payload[0] != 0x66) { // data error
printf("grant time: data error");
continue;
}
int secondsAbs = *((int*)(frame->payload+1));
secondsAbs = byteswapl(secondsAbs);
struct sysinfo info;
if (sysinfo(&info)) {
fprintf(stderr, "Failed to get sysinfo, errno:%u, reason:%s\n",errno, strerror(errno));
break;
}
PrintFilePos(); printf("---> uptime changed\n");
PrintFilePos(); printf("old uptime: %d\n", info.uptime + g_iExternTimeDifference);
g_iExternTimeDifference = secondsAbs - info.uptime;
PrintFilePos(); printf("new uptime: %d\n", info.uptime + g_iExternTimeDifference);
break;
};
default:
break;
}
}
delete[] buffer;
free(ptrProcHostAudio);
return NULL;
}
void send_command_upper_host(int length, void* payload) {
int tty_id = g_iHostCom_tty_id;
need_resend = true;
int sendtime = 0;
ComFrame* f_data = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_REQUEST,
payload, length, true);
struct timeval tCurTime;
uint64_t last_send_time;
while (need_resend && g_bKeepExintRuning) {
gettimeofday(&tCurTime, NULL);
uint64_t tCurTimeMs = tCurTime.tv_sec * 1000 + tCurTime.tv_usec / 1000;
if (sendtime == 0) {
last_send_time = tCurTimeMs;
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
sendtime += 1;
}
else if (sendtime == 3) {
break;
}
else {
if (tCurTimeMs > last_send_time + 250) {
last_send_time = tCurTimeMs;
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
sendtime += 1;
}
else {
usleep(500);
}
}
}
ComFrame_Del(f_data);
if (length != 6) return;
uint16_t send_seq = (g_sendCommandList.empty()) ? 0 : g_sendCommandList.front().seqid;
struct sysinfo info;
if (sysinfo(&info)) {
fprintf(stderr, "Failed to get sysinfo, errno:%u, reason:%s\n",errno, strerror(errno));
return;
}
uint32_t telemetryTimeSeconds = info.uptime + g_iExternTimeDifference;
TelemetryCommandInfo telemetryCommandInfo = {telemetryTimeSeconds, {((uint8_t*)payload)[3], ((uint8_t*)payload)[4]}, send_seq + 1};
g_sendCommandList.push_front(telemetryCommandInfo);
if (g_sendCommandList.size() > TELEMETRY_SEND_COMMAND_INFO_LENGTH) {
g_sendCommandList.pop_back();
}
}
void send_audio_upper_host(int length, void* payload) {
int tty_id = g_iHostCom_tty_id;
ComFrame* f_data = ComFrame_New(
COM_FRAME_ADDRESS_VOIX,
COM_FRAME_TYPE_AUDIO,
payload, length, true);
if (ComFrame_Send(tty_id, f_data, true)) {
PrintFilePos(); fprintf(stderr, "send telemetry data frame error: %s\n", strerror(errno));
}
ComFrame_Del(f_data);
}
ON_EVENT(send_command) {
send_command_upper_host(args_size, args);
}
ON_EVENT(send_audio) {
send_audio_upper_host(args_size, args);
}

View File

@ -12,7 +12,7 @@ using namespace logging;
std::unique_ptr<Logger>& logging::_get_global_logger() std::unique_ptr<Logger>& logging::_get_global_logger()
{ {
static std::unique_ptr<Logger> global_logger(new Logger(LogLevel::WARN)); static std::unique_ptr<Logger> global_logger(new Logger(LogLevel::INFO));
return global_logger; return global_logger;
} }
@ -28,9 +28,6 @@ void logging::set_global_logger(std::unique_ptr<Logger>&& logger)
global_logger = std::move(logger); global_logger = std::move(logger);
} }
Logger::Logger(Level level) : _parent(nullptr), _level(level)
{}
Logger::Logger(const std::string& name, Level level, Logger* parent) Logger::Logger(const std::string& name, Level level, Logger* parent)
: _parent(parent), _level(level) : _parent(parent), _level(level)
{ {
@ -41,87 +38,6 @@ Logger::Logger(const std::string& name, Level level, Logger* parent)
} }
} }
void Logger::add_stream(std::ostream& stream)
{
_streams.push_back(std::unique_ptr<std::ostream>(new std::ostream(stream.rdbuf())));
}
Logger::Level Logger::level() const
{
if (_level == Logger::Level::UNKNOWN) {
if (_parent) {
return _parent->level();
}
return Logger::Level::INFO;
}
return _level;
}
void Logger::set_level(Level level)
{
_level = level;
}
const std::string& Logger::name() const {
return _name;
}
Logger* Logger::parent() const {
return _parent;
}
size_t Logger::children_count() const {
return logger_cache.size();
}
void Logger::log(Level level, const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(level, fmt, args);
va_end(args);
}
void Logger::debug(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::DEBUG, fmt, args);
va_end(args);
}
void Logger::info(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::INFO, fmt, args);
va_end(args);
}
void Logger::warn(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::WARN, fmt, args);
va_end(args);
}
void Logger::error(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::ERROR, fmt, args);
va_end(args);
}
void Logger::fatal(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
vlog(Logger::Level::FATAL, fmt, args);
va_end(args);
}
void Logger::vlog(Level level, const char* fmt, va_list args) void Logger::vlog(Level level, const char* fmt, va_list args)
{ {
if (level < this->level()) if (level < this->level())
@ -195,7 +111,6 @@ LoggerOStream Logger::operator[](Logger::Level level)
return LoggerOStream(*this, level); return LoggerOStream(*this, level);
} }
LoggerOStream Logger::operator[](int level) LoggerOStream Logger::operator[](int level)
{ {
return (*this)[static_cast<Logger::Level>(level)]; return (*this)[static_cast<Logger::Level>(level)];

View File

@ -2,7 +2,7 @@
#include <thread> #include <thread>
#include <unistd.h> #include <unistd.h>
#include "exint/event.h" #include "exint/event.h"
#include "exint/detail.h" #include "exint/detail.hpp"
#include "c_testcase.h" #include "c_testcase.h"
static std::vector<int> g_vec; static std::vector<int> g_vec;

View File

@ -5,13 +5,33 @@
#include "comframe.h" #include "comframe.h"
#include "telemetry.h" #include "telemetry.h"
#include "dataqueue.hpp" #include "dataqueue.hpp"
#include "exint/detail.h" #include "logging/interface.h"
#include "transport/base.hpp"
#include "exint/protocol.hpp"
#include "exint/detail.hpp"
#include "c_testcase.h" #include "c_testcase.h"
class MockTransport: public Transport {
public:
void send_backend() override {}
void receive_backend() override {}
Transport::FrameType get_sent_frame() {
return super::send_que.Pop().first; // std::chrono::seconds(1)
}
void set_received_frame(ComFrame* frame) {
super::recv_que.Push(std::make_pair(std::shared_ptr<ComFrame>(frame, ComFrame_Del), nullptr));
}
private:
typedef Transport super;
};
static int g_telemetry_request_count = 0; static int g_telemetry_request_count = 0;
static DataQueue<std::tuple<uint32_t, size_t, void*>> g_data_queue; static DataQueue<std::tuple<uint32_t, size_t, void*>> g_data_queue;
static int host_com_master_id; static MockTransport* g_upperhost_mock_transport;
static int telemetry_com_master_id; static MockTransport* g_telemetry_mock_transport;
void exint_data_callback(uint32_t type, size_t size, void* data) { void exint_data_callback(uint32_t type, size_t size, void* data) {
if (type == ET_TYPE_TELEMETRY_REQUEST) { if (type == ET_TYPE_TELEMETRY_REQUEST) {
@ -25,65 +45,45 @@ void exint_data_callback(uint32_t type, size_t size, void* data) {
} }
SETUP { SETUP {
int host_com_slave_id, telemetry_com_slave_id; g_upperhost_mock_transport = new MockTransport();
if (openpty(&host_com_master_id, &host_com_slave_id, NULL, NULL, NULL) < 0) { g_telemetry_mock_transport = new MockTransport();
return 1; exint_setup(std::unique_ptr<Transport>(g_upperhost_mock_transport), std::unique_ptr<Transport>(g_telemetry_mock_transport), exint_data_callback);
}
if (openpty(&telemetry_com_master_id, &telemetry_com_slave_id, NULL, NULL, NULL) < 0) {
return 1;
}
init_tty(host_com_master_id, 115200);
init_tty(telemetry_com_master_id, 115200);
// init_tty(host_com_slave_id, 115200);
// init_tty(telemetry_com_slave_id, 115200);
exint_init_from_tty(host_com_slave_id, telemetry_com_slave_id, exint_data_callback);
g_telemetry_request_count = 0; g_telemetry_request_count = 0;
log_set_level(LOG_LEVEL_DEBUG);
return 0; return 0;
} }
TEARDOWN { TEARDOWN {
exint_finialize(); exint_finialize();
close(host_com_master_id);
close(telemetry_com_master_id);
g_data_queue.Clear(); g_data_queue.Clear();
return 0; return 0;
} }
TEST_CASE(test_init) { TEST_CASE(test_init) {
assert(host_com_master_id);
assert(telemetry_com_master_id);
END_TEST; END_TEST;
} }
TEST_CASE(test_host_telemetry) { TEST_CASE(test_host_telemetry) {
auto request_msg = NewTelemetryRequestMsg(COM_FRAME_ADDRESS_VOIX, true); auto request_msg = NewTelemetryRequestMsg(COM_FRAME_ADDRESS_VOIX, false);
ComFrame_Send(telemetry_com_master_id, request_msg, true); g_telemetry_mock_transport->set_received_frame(request_msg);
ComFrame_Del(request_msg);
bool timeout_flag = true; auto telemetry_reply = g_telemetry_mock_transport->get_sent_frame();
std::thread([&]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
timeout_flag = false;
}).detach();
size_t offset = 0;
size_t cached_size = 0;
auto telemetry_reply = ComFrame_ReceiveEx(telemetry_com_master_id, NULL, 100, &offset, &cached_size, &timeout_flag, true);
assert(telemetry_reply); assert(telemetry_reply);
assert_eq(g_telemetry_request_count, 1); assert_eq(g_telemetry_request_count, 1);
TelemetryData* telemetry_data = (TelemetryData*)ComFrame_PAYLOAD(telemetry_reply); TelemetryData* telemetry_data = (TelemetryData*)ComFrame_PAYLOAD(telemetry_reply);
assert_eq(ComFrame_TYPE(telemetry_reply), COM_FRAME_TYPE_TELEMETRY_ANSWER); assert_eq(ComFrame_TYPE(telemetry_reply.get()), byteswaps(COM_FRAME_TYPE_TELEMETRY_ANSWER));
assert_eq(ComFrame_PAYLOAD_LENGTH(telemetry_reply), sizeof(TelemetryData)); assert_eq(ComFrame_Length(telemetry_reply.get(), true), sizeof(ComFrameHeader) + sizeof(TelemetryData) + sizeof(uint16_t));
assert_eq(telemetry_data->application_version_high, 1); assert_eq(telemetry_data->application_version_high, 1);
assert_eq(telemetry_data->application_version_low, 2);
END_TEST; END_TEST;
} }
TEST_CASE(test_callback) { TEST_CASE(test_callback) {
uint8_t command[] = { 0x00, 0x01, 0x02, 0x03, 0xAA, 0xAA}; uint8_t command[] = { 0x00, 0x01, 0x02, 0x03, 0xAA, 0xAA};
auto command_msg = ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_COMMAND, command, sizeof(command), true); auto command_msg = ComFrame_New(COM_FRAME_ADDRESS_VOIX, COM_FRAME_TYPE_COMMAND, command, sizeof(command), false);
assert(!ComFrame_Send(host_com_master_id, command_msg, true)); g_upperhost_mock_transport->set_received_frame(command_msg);
uint32_t tp; uint32_t tp;
size_t len; size_t len;
@ -96,28 +96,13 @@ TEST_CASE(test_callback) {
} }
TEST_CASE(test_send_command) { TEST_CASE(test_send_command) {
ComFrame* command_frame = NULL;
bool timeout_flag = true;
std::thread([&]() {
size_t offset = 0;
size_t cached_size = 0;
command_frame = ComFrame_ReceiveEx(host_com_master_id, NULL, 100, &offset, &cached_size, &timeout_flag, true);
}).detach();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
char command_data[] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05}; char command_data[] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05};
exint_send(ET_TYPE_COMMAND, 6, command_data ); exint_send(ET_TYPE_COMMAND, 6, command_data);
auto command_frame = g_upperhost_mock_transport->get_sent_frame();
for (int i = 0; i < 100; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (command_frame) {
break;
}
}
timeout_flag = false;
assert(command_frame); assert(command_frame);
assert_eq(ComFrame_TYPE(command_frame), COM_FRAME_TYPE_REQUEST); assert_eq(ComFrame_TYPE(command_frame.get()), byteswaps(COM_FRAME_TYPE_REQUEST));
assert_eq(ComFrame_PAYLOAD_LENGTH(command_frame), 6); assert_eq(ComFrame_Length(command_frame.get(), true), 6 + sizeof(ComFrameHeader) + 2);
assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6); assert_mem_eq(ComFrame_PAYLOAD(command_frame), command_data, 6);
END_TEST; END_TEST;
} }

View File

@ -34,9 +34,10 @@ TEST_CASE(test_module_logger)
} }
TEST_CASE(test_add_stream) { TEST_CASE(test_add_stream) {
std::stringstream ss;
log_init(NULL); log_init(NULL);
get_global_logger().add_stream(ss); auto& global_logger = get_global_logger();
global_logger.add_stream(std::stringstream());
auto& ss = dynamic_cast<std::stringstream&>(*global_logger.streams()[0]);
log_info("test_stream"); log_info("test_stream");
auto s = ss.str(); auto s = ss.str();
@ -79,17 +80,16 @@ TEST_CASE(test_add_stream) {
TEST_CASE(test_log_stream) { TEST_CASE(test_log_stream) {
log_init(NULL); log_init(NULL);
auto& logger = get_global_logger(); auto& global_logger = get_global_logger();
global_logger.add_stream(std::stringstream());
auto& ss = dynamic_cast<std::stringstream&>(*global_logger.streams()[0]);
std::stringstream ss; global_logger[LOG_LEVEL_INFO] << "test_log_stream" << std::endl;
logger.add_stream(ss);
logger[LOG_LEVEL_INFO] << "test_log_stream" << std::endl;
auto s = ss.str(); auto s = ss.str();
assert_str_eq(s.c_str() + (s.length() - 23), "[INFO] test_log_stream\n"); assert_str_eq(s.c_str() + (s.length() - 23), "[INFO] test_log_stream\n");
ss.str(""); ss.str("");
logger[LOG_LEVEL_DEBUG] << "test_log_stream debug" << std::endl; global_logger[LOG_LEVEL_DEBUG] << "test_log_stream debug" << std::endl;
s = ss.str(); s = ss.str();
assert_eq(s.length(), 0); assert_eq(s.length(), 0);

View File

@ -0,0 +1,40 @@
#include "transport/base.hpp"
#include "transport/protocol.hpp"
#include "c_testcase.h"
using namespace transport;
class TestTransport: public BaseTransport<Protocol> {
protected:
void send_backend() override {
while (!is_closed) {
DataPair pair = send_que.Pop();
recv_que.Push(std::move(pair));
}
}
void receive_backend() override {}
};
const int timeout = 3;
TEST_CASE(test_init) {
TestTransport t;
END_TEST;
}
TEST_CASE(test_transport) {
TestTransport t;
assert(!t.closed());
t.open();
t.send(std::vector<uint8_t>(10, 1));
auto data_pair = t.receive(std::chrono::seconds(timeout));
assert_eq(data_pair.first.size(), 10);
assert_eq(data_pair.first[1], 1);
assert(!data_pair.second);
t.close();
assert(t.closed());
END_TEST;
}

View File

@ -0,0 +1,40 @@
#include "transport/udp.hpp"
#include "transport/protocol.hpp"
#include "c_testcase.h"
using namespace transport;
TEST_CASE(test_init) {
DatagramTransport<Protocol> transport;
transport.open();
END_TEST;
}
TEST_CASE(test_send_recv) {
DatagramTransport<Protocol> transport_server;
transport_server.open();
transport_server.bind("127.0.0.1", 12345);
DatagramTransport<Protocol> transport_client;
transport_client.open();
transport_client.connect("127.0.0.1", 12345);
transport_client.send(std::vector<uint8_t>{0x01, 0x02, 0x03, 0x04});
auto [frame, token] = transport_server.receive(std::chrono::seconds(3));
assert_eq(frame.size(), 4);
assert(token);
assert_eq(frame[0], 0x01);
assert_eq(frame[1], 0x02);
assert_eq(frame[2], 0x03);
assert_eq(frame[3], 0x04);
transport_server.send(std::vector<uint8_t>{0x04, 0x03, 0x02, 0x01}, token);
auto [frame2, token2] = transport_client.receive(std::chrono::seconds(3));
assert_eq(frame2.size(), 4);
assert(token2);
assert_eq(frame2[0], 0x04);
assert_eq(frame2[1], 0x03);
assert_eq(frame2[2], 0x02);
assert_eq(frame2[3], 0x01);
END_TEST;
}

View File

@ -0,0 +1,66 @@
#include <pty.h>
#include <fcntl.h>
#include <thread>
#include "c_testcase.h"
#include "transport/serial_port.hpp"
#include "transport/protocol.hpp"
using namespace transport;
int master_fd = -1;
int slave_fd = -1;
const int timeout = 3;
SETUP {
if (openpty(&master_fd, &slave_fd, NULL, NULL, NULL) < 0) {
throw std::runtime_error("openpty failed");
}
fcntl(master_fd, F_SETFL, fcntl(master_fd, F_GETFL) | O_NONBLOCK);
fcntl(slave_fd, F_SETFL, fcntl(slave_fd, F_GETFL) | O_NONBLOCK);
return 0;
}
TEARDOWN {
close(master_fd);
close(slave_fd);
master_fd = -1;
slave_fd = -1;
return 0;
}
TEST_CASE(test_pty) {
char buffer[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
ssize_t ret;
ret = write(slave_fd, buffer, 10);
assert_eq(ret, 10);
ret = read(master_fd, buffer, 10);
assert_eq(ret, 10);
for (int i = 0; i < 10; i++) {
assert_eq(buffer[i], i);
}
END_TEST;
}
TEST_CASE(test_serial_port) {
SerialPortTransport<Protocol> t1(master_fd);
SerialPortTransport<Protocol> t2(slave_fd);
assert(!t1.closed());
assert(!t2.closed());
std::pair<Protocol::FrameType, std::shared_ptr<TransportToken>> data_pair;
std::thread ([&] {
t2.open();
t2.send(std::vector<uint8_t>(10, 2));
}).detach();
t1.open();
data_pair = t1.receive(std::chrono::seconds(timeout));
assert_eq(data_pair.first.size(), 10);
assert_eq(data_pair.first[0], 2);
assert(data_pair.second);
assert_eq(data_pair.second->transport<Protocol>(), &t1);
t1.close();
assert(t1.closed());
END_TEST;
}

View File

@ -0,0 +1,38 @@
#include "transport/unix_udp.hpp"
#include "transport/protocol.hpp"
#include "c_testcase.h"
using namespace transport;
TEST_CASE(test_init) {
UnixDatagramTransport<Protocol> transport;
transport.open();
END_TEST;
}
TEST_CASE(test_send_recv) {
UnixDatagramTransport<Protocol> transport_server("/tmp/vxup_test.sock", "");
UnixDatagramTransport<Protocol> transport_client("/tmp/vxup_test1.sock", "/tmp/vxup_test.sock");
transport_server.open();
transport_client.open();
transport_client.send(std::vector<uint8_t>{0x01, 0x02, 0x03, 0x04});
auto [frame, token] = transport_server.receive(std::chrono::seconds(3));
assert_eq(frame.size(), 4);
assert(token);
assert_eq(frame[0], 0x01);
assert_eq(frame[1], 0x02);
assert_eq(frame[2], 0x03);
assert_eq(frame[3], 0x04);
transport_server.send(std::vector<uint8_t>{0x04, 0x03, 0x02, 0x01}, token);
auto [frame2, token2] = transport_client.receive(std::chrono::seconds(3));
assert_eq(frame2.size(), 4);
assert(token2);
assert_eq(frame2[0], 0x04);
assert_eq(frame2[1], 0x03);
assert_eq(frame2[2], 0x02);
assert_eq(frame2[3], 0x01);
END_TEST;
END_TEST;
}