I’ve seen plenty of articles about this topic already, but I’d like to talk about it on my blog as well. It’s a pretty important topic in my opinion.

Let’s say you have a microcontroller peripheral that sends and/or receives a bunch of data. A good example of this type of microcontroller peripheral is a UART (universal asynchronous receiver/transmitter). That’s essentially a fancy name for a serial port–you know, the old 9-pin connectors you’d find on PCs. They were commonly used for connecting mice and modems back before PS/2, USB, and broadband took over. A UART basically does two things: receives data and transmits data. When you want to transmit data, you write a byte to a register in the UART peripheral. The UART converts the 8 bits into a serial data stream and spits it out over a single wire at the baud rate you have configured. Likewise, when it detects an incoming serial data stream on a wire, it converts the stream into a byte and gives you a chance to read it.

I’m actually not going to go into detail about UARTs yet because I feel it’s really important to understand a different concept that we will need to know when we learn about how to use UARTs. That concept is an interrupt-safe circular buffer. This lesson is dedicated to understanding this very important data structure. The next post I write will then go into more detail about UARTs, and that post will use a concept that you will learn about in this post. OK, let’s get started with circular buffers.

A common problem when you are receiving or transmitting a lot of data is that you need to buffer it up. For example, let’s say you want to transmit 50 bytes of data. You have to write the bytes one-by-one into the peripheral’s data register to send it out. Every time you write a byte, you have to wait until it has transmitted before writing the next byte. (Note: Some microcontroller peripherals have a hardware buffer so you can write up to 16 [for example] bytes before you have to wait, but that hardware buffer can still fill up, so this concept I’m describing still applies)

A typical busy loop sending the data would look like this:

char data[50]; // filled with data you're transmitting

for (int x = 0; x < 50; x++) {
    while (peripheral_is_busy());
    peripheral_send_byte(data[x]);
}

This loop is simple. For each of the 50 bytes, it first waits until the peripheral isn’t busy, then tells the peripheral to send it. You can imagine what implementations of the peripheral_is_busy() and peripheral_send_byte() functions might look like.

While you’re transmitting these 50 bytes, the rest of your program can’t run because you’re busy in this loop making sure all of the bytes are sent correctly. What a waste, especially if the data transmission rate is much slower than your microcontroller! (Typically, that will be the case.) There are so many more important tasks your microcontroller could be doing in the meantime than sitting in a loop waiting for a slow transmission to complete. The solution is to buffer the data and allow it to be sent in the background while the rest of your program does other things.

So how do you buffer the data? You create a buffer that will store data waiting to be transmitted. If the peripheral is busy, rather than waiting around for it to finish, you put your data into the buffer. When the peripheral finishes transmitting a byte, it fires an interrupt. Your interrupt handler takes the next byte from the buffer and sends it to the peripheral, then immediately returns back to your program. Your program can then continue to do other things while the peripheral is transmitting. You will periodically be interrupted to send another byte, but it will be a very short period of time — all the interrupt handler has to do is grab the next byte waiting to be transmitted and tell the peripheral to send it off. Then your program can get back to doing more important stuff. This is called interrupt-driven I/O, and it’s awesome. The original code I showed above is called polled I/O.

You can probably imagine how useful this idea is. It will make your program much more efficient. If you’re a computer science person, you’re probably thinking,  “Doug, the abstract data type you should use for your buffer is a queue!” You would be absolutely correct. A queue is a first-in, first-out (FIFO) data structure. Everything will come out of the queue in the same order it went in. There’s no cutting in line! (Well, unless you have a bug in your code, which I’m not afraid to admit has happened to me enough times that I finally bothered to write up this article so I’ll have a reference for myself.) Anyway, that’s exactly how we want it to be. If I queue up “BRITNEYSPEARS” I don’t want it to be transmitted as “PRESBYTERIANS”. (Yes, amazingly enough, that is an anagram.)

A really easy to way to implement a queue is by creating a ring buffer, also called a circular buffer or a circular queue. It’s a regular old array, but when you reach the end of the array, you wrap back around to the beginning. You keep two indexes: head and tail. The head is updated when an item is inserted into the queue, and it is the index of the next free location in the ring buffer. The tail is updated when an item is removed from the queue, and it is the index of the next item available for reading from the buffer. When the head and tail are the same, the buffer is empty. As you add things to the buffer, the head index increases. If the head wraps all the way back around to the point where it’s right behind the tail, the buffer is considered full and there is no room to add any more items until something is removed. As items are removed, the tail index increases until it reaches the head and it’s empty again. The head and tail endlessly follow this circular pattern–the tail is always trying to catch up with the head–and it will catch up, unless you’re constantly transmitting new data so quickly that the tail is always busy chasing the head.

Anyway, we’ve determined that you need three things:

  1. An array
  2. A head
  3. A tail

These will all be accessed by both the main loop and the interrupt handler, so they should all be declared as volatile. Also, updates to the head and updates to the tail each need to be an atomic operation, so they should be the native size of your architecture. For example, if you’re on an 8-bit processor like an AVR, it should be a uint8_t (which also means the maximum possible size of the queue is 256 items). On a 16-bit processor it can be a uint16_t, and so on. Let’s assume we’re on an 8-bit processor here, so ring_pos_t in the code below is defined to be a uint8_t.

#define RING_SIZE   64
typedef uint8_t ring_pos_t;
volatile ring_pos_t ring_head;
volatile ring_pos_t ring_tail;
volatile char ring_data[RING_SIZE];

One final thing before I give you code: it’s a really good idea to use a power of two for your ring size (16, 32, 64, 128, etc.). The reason for this is because the wrapping operation (where index 63 wraps back around to index 0, for example) is much quicker if it’s a power of two. I’ll explain why. Normally a programmer would use the modulo (%) operator to do the wrapping. For example:

ring_tail = (ring_tail + 1) % 64;

If your tail began at 60 and you repeated this line above multiple times, the tail would do the following:

61 -> 62 -> 63 -> 0 -> 1 -> …

That works perfectly, but the problem with this approach is that modulo is pretty slow because it’s a divide operation. Division is a pretty slow operation on computers. It turns out when you have a power of two, you can do the equivalent of a modulo by doing a bitwise AND, which is a much quicker operation. It works because if you take a power of two and subtract one, you get a number which can be represented in binary as a string of all 1 bits. In the case of a queue of size 64, bitwise ANDing the head or tail with 63 will always keep the index between 0 and 63. So you can do the wrap-around like so:

ring_tail = (ring_tail + 1) & 63;

A good compiler will automatically convert the modulo to the faster AND operation if it’s a power of two, so you can just use my first example with the “% 64” since it makes the intent of the code clearer. I’ve been told I have “a lot of faith in compilers” but it’s true–if you compile with optimizations enabled, GCC will correctly optimize my first example into the assembly equivalent of my second example. Looking at the assembly code that your compiler generates is a very valuable tool for you to have available.

OK. Now that I’ve explained everything, here are my add() and remove() functions for the queue:

int add(char c) {
    ring_pos_t next_head = (ring_head + 1) % RING_SIZE;
    if (next_head != ring_tail) {
        /* there is room */
        ring_data[ring_head] = c;
        ring_head = next_head;
        return 0;
    } else {
        /* no room left in the buffer */
        return -1;
    }
}

int remove(void) {
    if (ring_head != ring_tail) {
        int c = ring_data[ring_tail];
        ring_tail = (ring_tail + 1) % RING_SIZE;
        return c;
    } else {
        return -1;
    }
}

The add function calculates the next position for the head, ensures that there’s room in the buffer, writes the character to the end of the queue, and finally updates the head. The remove function ensures the buffer isn’t empty, grabs the first character waiting to be read out of the queue, and finally updates the tail. The code is pretty straightforward, but there are a couple of details you should be aware of:

  • I only modify the head index in the add function, and I only modify the tail index in the remove function.
  • I only modify the index after reading/writing the data in the buffer.

By doing both of the above, I have successfully ensured that this is an interrupt-safe, lock-free ring buffer (if there is a single consumer and a single producer). What I mean is you can add to the queue from an interrupt handler and remove from the queue in your main loop (or vice-versa), and you don’t have to worry about interrupt safety in your main loop. This is a really cool thing! It means that you don’t have to temporarily disable interrupts in order to update the buffer from your main loop. It’s all because of the two bullets above. If the interrupt only modifies the head, and the main loop only modifies the tail, there’s no conflict between the two of them that would require disabling interrupts. As for my second bullet point, updating the head or tail index only after the read or write is only necessary in whatever side of the queue is working in the main loop — it doesn’t matter in the interrupt handler. But if I follow that convention in both the add() and remove() functions, they can be swapped around as needed — in one program add() could be used in an interrupt handler and remove() could be used in the main loop, but in a different program, remove() would be in the interrupt handler and add() would be in the main loop.

OK, so back to the original example of transmitting 50 bytes. In this case, the main loop will use the add() function. You will add 50 bytes to the queue by calling add() 50 times. In the meantime, the microcontroller peripheral will interrupt every time it successfully sends a byte. The interrupt handler will call remove(), tell the peripheral to transmit the character it just removed, and give control back to the main loop. Later on, the transmission of that character will complete, the interrupt will occur again, it will send the next character, and so on until all 50 bytes have been sent, at which point the queue will be empty. It works like a charm!

The only problem remaining is that the first time you need to send a character, the transmitter isn’t running yet. So you have to “prime the pump” by telling the transmitter to start. Sometimes a peripheral will have a way to induce the first interrupt to get the ball rolling. Otherwise, you may have to add some extra logic that forces the first transmission to occur from the main loop. Since this technically breaks the “single consumer and single producer” rule, this one exception to the rule may still require some careful interrupt safety. I’ll talk more about that in a later post. The main purpose of this post is just to get you familiar with ring buffers and their inherent interrupt safety (aside from that one “gotcha”). They are very, very useful and very, very common. I’ve used them in drivers for UARTs and CANbus, and as I understand it, they are very common in audio applications as well.

I hope this made some sense. If it didn’t, I hope my next posting about UARTs will help clear things up.

Note: The information I have given may not be completely correct if you’re dealing with a system that has more than one processor or core, or other weird stuff like that. In that case you may need something extra such as a memory barrier between writing the queue data and updating the index. That’s beyond the scope of this article, which is targeted toward microcontrollers. This technique should work fine on microcontrollers with a single processor core.

Trackback

23 comments

  1. Thank You for this good Work.
    Greetings from Germany.
    I have made a Thread on my Forum for my C_Can
    on the LPC11C24/301.
    I learn from the National Semi the CHIP CAN COP888BC
    how to handle the IRQ flags there.
    There a cool Drawings for the arcitektur of CAN.

  2. Alan Jacobs @ 2014-03-18 22:46

    Doug….I was reading this great article and came to the “Britney” anagram and fell out of my chair laughing.

    I have a beast of a problem with an interrupt driven parallel port phased array sonar system. The equipment consists of arrays of ultrasonic transducers pinged by gate arrays to scan and do beam forming. The ping returns pass through an A/D converter to a 8051 family processor. The 8051 in turn connects to a Windows laptop through interrupt driven parallel port using one of two drivers: “wrtdev0.VxD” ( if Win .9xx0 or Blue Water Systems “WinRT.sys if WinNT ). The .exe GUI app on the laptop displays a RADAR like scanning display of obstructions underwater ahead of the boat. The GUI can send configuration settings to the 8051 to control scan rate, resolution, beam position, etc. This all works great on a Win 95, Win 98, NT3.5, NT 4.0, Win200, or WinXP Home platform ( does NOT work on Win XP PRO).
    Now the problem. The high G forces of slamming waves crash a mechanical hard disk head. I need to use a solid state disk; preferably a USB flash drive and an OS friendly to installing OS on removable media ( Micro$oft uses removable media bit to block such installs to prevent piracy). I run Ubuntu 10.10. I have tried running the .exe on Wine” environment on Linux- the app works but cannot recognize port 0378 and hangs.
    Any Ideas how I might proceed to solve this? I am willing to retain a consultant if necessary. If interested contact me at the email and/or phone below.
    Cheers.

    Alan Jacobs
    Green Bay, Wisconsin, USA
    alan.jacobs@att.net
    USA land line (920) 654-4477

  3. Alan,

    Have you considered running a Windows 98/XP virtual machine inside of Ubuntu? I know that VMware can forward raw parallel port access to guest VMs, although I do not know if it will work with interrupt-driven stuff. VMware Player is free for non-commercial use, and they have a commercial version as well. Off the top of my head I do not know how to solve this in Wine.

  4. Ali Ebrahimi @ 2014-04-14 05:02

    Just telling my thanks from Iran for your great Explanation my dear DUde DOug.Britney was a nice example here..hehe

  5. Doug Brown, thanks much for the VMWare suggestion. I tried Oracle VirtualBox but no luck. I did not try VMWare…..I will. I wish I was skilled in this area. One person told me a basic problem is that .VxDs in general and wrtdevo.VxD in particular run in “ring 0 root level” (what ever that means ) and linux won’t allow I/O at that level ( that’s gibberish to me ). I don’t understand why an app running in a virtual machine or an API emulator like Wine simply cannot pipe to “parportX”
    Thanks again for your interest.

  6. Alan Jacobs @ 2014-04-18 20:34

    Doug…..FYI….I just checked our VMware documentation. Here is a pertinent paragraph for interrupt driven parallel ports

    “Workstation provides only partial emulation of PS/2 hardware. Interrupts that a device connected to a physical port requests are not passed to the virtual machine. The guest operating system cannot use direct memory access (DMA) to move data to or from the port. For this reason, not all devices that attach to a parallel port work correctly. Do not use virtual parallel ports to connect parallel port storage devices or other types of parallel port devices to a virtual machine ” hmmmm….

  7. Hi Alan,

    Darn…I didn’t realize that. I guess it probably won’t work then 🙁 Yeah, drivers just don’t seem to be able to work with Wine. I’d almost have to recommend that you stick with an older machine (with a built-in parallel port) that can run 98/XP and just use something solid state like a CompactFlash card in an IDE to CompactFlash adapter or something like that. Doing it that way, the solid state disk should still appear as a hard drive so Windows would install to it.

  8. Alan Jacobs @ 2014-05-02 21:14

    Doug….seems we think along the same lines. I purchased a 16GB KingSpec brand 40 pin IDE flash DOM ( disk on module ) for that very purpose….assuming A) no removable media bit to block Windows B) a standard Seagate type HDD controller embedded in the DOM which BIOS and WinXP would see as an IDE HDD. The motherboard I am using is a 2007 vintage ITX format board by Intel…. “Little Valley” model D201GLY2 Celeron 220. The Intel BIOS did not even detect the presence of the DOM. I went to a computer shop had them plug it into 3 other old Intel boxes- every BIOS still failed to detect KingSpec DOM. KingSpec does NOT offer any manual and has NOT responded to my emails for tech support. Can you suggest a a reliable manufacturer of 16GB IDE flash DOM modules?
    Other approaches:
    A) I purchased a Tektronix 128 channel logic analyzer and constructed a parallel port data traffic “sniffer” to read the traffic between Windows application and the 8051 controller’s parallel port UART. I’m in process of mapping their conversation…then I can write my own linux Python or C version of the executable application
    B) decompile or disassemble the WinXP .exe of the application to map the algorithm to talk to the 8051 for same reason as item A above.

    Wish I knew a reverse engineer guy more skilled than myself.
    Cheers.
    Alan Jacobs

  9. Alan, sorry for taking so long to get back to you on this. Here’s what I would recommend:

    Use a normal CF card (yes, I know it has the removable media bit set). But…if you use a TrueIDE adapter, it should hide the removable bit from your computer and allow Windows to be installed on it. Just a thought.

  10. Well laid-out article (with code examples) explaining ring buffers, good work my friend!

  11. This is what i was looking for…
    Good work.

  12. Alex Villa @ 2015-02-27 17:03

    Won’t these functions only work if ring head and ring tail are made to be global variables so that the functions can recognize them? Since you’re not passing them by reference.

  13. Hi Alex,

    Yes — all three of the variables (ring_head, ring_tail, and ring_data) should be declared outside of the functions so that both functions have access to them. You could do it as a global, but even better, make them static. That way, they won’t pollute the global namespace and they will only be directly accessible from that file.

  14. Alex Villa @ 2015-02-28 09:43

    Thanks so much for your help! Honestly, your site is more helpful than stack overflow haha

  15. Sanjay Kemkar @ 2015-08-01 19:36

    Daug . Yes it is very important concept” RING BUFFER” . Can it be also done like tail index is handled in timer interrupt. Time between two Timer interrupt can be adjusted conveniently.So head index is handled in serial interrupt and tail index in timer interrupt.End of message can be checked by say () and flag can be raised to indicate main program that message is ready!

  16. Maybe I’m not quite understanding your add function, but wont you never quite fill up the buffer? Example a buffer of size 8, if your head is at position 7, and tail is at position 0 (ie only one place left in the buffer), and you try to add, you will calculate the next head as position 0 and not add to the buffer.

  17. Hey Sean,

    You’re absolutely right — I probably should have mentioned that in the post. You lose one position in the ring buffer by using this approach. If your max size is set to 32, you can only put 31 items in. Possible workarounds include creating a new variable called “count” that keeps track of how many items are currently in the buffer and using that instead of the head-to-tail comparisons, but then you have to start being careful about disabling interrupts in several places because they can be temporarily out of sync during an add/remove operation. I personally just live with the capacity being one smaller than advertised. 🙂

  18. Deep Shah @ 2016-01-27 15:28

    Hey ! Great article.

    Also, one more thing. Assume size of buffer 8. Suppose tail is at 0. and head is at 7.
    Now when remove function gets called.

    if (ring_head != ring_tail) { — /* check passed */
    int c = ring_data[ring_tail]; — /* gets value at index 0. */
    ring_tail = (ring_tail + 1) % RING_SIZE; — before this instruction could execute, interrupt occurs, and it tries to add an element. But
    ring_pos_t next_head = (ring_head + 1) % RING_SIZE; — This check will fail because head is not updated yet. Hence, it wont be able to add the element in buffer even when there is space. This is not harmful, but the extra byte gets lost or needs to be handled in some way.

    Please correct me if I am wrong.

  19. Hi Deep Shah,

    You’re correct — in that case, until ring_tail is updated in the remove function, the add function won’t be able to add anything else to the buffer. As you pointed out, there is no harm in that failure — the buffer will stay in a consistent state.

    The solution to that problem is simply to make sure that your buffer is big enough to handle the worst-case amount of data that it will need to store before the “consumer” can get around to handling it.

  20. Just thanks for your robust implementation.
    That helped me a lot working with canbus frames.

  21. I wonder about thread-safety when it comes to the various comparison expressions ( supposing a typical single processor microcontroler) :

    if (next_head != ring_tail) { … }

    Can I expect the comparison operation to be atomic ? If not, wouldn’t that break the thread-safety guarantee ?

    Also, suppose I need to check for emptiness:

    bool isEmpty() {
    return ring_head == ring_tail;
    }

    If the comparison is not atomic, an interrupt could occur somewhere during the processing of the corresponding machine instructions and add to the queue. The result could then be wrong.

    So aren’t there still critical sections in your code that need to be protected ( by disabling interrupts) ?

  22. Hi Peter,

    In my experience, your first example typically turns into several instructions (or something similar):

    1. Load next_head into register X
    2. Load ring_tail into register Y
    3. Compare register X against register Y
    4. Conditional jump based on the comparison result

    Together, the comparison is definitely not atomic because it is several instructions, but we can at least be guaranteed that the two load instructions are each individually atomic since they are ring_pos_t, so you know that you loaded a valid value for each (even though one of them might have been incremented sometime in the middle of the four instructions). However, this still doesn’t break the thread-safe guarantee.

    ring_tail might change while this is happening, but it’s never going to change enough to make something bad happen. If next_head and ring_tail were not equal before the comparison began, then they will not be equal afterward either, because the code in remove() that can potentially run in a different thread won’t allow ring_tail to increment any further than equaling ring_head. Next_head is 1 greater than ring_head, so we’re still fine. And we are already guaranteed that ring_head won’t be changed anywhere else, because of the requirement of a single producer and single consumer. The single producer is already busy in add(), so nothing else can happen that will modify ring_head. If next_head and ring_tail were equal before the comparison began but then they weren’t equal after the comparison, then we’re just out of luck and it means the buffer was too full, even if the situation changed while the comparison was in progress. Next time we try to add something, there will be room in the buffer.

    Your isEmpty() example is not a problem either, because that’s how it’s designed to work. If the queue happens to have something added while the comparison is busy, and isEmpty() returns true, then we’ll just catch whatever was added to the queue the next time we check. Typically, you would have a periodic task somewhere (or a spot in your main loop) that checks the result of isEmpty() and processes the data until isEmpty() returns true.

  23. Thanks, Doug, excellent, I greatly appreciate the thorough explanation – Peter

Add your comment now