
/*****************************************************************************
*  qing - QoS testing tool                                                   *
*                                                                            *
* This tool sends udp packets to another hosts. If these packets get bounced *
* back, the round trip time is measured and lost packets are detected.       * 
* Mean latency of the last N packets received, the standard deviation and    *
* the number of missing packets are printed at regular intervals.            *
*                                                                            * 
*                                                                            * 
*                             created by folkert@feedface.com, october 2005  *
*                                                                            * 
*****************************************************************************/



/*        TO BUILD: gcc -std=c99 -Wall -pedantic -lm -o qing qing.c         */



#define QINGPORT          (0xffff)

#define MINBUFFERSECS          (1)
#define DEFBUFFERSECS          (2)
#define MAXBUFFERSECS         (20)

#define MINPKTSIZE            (16)
#define DEFPKTSIZE           (240)
#define MAXPKTSIZE          (1500)

#define MINPKTDELTA         (1000)
#define DEFPKTDELTA      (30*1000)
#define MAXPKTDELTA    (5000*1000)

#define MINREFRESHTIME   (10*1000)
#define DEFREFRESHTIME (1000*1000)
#define MAXREFRESHTIME (1000*1000)

#undef GNUPLOT_FORMAT 


#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <signal.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <math.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>


/* the header of the test packets sent */
struct qing_hdr {
	uint32_t   sequence;        /* sequence number of the packet            */
	uint32_t   timestamp[2];    /* 64 bit timestamp, splitted in two longs  */
};



/* globals */
static int verbose= 0;
static int breakflag= 0;


/* helper functions */
void error(char *str);
void sigint();
void usage(void);
void version(void);
void prepare_address(struct sockaddr_in6 *result, char *addr, uint16_t port);


/* struct timevals are clumsy, so we use 64 bit integers for timestamps */
typedef int64_t time64_t;    //stores microseconds
time64_t get_time(struct timeval *tv);
void     set_time64(struct timeval *tv, time64_t tt);
time64_t gettime64ofday(void);


/* a ringbuffer is needed for jitter calculation */
void ringbuffer_init(uint32_t pkt_size);
void ringbuffer_fini(void);
void ringbuffer_add(double val);
double ringbuffer_mean(void);
double ringbuffer_standard_deviation(void);


uint32_t packets_sent= 0;              //for counting
uint32_t packets_recv= 0;              //for counting
uint32_t packets_bounced= 0;           //for counting
double overall_latency= 0.0;           //for statistics



/*                               Wenn Wellen schwingen, ferne Stimmen singen */
/*                                                                -Kraftwerk */




 /*****************************************************************************
 * xceiver()                                                                  *
 * in xceiver mode, we send packets of a given size with a given frequency.   *
 * we also listen for packets bounced back to us. such packets are read and   *
 * their round trip time is determined. lost packets are detected also. at a  *
 * given interval, statistics about the last N packets received are printed.  *
 *****************************************************************************/
void
xceiver(
	char *dst_addr,         /* address to which to send               */
	uint16_t port,          /* port from/to which to send/receive     */
	uint32_t pkt_size,      /* size of a packet (bytes)               */
	time64_t pkt_delta,     /* time between packets (microseconds)    */
	uint32_t buf_time,      /* desired size of ringbuffer (seconds)   */
	time64_t refresh_delta  /* output refresh interval (microseconds) */
)
{
	struct sockaddr_in6 src,dst;           //addresses
	int sock;                              //the socket
	uint8_t *send_buf, *recv_buf;          //buffers
	struct qing_hdr *send_hdr;             //overlay to fill in header data
	struct qing_hdr *recv_hdr;             //overlay to read out header data

	time64_t wait;         //keep track of time until next packet should be sent
	time64_t prev;         //keep track of time when last packet was sent
	
	time64_t start;            //keep track of time started
	time64_t last_update= 0;   //keep track of when last statistics shown 	
	uint32_t missing= 0;       //keep track of packets not yet returned
	uint32_t expected_next= 1; //keep track of next packet to be received

	char valid_update= 0;     //keep track of whether buffer has changed
	double last_latency= 0.0; //keep track of latency of last packet received

	if (verbose)
		printf("entering xceiver mode\n");
	if (verbose) 
		printf("sending %u bytes every %lld ms\n",pkt_size,pkt_delta/1000l);


	{ /* initialize ringbuffer */
		uint32_t size= buf_time * (1000*1000) / pkt_delta; 
		ringbuffer_init(size);
		if (verbose) 
			printf("using %d samples for %d seconds buffer\n",size,buf_time);
	}
	

	/* prepare socket */		
	sock= socket(PF_INET6,SOCK_DGRAM,0);
	if (sock < 0)
		error("could not open socket");
		
	prepare_address(&src,NULL,port);
	prepare_address(&dst,dst_addr,port);
	
	/* bind to src port */
	if (bind(sock,(struct sockaddr*) &src,sizeof(struct sockaddr_in6)) < 0)
		error("could not bind to any address");


	/* prepare memory for send */
	send_buf= (uint8_t*) malloc(pkt_size);
	if (!send_buf) 
		error("could not allocate send buffer");
	memset((void*)send_buf,0xff,pkt_size);
	send_hdr= (struct qing_hdr*) send_buf;	

	/* prepare memory for receive */
	recv_buf= (uint8_t*) malloc(pkt_size);
	if (!recv_buf)
		error("could not allocate receive buffer");
	memset((void*)recv_buf,0x00,pkt_size);
	recv_hdr= (struct qing_hdr*) recv_buf;	


	/* remember time when started */
	start= gettime64ofday();

	/* prepare timer */
	wait= pkt_delta;

	while (!breakflag) { //mainloop
		struct timeval tv;
		time64_t now;
		fd_set rset;
		int r;

		FD_ZERO(&rset);
		FD_SET(sock,&rset);
	
		/* wait for either: a packet to arrive */
		/* or: until it is time to send the next packet */
		set_time64(&tv,	wait);
		r= select(sock+1,&rset,NULL,NULL,&tv);
		now= gettime64ofday();	

		if (r > 0) { //descriptor ready
			ssize_t n;

			n= recv(sock, recv_buf, pkt_size, 0);

			if (n < 0)
				error("could not receive datagram");
				
							
			if ( n == pkt_size) { //make sure its looks like a correct packet
				uint32_t seq;
			
				seq= ntohl(recv_hdr->sequence);
				if (seq >= expected_next) { //else packet is too late, ignore
					time64_t then;
					double latency;
					
					if (seq > expected_next) //missed some packets in between
						missing+= seq - expected_next;
			
					/* calculate round trip time */
					then= ntohl(recv_hdr->timestamp[0]);
					then<<= 32;
					then|= ntohl(recv_hdr->timestamp[1]);
					latency= ((double)(now - then)) / 2.0;
	
					last_latency= latency;
					overall_latency+= latency;
					ringbuffer_add(latency);
					expected_next= seq+1;
					packets_recv++;
					valid_update= 1;
				} 
				
			}
			
			/* adjust timer */
			if (now - prev > pkt_delta)
				wait= 0;
			else
				wait= pkt_delta - (now - prev);	

		} 
		
		else if (r == 0) { //timer expired

			/* fill header for packet to be sent */
			send_hdr->sequence= htonl(++packets_sent);
			send_hdr->timestamp[0]= htonl((int32_t)(now>>32));
			send_hdr->timestamp[1]= htonl((int32_t)(now & 0x00000000ffffffff));

			if (sendto(sock,send_buf,pkt_size,0,(struct sockaddr*)&dst,sizeof(dst)) < 0)
				error("could not send datagram");

			/* update time of last packet sent */
			prev= now;

			/* reset timer */
			wait= pkt_delta;
			
		}
		
		else if (errno != EINTR)  //something went wrong
			error("could not multiplex synchronously");
		
		/* print current statistics */
		/* FIXME: */
		/* update should be done even if no packet was sent or received? */ 
		if (
		 valid_update                           //buffer changed since last display
		 && now - last_update > refresh_delta   //refresh interval over
		 && now - start > (buf_time*1000*1000)  //buffer is not empty
		 ) {
			double jitter= ringbuffer_standard_deviation()/1000.0;
			double mean= ringbuffer_mean()/1000.0;
			
#ifdef GNUPLOT_FORMAT
			if (missing) printf("1");
			printf("\n");
			printf("%08lld",(now-start)/1000l);
			printf(" %03.3f",mean);
			printf(" %03.3f",jitter);
			printf(" %03.3f",last_latency/1000.0);
			printf(" %lld",missing*pkt_delta/1000l);
			printf(" ");
			if (missing) printf("1");
#else
			printf("%08lld",(now-start)/1000l);
			printf("\tmean %03.3f",mean);
			printf("\tjitter %03.3f",jitter);
			printf("\tlatency %03.3f",last_latency/1000.0);
			if (missing)
				printf("\t missing %d packets (%lld ms)",
				 missing,missing*(pkt_delta/1000l)
				);
			printf("\n");	
#endif				
			missing= 0;				       
			last_update= now;
			valid_update= 0;
		}
		
		
		fflush(stdout);
		
	} //mainloop

	if (verbose) printf("\nleaving xceiver mode\n");

	/* clean up */
	free(send_buf);
	free(recv_buf);	
	if (close(sock) < 0)
		error("could not close socket");
	ringbuffer_fini();
	
	{ /* output statistics */
		double loss_rate= 1.0 - ((double)packets_recv) / ((double)packets_sent);
		double mean_latency= ((double)overall_latency) / ((double)packets_recv);
		
		printf("\nsent %u packets\nrecv %u packets\n",packets_sent,packets_recv);
		printf("packet loss rate was %02.3f\n",loss_rate);
		printf("mean latency was %03.3f ms\n",mean_latency/1000.0);
	}
	
}





 /*****************************************************************************
 * bouncer()                                                                  *
 * in bouncer mode, we wait (blocking) for a packet to arrive on the          *
 * port and send it back to the sender without changing anything.             *
 *****************************************************************************/
void
bouncer(
	uint16_t port   /* port on which to bounce packets */
)
{
	int sock;
	uint8_t *buf;
	struct sockaddr_in6 src;
	
	/* TODO: */
	/* might want to daemonize when in bounce mode? */
	
	if (verbose) printf("entering bouncer mode\n");
	
	/* prepare buffer */
	buf= (uint8_t*) malloc(MAXPKTSIZE);
	if (!buf)
		error("could not allocate buffer");
	memset(buf,0x00,MAXPKTSIZE);
	
	/* prepare socket */
	sock= socket(PF_INET6,SOCK_DGRAM,0);
	if (sock < 0)
		error("could not open socket");
		
	
	prepare_address(&src,NULL,port);

	/* bind socket to port */
	if (bind(sock,(struct sockaddr*)&src,sizeof(struct sockaddr_in6)) < 0)
		error("could not bind to any address");
	
	while (!breakflag) { //mainloop
		ssize_t r;
		struct sockaddr_in6 peer_addr;
		socklen_t sl;

		sl= sizeof(struct sockaddr_in6);
		r= recvfrom(sock,buf,MAXPKTSIZE,0,(struct sockaddr*)&peer_addr,&sl);
	
		if (r > 0) { //we got signal
			assert(sl == sizeof(struct sockaddr_in6));
			/* FIXME: */
			/* we should answer with the same address we were addressed with? */
			if (sendto(sock,buf,r,0,(struct sockaddr*)&peer_addr,sl) < 0) {
				if (errno == EHOSTUNREACH) {
					if (verbose) printf("unreachable!");
				} else
					error("could not bounce datagram");
			}		
			packets_bounced++;	
			if (verbose > 1) printf("bounce!");
		}
	
		else if (r < 0 && errno != EINTR) //something went wrong
			error("could not receive datagram");
			
		fflush(stdout);
		
	} //mainloop
	
	if (verbose) printf("\nleaving bouncer mode\n");
	
	/* clean up */
	free(buf);
	if (close(sock) < 0)
		error("could not close socket");

	printf("\nbounced %u packets\n",packets_bounced);
	return;
}




 /*** main function *********************************************************/
int
main(int argc, 	char **args)
{
	int mode_bouncer= 0;
	char dst_addr[0x40];
	uint16_t port= QINGPORT;
	time64_t refresh_delta= DEFREFRESHTIME;
	uint32_t pkt_size= DEFPKTSIZE;
	time64_t pkt_delta= DEFPKTDELTA;
	uint32_t buf_time= DEFBUFFERSECS;
	
	{ /* parse parameters */
		int c= 0;
		opterr= 0;
		while ( c != -1) {
			c= getopt(argc, args, "VvBs:d:r:t:");
			switch (c) {
				case 'V':
					version();
					break;
				case 'B': 
					mode_bouncer= 1;
					break;
				case 'v':
					verbose++;
					break;
				case 't':
						if (sscanf(optarg,"%u",&buf_time) != 1)
							usage();
						if (buf_time < MINBUFFERSECS || buf_time > MAXBUFFERSECS)
							usage();
					break;
				case 'r':
						if (sscanf(optarg,"%lld",&refresh_delta) != 1)
							usage();
						refresh_delta*= 1000l; //CLI param is milli, all else micro	
						if (refresh_delta < MINREFRESHTIME || refresh_delta > MAXREFRESHTIME)
							usage();
					break;					
				case 's':
					if (sscanf(optarg,"%u",&pkt_size) != 1)
						usage();
					if (pkt_size < MINPKTSIZE || pkt_size > MAXPKTSIZE)
						usage();
					break;
				case 'd':
					if (sscanf(optarg,"%lld",&pkt_delta) != 1)
						usage();
					pkt_delta*= 1000l; //CLI param is milli, all else micro	
					if (pkt_delta < MINPKTDELTA || pkt_delta > MAXPKTDELTA)
						usage();
					break;
				case -1:
					break;
				case '?':
				case 'h':	
				default:
					usage();
					break;			
			}
		}	
		argc-= optind;
		args+= optind;			
	}
	
		
	{ /* register sighup handler */
		struct sigaction sa;
		sigemptyset(&(sa.sa_mask));
		sa.sa_flags= 0;
		sa.sa_handler= sigint;
		if (sigaction(SIGINT,&sa,NULL) < 0)
			error("could not install signal handler");
	}
	

	/* enter desired mode */	
	if (mode_bouncer) {
		if (argc > 1) usage();				
		if (argc == 1) //custom port given
			if (sscanf(args[argc-1],"%hu",&port) <= 0)
				usage();
		bouncer(port);
	}	
	else {
		if (argc < 1 || argc > 2) usage();
		strncpy(dst_addr,args[0],0x40);
		if (argc == 2) //custom port given
			if (sscanf(args[1],"%hu",&port) != 1)
				usage();
		xceiver(dst_addr,port,pkt_size,pkt_delta,buf_time,refresh_delta);
	}	
	
	exit(0);	
} 



 /*****************************************************************************
 * prepare_address()                                                          *
 * fills the given sockaddr_in6 result with the needed parameters             *
 * if an addr string is given, it is resolved/converted and used as address   *
 * else, the unspecified  address is used                                     *
 *****************************************************************************/
void
prepare_address(struct sockaddr_in6 *result, char *addr, uint16_t port)
{	
	if (addr) { //resolution needed 
		struct addrinfo hints;
		struct addrinfo *res= NULL;
		int err= -1;

		/* get address for string representation */
		memset(&hints,0x00,sizeof(struct addrinfo));
		hints.ai_family= AF_INET6;
		err= getaddrinfo(addr,NULL,&hints,&res);
		if (err || !res) {
			fprintf(stderr,
				"\n##### ERROR #####\n# %s is not a valid address:\n# %s\n",
				addr,gai_strerror(err)
			);
			exit(-1);
		}
	
		/* copy address into result */
		assert(res->ai_addrlen == sizeof(struct sockaddr_in6));
		memcpy(result,res->ai_addr,sizeof(struct sockaddr_in6));

		freeaddrinfo(res);
	}

	/* fill result */
	result->sin6_family= AF_INET6;
	if (!addr) 
		result->sin6_addr= in6addr_any;
	result->sin6_port= htons(port);
	result->sin6_flowinfo= 0;
	result->sin6_scope_id= 0;
	result->sin6_len= sizeof(struct sockaddr_in6);
		
	return;
}

 /*** prints version information and exits ***********************************/
void
version(void)
{
	printf("qing version 1.0\n");
	printf("10/2005 folkert@feedface.com\n");
	exit(0);
}


 /*** prints short parameter overview and exits ******************************/
void 
usage(void)
{
	fprintf(stderr,"bounce: qing [-v] -B [port]\n");
	fprintf(stderr,"xceive: qing [-s size (byte)] [-d delta (ms)]\n");
	fprintf(stderr,"         [-t buffersize (s)] [-r refresh (ms)]\n");
	fprintf(stderr,"         [-v] address [port]\n");
	exit(-1);
}


 /*** prints an error message and exit ***************************************/
void
error(char *str)
{
	fprintf(stderr,"\n##### ERROR #####\n# %s:\n# %s\n",str,strerror(errno));
	exit(-1);
}


void 
sigint()
 /*** handler for SIGINT *****************************************************/
{
	breakflag= 1;
}



static double *ringbuffer;  //array of samples
static uint32_t rb_index;   //current position in ringbuffer
static uint32_t rb_size;    //number of samples in ringbuffer


 /*****************************************************************************
 * ringbuffer_init()                                                          *
 * initializes the ring buffer data structure                                 *
 *****************************************************************************/
void
ringbuffer_init(uint32_t size)
{
	rb_size= size;
	ringbuffer= (double*) malloc(sizeof(double) * rb_size);
	if (!ringbuffer)
		error("could not allocate ringbuffer");
	memset(ringbuffer,0x00,sizeof(double) * rb_size);
	rb_index= 0;		
}

 /*****************************************************************************
 * ringbuffer_fini()                                                          *
 * cleans up the ring buffer data structure                                   *
 *****************************************************************************/
void
ringbuffer_fini(void)
{
	free(ringbuffer);
}

 /*****************************************************************************
 * ringbuffer_add()                                                           *
 * adds the given value to the ringbuffer, overwriting the oldest entry       *
 *****************************************************************************/
void
ringbuffer_add(double val)
{
	ringbuffer[rb_index]= val;
	rb_index++;
	rb_index%= rb_size;
}


 /*****************************************************************************
 * ringbuffer_mean()                                                          *
 * returns the arithmetic mean of the values in the buffer                    *
 *****************************************************************************/
double
ringbuffer_mean(void)
{
	double m= 0.0;
	for (uint32_t i= 0; i < rb_size; i++)
		m+= ringbuffer[i];
	m/= rb_size;
	return m;
}


 /*****************************************************************************
 * ringbuffer_standard_deviation()                                            *
 * returns the standard deviation of the values in the buffer                 *
 *****************************************************************************/
double
ringbuffer_standard_deviation(void)
{
	double ret= 0.0;
	double mean= 0.0;
	double variance= 0.0;
	
	mean= ringbuffer_mean();	
	
	for (uint32_t i= 0; i < rb_size; i++)
		variance+= ((ringbuffer[i] - mean) * (ringbuffer[i] - mean));
	variance/= rb_size;	

	ret= sqrt(variance);
	return ret;	
}






 /*****************************************************************************
 * get_time64()                                                               *
 * returns the value of the given struct timeval as a time64_t value          *
 *****************************************************************************/
time64_t
get_time64(struct timeval *tv)
{
	time64_t ret;
	ret= tv->tv_sec;
	ret*= (1000 * 1000);
	ret+= tv->tv_usec;
	return ret;
}


 /*****************************************************************************
 * set_time64()                                                               *
 * stores the given time64_t value in the given timeval struct                *
 *****************************************************************************/
void
set_time64(struct timeval *tv, time64_t tt)
{
	tv->tv_sec=  (tt/(1000*1000));
	tv->tv_usec= (tt%(1000*1000));
	return;
}


 /*****************************************************************************
 * gettimeofday()                                                             *
 * returns the current microseconds since January 1st, 1970, midnight (UTC)   *
 *****************************************************************************/
time64_t
gettime64ofday(void)
{
	time64_t ret;
	struct timeval tv;
	gettimeofday(&tv,NULL);
	ret= get_time64(&tv);
	return ret;
}




