Compare commits

...

2 Commits

  1. 35
      queue.c
  2. 4
      queue.h
  3. 140
      rrqnet.c

35
queue.c

@ -69,3 +69,38 @@ void Queue_initialize(Queue *list,int n,size_t size) {
exit( 1 );
}
}
QueueItem *Queue_findItem(
Queue *list,int (*test)(QueueItem*,void*),void *data)
{
QueueItem *item;
QueueItem *before = 0;
if ( sem_wait( &list->count ) ) {
perror( "FATAL" );
exit( 1 );
}
if ( pthread_mutex_lock( &list->mutex ) ) {
perror( "FATAL" );
exit( 1 );
}
for ( item = list->head; item && ( test( item, data ) == 0 );
before = item, item = item->next ) {
// nothing here
}
if ( item ) {
if ( before == 0 ) {
list->head = item->next;
} else {
before->next = item->next;
}
if ( item == list->last ) {
list->last = before;
}
}
if ( pthread_mutex_unlock( &list->mutex ) ) {
perror( "FATAL" );
exit( 1 );
}
return item;
}

4
queue.h

@ -21,5 +21,9 @@ extern void Queue_addItem(Queue *list,QueueItem *item);
extern QueueItem *Queue_getItem(Queue *list);
extern void Queue_initialize(Queue *list,int n,size_t size);
typedef int (*testfn)(QueueItem*,void*);
extern QueueItem *Queue_findItem(Queue *list,testfn test,void *data);
#endif

140
rrqnet.c

@ -84,6 +84,7 @@ typedef struct _PacketItem {
int fd;
struct SockAddr src;
ssize_t len;
char macpair[12];
unsigned char buffer[ BUFSIZE ];
} PacketItem;
@ -112,13 +113,22 @@ static struct {
unsigned int count;
} allowed;
// Actual remotes are kept in a hash table keyed by their +uaddr+
// field, and another hash table keps Interface records for all MAC
// addresses sourced from some remote, keyed by their +mac+ field. The
// latter is used both for resolving destinations for outgoing
// packets, and for limiting broadcast cycles. The former table is
// used for limiting incoming packets to allowed sources, and then
// decrypt the payload accordingly.
// Packet delivery makes use of three hashtables.
//
// A. One hashtable is for keeping track of actual remotes. This table
// is used for limiting incoming packets to allowed sources, and
// then decrypt the payload accordingly. Actual remotes are kept in
// the hash table keyed by their +uaddr+ field, and another.
//
// B. One hashtable keeps Interface records for all MAC addresses
// sourced from some remote, keyed by their +mac+ field. This is
// used both for resolving remote destinations for outgoing
// packets, and for limiting broadcast cycles.
//
// C. One hashtable keeps packet Count records for a +smac-dmac+ key,
// so as to ensure delivery for any pair of source and destination
// MAC addresses is duly serial according to incoming order.
static int hashcode_uaddr(struct _htable *table,unsigned char *key);
static int hashcode_mac(struct _htable *table,unsigned char *key);
static struct {
@ -126,7 +136,7 @@ static struct {
htable by_addr; // struct Remote hash table
} remotes = {
.by_mac = HTABLEINIT( struct Interface, mac, hashcode_mac ),
.by_addr = HTABLEINIT( struct Remote, uaddr, hashcode_uaddr )
.by_addr = HTABLEINIT( struct Remote, uaddr, hashcode_uaddr ),
};
#define Interface_LOCK if ( pthread_mutex_lock( &remotes.by_mac.lock ) ) { \
@ -255,6 +265,23 @@ static int hashcode_mac(struct _htable *table,unsigned char *key) {
return x;
}
// Compute a hashcode for the given MAC addr pair key
static int hashcode_macpair(struct _htable *table,unsigned char *key) {
int x = 0;
int i = 0;
if ( table->size == 256 ) {
for ( ; i < 12; i++ ) {
x += *(key++);
}
return x;
}
uint16_t *p = (uint16_t *) key;
for ( ; i < 6; i++ ) {
x += *( p++ );
}
return x;
}
// Make a text representation of bytes as ipv4 or ipv6
static char *inet_nmtoa(unsigned char *b,int w) {
static char buffer[20000];
@ -1168,13 +1195,100 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
static struct {
Queue full;
Queue free;
sem_t control;
} todolist;
// hashtable marking delivering in
struct {
htable delivering;
htable pending;
} serial = {
.delivering = HTABLEINIT( PacketItem, macpair, hashcode_macpair ),
.pending = HTABLEINIT( PacketItem, macpair, hashcode_macpair )
};
#define Delivering_LOCK \
if ( pthread_mutex_lock( &serial.delivering.lock )) { \
perror( "FATAL" ); exit( 1 ); }
#define Delivering_UNLOCK \
if ( pthread_mutex_unlock( &serial.delivering.lock ) ) { \
perror( "FATAL" ); exit( 1 ); }
#define Delivering_FIND(a,x) \
htfind( &serial.delivering, a, (unsigned char **)&x )
#define Delivering_ADD(c) \
htadd( &serial.delivering, (unsigned char *) c )
#define Delivering_DEL(c) \
htdelete( &serial.delivering, (unsigned char *) c )
#define Pending_FIND(a,x) \
htfind( &serial.pending, a, (unsigned char **)&x )
#define Pending_ADD(c) \
htadd( &serial.pending, (unsigned char *) c )
#define Pending_DEL(c) \
htdelete( &serial.pending, (unsigned char *) c )
static int deliverable(QueueItem *item,void *data) {
(void)data;
PacketItem *p = (PacketItem *) item;
PacketItem *delivering;
PacketItem *pending;
Delivering_FIND( p, delivering );
Pending_FIND( p, pending );
if ( delivering != 0 ) {
if ( pending == 0 ) {
Pending_ADD( p );
}
return 0;
}
if ( pending != 0 && pending != p ) {
return 0;
}
Pending_DEL( p );
Delivering_ADD( p );
return 1;
}
// Pick the first delivarable packet in the queue
static PacketItem *get_from_queue(Queue *queue) {
for ( ;; ) {
PacketItem *todo = (PacketItem *)
Queue_findItem( queue, deliverable, 0 );
if ( todo ) {
if ( sem_post( &todolist.control ) ) {
perror( "FATAL" );
exit( 1 );
}
return todo;
}
if ( sem_wait( &todolist.control ) ) {
perror( "FATAL" );
exit( 1 );
}
if ( sem_post( &queue->count ) ) {
perror( "FATAL" );
exit( 1 );
}
}
}
////
// The threadcontrol program for handling packets.
static void *packet_handler(void *data) {
(void) data;
for ( ;; ) {
PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.full );
PacketItem *todo = get_from_queue( &todolist.full );
if ( todo->fd == mcast.fd ) {
// Patch multicast address as source for multicast packet
route_packet( todo->buffer, todo->len, &mcast.sock );
@ -1184,6 +1298,11 @@ static void *packet_handler(void *data) {
}
route_packet( todo->buffer, todo->len, &todo->src );
}
Delivering_DEL( todo );
if ( sem_post( &todolist.control ) ) {
perror( "FATAL" );
exit( 1 );
}
Queue_addItem( &todolist.free, (QueueItem*) todo );
}
return 0;
@ -1201,6 +1320,7 @@ void todolist_initialize(int nbuf,int nthr) {
exit( 1 );
}
Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) );
sem_init( &todolist.control, 0, 0 );
for ( ; nthr > 0; nthr-- ) {
pthread_t thread; // Temporary thread id
pthread_create( &thread, 0, packet_handler, 0 );
@ -1231,6 +1351,7 @@ static void doreadUDP(int fd) {
exit( 0 );
}
#endif
memcpy( todo->macpair, todo->buffer, 12 );
Queue_addItem( &todolist.full, (QueueItem*) todo );
}
@ -1251,6 +1372,7 @@ static void received_tap(unsigned char *buf, int len) {
sizeof( struct SockAddr ) );
todo->fd = 0;
todo->len = len;
memcpy( todo->macpair, todo->buffer, 12 );
Queue_addItem( &todolist.full, (QueueItem*) todo );
}

Loading…
Cancel
Save