//#pragma once #include #include "dataqueue.h" #include #define BLOCK_PADDING_SIZE 64 //bytes #define HEADER_BYTES 16//remain: block-bytes: 8 bytes int64_t + data bytes: 8bytes int64_t #define CHECK_QUEUE_ERR 1 #define myMax(x,y) ((x)>(y)?(x):(y)) typedef void* voidPtr; static char g_strDQCurTime[50] = ""; //---------------------------------------------------------------------------------------- inline char* GetCurTimeStr(char* strBuf, int iBufSize)// g_strDQCurTime { time_t tt; tt = time(NULL); strftime(strBuf, iBufSize, "%Y.%m.%d %H:%M:%S", localtime(&tt));//gener string with format:"YYYY-MM-DD hh:mm:ss" return strBuf; } #define DATA_QUEUE_RETAIN_BLOCK_CNT 4 //---------------------------------------------------------------------------------------- DataQueue::DataQueue(int64_t iSizeInKB, int64_t iMaxBlockSize, bool bUseMutex, int iReadThreadCnt) { m_pDataBuf = NULL; m_bUseMutex = bUseMutex; if (m_bUseMutex) pthread_mutex_init(&m_BufMutex, NULL); InitQueue(iSizeInKB, iMaxBlockSize, iReadThreadCnt);//KB } //---------------------------------------------------------------------------------------- DataQueue::~DataQueue() { if (m_pDataBuf != NULL) free((void*)m_pDataBuf); if (m_bUseMutex) pthread_mutex_destroy(&m_BufMutex); } //---------------------------------------------------------------------------------------- int DataQueue::InitQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)//KB { if (m_pDataBuf != NULL) free((void*)m_pDataBuf); m_pDataBuf = NULL; m_iMaxBlockByte = iMaxBlockBytes; m_iReservedBlockNum = iReadThreadCnt + DATA_QUEUE_RETAIN_BLOCK_CNT; m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4 times of m_iMaxBlockByte int64_t iTotalByes = 0; if (iTotalKB4QueueBuf > 0) { iTotalByes = iTotalKB4QueueBuf * 1024 + 2 * HEADER_BYTES + m_iReservedBufBytes; //KB; m_pDataBuf = (uint8_t*)malloc(iTotalByes); //for safe sake in case of over step the border if (m_pDataBuf == NULL) { fprintf(stderr, "\n----------------------------------------------------------------\n"); GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime)); fprintf(stderr, "***Error: fail to malloc for data queue, mem:%ld @line:%d %s\n", iTotalKB4QueueBuf, __LINE__, g_strDQCurTime); fprintf(stderr, "----------------------------------------------------------------\n"); fflush(stderr); exit(4); } } m_iFreeBytes = m_iQueueBufBytes = iTotalByes; ResetDataQueue(); return 0; } //---------------------------------------------------------------------------------------- void DataQueue::ResetDataQueue() { if (m_bUseMutex) pthread_mutex_lock(&m_BufMutex); m_iBufOutPos = m_iBufInPos = 0; m_iRcvBlocks = 0; m_iRcvBytes = 0; m_iProcBlocks = 0; m_iEmptyTimes = 0; m_iFullTimes = 0; m_iPaddingBlocks = m_iProcPaddingBlocks = 0; m_iFreeBytes = m_iQueueBufBytes; if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); } //---------------------------------------------------------------------------------------- int64_t DataQueue::GetBufBytes() {//Get buf size int64_t iBytes; iBytes = m_iQueueBufBytes - m_iReservedBufBytes; if (iBytes < 0) iBytes = 0; return iBytes; } //---------------------------------------------------------------------------------------- int64_t DataQueue::GetQueueFreeBytes() { //int64_t iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos) ) ); int64_t iFreeBytes; iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos))); iFreeBytes -= m_iReservedBufBytes; if (iFreeBytes < 0)//iFreeBytes never is 0 because of m_iReservedBufBytes, unless the beginning state where m_iBufOutPos and m_iBufInPos are same iFreeBytes = 0; return iFreeBytes; } //---------------------------------------------------------------------------------------- int64_t DataQueue::GetFreeBytes() { return GetQueueFreeBytes(); } //---------------------------------------------------------------------------------------- inline bool DataQueue::CanRcv(int iNeedBytes) { bool bCanRcv; m_iMaxBlockByte = iNeedBytes > m_iMaxBlockByte ? iNeedBytes : m_iMaxBlockByte; m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4����m_iMaxBlockByte bCanRcv = (GetQueueFreeBytes() >= iNeedBytes); return bCanRcv; } //---------------------------------------------------------------------------------------- bool DataQueue::IsFull() { //---- 这种判断方式,可能存在错误,即:期间m_iBufOutPos到尾部,折回到队列开头,变得小于m_iBufInPos //int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)));// bool bFull; int64_t iFreeBytes; iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos))); bFull = (iFreeBytes <= m_iReservedBufBytes); // if( bFull ) // printf("dataqueue is full, size:%ld free:%ld - %ld < retain:%ld in:%ld out:%ld\n", // m_iQueueBufBytes,iFreeBytes, GetFreeBytes(),m_iReservedBufBytes,m_iBufInPos,m_iBufOutPos ); return bFull; } //---------------------------------------------------------------------------------------- bool DataQueue::IsEmpty() { return (m_iBufInPos == m_iBufOutPos); } //---------------------------------------------------------------------------------------- int64_t DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata, int64_t iExtraDataBytes, void* extradata2, int64_t iExtraDataBytes2) { // >=0 for normal & success, //<0: error int64_t iBlockBytes; void* pWritePos = NULL; int64_t iPadding = 0; if (iDataBytes < 0 || iExtraDataBytes < 0 || iExtraDataBytes2 < 0) { fprintf(stderr, "iDataBytes %ld iExtraDataBytes %ld iExtraDataBytes %ld\n", iDataBytes, iExtraDataBytes, iExtraDataBytes2); fflush(stderr); throw __LINE__; } if (m_bUseMutex) pthread_mutex_lock(&m_BufMutex); int64_t iTotalDataBytes = iDataBytes + iExtraDataBytes + iExtraDataBytes2; #if CHECK_QUEUE_ERR CheckQueue(__LINE__, 0); #endif iBlockBytes = (((iTotalDataBytes + 7) >> 3) << 3)//8�ı��� + HEADER_BYTES + BLOCK_PADDING_SIZE;// 32 if (!CanRcv(iBlockBytes)) { if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); return QUEUE_IS_FULL; } if (m_iBufInPos >= m_iBufOutPos) { if (m_iBufInPos + iBlockBytes > m_iQueueBufBytes) {//create a padding block *((int64_t*)(m_pDataBuf + m_iBufInPos)) = m_iQueueBufBytes - m_iBufInPos; //m_iQueueBufBytes: the next block is at the begining of the buf *((int64_t*)(m_pDataBuf + m_iBufInPos + sizeof(int64_t))) = 0; //m_iRcvBlocks++; //printf("+iBlockBytes:%d, iDataByte:%d\n", m_iQueueBufBytes - m_iBufInPos, 0); m_iPaddingBlocks++; iPadding++; m_iBufInPos = 0; } } #if CHECK_QUEUE_ERR CheckQueue(__LINE__, iBlockBytes); #endif pWritePos = m_pDataBuf + m_iBufInPos; *((int64_t*)pWritePos) = iBlockBytes; *((int64_t*)(pWritePos + sizeof(int64_t))) = iTotalDataBytes; memcpy(pWritePos + HEADER_BYTES, e, iDataBytes); if (extradata && iExtraDataBytes > 0) memcpy(pWritePos + HEADER_BYTES + iDataBytes, extradata, iExtraDataBytes);//iExtraDataBytes if (extradata2 && iExtraDataBytes2 > 0) memcpy(pWritePos + HEADER_BYTES + iDataBytes + iExtraDataBytes, extradata2, iExtraDataBytes2);//iExtraDataBytes m_iRcvBlocks++; m_iRcvBytes += iTotalDataBytes; m_iBufInPos = (m_iBufInPos + iBlockBytes) % m_iQueueBufBytes; //#ifdef MY_DEBUG // printf("+iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes); //#endif if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); return iTotalDataBytes; } //---------------------------------------------------------------------------------------- #ifdef ARM_VERSION //NO_PORN_FUNC #define THERE_ARE_CONCURRENT_DEQUEUE_OP 1 #else #define THERE_ARE_CONCURRENT_DEQUEUE_OP 1 #endif int64_t DataQueue::DeQueue(void** e) {//return value: Bytes, >0 if sucess, =0 no data, < 0 error. No data copy, only return the pointer of data area void* p; int iBlockBytes, iDataBytes; #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) { pthread_mutex_lock(&m_BufMutex); #if CHECK_QUEUE_ERR CheckQueue(__LINE__, 0); #endif } #endif if (IsEmpty()) { //#ifdef MY_DEBUG // m_iEmptyTimes++; // if ( m_iEmptyTimes <5 || ( m_iEmptyTimes % 50 == 0) ) // { // printf("***Block Queue IsEmpty %1dth: m_iBufInPos=%lu, m_iBufOutPos=%lu, m_iQueueBufBytes=%lu, m_iReservedBufBytes=%lu\n", // m_iEmptyTimes, m_iBufInPos, m_iBufOutPos, m_iQueueBufBytes, m_iReservedBufBytes); //�ճ�4�� // fflush(stdout); // } //#endif #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif return EMPTY_QUEUE_NO_DATA; } p = m_pDataBuf + m_iBufOutPos; iBlockBytes = *((int64_t*)p); iDataBytes = *((int64_t*)(p + sizeof(int64_t))); //int iDataBytes = *((int*)(p + sizeof(int))); m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes; //#ifdef MY_DEBUG // printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes); //#endif if (iDataBytes > 0)//not pading block { m_iProcBlocks++; *e = (void*)(p + HEADER_BYTES); #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif return iDataBytes; } else//padding block { m_iProcPaddingBlocks++; if (IsEmpty()) { //printf("CSegmentPool IsEmpty\n"); #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif return EMPTY_QUEUE_NO_DATA; } p = m_pDataBuf + m_iBufOutPos; iBlockBytes = *((int64_t*)p); iDataBytes = *((int64_t*)(p + sizeof(int64_t))); //#ifdef MY_DEBUG // printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes); //#endif m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes; #if CHECK_QUEUE_ERR if (m_bUseMutex) CheckQueue(__LINE__, 0); #endif if (iDataBytes > 0)//not pading block { m_iProcBlocks++; *e = (void*)(p + HEADER_BYTES); #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif return iDataBytes; } else {//pading block #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif m_iProcPaddingBlocks++; char strMsg[200]; GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime)); sprintf(strMsg, "***Error in DeQueue: 2 successive padding block!@line %d %s\n", __LINE__, g_strDQCurTime); fprintf(stderr, "\n----------------------------------------------------------------\n"); fprintf(stderr, "%s", strMsg); fprintf(stderr, "----------------------------------------------------------------\n"); fflush(stderr); exit(0); return EMPTY_QUEUE_NO_DATA; } } #if THERE_ARE_CONCURRENT_DEQUEUE_OP if (m_bUseMutex) pthread_mutex_unlock(&m_BufMutex); #endif char strMsg[200]; GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime)); sprintf(strMsg, "***Error in DeQueue: @line %d %s\n", __LINE__, g_strDQCurTime); fprintf(stderr, "\n----------------------------------------------------------------\n"); fprintf(stderr, "%s", strMsg); fprintf(stderr, "----------------------------------------------------------------\n"); fflush(stderr); exit(0); return EMPTY_QUEUE_NO_DATA; } //---------------------------------------------------------------------------------------- void DataQueue::OutPutStatus(char* strBuf, int iBufLen) { int iLen = iBufLen, iStrLen; char* p = strBuf; snprintf(p, iLen, "m_iBufOutPos = % ld, m_iBufInPos = % ld, m_iFreeBytes = % ld, m_iQueueBufBytes = % ld, m_iRcvBlocks = % ld, m_iRcvBytes = % ld\n", m_iBufOutPos, m_iBufInPos, m_iFreeBytes, m_iQueueBufBytes, m_iRcvBlocks, m_iRcvBytes); iStrLen = strlen(p); p += iStrLen; iLen -= iStrLen; snprintf(p, iLen, "\m_iProcBlocks=%ld, m_iPaddingBlocks=%ld, m_iProcPaddingBlocks=%ld, m_iMaxBlockByte=%ld, m_iReservedBufBytes=%ld, m_iReservedBlockNum=%ld,m_iEmptyTimes=%ld, m_iFullTimes=%ld\n", m_iProcBlocks, m_iPaddingBlocks, m_iProcPaddingBlocks, m_iMaxBlockByte, m_iReservedBufBytes, m_iReservedBlockNum, m_iEmptyTimes, m_iFullTimes); iStrLen = strlen(p); p += iStrLen; iLen -= iStrLen; } //---------------------------------------------------------------------------------------- bool DataQueue::CheckQueue(int iLineNo, int iNeedBytes) { int64_t iFreeBytes; int iErr = 0; //int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) );// iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : (m_iBufOutPos - m_iBufInPos)); if (m_iBufOutPos < 0) iErr |= 0x1; if (m_iBufInPos < 0) iErr |= 0x2; if (iFreeBytes < (m_iReservedBlockNum - 2) * m_iMaxBlockByte) iErr |= 0x4; if (iFreeBytes < iNeedBytes) iErr |= 0x8; if (m_bUseMutex && (m_iProcBlocks == m_iRcvBlocks) && (m_iPaddingBlocks == m_iProcPaddingBlocks) && (m_iBufOutPos != m_iBufInPos)) iErr |= 0x10; if (iErr) { char strMsg[1000]; char strInfo[1000]; OutPutStatus(strMsg, sizeof(strMsg)); snprintf(strInfo, sizeof(strInfo), "******QueueError %d @line %d, iFreeBytes=%ld , iNeedBytes=%d:\n%s \n", iErr, iLineNo, iFreeBytes, iNeedBytes, strMsg); FILE* fp = fopen("err.log", "a+"); if (fp) { fprintf(fp, "%s\n", strInfo); fflush(fp); fclose(fp); } fprintf(stderr, "\n----------------------------------------------------------------\n"); fprintf(stderr, "%s\n", strInfo); fprintf(stderr, "----------------------------------------------------------------\n"); fflush(stderr); //throw __LINE__; return false; } return true; } //---------------------------------------------------------------------------------------- extern "C" voidPtr __attribute__((visibility("default"))) OpenDataQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt) { //DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, bConcurrent);//bUseMutex = bConcurrent DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, true);//bUseMutex = true return (void*)p; } //---------------------------------------------------------------------------------------- extern "C" bool __attribute__((visibility("default"))) GetDataQueueInfo(void* pQueue, int64_t iSize[5]) {//return the total data blocks, and set the buf with info string int64_t iFreeBytes, iBufBytes; DataQueue* p = (DataQueue*)pQueue; if (!pQueue) return false; iSize[0] = p->GetBufBytes(); iSize[1] = p->GetFreeBytes(); iSize[2] = p->GetTotalRcvBlockCnt(); iSize[3] = p->GetProcBlockCnt(); iSize[4] = p->GetBlockCnt(); return true; } //---------------------------------------------------------------------------------------- #define LOG_QUEUE_INFO 0 extern "C" int64_t __attribute__((visibility("default"))) EnDataQueue(void* pQueue, void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2) {// int DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2) int64_t iRet =0; if (pQueue) { #if LOG_QUEUE_INFO char strBuf[500]; char strBuf2[500]; FILE* fp = NULL; snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue); fp = fopen(strBuf, "a+"); if (fp) { snprintf(strBuf, sizeof(strBuf), "EnQueue1: iDataBytes=%ld, extradata1=%ld, extradata2=%ld\n", iDataBytes, iExtraDataBytes1, iExtraDataBytes2); ((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2)); fprintf(fp, "%s%s", strBuf, strBuf2); } #endif iRet = ((DataQueue*)pQueue)->EnQueue(e, iDataBytes, extradata1, iExtraDataBytes1, extradata2, iExtraDataBytes2); #if LOG_QUEUE_INFO if (fp) { ((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2)); fprintf(fp, "EnQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2); fclose(fp); } #endif } return iRet; } extern "C" int64_t __attribute__((visibility("default"))) DeDataQueue(void* pQueue, void** e) { int64_t iRet = 0; if (pQueue) { #if LOG_QUEUE_INFO char strBuf[500]; char strBuf2[500]; FILE* fp = NULL; snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue); fp = fopen(strBuf, "a+"); if (fp) { ((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2)); fprintf(fp, "DeQueue1: %s\n", strBuf2); } #endif iRet = ((DataQueue*)pQueue)->DeQueue(e); #if LOG_QUEUE_INFO if (fp) { ((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2)); fprintf(fp, "DeQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2); fclose(fp); } #endif } return iRet; } extern "C" bool __attribute__((visibility("default"))) DataQueueIsFull(void* pQueue) { return ((DataQueue*)pQueue)->IsFull(); } extern "C" void __attribute__((visibility("default"))) ResetDataQueue(void* pQueue) {//delete all data in the queue, and reset all counters to 0 ((DataQueue*)pQueue)->ResetDataQueue(); } extern "C" void __attribute__((visibility("default"))) CloseDataQueue(void*& pQueue) { if (pQueue) delete (DataQueue*)pQueue; pQueue = NULL; }