« It's parshing county, Nevada. Wild wild west | Main | Communicating Processes II »

May I be the ring master?... Please!

One of the old tool of the trade is to craft a ring buffer for quite a few things to be used it for! If you, the reader did not hear it yet, you will hear it soon!!!

So what is it? And what is the use?

Well, sometime you just want to know the recent history of certain things, say kernel events, or the recent acitivities of a server or even your application. So you want to constantly  pump out trace messages to a buffer, but your buffer might have some restrictions in terms of how much memory you should allocate for it. And optionally, you might have a backing up procedure which will do the backup ( or rather write to a file) intermittently so as to assume that if you loose the buffer information, at least you expect the file would be there to take a peek.

While this can be used in a variety of situations, we need to understand one thing is that the paradigm I try to follow is "Coding for debugging"... So anyone can have an infrastructure, more on this later, to be dropped and use in your software for debugging and other stuff...

When I try to develop some code, one thing I try to keep in my head is to have it as a reusable code. Then the otherthing I want to follow is that can I try it in user mode code, before bringing it in to kernel.... And finally can I have some way to introduce debugablity into it, from the primitive printf and cousines to elegantly using language features!!!

In this example, the ring size is fixed, in terms of how many elements it can hold. For dynamic sizing it would have to be changed. There are situations, depending on how much memory is available in the computer, the ring size has to be self tunned, which necessiates it to be dynamically sized ring. Another missing part is intermittent flushing into a file, and yet another part is missing is how to use the language features to debug this infrastructure...

I would not go into the dynamic sizing, it could be another note based on request(s).

For intermittent flushing to a file, one of the approach is to run another thread that will act as a consumer, and take the ring buffer to a file. Not very difficult to do that either.

For the language use, why do we need this? Well, the message producer could be an aribitrary source, and then the ring buffer thread would be an intermediaries ( like the way cache works), and consumer would be another thread. So one question would be -  how can I bypass the ring, and want to see the producer is producing, and consumer is directly taking it from producer and writing to file. For this an elegant solution is to have virual functions that can be redirected on the fly using lazy binding. So it would take only one line change to direct the messages to file, instead to ring....

Succently, the producer of the ring could produce nothing at all or it could produce to the ring or it could produce directly to final destination, the file or it could produce to both with or without any assistence from cosumer of the ring. And if we cover all these cases, it would be easy to find the problem component of the infrastructure.

The base code for such a ring is given as an example, but full implemenation for handling dynamic binding and debuggin including producing in a lifo or fifo are intentionallly left out for now. A c++ class based using pure virtual functions and threading works just fine for me -

 

#include <stdio.h>
#include <Windows.h>

#define SIZE_DMESG 256

//ouput level
typedef enum _DMESG_LVL{
DMESG_DISBLED,
DMESG_QUITE,
DMESG_MODERATE,
DMESG_VERBOSE,
DMESG_UNKNWN_LVL
}DMESG_LVL;

typedef struct _DMESG{
DMESG_LVL lvl;
size_t msglen;
CHAR * msg;
}DMESG, *PDMESG;

typedef struct _DMESG_RING{
ULONG producerIdx; //current producer index
ULONG ringSize; // Number of dmesg messages the ring will hold at most.
KSPIN_LOCK queuedSpinLock; //multi-proc efficient spin-lock
DMESG **pDmesgArray; // ring Array of dmesges
}DMESG_RING, *PDMESG_RING;

PDMESG_RING g_pDmesgRing;


PDMESG_RING
AllocdmesgRing( ULONG ringSize )
{
PDMESG_RING pdmesgRing;

//allocate/init Ring structure that holds meta data and the ringbuffer
pdmesgRing =(PDMESG_RING) malloc(sizeof(DMESG_RING));

if ( !pdmesgRing){
goto ErrorExit;
}
memset(pdmesgRing, 0, sizeof(DMESG_RING));
pdmesgRing->ringSize = ringSize;
pdmesgRing->producerIdx = 0;

//Now allocate the buffer array ptrs to DMESG of length ringSize
pdmesgRing->pDmesgArray =
(DMESG**)malloc( ringSize * sizeof(DMESG*));

//set to null, it's being checked before putting stuff in
//to make sure the existing msg gets cleaned.
memset(pdmesgRing->pDmesgArray, 0, ringSize * sizeof(PDMESG));
if ( ! pdmesgRing->pDmesgArray ){
goto ErrorExit;
}

// Init the ring's queued spinlock
//KeInitializeSpinLock(&pdmesgRing->queuedSpinLock);
//tie it with global ring pointer
g_pDmesgRing = pdmesgRing;

return pdmesgRing;

ErrorExit: //coming here => error

if (pdmesgRing->pDmesgArray ){
free(pdmesgRing->pDmesgArray );
}

if (pdmesgRing){
free(pdmesgRing );
}

return NULL;
}


VOID
AddmesgToRing (DMESG_LVL verbosity, CHAR * mesg)
{
PDMESG pdmesg;

//KLOCK_QUEUE_HANDLE lockHandle;
//NTSTATUS ntStatus;
size_t countByte;

//first check for existing element in the ring
if (g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]){
pdmesg = g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx];
if (pdmesg->msg){
free ( pdmesg->msg);
}

}else{ //create a new
g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]= (PDMESG)malloc( sizeof(DMESG));
}

if ( !g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]) {
goto ErrorExit;
}

countByte = strlen(mesg);

//init the msg entry
g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]->msglen = countByte;
g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]->lvl = verbosity;
g_pDmesgRing->pDmesgArray[g_pDmesgRing->producerIdx]->msg = mesg;

//Acquire lock
//KeAcquireInStackQueuedSpinLock(&g_pDmesgRing->queuedSpinLock, &lockHandle);
g_pDmesgRing->producerIdx++;
g_pDmesgRing->producerIdx %= g_pDmesgRing->ringSize;
//KeReleaseInStackQueuedSpinLock(&lockHandle);
//release lock
return;
ErrorExit:
if (pdmesg){
free(pdmesg);
}
}

#define NUM_RING_ELEMENTS 2
void freeAll()
{
PDMESG pdmesg;
int i;
for ( i =0; i < NUM_RING_ELEMENTS; i++){

pdmesg = g_pDmesgRing->pDmesgArray[i];
free(pdmesg->msg);
pdmesg->msg = 0;
free(pdmesg);

}

for ( i =0; i < NUM_RING_ELEMENTS; i++){
free(*g_pDmesgRing->pDmesgArray + i);
}

free (g_pDmesgRing);
}

void main()
{
//char msgs[512];
char *buf;
int i;
AllocdmesgRing( NUM_RING_ELEMENTS );
for ( i =0; i < 3 ; i++){
buf = (char*)malloc(SIZE_DMESG );
memset(buf, 'a'+i, SIZE_DMESG);
buf[SIZE_DMESG -1] = '\0';
AddmesgToRing( DMESG_VERBOSE, buf );
}

for (i =0; i < NUM_RING_ELEMENTS; i++){
PDMESG pdmesg;
pdmesg = g_pDmesgRing->pDmesgArray[i];
printf("lvl=%d Verbosity=%d, msg=%s\n", pdmesg->lvl, pdmesg->msglen,pdmesg->msg);

}
freeAll();


}

 

 

 

 

 

 

Posted on Monday, May 27, 2013 at 08:49AM by Registered CommenterProkash Sinha | Comments Off