extern_interface/logqueueconfig/dataqueue.cpp

538 lines
17 KiB
C++
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//#pragma once
#include <time.h>
#include "dataqueue.h"
#include <stdio.h>
#define BLOCK_PADDING_SIZE 64 //bytes
#define HEADER_BYTES 16//remain: block-bytes: 8 bytes int64_t + data bytes: 8bytes int64_t
#define CHECK_QUEUE_ERR 1
#define myMax(x,y) ((x)>(y)?(x):(y))
typedef void* voidPtr;
static char g_strDQCurTime[50] = "";
//----------------------------------------------------------------------------------------
inline char* GetCurTimeStr(char* strBuf, int iBufSize)// g_strDQCurTime
{
time_t tt;
tt = time(NULL);
strftime(strBuf, iBufSize, "%Y.%m.%d %H:%M:%S", localtime(&tt));//gener string with format:"YYYY-MM-DD hh:mm:ss"
return strBuf;
}
#define DATA_QUEUE_RETAIN_BLOCK_CNT 4
//----------------------------------------------------------------------------------------
DataQueue::DataQueue(int64_t iSizeInKB, int64_t iMaxBlockSize, bool bUseMutex, int iReadThreadCnt)
{
m_pDataBuf = NULL;
m_bUseMutex = bUseMutex;
if (m_bUseMutex)
pthread_mutex_init(&m_BufMutex, NULL);
InitQueue(iSizeInKB, iMaxBlockSize, iReadThreadCnt);//KB
}
//----------------------------------------------------------------------------------------
DataQueue::~DataQueue()
{
if (m_pDataBuf != NULL)
free((void*)m_pDataBuf);
if (m_bUseMutex)
pthread_mutex_destroy(&m_BufMutex);
}
//----------------------------------------------------------------------------------------
int DataQueue::InitQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)//KB
{
if (m_pDataBuf != NULL)
free((void*)m_pDataBuf);
m_pDataBuf = NULL;
m_iMaxBlockByte = iMaxBlockBytes;
m_iReservedBlockNum = iReadThreadCnt + DATA_QUEUE_RETAIN_BLOCK_CNT;
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4 times of m_iMaxBlockByte
int64_t iTotalByes = 0;
if (iTotalKB4QueueBuf > 0)
{
iTotalByes = iTotalKB4QueueBuf * 1024 + 2 * HEADER_BYTES + m_iReservedBufBytes; //KB;
m_pDataBuf = (uint8_t*)malloc(iTotalByes); //for safe sake in case of over step the border
if (m_pDataBuf == NULL)
{
fprintf(stderr, "\n----------------------------------------------------------------\n");
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
fprintf(stderr, "***Error: fail to malloc for data queue, mem:%ld @line:%d %s\n", iTotalKB4QueueBuf, __LINE__, g_strDQCurTime);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(4);
}
}
m_iFreeBytes = m_iQueueBufBytes = iTotalByes;
ResetDataQueue();
return 0;
}
//----------------------------------------------------------------------------------------
void DataQueue::ResetDataQueue()
{
if (m_bUseMutex)
pthread_mutex_lock(&m_BufMutex);
m_iBufOutPos = m_iBufInPos = 0;
m_iRcvBlocks = 0;
m_iRcvBytes = 0;
m_iProcBlocks = 0;
m_iEmptyTimes = 0;
m_iFullTimes = 0;
m_iPaddingBlocks = m_iProcPaddingBlocks = 0;
m_iFreeBytes = m_iQueueBufBytes;
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetBufBytes()
{//Get buf size
int64_t iBytes;
iBytes = m_iQueueBufBytes - m_iReservedBufBytes;
if (iBytes < 0)
iBytes = 0;
return iBytes;
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetQueueFreeBytes()
{
//int64_t iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos) ) );
int64_t iFreeBytes;
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
iFreeBytes -= m_iReservedBufBytes;
if (iFreeBytes < 0)//iFreeBytes never is 0 because of m_iReservedBufBytes, unless the beginning state where m_iBufOutPos and m_iBufInPos are same
iFreeBytes = 0;
return iFreeBytes;
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::GetFreeBytes()
{
return GetQueueFreeBytes();
}
//----------------------------------------------------------------------------------------
inline bool DataQueue::CanRcv(int iNeedBytes)
{
bool bCanRcv;
m_iMaxBlockByte = iNeedBytes > m_iMaxBlockByte ? iNeedBytes : m_iMaxBlockByte;
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4<><34><EFBFBD><EFBFBD>m_iMaxBlockByte
bCanRcv = (GetQueueFreeBytes() >= iNeedBytes);
return bCanRcv;
}
//----------------------------------------------------------------------------------------
bool DataQueue::IsFull()
{
//---- 这种判断方式可能存在错误期间m_iBufOutPos到尾部折回到队列开头变得小于m_iBufInPos
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)));//
bool bFull;
int64_t iFreeBytes;
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
bFull = (iFreeBytes <= m_iReservedBufBytes);
// if( bFull )
// printf("dataqueue is full, size:%ld free:%ld - %ld < retain:%ld in:%ld out:%ld\n",
// m_iQueueBufBytes,iFreeBytes, GetFreeBytes(),m_iReservedBufBytes,m_iBufInPos,m_iBufOutPos );
return bFull;
}
//----------------------------------------------------------------------------------------
bool DataQueue::IsEmpty()
{
return (m_iBufInPos == m_iBufOutPos);
}
//----------------------------------------------------------------------------------------
int64_t DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata, int64_t iExtraDataBytes, void* extradata2, int64_t iExtraDataBytes2)
{ // >=0 for normal & success,
//<0: error
int64_t iBlockBytes;
void* pWritePos = NULL;
int64_t iPadding = 0;
if (iDataBytes < 0 || iExtraDataBytes < 0 || iExtraDataBytes2 < 0)
{
fprintf(stderr, "iDataBytes %ld iExtraDataBytes %ld iExtraDataBytes %ld\n",
iDataBytes, iExtraDataBytes, iExtraDataBytes2);
fflush(stderr);
throw __LINE__;
}
if (m_bUseMutex)
pthread_mutex_lock(&m_BufMutex);
int64_t iTotalDataBytes = iDataBytes + iExtraDataBytes + iExtraDataBytes2;
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, 0);
#endif
iBlockBytes = (((iTotalDataBytes + 7) >> 3) << 3)//8<>ı<EFBFBD><C4B1><EFBFBD>
+ HEADER_BYTES
+ BLOCK_PADDING_SIZE;// 32
if (!CanRcv(iBlockBytes))
{
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
return QUEUE_IS_FULL;
}
if (m_iBufInPos >= m_iBufOutPos)
{
if (m_iBufInPos + iBlockBytes > m_iQueueBufBytes)
{//create a padding block
*((int64_t*)(m_pDataBuf + m_iBufInPos)) = m_iQueueBufBytes - m_iBufInPos; //m_iQueueBufBytes: the next block is at the begining of the buf
*((int64_t*)(m_pDataBuf + m_iBufInPos + sizeof(int64_t))) = 0;
//m_iRcvBlocks++;
//printf("+iBlockBytes:%d, iDataByte:%d\n", m_iQueueBufBytes - m_iBufInPos, 0);
m_iPaddingBlocks++;
iPadding++;
m_iBufInPos = 0;
}
}
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, iBlockBytes);
#endif
pWritePos = m_pDataBuf + m_iBufInPos;
*((int64_t*)pWritePos) = iBlockBytes;
*((int64_t*)(pWritePos + sizeof(int64_t))) = iTotalDataBytes;
memcpy(pWritePos + HEADER_BYTES, e, iDataBytes);
if (extradata && iExtraDataBytes > 0)
memcpy(pWritePos + HEADER_BYTES + iDataBytes, extradata, iExtraDataBytes);//iExtraDataBytes
if (extradata2 && iExtraDataBytes2 > 0)
memcpy(pWritePos + HEADER_BYTES + iDataBytes + iExtraDataBytes, extradata2, iExtraDataBytes2);//iExtraDataBytes
m_iRcvBlocks++;
m_iRcvBytes += iTotalDataBytes;
m_iBufInPos = (m_iBufInPos + iBlockBytes) % m_iQueueBufBytes;
//#ifdef MY_DEBUG
// printf("+iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
return iTotalDataBytes;
}
//----------------------------------------------------------------------------------------
#ifdef ARM_VERSION //NO_PORN_FUNC
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
#else
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
#endif
int64_t DataQueue::DeQueue(void** e)
{//return value: Bytes, >0 if sucess, =0 no data, < 0 error. No data copy, only return the pointer of data area
void* p;
int iBlockBytes, iDataBytes;
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
{
pthread_mutex_lock(&m_BufMutex);
#if CHECK_QUEUE_ERR
CheckQueue(__LINE__, 0);
#endif
}
#endif
if (IsEmpty())
{
//#ifdef MY_DEBUG
// m_iEmptyTimes++;
// if ( m_iEmptyTimes <5 || ( m_iEmptyTimes % 50 == 0) )
// {
// printf("***Block Queue IsEmpty %1dth: m_iBufInPos=%lu, m_iBufOutPos=%lu, m_iQueueBufBytes=%lu, m_iReservedBufBytes=%lu\n",
// m_iEmptyTimes, m_iBufInPos, m_iBufOutPos, m_iQueueBufBytes, m_iReservedBufBytes); //<2F>ճ<EFBFBD>4<EFBFBD><34>
// fflush(stdout);
// }
//#endif
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return EMPTY_QUEUE_NO_DATA;
}
p = m_pDataBuf + m_iBufOutPos;
iBlockBytes = *((int64_t*)p);
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
//int iDataBytes = *((int*)(p + sizeof(int)));
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
//#ifdef MY_DEBUG
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
if (iDataBytes > 0)//not pading block
{
m_iProcBlocks++;
*e = (void*)(p + HEADER_BYTES);
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return iDataBytes;
}
else//padding block
{
m_iProcPaddingBlocks++;
if (IsEmpty())
{
//printf("CSegmentPool IsEmpty\n");
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return EMPTY_QUEUE_NO_DATA;
}
p = m_pDataBuf + m_iBufOutPos;
iBlockBytes = *((int64_t*)p);
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
//#ifdef MY_DEBUG
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
//#endif
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
#if CHECK_QUEUE_ERR
if (m_bUseMutex)
CheckQueue(__LINE__, 0);
#endif
if (iDataBytes > 0)//not pading block
{
m_iProcBlocks++;
*e = (void*)(p + HEADER_BYTES);
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
return iDataBytes;
}
else
{//pading block
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
m_iProcPaddingBlocks++;
char strMsg[200];
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
sprintf(strMsg, "***Error in DeQueue: 2 successive padding block!@line %d %s\n", __LINE__, g_strDQCurTime);
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s", strMsg);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(0);
return EMPTY_QUEUE_NO_DATA;
}
}
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
if (m_bUseMutex)
pthread_mutex_unlock(&m_BufMutex);
#endif
char strMsg[200];
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
sprintf(strMsg, "***Error in DeQueue: @line %d %s\n", __LINE__, g_strDQCurTime);
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s", strMsg);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
exit(0);
return EMPTY_QUEUE_NO_DATA;
}
//----------------------------------------------------------------------------------------
void DataQueue::OutPutStatus(char* strBuf, int iBufLen)
{
int iLen = iBufLen, iStrLen;
char* p = strBuf;
snprintf(p, iLen, "m_iBufOutPos = % ld, m_iBufInPos = % ld, m_iFreeBytes = % ld, m_iQueueBufBytes = % ld, m_iRcvBlocks = % ld, m_iRcvBytes = % ld\n",
m_iBufOutPos, m_iBufInPos, m_iFreeBytes, m_iQueueBufBytes, m_iRcvBlocks, m_iRcvBytes);
iStrLen = strlen(p);
p += iStrLen;
iLen -= iStrLen;
snprintf(p, iLen, "\m_iProcBlocks=%ld, m_iPaddingBlocks=%ld, m_iProcPaddingBlocks=%ld, m_iMaxBlockByte=%ld, m_iReservedBufBytes=%ld, m_iReservedBlockNum=%ld,m_iEmptyTimes=%ld, m_iFullTimes=%ld\n",
m_iProcBlocks, m_iPaddingBlocks, m_iProcPaddingBlocks, m_iMaxBlockByte, m_iReservedBufBytes, m_iReservedBlockNum, m_iEmptyTimes, m_iFullTimes);
iStrLen = strlen(p);
p += iStrLen;
iLen -= iStrLen;
}
//----------------------------------------------------------------------------------------
bool DataQueue::CheckQueue(int iLineNo, int iNeedBytes)
{
int64_t iFreeBytes;
int iErr = 0;
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) );//
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : (m_iBufOutPos - m_iBufInPos));
if (m_iBufOutPos < 0)
iErr |= 0x1;
if (m_iBufInPos < 0)
iErr |= 0x2;
if (iFreeBytes < (m_iReservedBlockNum - 2) * m_iMaxBlockByte)
iErr |= 0x4;
if (iFreeBytes < iNeedBytes)
iErr |= 0x8;
if (m_bUseMutex && (m_iProcBlocks == m_iRcvBlocks)
&& (m_iPaddingBlocks == m_iProcPaddingBlocks)
&& (m_iBufOutPos != m_iBufInPos))
iErr |= 0x10;
if (iErr)
{
char strMsg[1000];
char strInfo[1000];
OutPutStatus(strMsg, sizeof(strMsg));
snprintf(strInfo, sizeof(strInfo), "******QueueError %d @line %d, iFreeBytes=%ld , iNeedBytes=%d:\n%s \n",
iErr, iLineNo, iFreeBytes, iNeedBytes, strMsg);
FILE* fp = fopen("err.log", "a+");
if (fp)
{
fprintf(fp, "%s\n", strInfo);
fflush(fp);
fclose(fp);
}
fprintf(stderr, "\n----------------------------------------------------------------\n");
fprintf(stderr, "%s\n", strInfo);
fprintf(stderr, "----------------------------------------------------------------\n");
fflush(stderr);
//throw __LINE__;
return false;
}
return true;
}
//----------------------------------------------------------------------------------------
extern "C" voidPtr __attribute__((visibility("default"))) OpenDataQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)
{
//DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, bConcurrent);//bUseMutex = bConcurrent
DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, true);//bUseMutex = true
return (void*)p;
}
//----------------------------------------------------------------------------------------
extern "C" bool __attribute__((visibility("default"))) GetDataQueueInfo(void* pQueue, int64_t iSize[5])
{//return the total data blocks, and set the buf with info string
int64_t iFreeBytes, iBufBytes;
DataQueue* p = (DataQueue*)pQueue;
if (!pQueue)
return false;
iSize[0] = p->GetBufBytes();
iSize[1] = p->GetFreeBytes();
iSize[2] = p->GetTotalRcvBlockCnt();
iSize[3] = p->GetProcBlockCnt();
iSize[4] = p->GetBlockCnt();
return true;
}
//----------------------------------------------------------------------------------------
#define LOG_QUEUE_INFO 0
extern "C" int64_t __attribute__((visibility("default"))) EnDataQueue(void* pQueue, void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
{// int DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
int64_t iRet =0;
if (pQueue)
{
#if LOG_QUEUE_INFO
char strBuf[500];
char strBuf2[500];
FILE* fp = NULL;
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
fp = fopen(strBuf, "a+");
if (fp)
{
snprintf(strBuf, sizeof(strBuf), "EnQueue1: iDataBytes=%ld, extradata1=%ld, extradata2=%ld\n",
iDataBytes, iExtraDataBytes1, iExtraDataBytes2);
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "%s%s", strBuf, strBuf2);
}
#endif
iRet = ((DataQueue*)pQueue)->EnQueue(e, iDataBytes, extradata1, iExtraDataBytes1, extradata2, iExtraDataBytes2);
#if LOG_QUEUE_INFO
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "EnQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
fclose(fp);
}
#endif
}
return iRet;
}
extern "C" int64_t __attribute__((visibility("default"))) DeDataQueue(void* pQueue, void** e)
{
int64_t iRet = 0;
if (pQueue)
{
#if LOG_QUEUE_INFO
char strBuf[500];
char strBuf2[500];
FILE* fp = NULL;
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
fp = fopen(strBuf, "a+");
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "DeQueue1: %s\n", strBuf2);
}
#endif
iRet = ((DataQueue*)pQueue)->DeQueue(e);
#if LOG_QUEUE_INFO
if (fp)
{
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
fprintf(fp, "DeQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
fclose(fp);
}
#endif
}
return iRet;
}
extern "C" bool __attribute__((visibility("default"))) DataQueueIsFull(void* pQueue)
{
return ((DataQueue*)pQueue)->IsFull();
}
extern "C" void __attribute__((visibility("default"))) ResetDataQueue(void* pQueue)
{//delete all data in the queue, and reset all counters to 0
((DataQueue*)pQueue)->ResetDataQueue();
}
extern "C" void __attribute__((visibility("default"))) CloseDataQueue(void*& pQueue)
{
if (pQueue)
delete (DataQueue*)pQueue;
pQueue = NULL;
}