extern_interface/logqueueconfig/dataqueue.cpp

538 lines
17 KiB
C++
Raw Permalink Normal View History

2024-11-28 08:31:00 +00:00
//#pragma once
#include <time.h>
#include "dataqueue.h"
#include <stdio.h>
#define BLOCK_PADDING_SIZE 64 //bytes
#define HEADER_BYTES 16//remain: block-bytes: 8 bytes int64_t + data bytes: 8bytes int64_t
#define CHECK_QUEUE_ERR 1
#define myMax(x,y) ((x)>(y)?(x):(y))
typedef void* voidPtr;
static char g_strDQCurTime[50] = "";
//----------------------------------------------------------------------------------------
inline char* GetCurTimeStr(char* strBuf, int iBufSize)// g_strDQCurTime
{
time_t tt;
tt = time(NULL);
strftime(strBuf, iBufSize, "%Y.%m.%d %H:%M:%S", localtime(&tt));//gener string with format:"YYYY-MM-DD hh:mm:ss"
return strBuf;
}
#define DATA_QUEUE_RETAIN_BLOCK_CNT 4
//----------------------------------------------------------------------------------------
DataQueue::DataQueue(int64_t iSizeInKB, int64_t iMaxBlockSize, bool bUseMutex, int iReadThreadCnt)
{
m_pDataBuf = NULL;
m_bUseMutex = bUseMutex;
if (m_bUseMutex)
pthread_mutex_init(&m_BufMutex, NULL);
InitQueue(iSizeInKB, iMaxBlockSize, iReadThreadCnt);//KB
}
//----------------------------------------------------------------------------------------
DataQueue::~DataQueue()
{
if (m_pDataBuf != NULL)
free((void*)m_pDataBuf);
if (m_bUseMutex)
pthread_mutex_destroy(&m_BufMutex);
}
//----------------------------------------------------------------------------------------
int DataQueue::InitQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)//KB
{
if (m_pDataBuf != NULL)
free((void*)m_pDataBuf);
m_pDataBuf = NULL;
m_iMaxBlockByte = iMaxBlockBytes;
m_iReservedBlockNum = iReadThreadCnt + DATA_QUEUE_RETAIN_BLOCK_CNT;
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4 times of m_iMaxBlockByte
int64_t iTotalByes = 0;
if (iTotalKB4QueueBuf > 0)
{
iTotalByes = iTotalKB4QueueBuf * 1024 + 2 * HEADER_BYTES + m_iReservedBufBytes; //KB;
m_pDataBuf = (uint8_t*)malloc(iTotalByes); //for safe sake in case of over step the border
if (m_pDataBuf == NULL)
{
fprintf(stderr, "\n----------------------------------------------------------------\n");
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
fprintf(stderr, "***Error: fail to malloc for data queue, mem:%ld @line:%d %s\n", iTotalKB4QueueBuf, __LINE__, g_strDQCurTime);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(4);
}
}
m_iFreeBytes = m_iQueueBufBytes = iTotalByes;
ResetDataQueue();
return 0;
}
//----------------------------------------------------------------------------------------
void DataQueue::ResetDataQueue()
{
if (m_bUseMutex)
pthread_mutex_lock(&m_BufMutex);
m_iBufOutPos = m_iBufInPos = 0;
m_iRcvBlocks = 0;
m_iRcvBytes = 0;
m_iProcBlocks = 0;
m_iEmptyTimes = 0;
m_iFullTimes = 0;
m_iPaddingBlocks = m_iProcPaddingBlocks = 0;
m_iFreeBytes = m_iQueueBufBytes;
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetBufBytes()
{//Get buf size
int64_t iBytes;
iBytes = m_iQueueBufBytes - m_iReservedBufBytes;
if (iBytes < 0)
iBytes = 0;
return iBytes;
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetQueueFreeBytes()
{
//int64_t iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos) ) );
int64_t iFreeBytes;
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
iFreeBytes -= m_iReservedBufBytes;
if (iFreeBytes < 0)//iFreeBytes never is 0 because of m_iReservedBufBytes, unless the beginning state where m_iBufOutPos and m_iBufInPos are same
iFreeBytes = 0;
return iFreeBytes;
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetFreeBytes()
{
return GetQueueFreeBytes();
}
//----------------------------------------------------------------------------------------
inline bool DataQueue::CanRcv(int iNeedBytes)
{
bool bCanRcv;
m_iMaxBlockByte = iNeedBytes > m_iMaxBlockByte ? iNeedBytes : m_iMaxBlockByte;
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4<><34><EFBFBD><EFBFBD>m_iMaxBlockByte
bCanRcv = (GetQueueFreeBytes() >= iNeedBytes);
return bCanRcv;
}
//----------------------------------------------------------------------------------------
bool DataQueue::IsFull()
{
//---- 这种判断方式可能存在错误期间m_iBufOutPos到尾部折回到队列开头变得小于m_iBufInPos
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)));//
bool bFull;
int64_t iFreeBytes;
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
bFull = (iFreeBytes <= m_iReservedBufBytes);
// if( bFull )
// printf("dataqueue is full, size:%ld free:%ld - %ld < retain:%ld in:%ld out:%ld\n",
// m_iQueueBufBytes,iFreeBytes, GetFreeBytes(),m_iReservedBufBytes,m_iBufInPos,m_iBufOutPos );
return bFull;
}
//----------------------------------------------------------------------------------------
bool DataQueue::IsEmpty()
{
return (m_iBufInPos == m_iBufOutPos);
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata, int64_t iExtraDataBytes, void* extradata2, int64_t iExtraDataBytes2)
{ // >=0 for normal & success,
//<0: error
int64_t iBlockBytes;
void* pWritePos = NULL;
int64_t iPadding = 0;
if (iDataBytes < 0 || iExtraDataBytes < 0 || iExtraDataBytes2 < 0)
{
fprintf(stderr, "iDataBytes %ld iExtraDataBytes %ld iExtraDataBytes %ld\n",
iDataBytes, iExtraDataBytes, iExtraDataBytes2);
fflush(stderr);
throw __LINE__;
}
if (m_bUseMutex)
pthread_mutex_lock(&m_BufMutex);
int64_t iTotalDataBytes = iDataBytes + iExtraDataBytes + iExtraDataBytes2;
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, 0);
#endif
iBlockBytes = (((iTotalDataBytes + 7) >> 3) << 3)//8<>ı<EFBFBD><C4B1><EFBFBD>
+ HEADER_BYTES
+ BLOCK_PADDING_SIZE;// 32
if (!CanRcv(iBlockBytes))
{
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
return QUEUE_IS_FULL;
}
if (m_iBufInPos >= m_iBufOutPos)
{
if (m_iBufInPos + iBlockBytes > m_iQueueBufBytes)
{//create a padding block
*((int64_t*)(m_pDataBuf + m_iBufInPos)) = m_iQueueBufBytes - m_iBufInPos; //m_iQueueBufBytes: the next block is at the begining of the buf
*((int64_t*)(m_pDataBuf + m_iBufInPos + sizeof(int64_t))) = 0;
//m_iRcvBlocks++;
//printf("+iBlockBytes:%d, iDataByte:%d\n", m_iQueueBufBytes - m_iBufInPos, 0);
m_iPaddingBlocks++;
iPadding++;
m_iBufInPos = 0;
}
}
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, iBlockBytes);
#endif
pWritePos = m_pDataBuf + m_iBufInPos;
*((int64_t*)pWritePos) = iBlockBytes;
*((int64_t*)(pWritePos + sizeof(int64_t))) = iTotalDataBytes;
memcpy(pWritePos + HEADER_BYTES, e, iDataBytes);
if (extradata && iExtraDataBytes > 0)
memcpy(pWritePos + HEADER_BYTES + iDataBytes, extradata, iExtraDataBytes);//iExtraDataBytes
if (extradata2 && iExtraDataBytes2 > 0)
memcpy(pWritePos + HEADER_BYTES + iDataBytes + iExtraDataBytes, extradata2, iExtraDataBytes2);//iExtraDataBytes
m_iRcvBlocks++;
m_iRcvBytes += iTotalDataBytes;
m_iBufInPos = (m_iBufInPos + iBlockBytes) % m_iQueueBufBytes;
//#ifdef MY_DEBUG
// printf("+iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
return iTotalDataBytes;
}
//----------------------------------------------------------------------------------------
#ifdef ARM_VERSION //NO_PORN_FUNC
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
#else
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
#endif
int64_t DataQueue::DeQueue(void** e)
{//return value: Bytes, >0 if sucess, =0 no data, < 0 error. No data copy, only return the pointer of data area
void* p;
int iBlockBytes, iDataBytes;
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
{
pthread_mutex_lock(&m_BufMutex);
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, 0);
#endif
}
#endif
if (IsEmpty())
{
//#ifdef MY_DEBUG
// m_iEmptyTimes++;
// if ( m_iEmptyTimes <5 || ( m_iEmptyTimes % 50 == 0) )
// {
// printf("***Block Queue IsEmpty %1dth: m_iBufInPos=%lu, m_iBufOutPos=%lu, m_iQueueBufBytes=%lu, m_iReservedBufBytes=%lu\n",
// m_iEmptyTimes, m_iBufInPos, m_iBufOutPos, m_iQueueBufBytes, m_iReservedBufBytes); //<2F>ճ<EFBFBD>4<EFBFBD><34>
// fflush(stdout);
// }
//#endif
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return EMPTY_QUEUE_NO_DATA;
}
p = m_pDataBuf + m_iBufOutPos;
iBlockBytes = *((int64_t*)p);
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
//int iDataBytes = *((int*)(p + sizeof(int)));
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
//#ifdef MY_DEBUG
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
if (iDataBytes > 0)//not pading block
{
m_iProcBlocks++;
*e = (void*)(p + HEADER_BYTES);
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return iDataBytes;
}
else//padding block
{
m_iProcPaddingBlocks++;
if (IsEmpty())
{
//printf("CSegmentPool IsEmpty\n");
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return EMPTY_QUEUE_NO_DATA;
}
p = m_pDataBuf + m_iBufOutPos;
iBlockBytes = *((int64_t*)p);
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
//#ifdef MY_DEBUG
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
#if CHECK_QUEUE_ERR
if (m_bUseMutex)
CheckQueue(__LINE__, 0);
#endif
if (iDataBytes > 0)//not pading block
{
m_iProcBlocks++;
*e = (void*)(p + HEADER_BYTES);
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return iDataBytes;
}
else
{//pading block
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
m_iProcPaddingBlocks++;
char strMsg[200];
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
sprintf(strMsg, "***Error in DeQueue: 2 successive padding block!@line %d %s\n", __LINE__, g_strDQCurTime);
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s", strMsg);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(0);
return EMPTY_QUEUE_NO_DATA;
}
}
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
char strMsg[200];
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
sprintf(strMsg, "***Error in DeQueue: @line %d %s\n", __LINE__, g_strDQCurTime);
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s", strMsg);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(0);
return EMPTY_QUEUE_NO_DATA;
}
//----------------------------------------------------------------------------------------
void DataQueue::OutPutStatus(char* strBuf, int iBufLen)
{
int iLen = iBufLen, iStrLen;
char* p = strBuf;
snprintf(p, iLen, "m_iBufOutPos = % ld, m_iBufInPos = % ld, m_iFreeBytes = % ld, m_iQueueBufBytes = % ld, m_iRcvBlocks = % ld, m_iRcvBytes = % ld\n",
m_iBufOutPos, m_iBufInPos, m_iFreeBytes, m_iQueueBufBytes, m_iRcvBlocks, m_iRcvBytes);
iStrLen = strlen(p);
p += iStrLen;
iLen -= iStrLen;
snprintf(p, iLen, "\m_iProcBlocks=%ld, m_iPaddingBlocks=%ld, m_iProcPaddingBlocks=%ld, m_iMaxBlockByte=%ld, m_iReservedBufBytes=%ld, m_iReservedBlockNum=%ld,m_iEmptyTimes=%ld, m_iFullTimes=%ld\n",
m_iProcBlocks, m_iPaddingBlocks, m_iProcPaddingBlocks, m_iMaxBlockByte, m_iReservedBufBytes, m_iReservedBlockNum, m_iEmptyTimes, m_iFullTimes);
iStrLen = strlen(p);
p += iStrLen;
iLen -= iStrLen;
}
//----------------------------------------------------------------------------------------
bool DataQueue::CheckQueue(int iLineNo, int iNeedBytes)
{
int64_t iFreeBytes;
int iErr = 0;
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) );//
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : (m_iBufOutPos - m_iBufInPos));
if (m_iBufOutPos < 0)
iErr |= 0x1;
if (m_iBufInPos < 0)
iErr |= 0x2;
if (iFreeBytes < (m_iReservedBlockNum - 2) * m_iMaxBlockByte)
iErr |= 0x4;
if (iFreeBytes < iNeedBytes)
iErr |= 0x8;
if (m_bUseMutex && (m_iProcBlocks == m_iRcvBlocks)
&& (m_iPaddingBlocks == m_iProcPaddingBlocks)
&& (m_iBufOutPos != m_iBufInPos))
iErr |= 0x10;
if (iErr)
{
char strMsg[1000];
char strInfo[1000];
OutPutStatus(strMsg, sizeof(strMsg));
snprintf(strInfo, sizeof(strInfo), "******QueueError %d @line %d, iFreeBytes=%ld , iNeedBytes=%d:\n%s \n",
iErr, iLineNo, iFreeBytes, iNeedBytes, strMsg);
FILE* fp = fopen("err.log", "a+");
if (fp)
{
fprintf(fp, "%s\n", strInfo);
fflush(fp);
fclose(fp);
}
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s\n", strInfo);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
//throw __LINE__;
return false;
}
return true;
}
//----------------------------------------------------------------------------------------
extern "C" voidPtr __attribute__((visibility("default"))) OpenDataQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)
{
//DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, bConcurrent);//bUseMutex = bConcurrent
DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, true);//bUseMutex = true
return (void*)p;
}
//----------------------------------------------------------------------------------------
extern "C" bool __attribute__((visibility("default"))) GetDataQueueInfo(void* pQueue, int64_t iSize[5])
{//return the total data blocks, and set the buf with info string
int64_t iFreeBytes, iBufBytes;
DataQueue* p = (DataQueue*)pQueue;
if (!pQueue)
return false;
iSize[0] = p->GetBufBytes();
iSize[1] = p->GetFreeBytes();
iSize[2] = p->GetTotalRcvBlockCnt();
iSize[3] = p->GetProcBlockCnt();
iSize[4] = p->GetBlockCnt();
return true;
}
//----------------------------------------------------------------------------------------
#define LOG_QUEUE_INFO 0
extern "C" int64_t __attribute__((visibility("default"))) EnDataQueue(void* pQueue, void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
{// int DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
int64_t iRet =0;
if (pQueue)
{
#if LOG_QUEUE_INFO
char strBuf[500];
char strBuf2[500];
FILE* fp = NULL;
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
fp = fopen(strBuf, "a+");
if (fp)
{
snprintf(strBuf, sizeof(strBuf), "EnQueue1: iDataBytes=%ld, extradata1=%ld, extradata2=%ld\n",
iDataBytes, iExtraDataBytes1, iExtraDataBytes2);
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "%s%s", strBuf, strBuf2);
}
#endif
iRet = ((DataQueue*)pQueue)->EnQueue(e, iDataBytes, extradata1, iExtraDataBytes1, extradata2, iExtraDataBytes2);
#if LOG_QUEUE_INFO
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "EnQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
fclose(fp);
}
#endif
}
return iRet;
}
extern "C" int64_t __attribute__((visibility("default"))) DeDataQueue(void* pQueue, void** e)
{
int64_t iRet = 0;
if (pQueue)
{
#if LOG_QUEUE_INFO
char strBuf[500];
char strBuf2[500];
FILE* fp = NULL;
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
fp = fopen(strBuf, "a+");
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "DeQueue1: %s\n", strBuf2);
}
#endif
iRet = ((DataQueue*)pQueue)->DeQueue(e);
#if LOG_QUEUE_INFO
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "DeQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
fclose(fp);
}
#endif
}
return iRet;
}
extern "C" bool __attribute__((visibility("default"))) DataQueueIsFull(void* pQueue)
{
return ((DataQueue*)pQueue)->IsFull();
}
extern "C" void __attribute__((visibility("default"))) ResetDataQueue(void* pQueue)
{//delete all data in the queue, and reset all counters to 0
((DataQueue*)pQueue)->ResetDataQueue();
}
extern "C" void __attribute__((visibility("default"))) CloseDataQueue(void*& pQueue)
{
if (pQueue)
delete (DataQueue*)pQueue;
pQueue = NULL;
}