Browse Source

use per-socket reader threads

wip/readerthreads
Ralph Rönnquist 3 months ago
parent
commit
9b24d4e275
  1. 257
      rrqnet.c

257
rrqnet.c

@ -87,6 +87,10 @@ typedef struct _PacketItem {
unsigned char buffer[ BUFSIZE ];
} PacketItem;
typedef struct _ReaderData {
int fd;
} ReaderData;
// heartbeat interval, in seconds
#define HEARTBEAT 30
#define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 )
@ -211,14 +215,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
#define VERBOSE3OUT(fmt, ...) \
if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) )
// A buffer for reading stdin in fragmented way, to allow outgoing
// packets during the reading of stdin packets, if it's fragmented.
static struct {
unsigned char buffer[ BUFSIZE ]; // Packet data
unsigned int end; // Packet size
unsigned int cur; // Amount read so far
} input;
// The actual name of this program (argv[0])
static unsigned char *progname;
@ -1168,6 +1164,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
static struct {
Queue full;
Queue free;
sem_t reading;
} todolist;
// The threadcontrol program for handling packets.
@ -1200,6 +1197,10 @@ void todolist_initialize(int nbuf,int nthr) {
perror( "FATAL" );
exit( 1 );
}
if ( sem_init( &todolist.reading, 0, 1 ) ) {
perror( "FATAL" );
exit( 1 );
}
Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) );
for ( ; nthr > 0; nthr-- ) {
pthread_t thread; // Temporary thread id
@ -1213,45 +1214,29 @@ void todolist_initialize(int nbuf,int nthr) {
// is updated for the new MAC address, However, if there is then a MAC
// address clash in the connection table, then the associated remote
// is removed, and the packet is dropped.
static void doreadUDP(int fd) {
PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
socklen_t addrlen =
udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
memset( &todo->src, 0, sizeof( todo->src ) );
todo->fd = fd;
todo->len = recvfrom(
fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
if ( todo->len == -1) {
perror( "Receiving UDP" );
exit( 1 );
}
static void *doreadUDP(void *data) {
int fd = ((ReaderData *) data)->fd;
while ( 1 ) {
PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
socklen_t addrlen =
udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
memset( &todo->src, 0, sizeof( todo->src ) );
todo->fd = fd;
todo->len = recvfrom(
fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
if ( todo->len == -1) {
perror( "Receiving UDP" );
exit( 1 );
}
#ifdef GPROF
if ( todo->len == 17 &&
memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
exit( 0 );
}
#endif
Queue_addItem( &todolist.full, (QueueItem*) todo );
}
// Handle packet received on the tap/stdio channel
static void received_tap(unsigned char *buf, int len) {
static struct Remote *tap_remote = 0;
if ( tap_remote == 0 ) {
Remote_LOCK;
if ( tap_remote == 0 ) {
tap_remote = add_remote( 0, 0 );
if ( todo->len == 17 &&
memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
exit( 0 );
}
Remote_UNLOCK;
#endif
Queue_addItem( &todolist.full, (QueueItem*) todo );
}
PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
memcpy( (void*)todo->buffer, (void*)buf, len );
memcpy( (void*)&todo->src,
(void*)&tap_remote->uaddr,
sizeof( struct SockAddr ) );
todo->fd = 0;
todo->len = len;
Queue_addItem( &todolist.full, (QueueItem*) todo );
return 0;
}
// Read up to n bytes from the given file descriptor into the buffer
@ -1312,17 +1297,6 @@ static void heartbeat(int fd) {
Remote_UNLOCK;
}
// The threadcontrol program for issuing heartbeat packets.
// Regular heartbeat
static void *hearbeater_thread(void *data) {
(void) data;
for ( ;; ) {
sleep( HEARTBEAT );
heartbeat( udp_fd );
}
return 0;
}
// Tell how to use this program and exit with failure.
static void usage(void) {
fprintf( stderr, "Packet tunneling over UDP, multiple channels, " );
@ -1356,102 +1330,60 @@ static int tun_alloc(char *dev, int flags) {
return fd;
}
static void doreadTap() {
if ( stdio ) {
if ( input.end == 0 ) {
// Handle packet received on the tap/stdio channel
static void initialize_tap() {
// Ensure there is a Remote for this
static struct Remote *tap_remote = 0;
if ( tap_remote == 0 ) {
Remote_LOCK;
if ( tap_remote == 0 ) {
tap_remote = add_remote( 0, 0 );
}
Remote_UNLOCK;
}
}
// Thread to handle tap/stdio input
static void *doreadTap(void *data) {
int fd = ((ReaderData*) data)->fd;
unsigned int end = 0; // Packet size
unsigned int cur = 0; // Amount read so far
size_t e;
PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
while ( 1 ) {
if ( stdio ) {
uint16_t plength;
int n = read_into( 0, (unsigned char *) &plength,
sizeof( plength ), 0 );
sizeof( plength ), 0 );
if ( n == 0 ) {
// Tap/stdio closed => exit silently
exit( 0 );
}
input.end = ntohs( plength );
input.cur = 0;
}
size_t e = input.end - input.cur;
unsigned char *p = input.buffer + input.cur;
if ( input.end > BUFSIZE ) {
// Oversize packets should be read and discarded
if ( e > BUFSIZE ) {
e = BUFSIZE;
}
p = input.buffer;
}
input.cur += read_into( 0, p, e, 1 );
} else {
input.end = doread( tap_fd, input.buffer, BUFSIZE );
input.cur = input.end;
}
if ( input.end == input.cur ) {
VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end );
if ( input.end <= BUFSIZE ) {
received_tap( input.buffer, input.end );
}
input.end = 0; // Ready for next packet
}
// End handling tap
}
// MAXFD is Finalized before multi-threading, and then remains constant.
static int MAXFD;
// This is the main packet handling loop
static int packet_loop() {
// A packet buffer for receiving UDP
unsigned char buffer[ BUFSIZE ];
int n;
//time_t next_heartbeat = time(0);
int cycle = 0; // which channel to check first
while( 1 ) {
fd_set rd_set;
FD_ZERO( &rd_set );
if ( mcast.fd ) {
FD_SET( mcast.fd, &rd_set );
}
FD_SET( udp_fd, &rd_set );
if ( udp6 ) {
FD_SET( udp_fd, &rd_set );
}
if ( tap ) { // tap/stdio
FD_SET( tap_fd, &rd_set );
}
n = select( MAXFD, &rd_set, NULL, NULL, NULL );
VERBOSE3OUT( "select got %d\n", n );
if ( n < 0 ) {
if ( errno == EINTR ) {
continue;
end = ntohs( plength );
cur = 0;
while ( ( e = ( end - cur ) ) != 0 ) {
unsigned char *p = todo->buffer + cur;
if ( end > BUFSIZE ) {
// Oversize packets should be read and discarded
if ( e > BUFSIZE ) {
e = BUFSIZE;
}
p = todo->buffer;
}
cur += read_into( 0, p, e, 1 );
}
perror("select");
exit(1);
} else {
end = doread( fd, todo->buffer, BUFSIZE );
cur = end;
}
if ( n == 0 ) {
continue;
}
// Process input with alternating priority across channels
for ( ;; cycle++ ) {
if ( cycle >= 3 ) {
cycle = 0;
}
if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) {
// Check and process UDP socket
doreadUDP( udp_fd ) ;
cycle = 1;
break;
}
if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) {
// Check and process multicast socket
doreadUDP( mcast.fd ) ;
cycle = 2;
break;
}
if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) {
// Check and process tap/stdio socket
doreadTap( tap_fd, buffer );
cycle = 0;
break;
}
VERBOSE3OUT( "TAP/stdio input %d bytes\n", end );
if ( end <= BUFSIZE ) {
todo->fd = 0;
todo->len = end;
Queue_addItem( &todolist.full, (QueueItem*) todo );
todo = (PacketItem*) Queue_getItem( &todolist.free );
}
// End handling tap
}
return 0;
}
@ -1464,6 +1396,7 @@ static int packet_loop() {
// remote = [ipv6(/maskwidth)](:port)(=key)
// ip = ipv4 | [ipv6]
int main(int argc, char *argv[]) {
pthread_t thread; // Temporary thread id
int port, i;
progname = (unsigned char *) argv[0];
///// Parse command line arguments
@ -1559,7 +1492,6 @@ int main(int argc, char *argv[]) {
}
todolist_initialize( buffers_count, threads_count );
MAXFD = 0;
// Set up the tap/stdio channel
if ( tap ) {
// set up the nominated tap
@ -1570,15 +1502,13 @@ int main(int argc, char *argv[]) {
exit(1);
}
VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd );
MAXFD = tap_fd;
stdio = 0;
// pretend a zero packet on the tap, for initializing.
received_tap( 0, 0 );
initialize_tap();
} else {
// set up for stdin/stdout local traffix
setbuf( stdout, NULL ); // No buffering on stdout.
tap_fd = 0; // actually stdin
MAXFD = 0;
stdio = 1;
}
} else {
@ -1591,9 +1521,6 @@ int main(int argc, char *argv[]) {
VERBOSEOUT( "Using multicast %s:%d at %d\n",
inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ),
mcast.fd );
if ( mcast.fd > MAXFD ) {
MAXFD = mcast.fd;
}
}
// Set up the unicast UPD channel (all interfaces)
if ( udp6 == 0 ) {
@ -1629,10 +1556,6 @@ int main(int argc, char *argv[]) {
}
VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd );
}
if ( udp_fd > MAXFD ) {
MAXFD = udp_fd;
}
MAXFD++ ;
// If not using stdio for local traffic, then stdin and stdout are
// closed here, so as to avoid that any other traffic channel gets
// 0 or 1 as its file descriptor. Note: stderr (2) is left open.
@ -1640,13 +1563,27 @@ int main(int argc, char *argv[]) {
close( 0 );
close( 1 );
}
VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n",
tap_fd, mcast.fd, udp_fd, MAXFD );
// Start heartbeater thread
pthread_t thread; // Temporary thread id -- not used
pthread_create( &thread, 0, hearbeater_thread, 0 );
VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n",
tap_fd, mcast.fd, udp_fd );
// Handle packets
return packet_loop();
ReaderData udp_reader = { .fd = udp_fd };
pthread_create( &thread, 0, doreadUDP, &udp_reader );
if ( mcast.group.imr_multiaddr.s_addr ) {
ReaderData mcast_reader = { .fd = mcast.fd };
pthread_create( &thread, 0, doreadUDP, &mcast_reader );
}
if ( tap_fd || stdio ) {
ReaderData tap_reader = { .fd = tap_fd };
pthread_create( &thread, 0, doreadTap, &tap_reader );
}
// Start heartbeating to uplinks
for ( ;; ) {
sleep( HEARTBEAT );
heartbeat( udp_fd );
}
return 0;
}

Loading…
Cancel
Save