A reliable communication protocol
In computer communication, there are many cases in which a protocol
needs to cope with packet loss. In order to provide a reliable
channel, lost packets must be retransmitted. This example shows a
simple communication protocol that uses (the absence of)
acknowledgment packets to detect a lost packet. If a lost packet is
detected, the packet is retransmitted.
The example shows two protothreads, one for the sender and one for the
receiver.
#include "pt.h"
PT_THREAD(sender(struct pt *pt))
{
PT_BEGIN(pt);
do {
send_packet();
/* Wait until an ackowledgement has been received, or until the
timer expires. If the timer expires, we should send the packet
again. */
timer_set(&timer, TIMEOUT);
PT_WAIT_UNTIL(pt, acknowledgment_received() ||
timer_expired(&timer));
} while(timer_expired(&timer));
PT_END(pt);
}
PT_THREAD(receiver(struct pt *pt))
{
PT_BEGIN(pt);
/* Wait until a packet has been received, and send an
acknowledgment. */
PT_WAIT_UNTIL(pt, packet_received());
send_acknowledgement();
PT_END(pt);
}
Delays - text scrolling on an LCD panel
Protothreads can be used for introducing delays inside a function,
without using a full threading model. The following example shows a
function writing text to a one-line LCD panel. If the text is longer
than the size of the panel, the text should be scrolling in from the
right.
#include "pt.h"
#include "timer.h"
struct state {
char *text;
char *scrollptr;
struct pt pt;
struct timer timer;
};
PT_THREAD(display_text(struct state *s))
{
PT_BEGIN(&s->pt);
/* If the text is shorter than the display size, show it right
away. */
if(strlen(s->text) <= LCD_SIZE) {
lcd_display_text(s->text);
} else {
/* If the text is longer than the display, we should scroll in the
text from the right with a delay of one second per scroll
step. We do this in a for() loop, where the loop variable is
the pointer to the first character to be displayed. */
for(s->scrollptr = s->text;
strlen(s->scrollptr) > LCD_SIZE;
++s->scrollptr) {
lcd_display_text(s->scrollptr);
/* Wait for one second. */
timer_set(&s->timer, ONE_SECOND);
PT_WAIT_UNTIL(&s->pt, timer_expired(&s->timer));
}
}
PT_END(&s->pt);
}
A code lock
This example is a bit more complicated and shows how to implement a
simple code lock - the kind of device that is placed next to doors and
that you have to push a four digit number into in order to unlock the
door.
The code lock waits for key presses from a numeric keyboard and
if the correct code is entered, the lock is unlocked. There is a
maximum time of one second between each key press, and after the
correct code has been entered, no more keys must be pressed for 0.5
seconds before the lock is opened.
The code uses external functions for timers and for checking the
keyboard. These functions are included in the full source code for
this example, which is included in the downloadable
tarball.
/*
* This is the code that has to be entered.
*/
static const char code[4] = {'1', '4', '2', '3'};
/*
* Declaration of the protothread function implementing the code lock
* logic. The protothread function is declared using the PT_THREAD()
* macro. The function is declared with the "static" keyword since it
* is local to this file. The name of the function is codelock_thread
* and it takes one argument, pt, of the type struct pt.
*
*/
static
PT_THREAD(codelock_thread(struct pt *pt))
{
/* This is a local variable that holds the number of keys that have
* been pressed. Note that it is declared with the "static" keyword
* to make sure that the variable is *not* allocated on the stack.
*/
static int keys;
/*
* Declare the beginning of the protothread.
*/
PT_BEGIN(pt);
/*
* We'll let the protothread loop until the protothread is
* expliticly exited with PT_EXIT().
*/
while(1) {
/*
* We'll be reading key presses until we get the right amount of
* correct keys.
*/
for(keys = 0; keys < sizeof(code); ++keys) {
/*
* If we haven't gotten any keypresses, we'll simply wait for one.
*/
if(keys == 0) {
/*
* The PT_WAIT_UNTIL() function will block until the condition
* key_pressed() is true.
*/
PT_WAIT_UNTIL(pt, key_pressed());
} else {
/*
* If the "key" variable was larger than zero, we have already
* gotten at least one correct key press. If so, we'll not
* only wait for the next key, but we'll also set a timer that
* expires in one second. This gives the person pressing the
* keys one second to press the next key in the code.
*/
timer_set(&codelock_timer, 1000);
/*
* The following statement shows how complex blocking
* conditions can be easily expressed with protothreads and
* the PT_WAIT_UNTIL() function.
*/
PT_WAIT_UNTIL(pt, key_pressed() || timer_expired(&codelock_timer));
/*
* If the timer expired, we should break out of the for() loop
* and start reading keys from the beginning of the while(1)
* loop instead.
*/
if(timer_expired(&codelock_timer)) {
printf("Code lock timer expired.\n");
/*
* Break out from the for() loop and start from the
* beginning of the while(1) loop.
*/
break;
}
}
/*
* Check if the pressed key was correct.
*/
if(key != code[keys]) {
printf("Incorrect key '%c' found\n", key);
/*
* Break out of the for() loop since the key was incorrect.
*/
break;
} else {
printf("Correct key '%c' found\n", key);
}
}
/*
* Check if we have gotten all keys.
*/
if(keys == sizeof(code)) {
printf("Correct code entered, waiting for 500 ms before unlocking.\n");
/*
* Ok, we got the correct code. But to make sure that the code
* was not just a fluke of luck by an intruder, but the correct
* code entered by a person that knows the correct code, we'll
* wait for half a second before opening the lock. If another
* key is pressed during this time, we'll assume that it was a
* fluke of luck that the correct code was entered the first
* time.
*/
timer_set(&codelock_timer, 500);
PT_WAIT_UNTIL(pt, key_pressed() || timer_expired(&codelock_timer));
/*
* If we continued from the PT_WAIT_UNTIL() statement without
* the timer expired, we don't open the lock.
*/
if(!timer_expired(&codelock_timer)) {
printf("Key pressed during final wait, code lock locked again.\n");
} else {
/*
* If the timer expired, we'll open the lock and exit from the
* protothread.
*/
printf("Code lock unlocked.\n");
PT_EXIT(pt);
}
}
}
/*
* Finally, we'll mark the end of the protothread.
*/
PT_END(pt);
}
The bounded buffer with protothread semaphores
The following example shows how to implement the bounded buffer
problem using the protothreads semaphore library. The example uses
three protothreads: one producer() protothread that produces
items, one consumer() protothread that consumes items, and one
driver_thread() that schedules the producer and consumer
protothreads.
Note that there is no need for a mutex to guard the
add_to_buffer() and get_from_buffer() functions because
of the implicit locking semantics of protothreads - a protothread will
never be preempted and will never block except in an explicit PT_WAIT
statement.
#include "pt-sem.h"
#define NUM_ITEMS 32
#define BUFSIZE 8
static struct pt_sem full, empty;
static
PT_THREAD(producer(struct pt *pt))
{
static int produced;
PT_BEGIN(pt);
for(produced = 0; produced < NUM_ITEMS; ++produced) {
PT_SEM_WAIT(pt, &full);
add_to_buffer(produce_item());
PT_SEM_SIGNAL(pt, &empty);
}
PT_END(pt);
}
static
PT_THREAD(consumer(struct pt *pt))
{
static int consumed;
PT_BEGIN(pt);
for(consumed = 0; consumed < NUM_ITEMS; ++consumed) {
PT_SEM_WAIT(pt, &empty);
consume_item(get_from_buffer());
PT_SEM_SIGNAL(pt, &full);
}
PT_END(pt);
}
static
PT_THREAD(driver_thread(struct pt *pt))
{
static struct pt pt_producer, pt_consumer;
PT_BEGIN(pt);
PT_SEM_INIT(&empty, 0);
PT_SEM_INIT(&full, BUFSIZE);
PT_INIT(&pt_producer);
PT_INIT(&pt_consumer);
PT_WAIT_THREAD(pt, producer(&pt_producer) &
consumer(&pt_consumer));
PT_END(pt);
}
A radio driver written both with protothreads
and events
This example shows an interrupt handler in a device driver for a TR1001 radio
chip. The driver receives incoming data in bytes and constructs a
frame that is covered by a CRC checksum. The driver is implemented
both with protothreads and with an explicit state machine. The state
machine has 11 states and is implemented using the C switch()
statement. In contrast, the protothreads-based implementation does not
have any explicit states.
The flow of control in the state machine-based implementation is quite
hard to follow from inspection of the code, whereas the flow of
control is evident in the protothreads based implementation.
Protothreads-based implementation
PT_THREAD(tr1001_rxhandler(unsigned char incoming_byte))
{
PT_YIELDING();
static unsigned char rxtmp, tmppos;
PT_BEGIN(&rxhandler_pt);
while(1) {
/* Wait until we receive the first syncronization byte. */
PT_WAIT_UNTIL(&rxhandler_pt, incoming_byte == SYNCH1);
tr1001_rxstate = RXSTATE_RECEVING;
/* Read all incoming syncronization bytes. */
PT_WAIT_WHILE(&rxhandler_pt, incoming_byte == SYNCH1);
/* We should receive the second synch byte by now, otherwise we'll
restart the protothread. */
if(incoming_byte != SYNCH2) {
PT_RESTART(&rxhandler_pt);
}
/* Reset the CRC. */
rxcrc = 0xffff;
/* Read packet header. */
for(tmppos = 0; tmppos < TR1001_HDRLEN; ++tmppos) {
/* Wait for the first byte of the packet to arrive. */
PT_YIELD(&rxhandler_pt);
/* If the incoming byte isn't a valid Manchester encoded byte,
we start again from the beinning. */
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxtmp = me_decode8(incoming_byte);
/* Wait for the next byte to arrive. */
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
/* Put together the two bytes into a single Manchester decoded
byte. */
tr1001_rxbuf[tmppos] = (rxtmp << 4) | me_decode8(incoming_byte);
/* Calculate the CRC. */
rxcrc = crc16_add(tr1001_rxbuf[tmppos], rxcrc);
}
/* Since we've got the header, we can grab the length from it. */
tr1001_rxlen = ((((struct tr1001_hdr *)tr1001_rxbuf)->len[0] << 8) +
((struct tr1001_hdr *)tr1001_rxbuf)->len[1]);
/* If the length is longer than we can handle, we'll start from
the beginning. */
if(tmppos + tr1001_rxlen > sizeof(tr1001_rxbuf)) {
PT_RESTART(&rxhandler_pt);
}
/* Read packet data. */
for(tmppos = 6; tmppos < tr1001_rxlen + TR1001_HDRLEN; ++tmppos) {
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxtmp = me_decode8(incoming_byte);
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
tr1001_rxbuf[tmppos] = (rxtmp << 4) | me_decode8(incoming_byte);
rxcrc = crc16_add(tr1001_rxbuf[tmppos], rxcrc);
}
/* Read the frame CRC. */
for(tmppos = 0; tmppos < 4; ++tmppos) {
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxcrctmp = (rxcrctmp << 4) | me_decode8(incoming_byte);
}
if(rxcrctmp == rxcrc) {
/* A full packet has been received and the CRC checks out. We'll
request the driver to take care of the incoming data. */
tr1001_drv_request_poll();
/* We'll set the receive state flag to signal that a full frame
is present in the buffer, and we'll wait until the buffer has
been taken care of. */
tr1001_rxstate = RXSTATE_FULL;
PT_WAIT_UNTIL(&rxhandler_pt, tr1001_rxstate != RXSTATE_FULL);
}
}
PT_END(&rxhandler_pt);
}
Event-based implementation
/* No bytes read, waiting for synch byte. */
#define RXSTATE_READY 0
/* Second start byte read, waiting for header. */
#define RXSTATE_START 1
/* Reading packet header, first Manchester encoded byte. */
#define RXSTATE_HEADER1 2
/* Reading packet header, second Manchester encoded byte. */
#define RXSTATE_HEADER2 3
/* Reading packet data, first Manchester encoded byte. */
#define RXSTATE_DATA1 4
/* Reading packet data, second Manchester encoded byte. */
#define RXSTATE_DATA2 5
/* Receiving CRC16 */
#define RXSTATE_CRC1 6
#define RXSTATE_CRC2 7
#define RXSTATE_CRC3 8
#define RXSTATE_CRC4 9
/* A full packet has been received. */
#define RXSTATE_FULL 10
void
tr1001_rxhandler(unsigned char c)
{
switch(tr1001_rxstate) {
case RXSTATE_READY:
if(c == SYNCH1) {
tr1001_rxstate = RXSTATE_START;
rxpos = 0;
}
break;
case RXSTATE_START:
if(c == SYNCH1) {
tr1001_rxstate = RXSTATE_START;
} else if(c == SYNCH2) {
tr1001_rxstate = RXSTATE_HEADER1;
rxcrc = 0xffff;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_HEADER1:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = me_decode8(c);
tr1001_rxstate = RXSTATE_HEADER2;
} else {
tr1001_rxstate = RXSTATE_ERROR;
}
break;
case RXSTATE_HEADER2:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = (tr1001_rxbuf[rxpos] << 4) |
me_decode8(c);
rxcrc = crc16_add(tr1001_rxbuf[rxpos], rxcrc);
++rxpos;
if(rxpos >= TR1001_HDRLEN) {
tr1001_rxlen = ((((struct tr1001_hdr *)tr1001_rxbuf)->len[0] << 8) +
((struct tr1001_hdr *)tr1001_rxbuf)->len[1]);
if(rxpos + tr1001_rxlen == sizeof(tr1001_rxbuf)) {
tr1001_rxstate = RXSTATE_DATA1;
}
} else {
tr1001_rxstate = RXSTATE_HEADER1;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_DATA1:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = me_decode8(c);
tr1001_rxstate = RXSTATE_DATA2;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_DATA2:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = (tr1001_rxbuf[rxpos] << 4) |
me_decode8(c);
rxcrc = crc16_add(tr1001_rxbuf[rxpos], rxcrc);
++rxpos;
if(rxpos == tr1001_rxlen + TR1001_HDRLEN) {
tr1001_rxstate = RXSTATE_CRC1;
} else if(rxpos > sizeof(tr1001_rxbuf)) {
tr1001_rxstate = RXSTATE_READY;
} else {
tr1001_rxstate = RXSTATE_DATA1;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC1:
if(me_valid(c)) {
rxcrctmp = me_decode8(c);
tr1001_rxstate = RXSTATE_CRC2;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC2:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
tr1001_rxstate = RXSTATE_CRC3;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC3:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
tr1001_rxstate = RXSTATE_CRC4;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC4:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
if(rxcrctmp == rxcrc) {
tr1001_rxstate = RXSTATE_FULL;
tr1001_drv_request_poll();
} else {
tr1001_rxstate = RXSTATE_READY;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_FULL:
/* Just drop the incoming byte. */
break;
default:
/* Just drop the incoming byte. */
tr1001_rxstate = RXSTATE_READY;
break;
}
}
Last updated: $Date: 2005/03/19 21:57:01 $ (CEST)
Adam Dunkels
(contact)