538 lines
17 KiB
C++
Executable File
538 lines
17 KiB
C++
Executable File
//#pragma once
|
||
#include <time.h>
|
||
#include "dataqueue.h"
|
||
#include <stdio.h>
|
||
|
||
#define BLOCK_PADDING_SIZE 64 //bytes
|
||
#define HEADER_BYTES 16//remain: block-bytes: 8 bytes int64_t + data bytes: 8bytes int64_t
|
||
#define CHECK_QUEUE_ERR 1
|
||
#define myMax(x,y) ((x)>(y)?(x):(y))
|
||
|
||
typedef void* voidPtr;
|
||
static char g_strDQCurTime[50] = "";
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
inline char* GetCurTimeStr(char* strBuf, int iBufSize)// g_strDQCurTime
|
||
{
|
||
time_t tt;
|
||
tt = time(NULL);
|
||
strftime(strBuf, iBufSize, "%Y.%m.%d %H:%M:%S", localtime(&tt));//gener string with format:"YYYY-MM-DD hh:mm:ss"
|
||
return strBuf;
|
||
}
|
||
#define DATA_QUEUE_RETAIN_BLOCK_CNT 4
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
DataQueue::DataQueue(int64_t iSizeInKB, int64_t iMaxBlockSize, bool bUseMutex, int iReadThreadCnt)
|
||
{
|
||
m_pDataBuf = NULL;
|
||
m_bUseMutex = bUseMutex;
|
||
|
||
if (m_bUseMutex)
|
||
pthread_mutex_init(&m_BufMutex, NULL);
|
||
InitQueue(iSizeInKB, iMaxBlockSize, iReadThreadCnt);//KB
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
DataQueue::~DataQueue()
|
||
{
|
||
if (m_pDataBuf != NULL)
|
||
free((void*)m_pDataBuf);
|
||
if (m_bUseMutex)
|
||
pthread_mutex_destroy(&m_BufMutex);
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
int DataQueue::InitQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)//KB
|
||
{
|
||
if (m_pDataBuf != NULL)
|
||
free((void*)m_pDataBuf);
|
||
m_pDataBuf = NULL;
|
||
m_iMaxBlockByte = iMaxBlockBytes;
|
||
m_iReservedBlockNum = iReadThreadCnt + DATA_QUEUE_RETAIN_BLOCK_CNT;
|
||
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4 times of m_iMaxBlockByte
|
||
int64_t iTotalByes = 0;
|
||
if (iTotalKB4QueueBuf > 0)
|
||
{
|
||
iTotalByes = iTotalKB4QueueBuf * 1024 + 2 * HEADER_BYTES + m_iReservedBufBytes; //KB;
|
||
m_pDataBuf = (uint8_t*)malloc(iTotalByes); //for safe sake in case of over step the border
|
||
if (m_pDataBuf == NULL)
|
||
{
|
||
fprintf(stderr, "\n----------------------------------------------------------------\n");
|
||
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
|
||
fprintf(stderr, "***Error: fail to malloc for data queue, mem:%ld @line:%d %s\n", iTotalKB4QueueBuf, __LINE__, g_strDQCurTime);
|
||
fprintf(stderr, "----------------------------------------------------------------\n");
|
||
fflush(stderr);
|
||
exit(4);
|
||
}
|
||
}
|
||
m_iFreeBytes = m_iQueueBufBytes = iTotalByes;
|
||
ResetDataQueue();
|
||
|
||
return 0;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
void DataQueue::ResetDataQueue()
|
||
{
|
||
if (m_bUseMutex)
|
||
pthread_mutex_lock(&m_BufMutex);
|
||
m_iBufOutPos = m_iBufInPos = 0;
|
||
m_iRcvBlocks = 0;
|
||
m_iRcvBytes = 0;
|
||
m_iProcBlocks = 0;
|
||
m_iEmptyTimes = 0;
|
||
m_iFullTimes = 0;
|
||
m_iPaddingBlocks = m_iProcPaddingBlocks = 0;
|
||
m_iFreeBytes = m_iQueueBufBytes;
|
||
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
int64_t DataQueue::GetBufBytes()
|
||
{//Get buf size
|
||
int64_t iBytes;
|
||
iBytes = m_iQueueBufBytes - m_iReservedBufBytes;
|
||
if (iBytes < 0)
|
||
iBytes = 0;
|
||
return iBytes;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
int64_t DataQueue::GetQueueFreeBytes()
|
||
{
|
||
//int64_t iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos) ) );
|
||
int64_t iFreeBytes;
|
||
|
||
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
|
||
iFreeBytes -= m_iReservedBufBytes;
|
||
if (iFreeBytes < 0)//iFreeBytes never is 0 because of m_iReservedBufBytes, unless the beginning state where m_iBufOutPos and m_iBufInPos are same
|
||
iFreeBytes = 0;
|
||
|
||
return iFreeBytes;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
int64_t DataQueue::GetFreeBytes()
|
||
{
|
||
return GetQueueFreeBytes();
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
inline bool DataQueue::CanRcv(int iNeedBytes)
|
||
{
|
||
bool bCanRcv;
|
||
m_iMaxBlockByte = iNeedBytes > m_iMaxBlockByte ? iNeedBytes : m_iMaxBlockByte;
|
||
m_iReservedBufBytes = m_iMaxBlockByte * m_iReservedBlockNum; //4<><34><EFBFBD><EFBFBD>m_iMaxBlockByte
|
||
|
||
bCanRcv = (GetQueueFreeBytes() >= iNeedBytes);
|
||
|
||
return bCanRcv;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
bool DataQueue::IsFull()
|
||
{
|
||
//---- 这种判断方式,可能存在错误,即:期间m_iBufOutPos到尾部,折回到队列开头,变得小于m_iBufInPos
|
||
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)));//
|
||
bool bFull;
|
||
int64_t iFreeBytes;
|
||
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : ((m_iBufOutPos - m_iBufInPos)));
|
||
bFull = (iFreeBytes <= m_iReservedBufBytes);
|
||
// if( bFull )
|
||
// printf("dataqueue is full, size:%ld free:%ld - %ld < retain:%ld in:%ld out:%ld\n",
|
||
// m_iQueueBufBytes,iFreeBytes, GetFreeBytes(),m_iReservedBufBytes,m_iBufInPos,m_iBufOutPos );
|
||
return bFull;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
bool DataQueue::IsEmpty()
|
||
{
|
||
return (m_iBufInPos == m_iBufOutPos);
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
int64_t DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata, int64_t iExtraDataBytes, void* extradata2, int64_t iExtraDataBytes2)
|
||
{ // >=0 for normal & success,
|
||
//<0: error
|
||
int64_t iBlockBytes;
|
||
void* pWritePos = NULL;
|
||
int64_t iPadding = 0;
|
||
if (iDataBytes < 0 || iExtraDataBytes < 0 || iExtraDataBytes2 < 0)
|
||
{
|
||
fprintf(stderr, "iDataBytes %ld iExtraDataBytes %ld iExtraDataBytes %ld\n",
|
||
iDataBytes, iExtraDataBytes, iExtraDataBytes2);
|
||
fflush(stderr);
|
||
throw __LINE__;
|
||
}
|
||
if (m_bUseMutex)
|
||
pthread_mutex_lock(&m_BufMutex);
|
||
|
||
int64_t iTotalDataBytes = iDataBytes + iExtraDataBytes + iExtraDataBytes2;
|
||
#if CHECK_QUEUE_ERR
|
||
CheckQueue(__LINE__, 0);
|
||
#endif
|
||
|
||
iBlockBytes = (((iTotalDataBytes + 7) >> 3) << 3)//8<>ı<EFBFBD><C4B1><EFBFBD>
|
||
+ HEADER_BYTES
|
||
+ BLOCK_PADDING_SIZE;// 32
|
||
|
||
if (!CanRcv(iBlockBytes))
|
||
{
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
return QUEUE_IS_FULL;
|
||
}
|
||
|
||
if (m_iBufInPos >= m_iBufOutPos)
|
||
{
|
||
if (m_iBufInPos + iBlockBytes > m_iQueueBufBytes)
|
||
{//create a padding block
|
||
*((int64_t*)(m_pDataBuf + m_iBufInPos)) = m_iQueueBufBytes - m_iBufInPos; //m_iQueueBufBytes: the next block is at the begining of the buf
|
||
*((int64_t*)(m_pDataBuf + m_iBufInPos + sizeof(int64_t))) = 0;
|
||
//m_iRcvBlocks++;
|
||
//printf("+iBlockBytes:%d, iDataByte:%d\n", m_iQueueBufBytes - m_iBufInPos, 0);
|
||
m_iPaddingBlocks++;
|
||
iPadding++;
|
||
m_iBufInPos = 0;
|
||
}
|
||
}
|
||
|
||
#if CHECK_QUEUE_ERR
|
||
CheckQueue(__LINE__, iBlockBytes);
|
||
#endif
|
||
|
||
pWritePos = m_pDataBuf + m_iBufInPos;
|
||
*((int64_t*)pWritePos) = iBlockBytes;
|
||
*((int64_t*)(pWritePos + sizeof(int64_t))) = iTotalDataBytes;
|
||
|
||
memcpy(pWritePos + HEADER_BYTES, e, iDataBytes);
|
||
|
||
if (extradata && iExtraDataBytes > 0)
|
||
memcpy(pWritePos + HEADER_BYTES + iDataBytes, extradata, iExtraDataBytes);//iExtraDataBytes
|
||
|
||
if (extradata2 && iExtraDataBytes2 > 0)
|
||
memcpy(pWritePos + HEADER_BYTES + iDataBytes + iExtraDataBytes, extradata2, iExtraDataBytes2);//iExtraDataBytes
|
||
|
||
m_iRcvBlocks++;
|
||
m_iRcvBytes += iTotalDataBytes;
|
||
m_iBufInPos = (m_iBufInPos + iBlockBytes) % m_iQueueBufBytes;
|
||
|
||
//#ifdef MY_DEBUG
|
||
// printf("+iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
|
||
//#endif
|
||
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
|
||
return iTotalDataBytes;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
#ifdef ARM_VERSION //NO_PORN_FUNC
|
||
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
|
||
#else
|
||
#define THERE_ARE_CONCURRENT_DEQUEUE_OP 1
|
||
#endif
|
||
|
||
int64_t DataQueue::DeQueue(void** e)
|
||
{//return value: Bytes, >0 if sucess, =0 no data, < 0 error. No data copy, only return the pointer of data area
|
||
void* p;
|
||
int iBlockBytes, iDataBytes;
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
{
|
||
pthread_mutex_lock(&m_BufMutex);
|
||
#if CHECK_QUEUE_ERR
|
||
CheckQueue(__LINE__, 0);
|
||
#endif
|
||
}
|
||
#endif
|
||
|
||
if (IsEmpty())
|
||
{
|
||
//#ifdef MY_DEBUG
|
||
// m_iEmptyTimes++;
|
||
// if ( m_iEmptyTimes <5 || ( m_iEmptyTimes % 50 == 0) )
|
||
// {
|
||
// printf("***Block Queue IsEmpty %1dth: m_iBufInPos=%lu, m_iBufOutPos=%lu, m_iQueueBufBytes=%lu, m_iReservedBufBytes=%lu\n",
|
||
// m_iEmptyTimes, m_iBufInPos, m_iBufOutPos, m_iQueueBufBytes, m_iReservedBufBytes); //<2F>ճ<EFBFBD>4<EFBFBD><34>
|
||
// fflush(stdout);
|
||
// }
|
||
//#endif
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
return EMPTY_QUEUE_NO_DATA;
|
||
}
|
||
|
||
p = m_pDataBuf + m_iBufOutPos;
|
||
iBlockBytes = *((int64_t*)p);
|
||
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
|
||
//int iDataBytes = *((int*)(p + sizeof(int)));
|
||
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
|
||
//#ifdef MY_DEBUG
|
||
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
|
||
//#endif
|
||
|
||
if (iDataBytes > 0)//not pading block
|
||
{
|
||
m_iProcBlocks++;
|
||
*e = (void*)(p + HEADER_BYTES);
|
||
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
return iDataBytes;
|
||
}
|
||
else//padding block
|
||
{
|
||
m_iProcPaddingBlocks++;
|
||
if (IsEmpty())
|
||
{
|
||
//printf("CSegmentPool IsEmpty\n");
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
|
||
return EMPTY_QUEUE_NO_DATA;
|
||
}
|
||
p = m_pDataBuf + m_iBufOutPos;
|
||
iBlockBytes = *((int64_t*)p);
|
||
iDataBytes = *((int64_t*)(p + sizeof(int64_t)));
|
||
//#ifdef MY_DEBUG
|
||
// printf("-iBlockBytes:%d, iDataByte:%d\n", iBlockBytes, iDataBytes);
|
||
//#endif
|
||
m_iBufOutPos = (m_iBufOutPos + iBlockBytes) % m_iQueueBufBytes;
|
||
|
||
#if CHECK_QUEUE_ERR
|
||
if (m_bUseMutex)
|
||
CheckQueue(__LINE__, 0);
|
||
#endif
|
||
|
||
if (iDataBytes > 0)//not pading block
|
||
{
|
||
m_iProcBlocks++;
|
||
*e = (void*)(p + HEADER_BYTES);
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
return iDataBytes;
|
||
}
|
||
else
|
||
{//pading block
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
m_iProcPaddingBlocks++;
|
||
char strMsg[200];
|
||
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
|
||
sprintf(strMsg, "***Error in DeQueue: 2 successive padding block!@line %d %s\n", __LINE__, g_strDQCurTime);
|
||
fprintf(stderr, "\n----------------------------------------------------------------\n");
|
||
fprintf(stderr, "%s", strMsg);
|
||
fprintf(stderr, "----------------------------------------------------------------\n");
|
||
fflush(stderr);
|
||
exit(0);
|
||
return EMPTY_QUEUE_NO_DATA;
|
||
}
|
||
}
|
||
|
||
#if THERE_ARE_CONCURRENT_DEQUEUE_OP
|
||
if (m_bUseMutex)
|
||
pthread_mutex_unlock(&m_BufMutex);
|
||
#endif
|
||
char strMsg[200];
|
||
GetCurTimeStr(g_strDQCurTime, sizeof(g_strDQCurTime));
|
||
sprintf(strMsg, "***Error in DeQueue: @line %d %s\n", __LINE__, g_strDQCurTime);
|
||
fprintf(stderr, "\n----------------------------------------------------------------\n");
|
||
fprintf(stderr, "%s", strMsg);
|
||
fprintf(stderr, "----------------------------------------------------------------\n");
|
||
fflush(stderr);
|
||
exit(0);
|
||
return EMPTY_QUEUE_NO_DATA;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
void DataQueue::OutPutStatus(char* strBuf, int iBufLen)
|
||
{
|
||
int iLen = iBufLen, iStrLen;
|
||
char* p = strBuf;
|
||
snprintf(p, iLen, "m_iBufOutPos = % ld, m_iBufInPos = % ld, m_iFreeBytes = % ld, m_iQueueBufBytes = % ld, m_iRcvBlocks = % ld, m_iRcvBytes = % ld\n",
|
||
m_iBufOutPos, m_iBufInPos, m_iFreeBytes, m_iQueueBufBytes, m_iRcvBlocks, m_iRcvBytes);
|
||
iStrLen = strlen(p);
|
||
p += iStrLen;
|
||
iLen -= iStrLen;
|
||
|
||
snprintf(p, iLen, "\m_iProcBlocks=%ld, m_iPaddingBlocks=%ld, m_iProcPaddingBlocks=%ld, m_iMaxBlockByte=%ld, m_iReservedBufBytes=%ld, m_iReservedBlockNum=%ld,m_iEmptyTimes=%ld, m_iFullTimes=%ld\n",
|
||
m_iProcBlocks, m_iPaddingBlocks, m_iProcPaddingBlocks, m_iMaxBlockByte, m_iReservedBufBytes, m_iReservedBlockNum, m_iEmptyTimes, m_iFullTimes);
|
||
iStrLen = strlen(p);
|
||
p += iStrLen;
|
||
iLen -= iStrLen;
|
||
}
|
||
//----------------------------------------------------------------------------------------
|
||
bool DataQueue::CheckQueue(int iLineNo, int iNeedBytes)
|
||
{
|
||
int64_t iFreeBytes;
|
||
int iErr = 0;
|
||
//int iFreeBytes = (m_iBufInPos < m_iBufOutPos ? (m_iBufOutPos - m_iBufInPos) : (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) );//
|
||
|
||
iFreeBytes = (m_iBufInPos >= m_iBufOutPos ? (m_iQueueBufBytes - (m_iBufInPos - m_iBufOutPos)) : (m_iBufOutPos - m_iBufInPos));
|
||
if (m_iBufOutPos < 0)
|
||
iErr |= 0x1;
|
||
if (m_iBufInPos < 0)
|
||
iErr |= 0x2;
|
||
if (iFreeBytes < (m_iReservedBlockNum - 2) * m_iMaxBlockByte)
|
||
iErr |= 0x4;
|
||
if (iFreeBytes < iNeedBytes)
|
||
iErr |= 0x8;
|
||
|
||
if (m_bUseMutex && (m_iProcBlocks == m_iRcvBlocks)
|
||
&& (m_iPaddingBlocks == m_iProcPaddingBlocks)
|
||
&& (m_iBufOutPos != m_iBufInPos))
|
||
iErr |= 0x10;
|
||
if (iErr)
|
||
{
|
||
char strMsg[1000];
|
||
char strInfo[1000];
|
||
OutPutStatus(strMsg, sizeof(strMsg));
|
||
snprintf(strInfo, sizeof(strInfo), "******QueueError %d @line %d, iFreeBytes=%ld , iNeedBytes=%d:\n%s \n",
|
||
iErr, iLineNo, iFreeBytes, iNeedBytes, strMsg);
|
||
|
||
FILE* fp = fopen("err.log", "a+");
|
||
if (fp)
|
||
{
|
||
fprintf(fp, "%s\n", strInfo);
|
||
fflush(fp);
|
||
fclose(fp);
|
||
}
|
||
|
||
fprintf(stderr, "\n----------------------------------------------------------------\n");
|
||
fprintf(stderr, "%s\n", strInfo);
|
||
fprintf(stderr, "----------------------------------------------------------------\n");
|
||
fflush(stderr);
|
||
//throw __LINE__;
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
extern "C" voidPtr __attribute__((visibility("default"))) OpenDataQueue(int64_t iTotalKB4QueueBuf, int iMaxBlockBytes, int iReadThreadCnt)
|
||
{
|
||
//DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, bConcurrent);//bUseMutex = bConcurrent
|
||
DataQueue* p = new DataQueue(iTotalKB4QueueBuf, iMaxBlockBytes, true);//bUseMutex = true
|
||
return (void*)p;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
extern "C" bool __attribute__((visibility("default"))) GetDataQueueInfo(void* pQueue, int64_t iSize[5])
|
||
{//return the total data blocks, and set the buf with info string
|
||
int64_t iFreeBytes, iBufBytes;
|
||
DataQueue* p = (DataQueue*)pQueue;
|
||
if (!pQueue)
|
||
return false;
|
||
iSize[0] = p->GetBufBytes();
|
||
iSize[1] = p->GetFreeBytes();
|
||
iSize[2] = p->GetTotalRcvBlockCnt();
|
||
iSize[3] = p->GetProcBlockCnt();
|
||
iSize[4] = p->GetBlockCnt();
|
||
return true;
|
||
}
|
||
|
||
//----------------------------------------------------------------------------------------
|
||
#define LOG_QUEUE_INFO 0
|
||
extern "C" int64_t __attribute__((visibility("default"))) EnDataQueue(void* pQueue, void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
|
||
{// int DataQueue::EnQueue(void* e, int64_t iDataBytes, void* extradata1, int64_t iExtraDataBytes1, void* extradata2, int64_t iExtraDataBytes2)
|
||
int64_t iRet =0;
|
||
if (pQueue)
|
||
{
|
||
#if LOG_QUEUE_INFO
|
||
char strBuf[500];
|
||
char strBuf2[500];
|
||
FILE* fp = NULL;
|
||
|
||
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
|
||
fp = fopen(strBuf, "a+");
|
||
if (fp)
|
||
{
|
||
snprintf(strBuf, sizeof(strBuf), "EnQueue1: iDataBytes=%ld, extradata1=%ld, extradata2=%ld\n",
|
||
iDataBytes, iExtraDataBytes1, iExtraDataBytes2);
|
||
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
|
||
fprintf(fp, "%s%s", strBuf, strBuf2);
|
||
}
|
||
#endif
|
||
iRet = ((DataQueue*)pQueue)->EnQueue(e, iDataBytes, extradata1, iExtraDataBytes1, extradata2, iExtraDataBytes2);
|
||
#if LOG_QUEUE_INFO
|
||
if (fp)
|
||
{
|
||
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
|
||
fprintf(fp, "EnQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
|
||
fclose(fp);
|
||
}
|
||
#endif
|
||
}
|
||
return iRet;
|
||
}
|
||
|
||
extern "C" int64_t __attribute__((visibility("default"))) DeDataQueue(void* pQueue, void** e)
|
||
{
|
||
int64_t iRet = 0;
|
||
|
||
if (pQueue)
|
||
{
|
||
#if LOG_QUEUE_INFO
|
||
char strBuf[500];
|
||
char strBuf2[500];
|
||
FILE* fp = NULL;
|
||
|
||
snprintf(strBuf, sizeof(strBuf), "queue%p.log", pQueue);
|
||
fp = fopen(strBuf, "a+");
|
||
if (fp)
|
||
{
|
||
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
|
||
fprintf(fp, "DeQueue1: %s\n", strBuf2);
|
||
}
|
||
#endif
|
||
|
||
iRet = ((DataQueue*)pQueue)->DeQueue(e);
|
||
|
||
#if LOG_QUEUE_INFO
|
||
if (fp)
|
||
{
|
||
((DataQueue*)pQueue)->OutPutStatus(strBuf2, sizeof(strBuf2));
|
||
fprintf(fp, "DeQueue2: iRet=%ld\n%s\n\n", iRet, strBuf2);
|
||
fclose(fp);
|
||
}
|
||
#endif
|
||
}
|
||
|
||
return iRet;
|
||
}
|
||
|
||
extern "C" bool __attribute__((visibility("default"))) DataQueueIsFull(void* pQueue)
|
||
{
|
||
return ((DataQueue*)pQueue)->IsFull();
|
||
}
|
||
extern "C" void __attribute__((visibility("default"))) ResetDataQueue(void* pQueue)
|
||
{//delete all data in the queue, and reset all counters to 0
|
||
((DataQueue*)pQueue)->ResetDataQueue();
|
||
}
|
||
|
||
|
||
extern "C" void __attribute__((visibility("default"))) CloseDataQueue(void*& pQueue)
|
||
{
|
||
if (pQueue)
|
||
delete (DataQueue*)pQueue;
|
||
pQueue = NULL;
|
||
}
|
||
|
||
|
||
|