diff mbox

[PATCHv3,3/3] fork example

Message ID 1407163684-27844-4-git-send-email-maxim.uvarov@linaro.org
State New
Headers show

Commit Message

Maxim Uvarov Aug. 4, 2014, 2:48 p.m. UTC
Example how to create multi process odp application.

Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
---
 configure.ac             |   1 +
 example/Makefile.am      |   2 +-
 example/fork/Makefile.am |   5 +
 example/fork/README      |  32 +++
 example/fork/odp_fork.c  | 682 +++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 721 insertions(+), 1 deletion(-)
 create mode 100644 example/fork/Makefile.am
 create mode 100644 example/fork/README
 create mode 100644 example/fork/odp_fork.c

Comments

Anders Roxell Aug. 6, 2014, 9 p.m. UTC | #1
On 2014-08-04 18:48, Maxim Uvarov wrote:
> Example how to create multi process odp application.
> 
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  configure.ac             |   1 +
>  example/Makefile.am      |   2 +-
>  example/fork/Makefile.am |   5 +
>  example/fork/README      |  32 +++
>  example/fork/odp_fork.c  | 682 +++++++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 721 insertions(+), 1 deletion(-)
>  create mode 100644 example/fork/Makefile.am
>  create mode 100644 example/fork/README
>  create mode 100644 example/fork/odp_fork.c
> 
> diff --git a/configure.ac b/configure.ac
> index 97089e9..c549182 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -110,6 +110,7 @@ AC_CONFIG_FILES([Makefile
>  		 platform/linux-keystone2/Makefile
>  		 platform/linux-dpdk/Makefile
>  		 example/Makefile
> +		 example/fork/Makefile
>  		 example/generator/Makefile
>  		 example/l2fwd/Makefile
>  		 example/odp_example/Makefile
> diff --git a/example/Makefile.am b/example/Makefile.am
> index 01a3305..e85507d 100644
> --- a/example/Makefile.am
> +++ b/example/Makefile.am
> @@ -1 +1 @@
> -SUBDIRS = generator l2fwd odp_example packet packet_netmap timer
> +SUBDIRS = generator l2fwd odp_example packet packet_netmap timer fork
> diff --git a/example/fork/Makefile.am b/example/fork/Makefile.am
> new file mode 100644
> index 0000000..f6c6a50
> --- /dev/null
> +++ b/example/fork/Makefile.am
> @@ -0,0 +1,5 @@
> +include $(top_srcdir)/example/Makefile.inc
> +
> +bin_PROGRAMS = odp_fork

Miss: odp_fork_LDFLAGS = $(AM_LDFLAGS) -static

> +
> +dist_odp_fork_SOURCES = odp_fork.c
> diff --git a/example/fork/README b/example/fork/README
> new file mode 100644
> index 0000000..0eefd63
> --- /dev/null
> +++ b/example/fork/README
> @@ -0,0 +1,32 @@
> +			ODP fork example
> +
> +This example shows how to use fork to implement scalable multi
> +process applications.
> +
> +Algorithm is following:
> +	- do standard initialization, i.e odp_init_global();
> +	- allocate odp ring in shared memory;
> +	- fork();
> +	- Process 1: creates X ingress threads and doing actual
> +packet I/O and places packets to odp_ring buffer (pktio_queue_thread()
> +and pktio_ifburst_thread() functions).
> +	- Process 2: creates X packet processing threads and does read
> +	  from odp_ring buffer. Then calls odp_buffer_free().
> +
> +Example:
> +./odp_fork -i eth0 -m 1 -c 1
> +On remote host run ping.
> +
> +[12178/1] enqueue buf 7921 size 98/1856, cnt 0
> +		[12176] dequeue buf 7921, size 98/1856, cnt 0
> +[12178/1] enqueue buf 7905 size 98/1856, cnt 1
> +		[12176] dequeue buf 7905, size 98/1856, cnt 1
> +[12178/1] enqueue buf 7889 size 98/1856, cnt 2
> +		[12176] dequeue buf 7889, size 98/1856, cnt 2
> +[12178/1] enqueue buf 7873 size 98/1856, cnt 3
> +		[12176] dequeue buf 7873, size 98/1856, cnt 3
> +[12178/1] enqueue buf 7857 size 98/1856, cnt 4
> +		[12176] dequeue buf 7857, size 98/1856, cnt 4
> +
> +Main PID/thread [12178/1] enqueues packets to buffer, child process [12176]
> +dequeues packets from the odp_ring.
> diff --git a/example/fork/odp_fork.c b/example/fork/odp_fork.c
> new file mode 100644
> index 0000000..d8ec36c
> --- /dev/null
> +++ b/example/fork/odp_fork.c
> @@ -0,0 +1,682 @@
> +/* Copyright (c) 2013, Linaro Limited

Wrong copyright year...

Cheers,
Anders

> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +/**
> + * @file
> + *
> + * @example odp_pktio.c  ODP basic packet IO loopback test application
> + */
> +
> +#include <stdlib.h>
> +#include <string.h>
> +#include <getopt.h>
> +#include <unistd.h>
> +
> +#include <odp.h>
> +#include <helper/odp_linux.h>
> +#include <helper/odp_packet_helper.h>
> +#include <helper/odp_eth.h>
> +#include <helper/odp_ip.h>
> +#include <helper/odp_ring.h>
> +
> +#define MAX_WORKERS            32
> +#define SHM_PKT_POOL_SIZE      (512*2048)
> +#define SHM_PKT_POOL_BUF_SIZE  1856
> +#define MAX_PKT_BURST          16
> +
> +#define APPL_MODE_PKT_BURST    0
> +#define APPL_MODE_PKT_QUEUE    1
> +
> +#define RING_SIZE 4096
> +#define ODP_RING_NAMESIZE 32
> +
> +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
> +
> +/** Get rid of path in filename - only for unix-type paths using '/' */
> +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
> +			    strrchr((file_name), '/') + 1 : (file_name))
> +/**
> + * Parsed command line application arguments
> + */
> +typedef struct {
> +	int core_count;
> +	int if_count;		/**< Number of interfaces to be used */
> +	char **if_names;	/**< Array of pointers to interface names */
> +	int mode;		/**< Packet IO mode */
> +	int type;		/**< Packet IO type */
> +	int fanout;		/**< Packet IO fanout */
> +	odp_buffer_pool_t pool;	/**< Buffer pool for packet IO */
> +} appl_args_t;
> +
> +/**
> + * Thread specific arguments
> + */
> +typedef struct {
> +	char *pktio_dev;	/**< Interface name to use */
> +	odp_buffer_pool_t pool;	/**< Buffer pool for packet IO */
> +	int mode;		/**< Thread mode */
> +	int type;		/**< Thread i/o type */
> +	int fanout;		/**< Thread i/o fanout */
> +	int tpid;
> +} thread_args_t;
> +
> +/**
> + * Grouping of both parsed CL args and thread specific args - alloc together
> + */
> +typedef struct {
> +	/** Application (parsed) arguments */
> +	appl_args_t appl;
> +	/** Thread specific arguments */
> +	thread_args_t thread[MAX_WORKERS];
> +} args_t;
> +
> +/** Global pointer to args */
> +static args_t *args;
> +
> +/* helper funcs */
> +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len);
> +static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
> +static void print_info(char *progname, appl_args_t *appl_args);
> +static void usage(char *progname);
> +
> +static void *ring_thread(void *arg)
> +{
> +	thread_args_t *thr_args;
> +	thr_args = arg;
> +	odp_ring_t *r;
> +	int ret;
> +	odp_buffer_t buf;
> +	odp_packet_t pkt;
> +	unsigned long qp_cnt = 0;
> +
> +	r = odp_ring_lookup("odp_shm_ring");
> +	if (!r)
> +		ODP_ERR("forked pid %d ring lookup failed\n", getpid());
> +	else
> +		printf("forked pid %d ring lookup ok\n", getpid());
> +
> +	if (thr_args->tpid) {
> +		odp_buffer_t **buf_dst_p =
> +			(odp_buffer_t **)malloc(RING_SIZE *
> +						sizeof(odp_buffer_t));
> +
> +		while (1) {
> +			ret = odp_ring_mc_dequeue_bulk(r, (void **)buf_dst_p,
> +						       1);
> +			if (ret == 0) {
> +				memcpy(&buf, (void *)buf_dst_p,
> +				       sizeof(odp_buffer_t));
> +
> +				pkt = odp_packet_from_buffer(buf);
> +
> +				printf("\t\t[%d] dequeue buf %d, size %ld/%ld, cnt %ld\n",
> +				       getpid(), buf,
> +				       (unsigned long)odp_packet_get_len(pkt),
> +				       (unsigned long)odp_buffer_size(buf),
> +				       qp_cnt++);
> +
> +				odp_buffer_free(buf);
> +			}
> +		}
> +	}
> +
> +	/* unreachable */
> +	return NULL;
> +}
> +
> +
> +/**
> + * Packet IO loopback worker thread using ODP queues
> + *
> + * @param arg  thread arguments of type 'thread_args_t *'
> + */
> +static void *pktio_queue_thread(void *arg)
> +{
> +	int thr;
> +	odp_buffer_pool_t pkt_pool;
> +	odp_pktio_t pktio;
> +	thread_args_t *thr_args;
> +	odp_queue_t inq_def;
> +	char inq_name[ODP_QUEUE_NAME_LEN];
> +	odp_queue_param_t qparam;
> +	odp_packet_t pkt;
> +	odp_buffer_t buf;
> +	int ret;
> +	unsigned long pkt_cnt = 0;
> +	unsigned long err_cnt = 0;
> +	odp_pktio_params_t params;
> +	socket_params_t *sock_params = &params.sock_params;
> +	odp_ring_t *r;
> +
> +	thr_args = arg;
> +
> +	thr = odp_thread_id();
> +
> +	printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
> +	       thr_args->pktio_dev);
> +
> +	/* lookup ring from its name */
> +	/* Lookup the packet pool */
> +	pkt_pool = odp_buffer_pool_lookup("packet_pool");
> +	if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
> +		ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
> +		return NULL;
> +	}
> +
> +	/* Open a packet IO instance for this thread */
> +	sock_params->type = thr_args->type;
> +	sock_params->fanout = thr_args->fanout;
> +	pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
> +	if (pktio == ODP_PKTIO_INVALID) {
> +		ODP_ERR("  [%02i] Error: pktio create failed\n", thr);
> +		return NULL;
> +	}
> +
> +	/*
> +	 * Create and set the default INPUT queue associated with the 'pktio'
> +	 * resource
> +	 */
> +	qparam.sched.prio  = ODP_SCHED_PRIO_DEFAULT;
> +	qparam.sched.sync  = ODP_SCHED_SYNC_ATOMIC;
> +	qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
> +	snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
> +	inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
> +
> +	inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam);
> +	if (inq_def == ODP_QUEUE_INVALID) {
> +		ODP_ERR("  [%02i] Error: pktio queue creation failed\n", thr);
> +		return NULL;
> +	}
> +
> +	ret = odp_pktio_inq_setdef(pktio, inq_def);
> +	if (ret != 0) {
> +		ODP_ERR("  [%02i] Error: default input-Q setup\n", thr);
> +		return NULL;
> +	}
> +
> +	printf("  [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n"
> +	       "          default pktio%02i-INPUT queue:%u\n",
> +		thr, pktio, pktio, inq_def);
> +
> +	r = odp_ring_lookup("odp_shm_ring");
> +	if (!r)
> +		ODP_ERR("Main proc pid %d, thread %d ring lookup failed\n",
> +			getpid(), thr);
> +	else
> +		printf("Main proc pid %d, thread %d ring lookup OK\n",
> +		       getpid(), thr);
> +
> +
> +	void *rbuf = malloc(RING_SIZE * sizeof(odp_buffer_t));
> +	void **rbuf_p = rbuf;
> +
> +	/* Loop packets */
> +	for (;;) {
> +		/* Use schedule to get buf from any input queue */
> +		buf = odp_schedule(NULL, ODP_SCHED_WAIT);
> +
> +		pkt = odp_packet_from_buffer(buf);
> +
> +		/* Drop packets with errors */
> +		if (odp_unlikely(drop_err_pkts(&pkt, 1) == 0)) {
> +			ODP_ERR("Drop frame - err_cnt:%lu\n", ++err_cnt);
> +			continue;
> +		}
> +
> +		/* Place buffer to ODP ring queue */
> +		rbuf_p = (void *)&buf;
> +		ret = odp_ring_mp_enqueue_bulk(r, rbuf_p, 1);
> +		if (ret != 0)
> +			ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
> +		else
> +			printf("[%d/%d] enqueue buf %d size %ld/%ld, cnt %lu\n",
> +			       getpid(), thr, buf, (unsigned long)odp_packet_get_len(pkt),
> +			       (unsigned long)odp_buffer_size(buf), pkt_cnt++);
> +	}
> +
> +/* unreachable */
> +}
> +
> +/**
> + * Packet IO loopback worker thread using bursts from/to IO resources
> + *
> + * @param arg  thread arguments of type 'thread_args_t *'
> + */
> +static void *pktio_ifburst_thread(void *arg)
> +{
> +	int thr;
> +	odp_buffer_pool_t pkt_pool;
> +	odp_pktio_t pktio;
> +	thread_args_t *thr_args;
> +	int pkts, pkts_ok;
> +	odp_packet_t pkt_tbl[MAX_PKT_BURST];
> +	unsigned long pkt_cnt = 0;
> +	unsigned long err_cnt = 0;
> +	odp_pktio_params_t params;
> +	socket_params_t *sock_params = &params.sock_params;
> +	int ret;
> +	odp_ring_t *r;
> +
> +	thr = odp_thread_id();
> +	thr_args = arg;
> +
> +	printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
> +	       thr_args->pktio_dev);
> +
> +	/* Lookup the packet pool */
> +	pkt_pool = odp_buffer_pool_lookup("packet_pool");
> +	if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
> +		ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
> +		return NULL;
> +	}
> +
> +	/* Open a packet IO instance for this thread */
> +	sock_params->type = thr_args->type;
> +	sock_params->fanout = thr_args->fanout;
> +	pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
> +	if (pktio == ODP_PKTIO_INVALID) {
> +		ODP_ERR("  [%02i] Error: pktio create failed.\n", thr);
> +		return NULL;
> +	}
> +
> +	printf("  [%02i] created pktio:%02i, burst mode\n",
> +	       thr, pktio);
> +
> +	r = odp_ring_lookup("odp_shm_ring");
> +	if (!r)
> +		ODP_ERR("Main proc pid %d, thread %d ring lookup failed\n",
> +			getpid(), thr);
> +	else
> +		printf("Main proc pid %d, thread %d ring lookup OK\n",
> +		       getpid(), thr);
> +
> +	void *rbuf = malloc(RING_SIZE * sizeof(odp_buffer_t));
> +	void **rbuf_p = rbuf;
> +
> +	/* Loop packets */
> +	for (;;) {
> +		pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST);
> +		if (pkts > 0) {
> +			/* Drop packets with errors */
> +			pkts_ok = drop_err_pkts(pkt_tbl, pkts);
> +			if (pkts_ok > 0) {
> +				/* Place buffer to ODP ring queue */
> +				rbuf_p = (void *)&pkt_tbl;
> +				ret = odp_ring_mp_enqueue_bulk(r, rbuf_p,
> +							       pkts_ok);
> +				pkt_cnt += pkts_ok;
> +				if (ret != 0) {
> +					ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
> +				} else {
> +					printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n",
> +					       getpid(), thr, pkts_ok,
> +					       pkt_tbl[0],
> +					       (unsigned long)odp_packet_get_len(pkt_tbl[0]),
> +					       (unsigned long)odp_buffer_size(pkt_tbl[0]),
> +					       pkt_cnt);
> +				}
> +			}
> +
> +			if (odp_unlikely(pkts_ok != pkts))
> +				ODP_ERR("Dropped frames:%u - err_cnt:%lu\n",
> +					pkts-pkts_ok, ++err_cnt);
> +		}
> +	}
> +
> +/* unreachable */
> +}
> +
> +/**
> + * ODP packet example main function
> + */
> +int main(int argc, char *argv[])
> +{
> +	odp_linux_pthread_t thread_tbl[MAX_WORKERS];
> +	odp_buffer_pool_t pool;
> +	int thr_id;
> +	int num_workers;
> +	void *pool_base;
> +	int i;
> +	int first_core;
> +	int core_count;
> +	odp_ring_t *r;
> +	char ring_name[ODP_RING_NAMESIZE];
> +
> +	/* Init ODP before calling anything else */
> +	if (odp_init_global()) {
> +		ODP_ERR("Error: ODP global init failed.\n");
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	args = malloc(sizeof(args_t));
> +	if (args == NULL) {
> +		ODP_ERR("Error: shared mem alloc failed.\n");
> +		exit(EXIT_FAILURE);
> +	}
> +	memset(args, 0, sizeof(*args));
> +
> +	/* Parse and store the application arguments */
> +	parse_args(argc, argv, &args->appl);
> +
> +	/* Print both system and application information */
> +	print_info(NO_PATH(argv[0]), &args->appl);
> +
> +	core_count  = odp_sys_core_count();
> +	num_workers = core_count;
> +
> +	if (args->appl.core_count)
> +		num_workers = args->appl.core_count;
> +
> +	if (num_workers > MAX_WORKERS)
> +		num_workers = MAX_WORKERS;
> +
> +	printf("Num worker threads: %i\n", num_workers);
> +
> +	/*
> +	 * By default core #0 runs Linux kernel background tasks.
> +	 * Start mapping thread from core #1
> +	 */
> +	first_core = 1;
> +
> +	if (core_count == 1)
> +		first_core = 0;
> +
> +	printf("First core:         %i\n\n", first_core);
> +
> +	/* Init this thread */
> +	thr_id = odp_thread_create(0);
> +	odp_init_local(thr_id);
> +
> +	/* Create packet pool */
> +	pool_base = odp_shm_reserve("shm_packet_pool",
> +				    SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> +				    ODP_SHM_PROC);
> +	if (pool_base == NULL) {
> +		ODP_ERR("Error: packet pool mem alloc failed.\n");
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	pool = odp_buffer_pool_create("packet_pool", pool_base,
> +				      SHM_PKT_POOL_SIZE,
> +				      SHM_PKT_POOL_BUF_SIZE,
> +				      ODP_CACHE_LINE_SIZE,
> +				      ODP_BUFFER_TYPE_PACKET);
> +	if (pool == ODP_BUFFER_POOL_INVALID) {
> +		ODP_ERR("Error: packet pool create failed.\n");
> +		exit(EXIT_FAILURE);
> +	}
> +	odp_buffer_pool_print(pool);
> +
> +	/* Init ODP ring */
> +	odp_ring_tailq_init();
> +
> +	snprintf(ring_name, sizeof(ring_name), "odp_shm_ring");
> +	r = odp_ring_create(ring_name, RING_SIZE, ODP_RING_SHM_PROC);
> +	if (r == NULL) {
> +		ODP_ERR("ring create failed\n");
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	int f = fork();
> +
> +	/* Quick check that ring reachable after fork*/
> +	r = odp_ring_lookup("odp_shm_ring");
> +	if (!r)
> +		ODP_ERR("pid %d ring lookup failed\n", getpid());
> +
> +
> +	/* Create and init worker threads */
> +	memset(thread_tbl, 0, sizeof(thread_tbl));
> +	for (i = 0; i < num_workers; ++i) {
> +		void *(*thr_run_func) (void *);
> +		int core;
> +		int if_idx;
> +
> +		core = (first_core + i) % core_count;
> +
> +		if_idx = i % args->appl.if_count;
> +
> +		args->thread[i].pktio_dev = args->appl.if_names[if_idx];
> +		args->thread[i].pool = pool;
> +		args->thread[i].mode = args->appl.mode;
> +		args->thread[i].type = args->appl.type;
> +		args->thread[i].fanout = args->appl.fanout;
> +		args->thread[i].tpid = f;
> +
> +		if (f) {
> +			thr_run_func = ring_thread;
> +		} else {
> +			if (args->appl.mode == APPL_MODE_PKT_BURST)
> +				thr_run_func = pktio_ifburst_thread;
> +			else /* APPL_MODE_PKT_QUEUE */
> +				thr_run_func = pktio_queue_thread;
> +		}
> +		/*
> +		 * Create threads one-by-one instead of all-at-once,
> +		 * because each thread might get different arguments.
> +		 * Calls odp_thread_create(cpu) for each thread
> +		 */
> +		odp_linux_pthread_create(thread_tbl, 1, core, thr_run_func,
> +					 &args->thread[i]);
> +	}
> +
> +	/* Master thread waits for other threads to exit */
> +	odp_linux_pthread_join(thread_tbl, num_workers);
> +
> +	printf("Exit\n\n");
> +
> +	return 0;
> +}
> +
> +/**
> + * Drop packets which input parsing marked as containing errors.
> + *
> + * Frees packets with error and modifies pkt_tbl[] to only contain packets with
> + * no detected errors.
> + *
> + * @param pkt_tbl  Array of packet
> + * @param len      Length of pkt_tbl[]
> + *
> + * @return Number of packets with no detected error
> + */
> +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len)
> +{
> +	odp_packet_t pkt;
> +	unsigned pkt_cnt = len;
> +	unsigned i, j;
> +
> +	for (i = 0, j = 0; i < len; ++i) {
> +		pkt = pkt_tbl[i];
> +
> +		if (odp_unlikely(odp_packet_error(pkt))) {
> +			odp_packet_free(pkt); /* Drop */
> +			pkt_cnt--;
> +		} else if (odp_unlikely(i != j++)) {
> +			pkt_tbl[j-1] = pkt;
> +		}
> +	}
> +
> +	return pkt_cnt;
> +}
> +
> +/**
> + * Parse and store the command line arguments
> + *
> + * @param argc       argument count
> + * @param argv[]     argument vector
> + * @param appl_args  Store application arguments here
> + */
> +static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
> +{
> +	int opt;
> +	int long_index;
> +	char *names, *str, *token, *save;
> +	int i;
> +	int len;
> +	static struct option longopts[] = {
> +		{"count", required_argument, NULL, 'c'},
> +		{"interface", required_argument, NULL, 'i'},	/* return 'i' */
> +		{"mode", required_argument, NULL, 'm'},		/* return 'm' */
> +		{"help", no_argument, NULL, 'h'},		/* return 'h' */
> +		{NULL, 0, NULL, 0}
> +	};
> +
> +	appl_args->mode = -1; /* Invalid, must be changed by parsing */
> +	appl_args->type = 3;  /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */
> +	appl_args->fanout = 1; /* turn off fanout by default for mmap */
> +
> +	while (1) {
> +		opt = getopt_long(argc, argv, "+c:i:m:t:f:h",
> +				  longopts, &long_index);
> +
> +		if (opt == -1)
> +			break;	/* No more options */
> +
> +		switch (opt) {
> +		case 'c':
> +			appl_args->core_count = atoi(optarg);
> +			break;
> +			/* parse packet-io interface names */
> +		case 'i':
> +			len = strlen(optarg);
> +			if (len == 0) {
> +				usage(argv[0]);
> +				exit(EXIT_FAILURE);
> +			}
> +			len += 1;	/* add room for '\0' */
> +
> +			names = malloc(len);
> +			if (names == NULL) {
> +				usage(argv[0]);
> +				exit(EXIT_FAILURE);
> +			}
> +
> +			/* count the number of tokens separated by ',' */
> +			strcpy(names, optarg);
> +			for (str = names, i = 0;; str = NULL, i++) {
> +				token = strtok_r(str, ",", &save);
> +				if (token == NULL)
> +					break;
> +			}
> +			appl_args->if_count = i;
> +
> +			if (appl_args->if_count == 0) {
> +				usage(argv[0]);
> +				exit(EXIT_FAILURE);
> +			}
> +
> +			/* allocate storage for the if names */
> +			appl_args->if_names =
> +			    calloc(appl_args->if_count, sizeof(char *));
> +
> +			/* store the if names (reset names string) */
> +			strcpy(names, optarg);
> +			for (str = names, i = 0;; str = NULL, i++) {
> +				token = strtok_r(str, ",", &save);
> +				if (token == NULL)
> +					break;
> +				appl_args->if_names[i] = token;
> +			}
> +			break;
> +
> +		case 'm':
> +			i = atoi(optarg);
> +			if (i == 0)
> +				appl_args->mode = APPL_MODE_PKT_BURST;
> +			else
> +				appl_args->mode = APPL_MODE_PKT_QUEUE;
> +			break;
> +
> +		case 't':
> +			appl_args->type = atoi(optarg);
> +			break;
> +
> +		case 'f':
> +			appl_args->fanout = atoi(optarg);
> +			break;
> +
> +		case 'h':
> +			usage(argv[0]);
> +			exit(EXIT_SUCCESS);
> +			break;
> +
> +		default:
> +			break;
> +		}
> +	}
> +
> +	if (appl_args->if_count == 0 || appl_args->mode == -1) {
> +		usage(argv[0]);
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	optind = 1;		/* reset 'extern optind' from the getopt lib */
> +}
> +
> +/**
> + * Print system and application info
> + */
> +static void print_info(char *progname, appl_args_t *appl_args)
> +{
> +	int i;
> +
> +	printf("\n"
> +	       "ODP system info\n"
> +	       "---------------\n"
> +	       "ODP API version: %s\n"
> +	       "CPU model:       %s\n"
> +	       "CPU freq (hz):   %"PRIu64"\n"
> +	       "Cache line size: %i\n"
> +	       "Core count:      %i\n"
> +	       "\n",
> +	       odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(),
> +	       odp_sys_cache_line_size(), odp_sys_core_count());
> +
> +	printf("Running ODP appl: \"%s\"\n"
> +	       "-----------------\n"
> +	       "IF-count:        %i\n"
> +	       "Using IFs:      ",
> +	       progname, appl_args->if_count);
> +	for (i = 0; i < appl_args->if_count; ++i)
> +		printf(" %s", appl_args->if_names[i]);
> +	printf("\n"
> +	       "Mode:            ");
> +	if (appl_args->mode == APPL_MODE_PKT_BURST)
> +		PRINT_APPL_MODE(APPL_MODE_PKT_BURST);
> +	else
> +		PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE);
> +	printf("\n\n");
> +	fflush(NULL);
> +}
> +
> +/**
> + * Prinf usage information
> + */
> +static void usage(char *progname)
> +{
> +	printf("\n"
> +	       "Usage: %s OPTIONS\n"
> +	       "  E.g. %s -i eth1,eth2,eth3 -m 0\n"
> +	       "\n"
> +	       "OpenDataPlane example application.\n"
> +	       "\n"
> +	       "Mandatory OPTIONS:\n"
> +	       "  -i, --interface Eth interfaces (comma-separated, no spaces)\n"
> +	       "  -m, --mode      0: Burst send&receive packets (no queues)\n"
> +	       "                  1: Send&receive packets through ODP queues.\n"
> +	       " -t, --type   1: ODP_PKTIO_TYPE_SOCKET_BASIC\n"
> +	       "	      2: ODP_PKTIO_TYPE_SOCKET_MMSG\n"
> +	       "	      3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
> +	       "	      4: ODP_PKTIO_TYPE_NETMAP\n"
> +	       "	 Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
> +	       " -f, --fanout 0: off 1: on (Default 1: on)\n"
> +	       "\n"
> +	       "Optional OPTIONS\n"
> +	       "  -c, --count <number> Core count.\n"
> +	       "  -h, --help           Display help and exit.\n"
> +	       "\n", NO_PATH(progname), NO_PATH(progname)
> +	    );
> +}
> -- 
> 1.8.5.1.163.gd7aced9
> 
> 
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> http://lists.linaro.org/mailman/listinfo/lng-odp
diff mbox

Patch

diff --git a/configure.ac b/configure.ac
index 97089e9..c549182 100644
--- a/configure.ac
+++ b/configure.ac
@@ -110,6 +110,7 @@  AC_CONFIG_FILES([Makefile
 		 platform/linux-keystone2/Makefile
 		 platform/linux-dpdk/Makefile
 		 example/Makefile
+		 example/fork/Makefile
 		 example/generator/Makefile
 		 example/l2fwd/Makefile
 		 example/odp_example/Makefile
diff --git a/example/Makefile.am b/example/Makefile.am
index 01a3305..e85507d 100644
--- a/example/Makefile.am
+++ b/example/Makefile.am
@@ -1 +1 @@ 
-SUBDIRS = generator l2fwd odp_example packet packet_netmap timer
+SUBDIRS = generator l2fwd odp_example packet packet_netmap timer fork
diff --git a/example/fork/Makefile.am b/example/fork/Makefile.am
new file mode 100644
index 0000000..f6c6a50
--- /dev/null
+++ b/example/fork/Makefile.am
@@ -0,0 +1,5 @@ 
+include $(top_srcdir)/example/Makefile.inc
+
+bin_PROGRAMS = odp_fork
+
+dist_odp_fork_SOURCES = odp_fork.c
diff --git a/example/fork/README b/example/fork/README
new file mode 100644
index 0000000..0eefd63
--- /dev/null
+++ b/example/fork/README
@@ -0,0 +1,32 @@ 
+			ODP fork example
+
+This example shows how to use fork to implement scalable multi
+process applications.
+
+Algorithm is following:
+	- do standard initialization, i.e odp_init_global();
+	- allocate odp ring in shared memory;
+	- fork();
+	- Process 1: creates X ingress threads and doing actual
+packet I/O and places packets to odp_ring buffer (pktio_queue_thread()
+and pktio_ifburst_thread() functions).
+	- Process 2: creates X packet processing threads and does read
+	  from odp_ring buffer. Then calls odp_buffer_free().
+
+Example:
+./odp_fork -i eth0 -m 1 -c 1
+On remote host run ping.
+
+[12178/1] enqueue buf 7921 size 98/1856, cnt 0
+		[12176] dequeue buf 7921, size 98/1856, cnt 0
+[12178/1] enqueue buf 7905 size 98/1856, cnt 1
+		[12176] dequeue buf 7905, size 98/1856, cnt 1
+[12178/1] enqueue buf 7889 size 98/1856, cnt 2
+		[12176] dequeue buf 7889, size 98/1856, cnt 2
+[12178/1] enqueue buf 7873 size 98/1856, cnt 3
+		[12176] dequeue buf 7873, size 98/1856, cnt 3
+[12178/1] enqueue buf 7857 size 98/1856, cnt 4
+		[12176] dequeue buf 7857, size 98/1856, cnt 4
+
+Main PID/thread [12178/1] enqueues packets to buffer, child process [12176]
+dequeues packets from the odp_ring.
diff --git a/example/fork/odp_fork.c b/example/fork/odp_fork.c
new file mode 100644
index 0000000..d8ec36c
--- /dev/null
+++ b/example/fork/odp_fork.c
@@ -0,0 +1,682 @@ 
+/* Copyright (c) 2013, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+/**
+ * @file
+ *
+ * @example odp_pktio.c  ODP basic packet IO loopback test application
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <getopt.h>
+#include <unistd.h>
+
+#include <odp.h>
+#include <helper/odp_linux.h>
+#include <helper/odp_packet_helper.h>
+#include <helper/odp_eth.h>
+#include <helper/odp_ip.h>
+#include <helper/odp_ring.h>
+
+#define MAX_WORKERS            32
+#define SHM_PKT_POOL_SIZE      (512*2048)
+#define SHM_PKT_POOL_BUF_SIZE  1856
+#define MAX_PKT_BURST          16
+
+#define APPL_MODE_PKT_BURST    0
+#define APPL_MODE_PKT_QUEUE    1
+
+#define RING_SIZE 4096
+#define ODP_RING_NAMESIZE 32
+
+#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
+
+/** Get rid of path in filename - only for unix-type paths using '/' */
+#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
+			    strrchr((file_name), '/') + 1 : (file_name))
+/**
+ * Parsed command line application arguments
+ */
+typedef struct {
+	int core_count;
+	int if_count;		/**< Number of interfaces to be used */
+	char **if_names;	/**< Array of pointers to interface names */
+	int mode;		/**< Packet IO mode */
+	int type;		/**< Packet IO type */
+	int fanout;		/**< Packet IO fanout */
+	odp_buffer_pool_t pool;	/**< Buffer pool for packet IO */
+} appl_args_t;
+
+/**
+ * Thread specific arguments
+ */
+typedef struct {
+	char *pktio_dev;	/**< Interface name to use */
+	odp_buffer_pool_t pool;	/**< Buffer pool for packet IO */
+	int mode;		/**< Thread mode */
+	int type;		/**< Thread i/o type */
+	int fanout;		/**< Thread i/o fanout */
+	int tpid;
+} thread_args_t;
+
+/**
+ * Grouping of both parsed CL args and thread specific args - alloc together
+ */
+typedef struct {
+	/** Application (parsed) arguments */
+	appl_args_t appl;
+	/** Thread specific arguments */
+	thread_args_t thread[MAX_WORKERS];
+} args_t;
+
+/** Global pointer to args */
+static args_t *args;
+
+/* helper funcs */
+static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len);
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
+static void print_info(char *progname, appl_args_t *appl_args);
+static void usage(char *progname);
+
+static void *ring_thread(void *arg)
+{
+	thread_args_t *thr_args;
+	thr_args = arg;
+	odp_ring_t *r;
+	int ret;
+	odp_buffer_t buf;
+	odp_packet_t pkt;
+	unsigned long qp_cnt = 0;
+
+	r = odp_ring_lookup("odp_shm_ring");
+	if (!r)
+		ODP_ERR("forked pid %d ring lookup failed\n", getpid());
+	else
+		printf("forked pid %d ring lookup ok\n", getpid());
+
+	if (thr_args->tpid) {
+		odp_buffer_t **buf_dst_p =
+			(odp_buffer_t **)malloc(RING_SIZE *
+						sizeof(odp_buffer_t));
+
+		while (1) {
+			ret = odp_ring_mc_dequeue_bulk(r, (void **)buf_dst_p,
+						       1);
+			if (ret == 0) {
+				memcpy(&buf, (void *)buf_dst_p,
+				       sizeof(odp_buffer_t));
+
+				pkt = odp_packet_from_buffer(buf);
+
+				printf("\t\t[%d] dequeue buf %d, size %ld/%ld, cnt %ld\n",
+				       getpid(), buf,
+				       (unsigned long)odp_packet_get_len(pkt),
+				       (unsigned long)odp_buffer_size(buf),
+				       qp_cnt++);
+
+				odp_buffer_free(buf);
+			}
+		}
+	}
+
+	/* unreachable */
+	return NULL;
+}
+
+
+/**
+ * Packet IO loopback worker thread using ODP queues
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static void *pktio_queue_thread(void *arg)
+{
+	int thr;
+	odp_buffer_pool_t pkt_pool;
+	odp_pktio_t pktio;
+	thread_args_t *thr_args;
+	odp_queue_t inq_def;
+	char inq_name[ODP_QUEUE_NAME_LEN];
+	odp_queue_param_t qparam;
+	odp_packet_t pkt;
+	odp_buffer_t buf;
+	int ret;
+	unsigned long pkt_cnt = 0;
+	unsigned long err_cnt = 0;
+	odp_pktio_params_t params;
+	socket_params_t *sock_params = &params.sock_params;
+	odp_ring_t *r;
+
+	thr_args = arg;
+
+	thr = odp_thread_id();
+
+	printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
+	       thr_args->pktio_dev);
+
+	/* lookup ring from its name */
+	/* Lookup the packet pool */
+	pkt_pool = odp_buffer_pool_lookup("packet_pool");
+	if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
+		ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
+		return NULL;
+	}
+
+	/* Open a packet IO instance for this thread */
+	sock_params->type = thr_args->type;
+	sock_params->fanout = thr_args->fanout;
+	pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
+	if (pktio == ODP_PKTIO_INVALID) {
+		ODP_ERR("  [%02i] Error: pktio create failed\n", thr);
+		return NULL;
+	}
+
+	/*
+	 * Create and set the default INPUT queue associated with the 'pktio'
+	 * resource
+	 */
+	qparam.sched.prio  = ODP_SCHED_PRIO_DEFAULT;
+	qparam.sched.sync  = ODP_SCHED_SYNC_ATOMIC;
+	qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
+	snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
+	inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
+
+	inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam);
+	if (inq_def == ODP_QUEUE_INVALID) {
+		ODP_ERR("  [%02i] Error: pktio queue creation failed\n", thr);
+		return NULL;
+	}
+
+	ret = odp_pktio_inq_setdef(pktio, inq_def);
+	if (ret != 0) {
+		ODP_ERR("  [%02i] Error: default input-Q setup\n", thr);
+		return NULL;
+	}
+
+	printf("  [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n"
+	       "          default pktio%02i-INPUT queue:%u\n",
+		thr, pktio, pktio, inq_def);
+
+	r = odp_ring_lookup("odp_shm_ring");
+	if (!r)
+		ODP_ERR("Main proc pid %d, thread %d ring lookup failed\n",
+			getpid(), thr);
+	else
+		printf("Main proc pid %d, thread %d ring lookup OK\n",
+		       getpid(), thr);
+
+
+	void *rbuf = malloc(RING_SIZE * sizeof(odp_buffer_t));
+	void **rbuf_p = rbuf;
+
+	/* Loop packets */
+	for (;;) {
+		/* Use schedule to get buf from any input queue */
+		buf = odp_schedule(NULL, ODP_SCHED_WAIT);
+
+		pkt = odp_packet_from_buffer(buf);
+
+		/* Drop packets with errors */
+		if (odp_unlikely(drop_err_pkts(&pkt, 1) == 0)) {
+			ODP_ERR("Drop frame - err_cnt:%lu\n", ++err_cnt);
+			continue;
+		}
+
+		/* Place buffer to ODP ring queue */
+		rbuf_p = (void *)&buf;
+		ret = odp_ring_mp_enqueue_bulk(r, rbuf_p, 1);
+		if (ret != 0)
+			ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
+		else
+			printf("[%d/%d] enqueue buf %d size %ld/%ld, cnt %lu\n",
+			       getpid(), thr, buf, (unsigned long)odp_packet_get_len(pkt),
+			       (unsigned long)odp_buffer_size(buf), pkt_cnt++);
+	}
+
+/* unreachable */
+}
+
+/**
+ * Packet IO loopback worker thread using bursts from/to IO resources
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static void *pktio_ifburst_thread(void *arg)
+{
+	int thr;
+	odp_buffer_pool_t pkt_pool;
+	odp_pktio_t pktio;
+	thread_args_t *thr_args;
+	int pkts, pkts_ok;
+	odp_packet_t pkt_tbl[MAX_PKT_BURST];
+	unsigned long pkt_cnt = 0;
+	unsigned long err_cnt = 0;
+	odp_pktio_params_t params;
+	socket_params_t *sock_params = &params.sock_params;
+	int ret;
+	odp_ring_t *r;
+
+	thr = odp_thread_id();
+	thr_args = arg;
+
+	printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
+	       thr_args->pktio_dev);
+
+	/* Lookup the packet pool */
+	pkt_pool = odp_buffer_pool_lookup("packet_pool");
+	if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
+		ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
+		return NULL;
+	}
+
+	/* Open a packet IO instance for this thread */
+	sock_params->type = thr_args->type;
+	sock_params->fanout = thr_args->fanout;
+	pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
+	if (pktio == ODP_PKTIO_INVALID) {
+		ODP_ERR("  [%02i] Error: pktio create failed.\n", thr);
+		return NULL;
+	}
+
+	printf("  [%02i] created pktio:%02i, burst mode\n",
+	       thr, pktio);
+
+	r = odp_ring_lookup("odp_shm_ring");
+	if (!r)
+		ODP_ERR("Main proc pid %d, thread %d ring lookup failed\n",
+			getpid(), thr);
+	else
+		printf("Main proc pid %d, thread %d ring lookup OK\n",
+		       getpid(), thr);
+
+	void *rbuf = malloc(RING_SIZE * sizeof(odp_buffer_t));
+	void **rbuf_p = rbuf;
+
+	/* Loop packets */
+	for (;;) {
+		pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST);
+		if (pkts > 0) {
+			/* Drop packets with errors */
+			pkts_ok = drop_err_pkts(pkt_tbl, pkts);
+			if (pkts_ok > 0) {
+				/* Place buffer to ODP ring queue */
+				rbuf_p = (void *)&pkt_tbl;
+				ret = odp_ring_mp_enqueue_bulk(r, rbuf_p,
+							       pkts_ok);
+				pkt_cnt += pkts_ok;
+				if (ret != 0) {
+					ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
+				} else {
+					printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n",
+					       getpid(), thr, pkts_ok,
+					       pkt_tbl[0],
+					       (unsigned long)odp_packet_get_len(pkt_tbl[0]),
+					       (unsigned long)odp_buffer_size(pkt_tbl[0]),
+					       pkt_cnt);
+				}
+			}
+
+			if (odp_unlikely(pkts_ok != pkts))
+				ODP_ERR("Dropped frames:%u - err_cnt:%lu\n",
+					pkts-pkts_ok, ++err_cnt);
+		}
+	}
+
+/* unreachable */
+}
+
+/**
+ * ODP packet example main function
+ */
+int main(int argc, char *argv[])
+{
+	odp_linux_pthread_t thread_tbl[MAX_WORKERS];
+	odp_buffer_pool_t pool;
+	int thr_id;
+	int num_workers;
+	void *pool_base;
+	int i;
+	int first_core;
+	int core_count;
+	odp_ring_t *r;
+	char ring_name[ODP_RING_NAMESIZE];
+
+	/* Init ODP before calling anything else */
+	if (odp_init_global()) {
+		ODP_ERR("Error: ODP global init failed.\n");
+		exit(EXIT_FAILURE);
+	}
+
+	args = malloc(sizeof(args_t));
+	if (args == NULL) {
+		ODP_ERR("Error: shared mem alloc failed.\n");
+		exit(EXIT_FAILURE);
+	}
+	memset(args, 0, sizeof(*args));
+
+	/* Parse and store the application arguments */
+	parse_args(argc, argv, &args->appl);
+
+	/* Print both system and application information */
+	print_info(NO_PATH(argv[0]), &args->appl);
+
+	core_count  = odp_sys_core_count();
+	num_workers = core_count;
+
+	if (args->appl.core_count)
+		num_workers = args->appl.core_count;
+
+	if (num_workers > MAX_WORKERS)
+		num_workers = MAX_WORKERS;
+
+	printf("Num worker threads: %i\n", num_workers);
+
+	/*
+	 * By default core #0 runs Linux kernel background tasks.
+	 * Start mapping thread from core #1
+	 */
+	first_core = 1;
+
+	if (core_count == 1)
+		first_core = 0;
+
+	printf("First core:         %i\n\n", first_core);
+
+	/* Init this thread */
+	thr_id = odp_thread_create(0);
+	odp_init_local(thr_id);
+
+	/* Create packet pool */
+	pool_base = odp_shm_reserve("shm_packet_pool",
+				    SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+				    ODP_SHM_PROC);
+	if (pool_base == NULL) {
+		ODP_ERR("Error: packet pool mem alloc failed.\n");
+		exit(EXIT_FAILURE);
+	}
+
+	pool = odp_buffer_pool_create("packet_pool", pool_base,
+				      SHM_PKT_POOL_SIZE,
+				      SHM_PKT_POOL_BUF_SIZE,
+				      ODP_CACHE_LINE_SIZE,
+				      ODP_BUFFER_TYPE_PACKET);
+	if (pool == ODP_BUFFER_POOL_INVALID) {
+		ODP_ERR("Error: packet pool create failed.\n");
+		exit(EXIT_FAILURE);
+	}
+	odp_buffer_pool_print(pool);
+
+	/* Init ODP ring */
+	odp_ring_tailq_init();
+
+	snprintf(ring_name, sizeof(ring_name), "odp_shm_ring");
+	r = odp_ring_create(ring_name, RING_SIZE, ODP_RING_SHM_PROC);
+	if (r == NULL) {
+		ODP_ERR("ring create failed\n");
+		exit(EXIT_FAILURE);
+	}
+
+	int f = fork();
+
+	/* Quick check that ring reachable after fork*/
+	r = odp_ring_lookup("odp_shm_ring");
+	if (!r)
+		ODP_ERR("pid %d ring lookup failed\n", getpid());
+
+
+	/* Create and init worker threads */
+	memset(thread_tbl, 0, sizeof(thread_tbl));
+	for (i = 0; i < num_workers; ++i) {
+		void *(*thr_run_func) (void *);
+		int core;
+		int if_idx;
+
+		core = (first_core + i) % core_count;
+
+		if_idx = i % args->appl.if_count;
+
+		args->thread[i].pktio_dev = args->appl.if_names[if_idx];
+		args->thread[i].pool = pool;
+		args->thread[i].mode = args->appl.mode;
+		args->thread[i].type = args->appl.type;
+		args->thread[i].fanout = args->appl.fanout;
+		args->thread[i].tpid = f;
+
+		if (f) {
+			thr_run_func = ring_thread;
+		} else {
+			if (args->appl.mode == APPL_MODE_PKT_BURST)
+				thr_run_func = pktio_ifburst_thread;
+			else /* APPL_MODE_PKT_QUEUE */
+				thr_run_func = pktio_queue_thread;
+		}
+		/*
+		 * Create threads one-by-one instead of all-at-once,
+		 * because each thread might get different arguments.
+		 * Calls odp_thread_create(cpu) for each thread
+		 */
+		odp_linux_pthread_create(thread_tbl, 1, core, thr_run_func,
+					 &args->thread[i]);
+	}
+
+	/* Master thread waits for other threads to exit */
+	odp_linux_pthread_join(thread_tbl, num_workers);
+
+	printf("Exit\n\n");
+
+	return 0;
+}
+
+/**
+ * Drop packets which input parsing marked as containing errors.
+ *
+ * Frees packets with error and modifies pkt_tbl[] to only contain packets with
+ * no detected errors.
+ *
+ * @param pkt_tbl  Array of packet
+ * @param len      Length of pkt_tbl[]
+ *
+ * @return Number of packets with no detected error
+ */
+static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len)
+{
+	odp_packet_t pkt;
+	unsigned pkt_cnt = len;
+	unsigned i, j;
+
+	for (i = 0, j = 0; i < len; ++i) {
+		pkt = pkt_tbl[i];
+
+		if (odp_unlikely(odp_packet_error(pkt))) {
+			odp_packet_free(pkt); /* Drop */
+			pkt_cnt--;
+		} else if (odp_unlikely(i != j++)) {
+			pkt_tbl[j-1] = pkt;
+		}
+	}
+
+	return pkt_cnt;
+}
+
+/**
+ * Parse and store the command line arguments
+ *
+ * @param argc       argument count
+ * @param argv[]     argument vector
+ * @param appl_args  Store application arguments here
+ */
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
+{
+	int opt;
+	int long_index;
+	char *names, *str, *token, *save;
+	int i;
+	int len;
+	static struct option longopts[] = {
+		{"count", required_argument, NULL, 'c'},
+		{"interface", required_argument, NULL, 'i'},	/* return 'i' */
+		{"mode", required_argument, NULL, 'm'},		/* return 'm' */
+		{"help", no_argument, NULL, 'h'},		/* return 'h' */
+		{NULL, 0, NULL, 0}
+	};
+
+	appl_args->mode = -1; /* Invalid, must be changed by parsing */
+	appl_args->type = 3;  /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */
+	appl_args->fanout = 1; /* turn off fanout by default for mmap */
+
+	while (1) {
+		opt = getopt_long(argc, argv, "+c:i:m:t:f:h",
+				  longopts, &long_index);
+
+		if (opt == -1)
+			break;	/* No more options */
+
+		switch (opt) {
+		case 'c':
+			appl_args->core_count = atoi(optarg);
+			break;
+			/* parse packet-io interface names */
+		case 'i':
+			len = strlen(optarg);
+			if (len == 0) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+			len += 1;	/* add room for '\0' */
+
+			names = malloc(len);
+			if (names == NULL) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+
+			/* count the number of tokens separated by ',' */
+			strcpy(names, optarg);
+			for (str = names, i = 0;; str = NULL, i++) {
+				token = strtok_r(str, ",", &save);
+				if (token == NULL)
+					break;
+			}
+			appl_args->if_count = i;
+
+			if (appl_args->if_count == 0) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+
+			/* allocate storage for the if names */
+			appl_args->if_names =
+			    calloc(appl_args->if_count, sizeof(char *));
+
+			/* store the if names (reset names string) */
+			strcpy(names, optarg);
+			for (str = names, i = 0;; str = NULL, i++) {
+				token = strtok_r(str, ",", &save);
+				if (token == NULL)
+					break;
+				appl_args->if_names[i] = token;
+			}
+			break;
+
+		case 'm':
+			i = atoi(optarg);
+			if (i == 0)
+				appl_args->mode = APPL_MODE_PKT_BURST;
+			else
+				appl_args->mode = APPL_MODE_PKT_QUEUE;
+			break;
+
+		case 't':
+			appl_args->type = atoi(optarg);
+			break;
+
+		case 'f':
+			appl_args->fanout = atoi(optarg);
+			break;
+
+		case 'h':
+			usage(argv[0]);
+			exit(EXIT_SUCCESS);
+			break;
+
+		default:
+			break;
+		}
+	}
+
+	if (appl_args->if_count == 0 || appl_args->mode == -1) {
+		usage(argv[0]);
+		exit(EXIT_FAILURE);
+	}
+
+	optind = 1;		/* reset 'extern optind' from the getopt lib */
+}
+
+/**
+ * Print system and application info
+ */
+static void print_info(char *progname, appl_args_t *appl_args)
+{
+	int i;
+
+	printf("\n"
+	       "ODP system info\n"
+	       "---------------\n"
+	       "ODP API version: %s\n"
+	       "CPU model:       %s\n"
+	       "CPU freq (hz):   %"PRIu64"\n"
+	       "Cache line size: %i\n"
+	       "Core count:      %i\n"
+	       "\n",
+	       odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(),
+	       odp_sys_cache_line_size(), odp_sys_core_count());
+
+	printf("Running ODP appl: \"%s\"\n"
+	       "-----------------\n"
+	       "IF-count:        %i\n"
+	       "Using IFs:      ",
+	       progname, appl_args->if_count);
+	for (i = 0; i < appl_args->if_count; ++i)
+		printf(" %s", appl_args->if_names[i]);
+	printf("\n"
+	       "Mode:            ");
+	if (appl_args->mode == APPL_MODE_PKT_BURST)
+		PRINT_APPL_MODE(APPL_MODE_PKT_BURST);
+	else
+		PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE);
+	printf("\n\n");
+	fflush(NULL);
+}
+
+/**
+ * Prinf usage information
+ */
+static void usage(char *progname)
+{
+	printf("\n"
+	       "Usage: %s OPTIONS\n"
+	       "  E.g. %s -i eth1,eth2,eth3 -m 0\n"
+	       "\n"
+	       "OpenDataPlane example application.\n"
+	       "\n"
+	       "Mandatory OPTIONS:\n"
+	       "  -i, --interface Eth interfaces (comma-separated, no spaces)\n"
+	       "  -m, --mode      0: Burst send&receive packets (no queues)\n"
+	       "                  1: Send&receive packets through ODP queues.\n"
+	       " -t, --type   1: ODP_PKTIO_TYPE_SOCKET_BASIC\n"
+	       "	      2: ODP_PKTIO_TYPE_SOCKET_MMSG\n"
+	       "	      3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
+	       "	      4: ODP_PKTIO_TYPE_NETMAP\n"
+	       "	 Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
+	       " -f, --fanout 0: off 1: on (Default 1: on)\n"
+	       "\n"
+	       "Optional OPTIONS\n"
+	       "  -c, --count <number> Core count.\n"
+	       "  -h, --help           Display help and exit.\n"
+	       "\n", NO_PATH(progname), NO_PATH(progname)
+	    );
+}