Real-time Feed Distribution using Shared Queue
Contents
Introduction
Real-time feed is a continuous flow of data that carries instantaneous information.
That feed can be associated to one or several clients, and can be associated as a whole to all clients,
or each client has its own parts (packets) of feed.
We can put them in points to clarify our scope:
- Feed is distributed to one client.
- Feed is distributed to all clients.
- Feed is distributed between clients (each one takes his belonging data).
- The whole feed is distributed to all clients.
id
in each sent packet.
A good example for that section is network router. Network routers receive packets from senders
and route them to the right destination.
Section (2-2) means the feed is forwarded to all clients, so no need for packets to include destination id.
Section (2-2) is the scope of this article. The final goal of the article is to distribute real-time feed
between multiple clients with different bandwidthes as in the following figure:
Feed Distribution
Real time stream distribution is a critical issue that needs a perfect data structure to store and distribute real time feed efficiently. The system receives real-time feed and forwards it to all clients as in the following figure.
- Store it in a temporary storage then forward it to all clients.
- Process feed (transfer-compress-encrypt) then store results in a temporary storage then send results to all clients.
- First, to isolate the receiving operation from the sending or processing operations.
- Second, to enable sending data in different threads so as to balance send operation depending on clients' bandwidths.
- Fast data insertion and removal.
- Receiving data doesn't be blocked while sending data from the same storage for each client.
- Each Client has different offset in the storage depending on its bandwidth.
Shared Queue
The best storage to achieve these goals is the pipe or the queue. Feed is received from one side and is sent from the other side; First In First Out (FIFO). A queue is a collection of entities linked in order. New entity is added to the tail of the queue and old entity is removed from the head terminal. So, queue is just a FIFO (First In First Out) data structure. The first added entity is the first manipulated one. Any Queue should keep its tail and head entities to handle addition and removal of them. Each entity in the queue points to the next entity.
- Insertion and removal is so fast with linked list structure.
- We isolate receiving data from sending it in a separate threads, because both are working on the tail element.
- The owner of the Shared Queue should keep an object for each client. This object is to keep a pointer to client
node
on the queue and the offset in that node. - So, we can manage the sending operation in many threads. Each thread sends to available clients. Each client object should pass its working element pointer and send offset to the Queue to send to each client depending on its location in the whole queue.
- Previous point is the key for the whole article as it is the solution for slow (low bandwidth) clients. As a result, fast clients are always working in the tail element and their send offset are synchronized with the "receive offset" of the tail element of the queue.
IsEmpty() |
checks if the queue is empty |
Enqueue() |
adds new element at queue tail if needed |
Dequeue() |
gets element to send its data |
Recv() |
receives new arrived data to tail node |
Send() |
sends data to one client |
MaxQueueLength |
max queue length |
Head |
points to head node of the queue |
Tail |
points to tail node of the queue |
Garbage |
points to first garbage node |
In our case we need some special cases to fulfill our need:
- In real situation clients, speeds are not equal. Clients have different bandwidth. Bandwidth affects the quantity that each client can receive instantaneously. Fast Clients are always ready to receive data. So, slow clients may delay the removal of head entity of the queue, in addition, the whole sending process. The distributor agent must handle this case in some way to make sure fast clients receive real time data on time, and slow clients receive real time data as bandwidth allows.
- Each queue element contains buffer of 32 K bytes to store received feed.
So, the Enqueue() function doesn't add new element unless tail element receives its full 32 K bytes,
then it adds a new element and adjusts the new tail.
#define MAX_ELEMENT_SIZE 32768 struct QElement { QElement(int nSequence) { m_nSequence = nSequence; m_nRecvOffset = 0; m_pNext = NULL; } // element creation sequence in the queue int m_nSequence; // offset of received data in element buffer int m_nRecvOffset; // pointer to the next element in the queue QElement* m_pNext; // buffer of the element char m_pBuffer[MAX_ELEMENT_SIZE]; };
Enqueue
The standard usage of the function Enqueue is to add new node at queue tail, but in our queue it does only that if the tail node is full of data. In other words the Queue uses theEnqueue
function to get the working (not full)
tail node. It also adds received buffer to queue element and increments "receive offset" (m_nRecvOffset) with amount of received data.
The Enqueue
function checks for tail "receive offset" to decide if it is full or not as in the following flow-chart figure:
//************************************
// Method: Enqueue
// Access: protected
// Returns: QElement*
// Purpose: add new element at queue tail if needed
//************************************
QElement* <code>Enqueue</code>()
{
QElement* pQE;
::<code>EnterCriticalSection</code>(&m_cs);
if(m_pTail == NULL)
// initialize first element in the list
pQE = m_pHead = m_pTail = m_pGarbage = new QElement(0);
// check if last received element reached its element end
else if(m_pTail->m_nRecvOffset >= MAX_ELEMENT_SIZE)
{ // add new element to the list and let last element points to it
pQE = m_pTail->m_pNext = new QElement(m_pTail->m_nSequence + 1);
// increment Tail to the new created element
m_pTail = pQE;
// increment Head
if(m_pTail->m_nSequence > m_nMaxQueueLength)
// advance head pointer to next element
m_pHead = m_pHead->m_pNext;
...
}
else
// in the middle of the element
pQE = m_pTail;
::<code>LeaveCriticalSection</code>(&m_cs);
return pQE;
}
Dequeue
The queue uses its protected functionDequeue()
to retrieve an element to send its data to client.
Each client must have a pointer (initialized to NULL) to its working node and the offset in this node.
The Dequeue
function uses these parameters to adjust next working node and offset as in the following flow-chart.
The Dequeue
function doesn't remove elements as in normal queue because shared queue sends data to multiple clients. Hence,
all queue nodes is kept untill they are removed by a garbage collector loop in the Enqueue
function. The last point to mention is
the type of the client, if it is a real-time then it should join the queue for the first time at its tail to receive last received data,
or if it is not real-time it should join the queue at its head to receive all data.
//************************************
// Method: Dequeue
// Access: protected
// Returns: bool
// Purpose: get head element to send its data
// Parameter: QElement * & pCurElement
// Parameter: int & nCurElementSendOffset
//************************************
bool <code>Dequeue</code>(QElement*& pCurElement, int& nCurElementSendOffset)
{
::<code>EnterCriticalSection</code>(&m_cs);
// pCurElement = NULL for new client
if(pCurElement == NULL)
// check if client need real time data or all out of date data
if(m_bRealTime)
// point to the tail directly
pCurElement = m_pTail, nCurElementSendOffset = m_pTail ? m_pTail->m_nRecvOffset : 0;
else
// point to the head to get all stored data (first in)
pCurElement = m_pHead, nCurElementSendOffset = 0;
// check if received element reach its storage end
else if(nCurElementSendOffset >= MAX_ELEMENT_SIZE && pCurElement->m_pNext)
// get next element and reset send offset
pCurElement = pCurElement->m_pNext, nCurElementSendOffset = 0;
::<code>LeaveCriticalSection</code>(&m_cs);
// success if an element is found
return pCurElement != NULL;
}
Recv
Shared queue receives data in two ways.First is
Recv
(const char* pBuffer, int nLength), which is simple call
with the buffer to be saved in the queue. Second is template<class RECEIVER> int
Recv
(RECEIVER& receiver),
which asks class of type RECEIVER to receive data.
Recv raw data
This is the first way to receive data. It receives buffer with any length and callsEnqueue
function to get the working
queue node to keep data.
So, in many cases it needs to loop if received length is greater than MAX_ELEMENT_SIZE.
//************************************
// Method: Recv
// Access: public
// Returns: int
// Purpose: ask class of type RECEIVER to receive data in queue nodes
// Parameter: const char * pBuffer
// Parameter: int nLength
//************************************
int <code>Recv</code>(const char* pBuffer, int nLength)
{
int nRecvLength = 0;
while(nRecvLength < nLength)
{
QElement* pQE = <code>Enqueue</code>();
if(pQE->m_nRecvOffset < MAX_ELEMENT_SIZE)
{ // receive in last element
int nRecv = min(MAX_ELEMENT_SIZE - pQE->m_nRecvOffset, nLength - nRecvLength);
<code>memcpy</code>(pQE->m_pBuffer+pQE->m_nRecvOffset, pBuffer+nRecvLength, nRecv);
// increment element offset with the received bytes
pQE->m_nRecvOffset += nRecv;
nRecvLength += nRecv;
}
}
// return total received bytes
return nRecvLength;
}
Recv template
This is the second way to receive data. It asks class of type RECEIVER to receive data. It callsEnqueue
function to get the working queue node to keep data.
This function loops to request data from the RECEIVER Recv
function till the enqueued node
"receive offset" reaches MAX_ELEMENT_SIZE.
//************************************
// Method: Recv
// Access: public
// Returns: int
// Purpose: ask class of type RECEIVER to receive data in queue nodes
// Parameter: RECEIVER& receiver
//************************************
template<class RECEIVER> int <code>Recv</code>(RECEIVER& receiver)
{ // get tail element to save received data
QElement* pQE = <code>Enqueue</code>();
int nRecvLength = 0, nRecv;
while(pQE->m_nRecvOffset < MAX_ELEMENT_SIZE)
{ // receive in last element
if((nRecv = receiver.<code>Recv</code>(pQE->m_pBuffer + pQE->m_nRecvOffset, MAX_ELEMENT_SIZE - pQE->m_nRecvOffset)) <= 0)
return nRecv;
// increment element offset with the received bytes
pQE->m_nRecvOffset += nRecv;
// increment total received bytes
nRecvLength += nRecv;
}
// return total received bytes
return nRecvLength;
}
Send template
AlthoughSend template
has a simple Flow-Chart and code, it is the key of this article.
To send to multiple clients, the owner of the queue should keep the "send element" and the "offset" in that element for each client.
The key here is that each client in independent of other clients. Therefore, if the sender is a socket class and has a good connection
it will be always synchronized with the receive process. If the sender connection is slow, it may lag, but it will not affect other clients.
Client information (SendElement, ElementOffset) are passed to the Send
function to be used as in the following Flow-Chart and Code.

//************************************
// Method: Send
// Access: public
// Returns: int
// Purpose: delegate queue buffer send to a sender of class SENDER
// Parameter: SENDER& sender
// the class that must implement Send(char*, int) function
// Parameter: QElement * & pCurElement
// client current element pointer
// Parameter: int & nCurElementSendOffset
// current offset in client element
//************************************
template<class SENDER> int <code>Send</code>(SENDER& sender, QElement*& pCurElement, int& nCurElementSendOffset)
{ // get head element to send its data
if(m_pHead == NULL || <code>Dequeue</code>(pCurElement, nCurElementSendOffset) == false)
// return false as no elements to send
return 0;
// calculate bytes to send
int nSendBytes = min(MAX_ELEMENT_SIZE/2, pCurElement->m_nRecvOffset - nCurElementSendOffset);
// send bytes to client
if(nSendBytes <= 0 || (nSendBytes = sender.<code>Send</code>(pCurElement->m_pBuffer + nCurElementSendOffset, nSendBytes)) <= 0)
return nSendBytes;
// increment sent bytes offset
nCurElementSendOffset += nSendBytes;
// return total sent bytes
return nSendBytes;
}
Garbage Collection
Shared Queue sends feed to multiple clients with various speeds, so some clients may have low bandwidth and may be delayed with a few nodes.
Normal dequeue
process frees sent nodes, but in our case we can't free nodes till we make sure that they are sent to all clients,
or we can delay freeing it to a certain limit.
In my queue, I follow the strategy of free nodes before queue Head
with a certain length as in the following figure:
Remember, Receiving data is done from tail side, so, freeing nodes from garbage side will not affect receiving feed. Also, for real-time clients,
send
is done with client node pointer which is always the tail node for normal and fast clients.
For slow non real-time clients, send
starts from Head
node and goes fast to tail node to work like real-time clients.
Consequently, Garbage Collection is safe. In order to make the process safer, I put garbage collection code in the Enqueue
code to group the allocation and free of nodes in the same function. Check garbage collection code underlined in the Enqueue
code:
//************************************
// Method: Enqueue
// Access: protected
// Returns: QElement*
// Purpose: add new element at queue tail if needed
//************************************
QElement* <code>Enqueue</code>()
{
QElement* pQE;
::<code>EnterCriticalSection</code>(&m_cs);
if(m_pTail == NULL)
// initialize first element in the list
pQE = m_pHead = m_pTail = m_pGarbage = new QElement(0);
// check if last received element reached its element end
else if(m_pTail->m_nRecvOffset >= MAX_ELEMENT_SIZE)
{ // add new element to the list and let last element points to it
pQE = m_pTail->m_pNext = new QElement(m_pTail->m_nSequence + 1);
// increment Tail to the new created element
m_pTail = pQE;
// increment Head
if(m_pTail->m_nSequence > m_nMaxQueueLength)
// advance head pointer to next element
m_pHead = m_pHead->m_pNext;
// increment Garbage
if(m_pHead->m_nSequence > min(m_nMaxQueueLength, MAX_QUEUE_LENGTH))
{ // keep next element
QElement* pNext = m_pGarbage->m_pNext;
// clear element
delete m_pGarbage;
// point to next element
m_pGarbage = pNext;
}
}
else
// in the middle of the element
pQE = m_pTail;
::<code>LeaveCriticalSection</code>(&m_cs);
return pQE;
}
Usage
You can define any number of queues in your application. The code listed here defines one queue, one thread to receive feed,
and one thread to distribute this feed to clients list. You can define many Send
threads, each having its own clients list. So
you can send to thousands of clients distributed between tens of threads.
SharedQueue sq;
struct CFeedClient
{
CFeedClient()
{ // initialize send pointer and offset
m_pCurSendElem = NULL;
m_nCurSendElemOffset = 0;
}
QElement *m_pCurSendElem;
int m_nCurSendElemOffset;
SOCKET m_sock;
int <code>Send</code>(const char* lpcs, int nLength)
{ return ::<code>send</code>(m_sock, lpcs, nLength, 0); }
...
};
void RecvThreadFunc(LPVOID pParam)
{
int nLength;
char szbuf[10240];
while(true)
{
// Recv feed from some where
... // (fill szBuf and nLength), data can be processed or transformed
// save received feed in the queue
sq.<code>Recv</code>(szbuf, nLength);
}
}
void SendThreadFunc(LPVOID pParam)
{
vector<CFeedClient>* pvClients = (vector<CFeedClient>*)pParam;
while(true)
// send saved feed to thread clients
for(vector<CFeedClient>::iterator client = pvClients->begin(); client != pvClients->end(); client++)
sq.<code>Send</code>(*client, client->m_pCurSendElem, client->m_nCurSendElemOffset);
}
void <code>Usage</code>()
{
vector<CFeedClient> vClients;
// create Recv thread
<code>_beginthread</code>(<code>RecvThreadFunc</code>, 0, NULL);
// create Send thread to send saved feed to thread clients
<code>_beginthread</code>(<code>SendThreadFunc</code>, 0, (LPVOID)&vClients);
// Note: you can create multiple threads with each has its own clients list
}
Points of Interest
-
Thread Safe
SharedQueue is a thread safe class as it synchronizes changes of its linked list with critical section. Queue Linked List changes are limited to add or remove new nodes to the queue, which is done by
Enqueue
,Dequeue
, orClear
functions. The other functionsRecv
andSend
are independent, so no need to synchronize their access, and they are the only callers forEnqueue
andDequeue
. So, the user of the shared queue can design one thread to receive feed and multiple threads to send it or its resultant as in section Feed Distribution. -
Send Timeout
Sender may set "send timeout" to a small value to make sure all clients have equal chances or sending data, but in this case slow clients (slow connection) may lag than fast clients (fast connection). Slow client may accept delayed data than losing data.
-
Sender Type
"Sender Type" that is passed to the Send template can be a Socket class, that has a function
Send
, or it can be any type that has aSend
function. User of this class can take data and transfer it to any type or do any processing like compression or encryption, then keep or send it. -
Thread Managment
To build a simple feed distribution system we need:
- One thread to receive feed in a primary queue.
- One thread to analyze feed and format a new feed suitable for clients and save it in a secondary queue.
- N threads to send feed from secondary queue to clients of each thread.
Updates
- 12/07/2011: Posted version v0.9000
References
Thanks to...
God
Post Comment
マストな新作アイテム続々入荷中…
長年の豊富な経験と実績を持ち、
ブラントブランドコピー品の完壁な品質を維持するために、
一流の素材を選択し、精巧な作り方でまるで本物のようなな製品を造ります。
高品質の商品を低価格で提供する、納期を厳守することは弊社の経営理念です。
今、マストなブランドコピー新作アイテム続々入荷中…
【シャネルコピー、ヴィトンコピー、コピーグッチ、エルメスコピー】
ブランド財布コピー、バッグコピー腕時計コピーぜひおすすめです。Кеды Converse имеют длительную и увлекательную историю. Это максимально комфортная, стильная и практичная спортивная обувь, в ней можно как просто передвигаться, так и заниматься спортом. Именно поэтому у этой обуви много поклонников. За весь период существования TM было произведено немало коллекций, предназначенных для самых разных видов спорта, кеды и для повседневного ношения и отдыха.
Главной особенностью обуви Converse является совершенная оригинальность и свобода. Для людей, кто ценит удобство и стиль, много общается и двигается, эта продукция есть идеальным вариантом. В ее производстве приняли участие самые лучшие дизайнеры. Данная обувь убирает границы между спортом и модой. В ассортименте на нашем сайте можно подыскать как высокие, так и низкие модели, имеющие отворот и без него, со стразами варианты для девушек, и строгие для мужчин, а также сугубо спортивные модели.
С помощью такой обуви их обладателю будет комфортно и удобно совершенно везде – в фитнес центре, на спортивной арене, на дискотеке, в гостях, либо просто на прогулке. За счет сочетания комфорта и оригинального дизайна, тот, кто кеды носит, будет испытывать ощущение, будто обувь была сделана только для него.