diff mbox

[PATCHv11,6/7] linux-generic: add ipc pktio support

Message ID 1445507411-20556-7-git-send-email-maxim.uvarov@linaro.org
State Superseded
Headers show

Commit Message

Maxim Uvarov Oct. 22, 2015, 9:50 a.m. UTC
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
---
 platform/linux-generic/Makefile.am                 |   2 +
 .../linux-generic/include/odp_buffer_internal.h    |   3 +
 .../linux-generic/include/odp_packet_io_internal.h |  38 ++
 .../include/odp_packet_io_ipc_internal.h           |  47 ++
 platform/linux-generic/include/odp_shm_internal.h  |  21 +
 platform/linux-generic/odp_packet_io.c             |   1 +
 platform/linux-generic/odp_pool.c                  |  14 +-
 platform/linux-generic/odp_shared_memory.c         |  14 +-
 platform/linux-generic/pktio/io_ops.c              |   1 +
 platform/linux-generic/pktio/ipc.c                 | 729 +++++++++++++++++++++
 platform/linux-generic/pktio/ring.c                |   1 +
 11 files changed, 866 insertions(+), 5 deletions(-)
 create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
 create mode 100644 platform/linux-generic/include/odp_shm_internal.h
 create mode 100644 platform/linux-generic/pktio/ipc.c
 create mode 120000 platform/linux-generic/pktio/ring.c

\ No newline at end of file

Comments

Stuart Haslam Oct. 27, 2015, 4:32 p.m. UTC | #1
On Thu, Oct 22, 2015 at 12:50:10PM +0300, Maxim Uvarov wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  38 ++
>  .../include/odp_packet_io_ipc_internal.h           |  47 ++
>  platform/linux-generic/include/odp_shm_internal.h  |  21 +
>  platform/linux-generic/odp_packet_io.c             |   1 +
>  platform/linux-generic/odp_pool.c                  |  14 +-
>  platform/linux-generic/odp_shared_memory.c         |  14 +-
>  platform/linux-generic/pktio/io_ops.c              |   1 +
>  platform/linux-generic/pktio/ipc.c                 | 729 +++++++++++++++++++++
>  platform/linux-generic/pktio/ring.c                |   1 +
>  11 files changed, 866 insertions(+), 5 deletions(-)
>  create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>  create mode 100644 platform/linux-generic/pktio/ipc.c
>  create mode 120000 platform/linux-generic/pktio/ring.c
> 
> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
> index dfb5a91..834354c 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -151,10 +151,12 @@ __LIB__libodp_la_SOURCES = \
>  			   odp_packet_flags.c \
>  			   odp_packet_io.c \
>  			   pktio/io_ops.c \
> +			   pktio/ipc.c \
>  			   pktio/loop.c \
>  			   pktio/netmap.c \
>  			   pktio/socket.c \
>  			   pktio/socket_mmap.c \
> +			   pktio/ring.c \
>  			   odp_pool.c \
>  			   odp_queue.c \
>  			   odp_rwlock.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
> index 4cacca1..a078e52 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>  	uint32_t                 uarea_size; /* size of user area */
>  	uint32_t                 segcount;   /* segment count */
>  	uint32_t                 segsize;    /* segment size */
> +	/* ipc mapped process can not walk over pointers,
> +	 * offset has to be used */
> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>  	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>  	uint64_t                 order;      /* sequence for ordered queues */
>  	queue_entry_t           *origin_qe;  /* ordered queue origin */
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
> index 4745bd5..54aa795 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -25,6 +25,7 @@ extern "C" {
>  #include <odp_classification_datamodel.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp/helper/ring.h>
>  
>  #include <odp/config.h>
>  #include <odp/hints.h>
> @@ -55,6 +56,41 @@ typedef struct {
>  } pkt_pcap_t;
>  #endif
>  
> +typedef	struct {
> +	/* TX */
> +	struct  {
> +		odph_ring_t	*send; /**< ODP ring for IPC msg packets
> +					    indexes transmitted to shared
> +					    memory */
> +		odph_ring_t	*free; /**< ODP ring for IPC msg packets
> +					    indexes already processed by remote
> +					    process */
> +	} tx;
> +	/* RX */
> +	struct {
> +		odph_ring_t	*recv; /**< ODP ring for IPC msg packets
> +					    indexes received from shared
> +					     memory (from remote process) */
> +		odph_ring_t	*free; /**< ODP ring for IPC msg packets
> +					    indexes already processed by
> +					    current process */
> +	} rx; /* slave */
> +	void		*pool_base;		/**< Remote pool base addr */
> +	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
> +	uint64_t	pkt_size;		/**< Packet size in remote pool */
> +	odp_pool_t	pool;			/**< Pool of main process */
> +	enum {
> +		PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which
> +						creates shm */
> +		PKTIO_TYPE_IPC_SLAVE	   /**< Slave is the process which
> +						connects to shm */
> +	} type; /**< define if it's master or slave process */
> +	odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send
> +				     packet, 0 - not yet ready */
> +	void *pinfo;
> +	odp_shm_t pinfo_shm;
> +} _ipc_pktio_t;
> +
>  struct pktio_entry {
>  	const struct pktio_if_ops *ops; /**< Implementation specific methods */
>  	odp_ticketlock_t lock;		/**< entry ticketlock */
> @@ -72,6 +108,7 @@ struct pktio_entry {
>  #ifdef HAVE_PCAP
>  		pkt_pcap_t pkt_pcap;		/**< Using pcap for IO */
>  #endif
> +		_ipc_pktio_t ipc;		/**< IPC pktio data */
>  	};
>  	enum {
>  		STATE_START = 0,
> @@ -151,6 +188,7 @@ extern const pktio_if_ops_t loopback_pktio_ops;
>  #ifdef HAVE_PCAP
>  extern const pktio_if_ops_t pcap_pktio_ops;
>  #endif
> +extern const pktio_if_ops_t ipc_pktio_ops;
>  extern const pktio_if_ops_t * const pktio_if_ops[];
>  
>  #ifdef __cplusplus
> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> new file mode 100644
> index 0000000..bec2af6
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> @@ -0,0 +1,47 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/packet_io.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp/packet.h>
> +#include <odp_packet_internal.h>
> +#include <odp_internal.h>
> +#include <odp/shared_memory.h>
> +
> +#include <string.h>
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +/* IPC packet I/O over shared memory ring */
> +#include <odp/helper/ring.h>
> +
> +/* number of odp buffers in odp ring queue */
> +#define PKTIO_IPC_ENTRIES 4096
> +
> +/* that struct is exported to shared memory, so that processes can find
> + * each other.
> + */
> +struct pktio_info {
> +	struct {
> +		/* number of buffer in remote pool */
> +		int shm_pool_bufs_num;
> +		/* size of remote pool */
> +		size_t shm_pkt_pool_size;
> +		/* size of packet/segment in remote pool */
> +		uint32_t shm_pkt_size;
> +		/* offset from shared memory block start
> +		 * to pool_mdata_addr (linux-generic pool specific) */
> +		size_t mdata_offset;
> +		char pool_name[ODP_POOL_NAME_LEN];
> +	} master;
> +	struct {
> +		/* offset from shared memory block start
> +		 * to pool_mdata_addr in remote process.
> +		 * (linux-generic pool specific) */
> +		size_t mdata_offset;
> +		char pool_name[ODP_POOL_NAME_LEN];
> +	} slave;
> +} ODP_PACKED;
> diff --git a/platform/linux-generic/include/odp_shm_internal.h b/platform/linux-generic/include/odp_shm_internal.h
> new file mode 100644
> index 0000000..1fd7a3c
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_shm_internal.h
> @@ -0,0 +1,21 @@
> +/* Copyright (c) 2013, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#ifndef ODP_SHM_INTERNAL_H_
> +#define ODP_SHM_INTERNAL_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#define _ODP_SHM_PROC_NOCREAT 0x4  /**< Do not create shm if not exist */
> +#define _ODP_SHM_O_EXCL	      0x8  /**< Do not create shm if exist */
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index 20c66f3..0c02f9e 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -19,6 +19,7 @@
>  #include <odp_schedule_internal.h>
>  #include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_packet_io_ipc_internal.h>
>  
>  #include <string.h>
>  #include <sys/ioctl.h>
> diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
> index 2036c2a..220dcd2 100644
> --- a/platform/linux-generic/odp_pool.c
> +++ b/platform/linux-generic/odp_pool.c
> @@ -219,8 +219,11 @@ odp_pool_t _pool_create(const char *name,
>  			ODP_ALIGN_ROUNDUP(params->pkt.len, seg_len);
>  
>  		/* Reject create if pkt.len needs too many segments */
> -		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG)
> +		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG) {
> +			ODP_ERR("ODP_BUFFER_MAX_SEG exceed %d(%d)\n",
> +				blk_size / seg_len, ODP_BUFFER_MAX_SEG);
>  			return ODP_POOL_INVALID;
> +		}
>  
>  		p_udata_size = params->pkt.uarea_size;
>  		udata_stride = ODP_ALIGN_ROUNDUP(p_udata_size,
> @@ -241,8 +244,12 @@ odp_pool_t _pool_create(const char *name,
>  
>  	/* Validate requested number of buffers against addressable limits */
>  	if (buf_num >
> -	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE)))
> +	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE))) {
> +		ODP_ERR("buf_num %d > then expected %d\n",
> +			buf_num, ODP_BUFFER_MAX_BUFFERS /
> +			(buf_stride / ODP_CACHE_LINE_SIZE));
>  		return ODP_POOL_INVALID;
> +	}
>  
>  	/* Find an unused buffer pool slot and iniitalize it as requested */
>  	for (i = 0; i < ODP_CONFIG_POOLS; i++) {
> @@ -468,6 +475,9 @@ int odp_pool_destroy(odp_pool_t pool_hdl)
>  	/* Call fails if pool has allocated buffers */
>  	if (odp_atomic_load_u32(&pool->s.bufcount) < pool->s.buf_num) {
>  		POOL_UNLOCK(&pool->s.lock);
> +		ODP_DBG("error: pool has allocated buffers %d/%d\n",
> +			odp_atomic_load_u32(&pool->s.bufcount),
> +			pool->s.buf_num);
>  		return -1;
>  	}
>  
> diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c
> index ab48dda..6a4b5ba 100644
> --- a/platform/linux-generic/odp_shared_memory.c
> +++ b/platform/linux-generic/odp_shared_memory.c
> @@ -15,6 +15,7 @@
>  #include <odp/debug.h>
>  #include <odp_debug_internal.h>
>  #include <odp_align_internal.h>
> +#include <odp_shm_internal.h>
>  #include <odp/config.h>
>  
>  #include <unistd.h>
> @@ -167,7 +168,7 @@ int odp_shm_free(odp_shm_t shm)
>  		return -1;
>  	}
>  
> -	if (block->flags & ODP_SHM_PROC) {
> +	if (block->flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
>  		ret = shm_unlink(block->name);
>  		if (0 != ret) {
>  			ODP_DBG("odp_shm_free: shm_unlink failed\n");
> @@ -189,7 +190,7 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>  	int fd = -1;
>  	int map_flag = MAP_SHARED;
>  	/* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */
> -	int oflag = O_RDWR | O_CREAT | O_TRUNC;
> +	int oflag = O_RDWR;
>  	uint64_t alloc_size;
>  	uint64_t page_sz, huge_sz;
>  #ifdef MAP_HUGETLB
> @@ -207,7 +208,14 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>  	alloc_hp_size = (size + align + (huge_sz - 1)) & (-huge_sz);
>  #endif
>  
> -	if (flags & ODP_SHM_PROC) {
> +	if (flags & ODP_SHM_PROC)
> +		oflag |= O_CREAT | O_TRUNC;
> +	if (flags & _ODP_SHM_O_EXCL)
> +		oflag |= O_EXCL;
> +
> +	if (flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
> +		need_huge_page = 0;
> +
>  		/* Creates a file to /dev/shm */
>  		fd = shm_open(name, oflag,
>  			      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
> diff --git a/platform/linux-generic/pktio/io_ops.c b/platform/linux-generic/pktio/io_ops.c
> index 3b344e6..b7e6f7c 100644
> --- a/platform/linux-generic/pktio/io_ops.c
> +++ b/platform/linux-generic/pktio/io_ops.c
> @@ -18,6 +18,7 @@ const pktio_if_ops_t * const pktio_if_ops[]  = {
>  #ifdef HAVE_PCAP
>  	&pcap_pktio_ops,
>  #endif
> +	&ipc_pktio_ops,
>  	&sock_mmap_pktio_ops,
>  	&sock_mmsg_pktio_ops,
>  	NULL
> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
> new file mode 100644
> index 0000000..05fce2a
> --- /dev/null
> +++ b/platform/linux-generic/pktio/ipc.c
> @@ -0,0 +1,729 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_packet_io_ipc_internal.h>
> +#include <odp_debug_internal.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_spin_internal.h>
> +#include <odp/system_info.h>
> +#include <odp_shm_internal.h>
> +
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +
> +/* MAC address for the "ipc" interface */
> +static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size);
> +
> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> +{
> +	pool_entry_t *pool;
> +	uint32_t pool_id;
> +	odp_shm_t shm;
> +	odp_shm_info_t info;
> +
> +	pool_id = pool_handle_to_index(pool_hdl);
> +	pool    = get_pool_entry(pool_id);
> +	shm = pool->s.pool_shm;
> +
> +	odp_shm_info(shm, &info);
> +
> +	return info.name;
> +}
> +
> +/**
> +* Look up for shared memory object.
> +*
> +* @param name   name of shm object
> +*
> +* @return 0 on success, otherwise non-zero
> +*/
> +static int _ipc_shm_lookup(const char *name)
> +{
> +	int shm;
> +
> +	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> +	if (shm == -1) {
> +		if (errno == ENOENT)
> +			return -1;
> +		ODP_ABORT("shm_open for %s err %s\n",
> +			  name, strerror(errno));
> +	}
> +	close(shm);
> +	return 0;
> +}
> +
> +static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
> +			       const char *dev,
> +			       int *slave)
> +{
> +	struct pktio_info *pinfo;
> +	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
> +	uint32_t flags;
> +	odp_shm_t shm;
> +
> +	/* Create info about remote pktio */
> +	snprintf(name, sizeof(name), "%s_info", dev);
> +
> +	flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
> +
> +	shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +			      ODP_CACHE_LINE_SIZE,
> +			      flags);
> +	if (ODP_SHM_INVALID != shm) {
> +		pinfo = odp_shm_addr(shm);
> +		pinfo->master.pool_name[0] = 0;
> +		*slave = 0;
> +	} else {
> +		flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
> +		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +				      ODP_CACHE_LINE_SIZE,
> +				      flags);
> +		if (ODP_SHM_INVALID == shm)
> +			ODP_ABORT("can not connect to shm\n");
> +
> +		pinfo = odp_shm_addr(shm);
> +		*slave = 1;
> +	}
> +
> +	pktio_entry->s.ipc.pinfo = pinfo;
> +	pktio_entry->s.ipc.pinfo_shm = shm;
> +
> +	return 0;
> +}
> +
> +static int _ipc_master_start(pktio_entry_t *pktio_entry)
> +{
> +	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
> +	int ret;
> +	void *ipc_pool_base;
> +
> +	if (pinfo->slave.mdata_offset == 0)
> +		return -1;
> +
> +	ret = _ipc_shm_lookup(pinfo->slave.pool_name);
> +	if (ret) {
> +		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
> +		return -1;
> +	}
> +
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> +					     pinfo->master.shm_pkt_pool_size);
> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->slave.mdata_offset;
> +
> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
> +
> +	ODP_DBG("%s started.\n",  pktio_entry->s.name);
> +	return 0;
> +}
> +
> +static int _ipc_init_master(pktio_entry_t *pktio_entry,
> +			    const char *dev,
> +			    odp_pool_t pool)
> +{
> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> +	pool_entry_t *pool_entry;
> +	uint32_t pool_id;
> +	struct pktio_info *pinfo;
> +	const char *pool_name;
> +	odp_shm_t shm;
> +
> +	pool_id = pool_handle_to_index(pool);
> +	pool_entry    = get_pool_entry(pool_id);
> +
> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) {
> +		ODP_DBG("too big ipc name\n");
> +		return -1;
> +	}
> +
> +	/* generate name in shm like ipc_pktio_r for
> +	 * to be processed packets ring.
> +	 */
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	pktio_entry->s.ipc.tx.send = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.tx.send) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		return -1;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
> +		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
> +
> +	/* generate name in shm like ipc_pktio_p for
> +	 * already processed packets
> +	 */
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	pktio_entry->s.ipc.tx.free = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.tx.free) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_m_prod;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
> +		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	pktio_entry->s.ipc.rx.recv = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.rx.recv) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_m_cons;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
> +		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
> +	pktio_entry->s.ipc.rx.free = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.rx.free) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_s_prod;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
> +		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
> +
> +	/* Set up pool name for remote info */
> +	pinfo = pktio_entry->s.ipc.pinfo;
> +	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> +	memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
> +	pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
> +	pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
> +	pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
> +	pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
> +			       pool_entry->s.pool_base_addr;
> +	pinfo->slave.mdata_offset = 0;
> +
> +	pktio_entry->s.ipc.pool = pool;
> +
> +	ODP_DBG("Pre init... DONE.\n");
> +
> +	_ipc_master_start(pktio_entry);
> +
> +	return 0;
> +
> +free_s_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_cons:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +	return -1;
> +}
> +
> +static void _ipc_export_pool(struct pktio_info *pinfo,
> +			     odp_pool_t pool)
> +{
> +	pool_entry_t *pool_entry;
> +
> +	pool_entry = odp_pool_to_entry(pool);
> +	if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
> +		ODP_ABORT("pktio for same name should have the same pool size\n");
> +	if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
> +		ODP_ABORT("pktio for same name should have the same pool size\n");
> +
> +	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
> +		 pool_entry->s.name);
> +	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> +				    pool_entry->s.pool_base_addr;
> +}
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size)
> +{
> +	odp_shm_t shm;
> +	void *addr;
> +
> +	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> +	shm = odp_shm_reserve(name,
> +			      size,
> +			      ODP_CACHE_LINE_SIZE,
> +			      _ODP_SHM_PROC_NOCREAT);
> +	if (shm == ODP_SHM_INVALID)
> +		ODP_ABORT("unable map %s\n", name);
> +
> +	addr = odp_shm_addr(shm);
> +	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
> +		addr, (char *)addr + size, size, name);
> +	return addr;
> +}
> +
> +static void *_ipc_shm_map(char *name, size_t size)
> +{
> +	odp_shm_t shm;
> +	int ret;
> +
> +	ret = _ipc_shm_lookup(name);
> +	if (ret == -1)
> +		return NULL;
> +
> +	shm = odp_shm_reserve(name, size,
> +			      ODP_CACHE_LINE_SIZE,
> +			      _ODP_SHM_PROC_NOCREAT);
> +	if (ODP_SHM_INVALID == shm)
> +		ODP_ABORT("unable to map: %s\n", name);
> +
> +	return odp_shm_addr(shm);
> +}
> +
> +static int _ipc_init_slave(const char *dev,
> +			   pktio_entry_t *pktio_entry,
> +			   odp_pool_t pool)
> +{
> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r")))
> +		ODP_ABORT("too big ipc name\n");
> +
> +	pktio_entry->s.ipc.pool = pool;
> +	return 0;
> +}
> +
> +static int _ipc_slave_start(pktio_entry_t *pktio_entry)
> +{
> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> +	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> +			   sizeof(odph_ring_t);
> +	struct pktio_info *pinfo;
> +	void *ipc_pool_base;
> +	odp_shm_t shm;
> +	const char *dev = pktio_entry->s.name;
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
> +	if (!pktio_entry->s.ipc.rx.recv) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		sleep(1);
> +		return -1;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
> +		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
> +	if (!pktio_entry->s.ipc.rx.free) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_m_prod;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
> +		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
> +	if (!pktio_entry->s.ipc.tx.send) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_m_cons;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
> +		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
> +	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
> +	if (!pktio_entry->s.ipc.tx.free) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_s_prod;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
> +		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
> +
> +	/* Get info about remote pool */
> +	pinfo = pktio_entry->s.ipc.pinfo;
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
> +					     pinfo->master.shm_pkt_pool_size);
> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->master.mdata_offset;
> +	pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
> +
> +	/* @todo: to simplify in linux-generic implementation we create pool for
> +	 * packets from IPC queue. On receive implementation copies packets to
> +	 * that pool. Later we can try to reuse original pool without packets
> +	 * copying. (pkt refcounts needs to be implemented).
> +	 */
> +	_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
> +
> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
> +
> +	ODP_DBG("%s started.\n",  pktio_entry->s.name);
> +	return 0;
> +
> +free_s_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_cons:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +	return -1;
> +}
> +
> +static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
> +			  pktio_entry_t *pktio_entry,
> +			  const char *dev,
> +			  odp_pool_t pool)
> +{
> +	int ret = -1;
> +	int slave;
> +
> +	_ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == ODPH_RING_NAMESIZE,
> +			   "mismatch pool and ring name arrays");
> +
> +	if (strncmp(dev, "ipc", 3))
> +		return -1;
> +
> +	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
> +
> +	_ipc_map_pktio_info(pktio_entry, dev, &slave);
> +	pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
> +						 PKTIO_TYPE_IPC_SLAVE;
> +
> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
> +		ODP_DBG("process %d is master\n", getpid());
> +		ret = _ipc_init_master(pktio_entry, dev, pool);
> +	} else {
> +		ODP_DBG("process %d is slave\n", getpid());
> +		ret = _ipc_init_slave(dev, pktio_entry, pool);
> +	}
> +
> +	return ret;
> +}
> +
> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> +				    uint32_t offset,
> +				    uint32_t *seglen,
> +				    uint32_t limit)
> +{
> +	int seg_index  = offset / buf->segsize;
> +	int seg_offset = offset % buf->segsize;
> +	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> +
> +	if (seglen) {
> +		uint32_t buf_left = limit - offset;
> +		*seglen = seg_offset + buf_left <= buf->segsize ?
> +			buf_left : buf->segsize - seg_offset;
> +	}
> +
> +	return (void *)(seg_offset + (uint8_t *)addr);
> +}
> +
> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> +				    uint32_t offset, uint32_t *seglen)
> +{
> +	if (offset > pkt_hdr->frame_len)
> +		return NULL;
> +
> +	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> +			  pkt_hdr->headroom + offset, seglen,
> +			  pkt_hdr->headroom + pkt_hdr->frame_len);
> +}
> +
> +static void _ipc_free_ring_packets(odph_ring_t *r)
> +{
> +	odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +	int ret;
> +	void **rbuf_p;
> +	int i;
> +
> +	rbuf_p = (void *)&r_p_pkts;
> +
> +	while (1) {
> +		ret = odph_ring_mc_dequeue_burst(r, rbuf_p,
> +						 PKTIO_IPC_ENTRIES);
> +		if (0 == ret)
> +			break;
> +		for (i = 0; i < ret; i++) {
> +			if (r_p_pkts[i] != ODP_PACKET_INVALID)
> +				odp_packet_free(r_p_pkts[i]);
> +		}
> +	}
> +}
> +
> +static int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> +			  odp_packet_t pkt_table[], unsigned len)
> +{
> +	int pkts = 0;
> +	int i;
> +	odph_ring_t *r;
> +	odph_ring_t *r_p;
> +
> +	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> +	void **ipcbufs_p = (void *)&remote_pkts;
> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
> +
> +	if (odp_unlikely(!ready)) {
> +		ODP_DBG("start pktio is missing before usage?\n");
> +		return -1;
> +	}
> +
> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
> +
> +	r = pktio_entry->s.ipc.rx.recv;
> +	pkts = odph_ring_mc_dequeue_burst(r, ipcbufs_p, len);
> +	if (odp_unlikely(pkts < 0))
> +		ODP_ABORT("error to dequeue no packets\n");
> +
> +	/* fast path */
> +	if (odp_likely(0 == pkts))
> +		return 0;
> +
> +	for (i = 0; i < pkts; i++) {
> +		odp_pool_t pool;
> +		odp_packet_t pkt;
> +		odp_packet_hdr_t phdr;
> +		void *ptr;
> +		odp_buffer_bits_t handle;
> +		int idx; /* Remote packet has coded pool and index.
> +			  * We need only index.*/
> +		void *pkt_data;
> +		void *remote_pkt_data;
> +
> +		if (remote_pkts[i] == ODP_PACKET_INVALID)
> +			continue;
> +
> +		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> +		idx = handle.index;
> +
> +		/* Link to packed data. To this line we have Zero-Copy between
> +		 * processes, to simplify use packet copy in that version which
> +		 * can be removed later with more advance buffer management
> +		 * (ref counters).
> +		 */
> +		/* reverse odp_buf_to_hdr() */
> +		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
> +		      (idx * ODP_CACHE_LINE_SIZE);
> +		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
> +
> +		/* Allocate new packet. Select*/
> +		pool = pktio_entry->s.ipc.pool;
> +		if (odp_unlikely(pool == ODP_POOL_INVALID))
> +			ODP_ABORT("invalid pool");
> +
> +		pkt = odp_packet_alloc(pool, phdr.frame_len);
> +		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +			/* Original pool might be smaller then
> +			*  PKTIO_IPC_ENTRIES. If packet can not be
> +			 * allocated from pool at this time,
> +			 * simple get in on next recv() call.
> +			 */
> +			if (i == 0)
> +				return 0;
> +			break;
> +		}
> +
> +		/* Copy packet data. */
> +		pkt_data = odp_packet_data(pkt);
> +		if (odp_unlikely(!pkt_data))
> +			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> +				  (PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.ipc.type));
> +
> +		remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
> +		if (odp_unlikely(!remote_pkt_data))
> +			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
> +				  (PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.ipc.type));
> +
> +		/* @todo fix copy packet!!! */
> +		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
> +
> +		/* Copy packets L2, L3 parsed offsets and size */
> +		copy_packet_parser_metadata(&phdr, odp_packet_hdr(pkt));
> +
> +		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
> +		odp_packet_hdr(pkt)->headroom = phdr.headroom;
> +		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
> +		pkt_table[i] = pkt;
> +	}
> +
> +	/* Now tell other process that we no longer need that buffers.*/
> +	r_p = pktio_entry->s.ipc.rx.free;
> +	pkts = odph_ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
> +	if (odp_unlikely(pkts < 0))
> +		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> +
> +	return pkts;
> +}
> +
> +static int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +			  unsigned len)
> +{
> +	odph_ring_t *r;
> +	void **rbuf_p;
> +	int ret;
> +	unsigned i;
> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
> +
> +	if (odp_unlikely(!ready))
> +		return 0;
> +
> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
> +
> +	/* Prepare packets: calculate offset from address. */
> +	for (i = 0; i < len; i++) {
> +		int j;
> +		odp_packet_t pkt =  pkt_table[i];
> +		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> +		odp_buffer_bits_t handle;
> +		uint32_t cur_mapped_pool_id =
> +			 pool_handle_to_index(pktio_entry->s.ipc.pool);
> +		uint32_t pool_id;
> +
> +		/* do copy if packet was allocated from not mapped pool */
> +		handle.handle = _odp_packet_to_buffer(pkt);
> +		pool_id = handle.pool_id;
> +		if (pool_id != cur_mapped_pool_id) {
> +			odp_packet_t newpkt;
> +
> +			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
> +			if (newpkt == ODP_PACKET_INVALID)
> +				ODP_ABORT("Unable to copy packet\n");
> +
> +			odp_packet_free(pkt);
> +			pkt_table[i] = newpkt;
> +		}
> +
> +		rbuf_p = (void *)&pkt;
> +
> +		/* buf_hdr.addr can not be used directly in remote process,
> +		 * convert it to offset
> +		 */
> +		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
> +			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
> +				(char *)pkt_hdr->buf_hdr.addr[j];
> +		}
> +	}
> +
> +	/* Put packets to ring to be processed by other process. */
> +	rbuf_p = (void *)&pkt_table[0];
> +	r = pktio_entry->s.ipc.tx.send;
> +	ret = odph_ring_mp_enqueue_burst(r, rbuf_p, len);
> +	if (odp_unlikely(ret < 0)) {
> +		ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
> +			getpid(),
> +			(PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type),
> +			ret);
> +		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
> +			odph_ring_full(r), odph_ring_count(r),
> +			odph_ring_free_count(r));
> +	}
> +
> +	return ret;
> +}
> +
> +static int ipc_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
> +{
> +	/* mtu not limited, pool settings are used. */
> +	return (9 * 1024);
> +}
> +
> +static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
> +			    void *mac_addr)
> +{
> +	memcpy(mac_addr, pktio_ipc_mac, ETH_ALEN);
> +	return ETH_ALEN;
> +}
> +
> +static int ipc_start(pktio_entry_t *pktio_entry)
> +{
> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
> +
> +	if (ready) {
> +		ODP_ABORT("%s Already started\n", pktio_entry->s.name);
> +		return -1;
> +	}
> +
> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER)
> +		return _ipc_master_start(pktio_entry);
> +	else
> +		return _ipc_slave_start(pktio_entry);
> +}
> +
> +static int ipc_stop(pktio_entry_t *pktio_entry)
> +{
> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
> +
> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
> +	/* other process can transfer packets from one ring to
> +	 * other, use delay here to free that packets. */
> +	sleep(0.3);

sleep() takes an unsigned int so this is really doing sleep(0)

This could be racy (no matter what sleep value is used), what happens if
the slave is still around, do you just leak packets or something worse?

> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
> +
> +	return 0;
> +}
> +
> +static int ipc_close(pktio_entry_t *pktio_entry)
> +{
> +	ipc_stop(pktio_entry);
> +
> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
> +		char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];

This isn't the right size.

> +		char *dev = pktio_entry->s.name;
> +		odp_shm_t shm;
> +
> +		/* unlink this pktio info */
> +		odp_shm_free(pktio_entry->s.ipc.pinfo_shm);
> +
> +		/* unlink rings */
> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
> +		shm = odp_shm_lookup(ipc_shm_name);
> +		odp_shm_free(shm);
> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +		shm = odp_shm_lookup(ipc_shm_name);
> +		odp_shm_free(shm);
> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +		shm = odp_shm_lookup(ipc_shm_name);
> +		odp_shm_free(shm);
> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +		shm = odp_shm_lookup(ipc_shm_name);
> +		odp_shm_free(shm);
> +	}
> +

Last time I asked about start/exit sequences and it looks like not all
of the sequences are supported -

  p1      p2

start
        start
        exit
exit

I suppose this works, but what if p1 (master) exits first?

What if p1 exits then p3 starts and connects to p2, are p2 and p3 now
both slaves? would this work?

> +	return 0;
> +}
> +
> +const pktio_if_ops_t ipc_pktio_ops = {
> +	.init = NULL,
> +	.term = NULL,
> +	.open = ipc_pktio_open,
> +	.close = ipc_close,
> +	.recv =  ipc_pktio_recv,
> +	.send = ipc_pktio_send,
> +	.start = ipc_start,
> +	.stop = ipc_stop,
> +	.mtu_get = ipc_mtu_get,
> +	.promisc_mode_set = NULL,
> +	.promisc_mode_get = NULL,
> +	.mac_get = ipc_mac_addr_get
> +};

I think all supported pktio types should have the pktio validation tests
run against them as part of make check. The test added in this series
touches most of the API but there are gaps like the promiscuous modes
being unsupported - actually it looks like it would crash if someone
called one those APIs.

This raises the question of whether it makes sense to support promiscuous
mode on this type of interface. I have a local patch to rework the tests
a bit to allow for interfaces that can't change their promiscuous state,
and also support not having a MAC address. I'll send these out tomorrow
(part of a series adding support for netmap VALE and pipe ports).
Maxim Uvarov Oct. 28, 2015, 10:30 a.m. UTC | #2
On 10/27/2015 19:32, Stuart Haslam wrote:
> On Thu, Oct 22, 2015 at 12:50:10PM +0300, Maxim Uvarov wrote:
>> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
>> ---
>>   platform/linux-generic/Makefile.am                 |   2 +
>>   .../linux-generic/include/odp_buffer_internal.h    |   3 +
>>   .../linux-generic/include/odp_packet_io_internal.h |  38 ++
>>   .../include/odp_packet_io_ipc_internal.h           |  47 ++
>>   platform/linux-generic/include/odp_shm_internal.h  |  21 +
>>   platform/linux-generic/odp_packet_io.c             |   1 +
>>   platform/linux-generic/odp_pool.c                  |  14 +-
>>   platform/linux-generic/odp_shared_memory.c         |  14 +-
>>   platform/linux-generic/pktio/io_ops.c              |   1 +
>>   platform/linux-generic/pktio/ipc.c                 | 729 +++++++++++++++++++++
>>   platform/linux-generic/pktio/ring.c                |   1 +
>>   11 files changed, 866 insertions(+), 5 deletions(-)
>>   create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>>   create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>>   create mode 100644 platform/linux-generic/pktio/ipc.c
>>   create mode 120000 platform/linux-generic/pktio/ring.c
>>
>> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
>> index dfb5a91..834354c 100644
>> --- a/platform/linux-generic/Makefile.am
>> +++ b/platform/linux-generic/Makefile.am
>> @@ -151,10 +151,12 @@ __LIB__libodp_la_SOURCES = \
>>   			   odp_packet_flags.c \
>>   			   odp_packet_io.c \
>>   			   pktio/io_ops.c \
>> +			   pktio/ipc.c \
>>   			   pktio/loop.c \
>>   			   pktio/netmap.c \
>>   			   pktio/socket.c \
>>   			   pktio/socket_mmap.c \
>> +			   pktio/ring.c \
>>   			   odp_pool.c \
>>   			   odp_queue.c \
>>   			   odp_rwlock.c \
>> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
>> index 4cacca1..a078e52 100644
>> --- a/platform/linux-generic/include/odp_buffer_internal.h
>> +++ b/platform/linux-generic/include/odp_buffer_internal.h
>> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>>   	uint32_t                 uarea_size; /* size of user area */
>>   	uint32_t                 segcount;   /* segment count */
>>   	uint32_t                 segsize;    /* segment size */
>> +	/* ipc mapped process can not walk over pointers,
>> +	 * offset has to be used */
>> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>>   	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>>   	uint64_t                 order;      /* sequence for ordered queues */
>>   	queue_entry_t           *origin_qe;  /* ordered queue origin */
>> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
>> index 4745bd5..54aa795 100644
>> --- a/platform/linux-generic/include/odp_packet_io_internal.h
>> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
>> @@ -25,6 +25,7 @@ extern "C" {
>>   #include <odp_classification_datamodel.h>
>>   #include <odp_align_internal.h>
>>   #include <odp_debug_internal.h>
>> +#include <odp/helper/ring.h>
>>   
>>   #include <odp/config.h>
>>   #include <odp/hints.h>
>> @@ -55,6 +56,41 @@ typedef struct {
>>   } pkt_pcap_t;
>>   #endif
>>   
>> +typedef	struct {
>> +	/* TX */
>> +	struct  {
>> +		odph_ring_t	*send; /**< ODP ring for IPC msg packets
>> +					    indexes transmitted to shared
>> +					    memory */
>> +		odph_ring_t	*free; /**< ODP ring for IPC msg packets
>> +					    indexes already processed by remote
>> +					    process */
>> +	} tx;
>> +	/* RX */
>> +	struct {
>> +		odph_ring_t	*recv; /**< ODP ring for IPC msg packets
>> +					    indexes received from shared
>> +					     memory (from remote process) */
>> +		odph_ring_t	*free; /**< ODP ring for IPC msg packets
>> +					    indexes already processed by
>> +					    current process */
>> +	} rx; /* slave */
>> +	void		*pool_base;		/**< Remote pool base addr */
>> +	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
>> +	uint64_t	pkt_size;		/**< Packet size in remote pool */
>> +	odp_pool_t	pool;			/**< Pool of main process */
>> +	enum {
>> +		PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which
>> +						creates shm */
>> +		PKTIO_TYPE_IPC_SLAVE	   /**< Slave is the process which
>> +						connects to shm */
>> +	} type; /**< define if it's master or slave process */
>> +	odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send
>> +				     packet, 0 - not yet ready */
>> +	void *pinfo;
>> +	odp_shm_t pinfo_shm;
>> +} _ipc_pktio_t;
>> +
>>   struct pktio_entry {
>>   	const struct pktio_if_ops *ops; /**< Implementation specific methods */
>>   	odp_ticketlock_t lock;		/**< entry ticketlock */
>> @@ -72,6 +108,7 @@ struct pktio_entry {
>>   #ifdef HAVE_PCAP
>>   		pkt_pcap_t pkt_pcap;		/**< Using pcap for IO */
>>   #endif
>> +		_ipc_pktio_t ipc;		/**< IPC pktio data */
>>   	};
>>   	enum {
>>   		STATE_START = 0,
>> @@ -151,6 +188,7 @@ extern const pktio_if_ops_t loopback_pktio_ops;
>>   #ifdef HAVE_PCAP
>>   extern const pktio_if_ops_t pcap_pktio_ops;
>>   #endif
>> +extern const pktio_if_ops_t ipc_pktio_ops;
>>   extern const pktio_if_ops_t * const pktio_if_ops[];
>>   
>>   #ifdef __cplusplus
>> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
>> new file mode 100644
>> index 0000000..bec2af6
>> --- /dev/null
>> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
>> @@ -0,0 +1,47 @@
>> +/* Copyright (c) 2015, Linaro Limited
>> + * All rights reserved.
>> + *
>> + * SPDX-License-Identifier:     BSD-3-Clause
>> + */
>> +
>> +#include <odp/packet_io.h>
>> +#include <odp_packet_io_internal.h>
>> +#include <odp/packet.h>
>> +#include <odp_packet_internal.h>
>> +#include <odp_internal.h>
>> +#include <odp/shared_memory.h>
>> +
>> +#include <string.h>
>> +#include <unistd.h>
>> +#include <stdlib.h>
>> +
>> +/* IPC packet I/O over shared memory ring */
>> +#include <odp/helper/ring.h>
>> +
>> +/* number of odp buffers in odp ring queue */
>> +#define PKTIO_IPC_ENTRIES 4096
>> +
>> +/* that struct is exported to shared memory, so that processes can find
>> + * each other.
>> + */
>> +struct pktio_info {
>> +	struct {
>> +		/* number of buffer in remote pool */
>> +		int shm_pool_bufs_num;
>> +		/* size of remote pool */
>> +		size_t shm_pkt_pool_size;
>> +		/* size of packet/segment in remote pool */
>> +		uint32_t shm_pkt_size;
>> +		/* offset from shared memory block start
>> +		 * to pool_mdata_addr (linux-generic pool specific) */
>> +		size_t mdata_offset;
>> +		char pool_name[ODP_POOL_NAME_LEN];
>> +	} master;
>> +	struct {
>> +		/* offset from shared memory block start
>> +		 * to pool_mdata_addr in remote process.
>> +		 * (linux-generic pool specific) */
>> +		size_t mdata_offset;
>> +		char pool_name[ODP_POOL_NAME_LEN];
>> +	} slave;
>> +} ODP_PACKED;
>> diff --git a/platform/linux-generic/include/odp_shm_internal.h b/platform/linux-generic/include/odp_shm_internal.h
>> new file mode 100644
>> index 0000000..1fd7a3c
>> --- /dev/null
>> +++ b/platform/linux-generic/include/odp_shm_internal.h
>> @@ -0,0 +1,21 @@
>> +/* Copyright (c) 2013, Linaro Limited
>> + * All rights reserved.
>> + *
>> + * SPDX-License-Identifier:     BSD-3-Clause
>> + */
>> +
>> +#ifndef ODP_SHM_INTERNAL_H_
>> +#define ODP_SHM_INTERNAL_H_
>> +
>> +#ifdef __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#define _ODP_SHM_PROC_NOCREAT 0x4  /**< Do not create shm if not exist */
>> +#define _ODP_SHM_O_EXCL	      0x8  /**< Do not create shm if exist */
>> +
>> +#ifdef __cplusplus
>> +}
>> +#endif
>> +
>> +#endif
>> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
>> index 20c66f3..0c02f9e 100644
>> --- a/platform/linux-generic/odp_packet_io.c
>> +++ b/platform/linux-generic/odp_packet_io.c
>> @@ -19,6 +19,7 @@
>>   #include <odp_schedule_internal.h>
>>   #include <odp_classification_internal.h>
>>   #include <odp_debug_internal.h>
>> +#include <odp_packet_io_ipc_internal.h>
>>   
>>   #include <string.h>
>>   #include <sys/ioctl.h>
>> diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
>> index 2036c2a..220dcd2 100644
>> --- a/platform/linux-generic/odp_pool.c
>> +++ b/platform/linux-generic/odp_pool.c
>> @@ -219,8 +219,11 @@ odp_pool_t _pool_create(const char *name,
>>   			ODP_ALIGN_ROUNDUP(params->pkt.len, seg_len);
>>   
>>   		/* Reject create if pkt.len needs too many segments */
>> -		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG)
>> +		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG) {
>> +			ODP_ERR("ODP_BUFFER_MAX_SEG exceed %d(%d)\n",
>> +				blk_size / seg_len, ODP_BUFFER_MAX_SEG);
>>   			return ODP_POOL_INVALID;
>> +		}
>>   
>>   		p_udata_size = params->pkt.uarea_size;
>>   		udata_stride = ODP_ALIGN_ROUNDUP(p_udata_size,
>> @@ -241,8 +244,12 @@ odp_pool_t _pool_create(const char *name,
>>   
>>   	/* Validate requested number of buffers against addressable limits */
>>   	if (buf_num >
>> -	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE)))
>> +	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE))) {
>> +		ODP_ERR("buf_num %d > then expected %d\n",
>> +			buf_num, ODP_BUFFER_MAX_BUFFERS /
>> +			(buf_stride / ODP_CACHE_LINE_SIZE));
>>   		return ODP_POOL_INVALID;
>> +	}
>>   
>>   	/* Find an unused buffer pool slot and iniitalize it as requested */
>>   	for (i = 0; i < ODP_CONFIG_POOLS; i++) {
>> @@ -468,6 +475,9 @@ int odp_pool_destroy(odp_pool_t pool_hdl)
>>   	/* Call fails if pool has allocated buffers */
>>   	if (odp_atomic_load_u32(&pool->s.bufcount) < pool->s.buf_num) {
>>   		POOL_UNLOCK(&pool->s.lock);
>> +		ODP_DBG("error: pool has allocated buffers %d/%d\n",
>> +			odp_atomic_load_u32(&pool->s.bufcount),
>> +			pool->s.buf_num);
>>   		return -1;
>>   	}
>>   
>> diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c
>> index ab48dda..6a4b5ba 100644
>> --- a/platform/linux-generic/odp_shared_memory.c
>> +++ b/platform/linux-generic/odp_shared_memory.c
>> @@ -15,6 +15,7 @@
>>   #include <odp/debug.h>
>>   #include <odp_debug_internal.h>
>>   #include <odp_align_internal.h>
>> +#include <odp_shm_internal.h>
>>   #include <odp/config.h>
>>   
>>   #include <unistd.h>
>> @@ -167,7 +168,7 @@ int odp_shm_free(odp_shm_t shm)
>>   		return -1;
>>   	}
>>   
>> -	if (block->flags & ODP_SHM_PROC) {
>> +	if (block->flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
>>   		ret = shm_unlink(block->name);
>>   		if (0 != ret) {
>>   			ODP_DBG("odp_shm_free: shm_unlink failed\n");
>> @@ -189,7 +190,7 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>>   	int fd = -1;
>>   	int map_flag = MAP_SHARED;
>>   	/* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */
>> -	int oflag = O_RDWR | O_CREAT | O_TRUNC;
>> +	int oflag = O_RDWR;
>>   	uint64_t alloc_size;
>>   	uint64_t page_sz, huge_sz;
>>   #ifdef MAP_HUGETLB
>> @@ -207,7 +208,14 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>>   	alloc_hp_size = (size + align + (huge_sz - 1)) & (-huge_sz);
>>   #endif
>>   
>> -	if (flags & ODP_SHM_PROC) {
>> +	if (flags & ODP_SHM_PROC)
>> +		oflag |= O_CREAT | O_TRUNC;
>> +	if (flags & _ODP_SHM_O_EXCL)
>> +		oflag |= O_EXCL;
>> +
>> +	if (flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
>> +		need_huge_page = 0;
>> +
>>   		/* Creates a file to /dev/shm */
>>   		fd = shm_open(name, oflag,
>>   			      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
>> diff --git a/platform/linux-generic/pktio/io_ops.c b/platform/linux-generic/pktio/io_ops.c
>> index 3b344e6..b7e6f7c 100644
>> --- a/platform/linux-generic/pktio/io_ops.c
>> +++ b/platform/linux-generic/pktio/io_ops.c
>> @@ -18,6 +18,7 @@ const pktio_if_ops_t * const pktio_if_ops[]  = {
>>   #ifdef HAVE_PCAP
>>   	&pcap_pktio_ops,
>>   #endif
>> +	&ipc_pktio_ops,
>>   	&sock_mmap_pktio_ops,
>>   	&sock_mmsg_pktio_ops,
>>   	NULL
>> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
>> new file mode 100644
>> index 0000000..05fce2a
>> --- /dev/null
>> +++ b/platform/linux-generic/pktio/ipc.c
>> @@ -0,0 +1,729 @@
>> +/* Copyright (c) 2015, Linaro Limited
>> + * All rights reserved.
>> + *
>> + * SPDX-License-Identifier:     BSD-3-Clause
>> + */
>> +
>> +#include <odp_packet_io_ipc_internal.h>
>> +#include <odp_debug_internal.h>
>> +#include <odp_packet_io_internal.h>
>> +#include <odp_spin_internal.h>
>> +#include <odp/system_info.h>
>> +#include <odp_shm_internal.h>
>> +
>> +#include <sys/mman.h>
>> +#include <sys/stat.h>
>> +#include <fcntl.h>
>> +
>> +/* MAC address for the "ipc" interface */
>> +static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
>> +
>> +static void *_ipc_map_remote_pool(const char *name, size_t size);
>> +
>> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
>> +{
>> +	pool_entry_t *pool;
>> +	uint32_t pool_id;
>> +	odp_shm_t shm;
>> +	odp_shm_info_t info;
>> +
>> +	pool_id = pool_handle_to_index(pool_hdl);
>> +	pool    = get_pool_entry(pool_id);
>> +	shm = pool->s.pool_shm;
>> +
>> +	odp_shm_info(shm, &info);
>> +
>> +	return info.name;
>> +}
>> +
>> +/**
>> +* Look up for shared memory object.
>> +*
>> +* @param name   name of shm object
>> +*
>> +* @return 0 on success, otherwise non-zero
>> +*/
>> +static int _ipc_shm_lookup(const char *name)
>> +{
>> +	int shm;
>> +
>> +	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
>> +	if (shm == -1) {
>> +		if (errno == ENOENT)
>> +			return -1;
>> +		ODP_ABORT("shm_open for %s err %s\n",
>> +			  name, strerror(errno));
>> +	}
>> +	close(shm);
>> +	return 0;
>> +}
>> +
>> +static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
>> +			       const char *dev,
>> +			       int *slave)
>> +{
>> +	struct pktio_info *pinfo;
>> +	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
>> +	uint32_t flags;
>> +	odp_shm_t shm;
>> +
>> +	/* Create info about remote pktio */
>> +	snprintf(name, sizeof(name), "%s_info", dev);
>> +
>> +	flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
>> +
>> +	shm = odp_shm_reserve(name, sizeof(struct pktio_info),
>> +			      ODP_CACHE_LINE_SIZE,
>> +			      flags);
>> +	if (ODP_SHM_INVALID != shm) {
>> +		pinfo = odp_shm_addr(shm);
>> +		pinfo->master.pool_name[0] = 0;
>> +		*slave = 0;
>> +	} else {
>> +		flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
>> +		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
>> +				      ODP_CACHE_LINE_SIZE,
>> +				      flags);
>> +		if (ODP_SHM_INVALID == shm)
>> +			ODP_ABORT("can not connect to shm\n");
>> +
>> +		pinfo = odp_shm_addr(shm);
>> +		*slave = 1;
>> +	}
>> +
>> +	pktio_entry->s.ipc.pinfo = pinfo;
>> +	pktio_entry->s.ipc.pinfo_shm = shm;
>> +
>> +	return 0;
>> +}
>> +
>> +static int _ipc_master_start(pktio_entry_t *pktio_entry)
>> +{
>> +	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
>> +	int ret;
>> +	void *ipc_pool_base;
>> +
>> +	if (pinfo->slave.mdata_offset == 0)
>> +		return -1;
>> +
>> +	ret = _ipc_shm_lookup(pinfo->slave.pool_name);
>> +	if (ret) {
>> +		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
>> +		return -1;
>> +	}
>> +
>> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
>> +					     pinfo->master.shm_pkt_pool_size);
>> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
>> +					     pinfo->slave.mdata_offset;
>> +
>> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
>> +
>> +	ODP_DBG("%s started.\n",  pktio_entry->s.name);
>> +	return 0;
>> +}
>> +
>> +static int _ipc_init_master(pktio_entry_t *pktio_entry,
>> +			    const char *dev,
>> +			    odp_pool_t pool)
>> +{
>> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
>> +	pool_entry_t *pool_entry;
>> +	uint32_t pool_id;
>> +	struct pktio_info *pinfo;
>> +	const char *pool_name;
>> +	odp_shm_t shm;
>> +
>> +	pool_id = pool_handle_to_index(pool);
>> +	pool_entry    = get_pool_entry(pool_id);
>> +
>> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) {
>> +		ODP_DBG("too big ipc name\n");
>> +		return -1;
>> +	}
>> +
>> +	/* generate name in shm like ipc_pktio_r for
>> +	 * to be processed packets ring.
>> +	 */
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
>> +	pktio_entry->s.ipc.tx.send = odph_ring_create(ipc_shm_name,
>> +			PKTIO_IPC_ENTRIES,
>> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
>> +	if (!pktio_entry->s.ipc.tx.send) {
>> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
>> +			getpid(), ipc_shm_name);
>> +		return -1;
>> +	}
>> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
>> +		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
>> +
>> +	/* generate name in shm like ipc_pktio_p for
>> +	 * already processed packets
>> +	 */
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
>> +	pktio_entry->s.ipc.tx.free = odph_ring_create(ipc_shm_name,
>> +			PKTIO_IPC_ENTRIES,
>> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
>> +	if (!pktio_entry->s.ipc.tx.free) {
>> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
>> +			getpid(), ipc_shm_name);
>> +		goto free_m_prod;
>> +	}
>> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
>> +		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
>> +	pktio_entry->s.ipc.rx.recv = odph_ring_create(ipc_shm_name,
>> +			PKTIO_IPC_ENTRIES,
>> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
>> +	if (!pktio_entry->s.ipc.rx.recv) {
>> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
>> +			getpid(), ipc_shm_name);
>> +		goto free_m_cons;
>> +	}
>> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
>> +		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
>> +	pktio_entry->s.ipc.rx.free = odph_ring_create(ipc_shm_name,
>> +			PKTIO_IPC_ENTRIES,
>> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
>> +	if (!pktio_entry->s.ipc.rx.free) {
>> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
>> +			getpid(), ipc_shm_name);
>> +		goto free_s_prod;
>> +	}
>> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
>> +		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
>> +
>> +	/* Set up pool name for remote info */
>> +	pinfo = pktio_entry->s.ipc.pinfo;
>> +	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
>> +	memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
>> +	pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
>> +	pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
>> +	pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
>> +	pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
>> +			       pool_entry->s.pool_base_addr;
>> +	pinfo->slave.mdata_offset = 0;
>> +
>> +	pktio_entry->s.ipc.pool = pool;
>> +
>> +	ODP_DBG("Pre init... DONE.\n");
>> +
>> +	_ipc_master_start(pktio_entry);
>> +
>> +	return 0;
>> +
>> +free_s_prod:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +free_m_cons:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +free_m_prod:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +	return -1;
>> +}
>> +
>> +static void _ipc_export_pool(struct pktio_info *pinfo,
>> +			     odp_pool_t pool)
>> +{
>> +	pool_entry_t *pool_entry;
>> +
>> +	pool_entry = odp_pool_to_entry(pool);
>> +	if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
>> +		ODP_ABORT("pktio for same name should have the same pool size\n");
>> +	if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
>> +		ODP_ABORT("pktio for same name should have the same pool size\n");
>> +
>> +	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
>> +		 pool_entry->s.name);
>> +	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
>> +				    pool_entry->s.pool_base_addr;
>> +}
>> +
>> +static void *_ipc_map_remote_pool(const char *name, size_t size)
>> +{
>> +	odp_shm_t shm;
>> +	void *addr;
>> +
>> +	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
>> +	shm = odp_shm_reserve(name,
>> +			      size,
>> +			      ODP_CACHE_LINE_SIZE,
>> +			      _ODP_SHM_PROC_NOCREAT);
>> +	if (shm == ODP_SHM_INVALID)
>> +		ODP_ABORT("unable map %s\n", name);
>> +
>> +	addr = odp_shm_addr(shm);
>> +	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
>> +		addr, (char *)addr + size, size, name);
>> +	return addr;
>> +}
>> +
>> +static void *_ipc_shm_map(char *name, size_t size)
>> +{
>> +	odp_shm_t shm;
>> +	int ret;
>> +
>> +	ret = _ipc_shm_lookup(name);
>> +	if (ret == -1)
>> +		return NULL;
>> +
>> +	shm = odp_shm_reserve(name, size,
>> +			      ODP_CACHE_LINE_SIZE,
>> +			      _ODP_SHM_PROC_NOCREAT);
>> +	if (ODP_SHM_INVALID == shm)
>> +		ODP_ABORT("unable to map: %s\n", name);
>> +
>> +	return odp_shm_addr(shm);
>> +}
>> +
>> +static int _ipc_init_slave(const char *dev,
>> +			   pktio_entry_t *pktio_entry,
>> +			   odp_pool_t pool)
>> +{
>> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r")))
>> +		ODP_ABORT("too big ipc name\n");
>> +
>> +	pktio_entry->s.ipc.pool = pool;
>> +	return 0;
>> +}
>> +
>> +static int _ipc_slave_start(pktio_entry_t *pktio_entry)
>> +{
>> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
>> +	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
>> +			   sizeof(odph_ring_t);
>> +	struct pktio_info *pinfo;
>> +	void *ipc_pool_base;
>> +	odp_shm_t shm;
>> +	const char *dev = pktio_entry->s.name;
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
>> +	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
>> +	if (!pktio_entry->s.ipc.rx.recv) {
>> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
>> +			getpid(), dev);
>> +		sleep(1);
>> +		return -1;
>> +	}
>> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
>> +		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
>> +	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
>> +	if (!pktio_entry->s.ipc.rx.free) {
>> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
>> +			getpid(), dev);
>> +		goto free_m_prod;
>> +	}
>> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
>> +		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
>> +	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
>> +	if (!pktio_entry->s.ipc.tx.send) {
>> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
>> +			getpid(), dev);
>> +		goto free_m_cons;
>> +	}
>> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
>> +		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
>> +
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
>> +	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
>> +	if (!pktio_entry->s.ipc.tx.free) {
>> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
>> +			getpid(), dev);
>> +		goto free_s_prod;
>> +	}
>> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
>> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
>> +		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
>> +
>> +	/* Get info about remote pool */
>> +	pinfo = pktio_entry->s.ipc.pinfo;
>> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
>> +					     pinfo->master.shm_pkt_pool_size);
>> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
>> +					     pinfo->master.mdata_offset;
>> +	pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
>> +
>> +	/* @todo: to simplify in linux-generic implementation we create pool for
>> +	 * packets from IPC queue. On receive implementation copies packets to
>> +	 * that pool. Later we can try to reuse original pool without packets
>> +	 * copying. (pkt refcounts needs to be implemented).
>> +	 */
>> +	_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
>> +
>> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
>> +
>> +	ODP_DBG("%s started.\n",  pktio_entry->s.name);
>> +	return 0;
>> +
>> +free_s_prod:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +free_m_cons:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +free_m_prod:
>> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
>> +	shm = odp_shm_lookup(ipc_shm_name);
>> +	odp_shm_free(shm);
>> +	return -1;
>> +}
>> +
>> +static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
>> +			  pktio_entry_t *pktio_entry,
>> +			  const char *dev,
>> +			  odp_pool_t pool)
>> +{
>> +	int ret = -1;
>> +	int slave;
>> +
>> +	_ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == ODPH_RING_NAMESIZE,
>> +			   "mismatch pool and ring name arrays");
>> +
>> +	if (strncmp(dev, "ipc", 3))
>> +		return -1;
>> +
>> +	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
>> +
>> +	_ipc_map_pktio_info(pktio_entry, dev, &slave);
>> +	pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
>> +						 PKTIO_TYPE_IPC_SLAVE;
>> +
>> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
>> +		ODP_DBG("process %d is master\n", getpid());
>> +		ret = _ipc_init_master(pktio_entry, dev, pool);
>> +	} else {
>> +		ODP_DBG("process %d is slave\n", getpid());
>> +		ret = _ipc_init_slave(dev, pktio_entry, pool);
>> +	}
>> +
>> +	return ret;
>> +}
>> +
>> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
>> +				    uint32_t offset,
>> +				    uint32_t *seglen,
>> +				    uint32_t limit)
>> +{
>> +	int seg_index  = offset / buf->segsize;
>> +	int seg_offset = offset % buf->segsize;
>> +	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
>> +
>> +	if (seglen) {
>> +		uint32_t buf_left = limit - offset;
>> +		*seglen = seg_offset + buf_left <= buf->segsize ?
>> +			buf_left : buf->segsize - seg_offset;
>> +	}
>> +
>> +	return (void *)(seg_offset + (uint8_t *)addr);
>> +}
>> +
>> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
>> +				    uint32_t offset, uint32_t *seglen)
>> +{
>> +	if (offset > pkt_hdr->frame_len)
>> +		return NULL;
>> +
>> +	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
>> +			  pkt_hdr->headroom + offset, seglen,
>> +			  pkt_hdr->headroom + pkt_hdr->frame_len);
>> +}
>> +
>> +static void _ipc_free_ring_packets(odph_ring_t *r)
>> +{
>> +	odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
>> +	int ret;
>> +	void **rbuf_p;
>> +	int i;
>> +
>> +	rbuf_p = (void *)&r_p_pkts;
>> +
>> +	while (1) {
>> +		ret = odph_ring_mc_dequeue_burst(r, rbuf_p,
>> +						 PKTIO_IPC_ENTRIES);
>> +		if (0 == ret)
>> +			break;
>> +		for (i = 0; i < ret; i++) {
>> +			if (r_p_pkts[i] != ODP_PACKET_INVALID)
>> +				odp_packet_free(r_p_pkts[i]);
>> +		}
>> +	}
>> +}
>> +
>> +static int ipc_pktio_recv(pktio_entry_t *pktio_entry,
>> +			  odp_packet_t pkt_table[], unsigned len)
>> +{
>> +	int pkts = 0;
>> +	int i;
>> +	odph_ring_t *r;
>> +	odph_ring_t *r_p;
>> +
>> +	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
>> +	void **ipcbufs_p = (void *)&remote_pkts;
>> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
>> +
>> +	if (odp_unlikely(!ready)) {
>> +		ODP_DBG("start pktio is missing before usage?\n");
>> +		return -1;
>> +	}
>> +
>> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
>> +
>> +	r = pktio_entry->s.ipc.rx.recv;
>> +	pkts = odph_ring_mc_dequeue_burst(r, ipcbufs_p, len);
>> +	if (odp_unlikely(pkts < 0))
>> +		ODP_ABORT("error to dequeue no packets\n");
>> +
>> +	/* fast path */
>> +	if (odp_likely(0 == pkts))
>> +		return 0;
>> +
>> +	for (i = 0; i < pkts; i++) {
>> +		odp_pool_t pool;
>> +		odp_packet_t pkt;
>> +		odp_packet_hdr_t phdr;
>> +		void *ptr;
>> +		odp_buffer_bits_t handle;
>> +		int idx; /* Remote packet has coded pool and index.
>> +			  * We need only index.*/
>> +		void *pkt_data;
>> +		void *remote_pkt_data;
>> +
>> +		if (remote_pkts[i] == ODP_PACKET_INVALID)
>> +			continue;
>> +
>> +		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
>> +		idx = handle.index;
>> +
>> +		/* Link to packed data. To this line we have Zero-Copy between
>> +		 * processes, to simplify use packet copy in that version which
>> +		 * can be removed later with more advance buffer management
>> +		 * (ref counters).
>> +		 */
>> +		/* reverse odp_buf_to_hdr() */
>> +		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
>> +		      (idx * ODP_CACHE_LINE_SIZE);
>> +		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
>> +
>> +		/* Allocate new packet. Select*/
>> +		pool = pktio_entry->s.ipc.pool;
>> +		if (odp_unlikely(pool == ODP_POOL_INVALID))
>> +			ODP_ABORT("invalid pool");
>> +
>> +		pkt = odp_packet_alloc(pool, phdr.frame_len);
>> +		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
>> +			/* Original pool might be smaller then
>> +			*  PKTIO_IPC_ENTRIES. If packet can not be
>> +			 * allocated from pool at this time,
>> +			 * simple get in on next recv() call.
>> +			 */
>> +			if (i == 0)
>> +				return 0;
>> +			break;
>> +		}
>> +
>> +		/* Copy packet data. */
>> +		pkt_data = odp_packet_data(pkt);
>> +		if (odp_unlikely(!pkt_data))
>> +			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
>> +				  (PKTIO_TYPE_IPC_SLAVE ==
>> +					pktio_entry->s.ipc.type));
>> +
>> +		remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
>> +		if (odp_unlikely(!remote_pkt_data))
>> +			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
>> +				  (PKTIO_TYPE_IPC_SLAVE ==
>> +					pktio_entry->s.ipc.type));
>> +
>> +		/* @todo fix copy packet!!! */
>> +		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
>> +
>> +		/* Copy packets L2, L3 parsed offsets and size */
>> +		copy_packet_parser_metadata(&phdr, odp_packet_hdr(pkt));
>> +
>> +		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
>> +		odp_packet_hdr(pkt)->headroom = phdr.headroom;
>> +		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
>> +		pkt_table[i] = pkt;
>> +	}
>> +
>> +	/* Now tell other process that we no longer need that buffers.*/
>> +	r_p = pktio_entry->s.ipc.rx.free;
>> +	pkts = odph_ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
>> +	if (odp_unlikely(pkts < 0))
>> +		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
>> +
>> +	return pkts;
>> +}
>> +
>> +static int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
>> +			  unsigned len)
>> +{
>> +	odph_ring_t *r;
>> +	void **rbuf_p;
>> +	int ret;
>> +	unsigned i;
>> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
>> +
>> +	if (odp_unlikely(!ready))
>> +		return 0;
>> +
>> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
>> +
>> +	/* Prepare packets: calculate offset from address. */
>> +	for (i = 0; i < len; i++) {
>> +		int j;
>> +		odp_packet_t pkt =  pkt_table[i];
>> +		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
>> +		odp_buffer_bits_t handle;
>> +		uint32_t cur_mapped_pool_id =
>> +			 pool_handle_to_index(pktio_entry->s.ipc.pool);
>> +		uint32_t pool_id;
>> +
>> +		/* do copy if packet was allocated from not mapped pool */
>> +		handle.handle = _odp_packet_to_buffer(pkt);
>> +		pool_id = handle.pool_id;
>> +		if (pool_id != cur_mapped_pool_id) {
>> +			odp_packet_t newpkt;
>> +
>> +			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
>> +			if (newpkt == ODP_PACKET_INVALID)
>> +				ODP_ABORT("Unable to copy packet\n");
>> +
>> +			odp_packet_free(pkt);
>> +			pkt_table[i] = newpkt;
>> +		}
>> +
>> +		rbuf_p = (void *)&pkt;
>> +
>> +		/* buf_hdr.addr can not be used directly in remote process,
>> +		 * convert it to offset
>> +		 */
>> +		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
>> +			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
>> +				(char *)pkt_hdr->buf_hdr.addr[j];
>> +		}
>> +	}
>> +
>> +	/* Put packets to ring to be processed by other process. */
>> +	rbuf_p = (void *)&pkt_table[0];
>> +	r = pktio_entry->s.ipc.tx.send;
>> +	ret = odph_ring_mp_enqueue_burst(r, rbuf_p, len);
>> +	if (odp_unlikely(ret < 0)) {
>> +		ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
>> +			getpid(),
>> +			(PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type),
>> +			ret);
>> +		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
>> +			odph_ring_full(r), odph_ring_count(r),
>> +			odph_ring_free_count(r));
>> +	}
>> +
>> +	return ret;
>> +}
>> +
>> +static int ipc_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
>> +{
>> +	/* mtu not limited, pool settings are used. */
>> +	return (9 * 1024);
>> +}
>> +
>> +static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
>> +			    void *mac_addr)
>> +{
>> +	memcpy(mac_addr, pktio_ipc_mac, ETH_ALEN);
>> +	return ETH_ALEN;
>> +}
>> +
>> +static int ipc_start(pktio_entry_t *pktio_entry)
>> +{
>> +	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
>> +
>> +	if (ready) {
>> +		ODP_ABORT("%s Already started\n", pktio_entry->s.name);
>> +		return -1;
>> +	}
>> +
>> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER)
>> +		return _ipc_master_start(pktio_entry);
>> +	else
>> +		return _ipc_slave_start(pktio_entry);
>> +}
>> +
>> +static int ipc_stop(pktio_entry_t *pktio_entry)
>> +{
>> +	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
>> +
>> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
>> +	/* other process can transfer packets from one ring to
>> +	 * other, use delay here to free that packets. */
>> +	sleep(0.3);
> sleep() takes an unsigned int so this is really doing sleep(0)
>
> This could be racy (no matter what sleep value is used), what happens if
> the slave is still around, do you just leak packets or something worse?
>
If slave is around than slave will and packets to tx.free later. And 
next round
that packets can be freed. Primary propose of that stop and pktio close here
to destroy pool with zero used buffers. So here I do what I can to 
release packets
back to pool.


>> +	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
>> +
>> +	return 0;
>> +}
>> +
>> +static int ipc_close(pktio_entry_t *pktio_entry)
>> +{
>> +	ipc_stop(pktio_entry);
>> +
>> +	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
>> +		char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> This isn't the right size.
ok.

>> +		char *dev = pktio_entry->s.name;
>> +		odp_shm_t shm;
>> +
>> +		/* unlink this pktio info */
>> +		odp_shm_free(pktio_entry->s.ipc.pinfo_shm);
>> +
>> +		/* unlink rings */
>> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
>> +		shm = odp_shm_lookup(ipc_shm_name);
>> +		odp_shm_free(shm);
>> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
>> +		shm = odp_shm_lookup(ipc_shm_name);
>> +		odp_shm_free(shm);
>> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
>> +		shm = odp_shm_lookup(ipc_shm_name);
>> +		odp_shm_free(shm);
>> +		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
>> +		shm = odp_shm_lookup(ipc_shm_name);
>> +		odp_shm_free(shm);
>> +	}
>> +
> Last time I asked about start/exit sequences and it looks like not all
> of the sequences are supported -
>
>    p1      p2
>
> start
>          start
>          exit
> exit
>
> I suppose this works, but what if p1 (master) exits first?
>
> What if p1 exits then p3 starts and connects to p2, are p2 and p3 now
> both slaves? would this work?
p3 is not supported in that version. For p1 and p2 I checked different 
orders,
all of them works as I expect.

>
>> +	return 0;
>> +}
>> +
>> +const pktio_if_ops_t ipc_pktio_ops = {
>> +	.init = NULL,
>> +	.term = NULL,
>> +	.open = ipc_pktio_open,
>> +	.close = ipc_close,
>> +	.recv =  ipc_pktio_recv,
>> +	.send = ipc_pktio_send,
>> +	.start = ipc_start,
>> +	.stop = ipc_stop,
>> +	.mtu_get = ipc_mtu_get,
>> +	.promisc_mode_set = NULL,
>> +	.promisc_mode_get = NULL,
>> +	.mac_get = ipc_mac_addr_get
>> +};
> I think all supported pktio types should have the pktio validation tests
> run against them as part of make check. The test added in this series
> touches most of the API but there are gaps like the promiscuous modes
> being unsupported - actually it looks like it would crash if someone
> called one those APIs.
>
> This raises the question of whether it makes sense to support promiscuous
> mode on this type of interface. I have a local patch to rework the tests
> a bit to allow for interfaces that can't change their promiscuous state,
> and also support not having a MAC address. I'll send these out tomorrow
> (part of a series adding support for netmap VALE and pipe ports).
>
I think that we can close these questions after implementing pktio 
capabilities
api. promisc mode is useless for that kind of pktio and well for loop. 
Mac addr is also
fake here.

To run pktio test for that pktio will be a little tricky. If it's not 
stopper I would add it later to
not repost such huge patches.

Maxim.
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index dfb5a91..834354c 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -151,10 +151,12 @@  __LIB__libodp_la_SOURCES = \
 			   odp_packet_flags.c \
 			   odp_packet_io.c \
 			   pktio/io_ops.c \
+			   pktio/ipc.c \
 			   pktio/loop.c \
 			   pktio/netmap.c \
 			   pktio/socket.c \
 			   pktio/socket_mmap.c \
+			   pktio/ring.c \
 			   odp_pool.c \
 			   odp_queue.c \
 			   odp_rwlock.c \
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 4cacca1..a078e52 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -132,6 +132,9 @@  struct odp_buffer_hdr_t {
 	uint32_t                 uarea_size; /* size of user area */
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
+	/* ipc mapped process can not walk over pointers,
+	 * offset has to be used */
+	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 	uint64_t                 order;      /* sequence for ordered queues */
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index 4745bd5..54aa795 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -25,6 +25,7 @@  extern "C" {
 #include <odp_classification_datamodel.h>
 #include <odp_align_internal.h>
 #include <odp_debug_internal.h>
+#include <odp/helper/ring.h>
 
 #include <odp/config.h>
 #include <odp/hints.h>
@@ -55,6 +56,41 @@  typedef struct {
 } pkt_pcap_t;
 #endif
 
+typedef	struct {
+	/* TX */
+	struct  {
+		odph_ring_t	*send; /**< ODP ring for IPC msg packets
+					    indexes transmitted to shared
+					    memory */
+		odph_ring_t	*free; /**< ODP ring for IPC msg packets
+					    indexes already processed by remote
+					    process */
+	} tx;
+	/* RX */
+	struct {
+		odph_ring_t	*recv; /**< ODP ring for IPC msg packets
+					    indexes received from shared
+					     memory (from remote process) */
+		odph_ring_t	*free; /**< ODP ring for IPC msg packets
+					    indexes already processed by
+					    current process */
+	} rx; /* slave */
+	void		*pool_base;		/**< Remote pool base addr */
+	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
+	uint64_t	pkt_size;		/**< Packet size in remote pool */
+	odp_pool_t	pool;			/**< Pool of main process */
+	enum {
+		PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which
+						creates shm */
+		PKTIO_TYPE_IPC_SLAVE	   /**< Slave is the process which
+						connects to shm */
+	} type; /**< define if it's master or slave process */
+	odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send
+				     packet, 0 - not yet ready */
+	void *pinfo;
+	odp_shm_t pinfo_shm;
+} _ipc_pktio_t;
+
 struct pktio_entry {
 	const struct pktio_if_ops *ops; /**< Implementation specific methods */
 	odp_ticketlock_t lock;		/**< entry ticketlock */
@@ -72,6 +108,7 @@  struct pktio_entry {
 #ifdef HAVE_PCAP
 		pkt_pcap_t pkt_pcap;		/**< Using pcap for IO */
 #endif
+		_ipc_pktio_t ipc;		/**< IPC pktio data */
 	};
 	enum {
 		STATE_START = 0,
@@ -151,6 +188,7 @@  extern const pktio_if_ops_t loopback_pktio_ops;
 #ifdef HAVE_PCAP
 extern const pktio_if_ops_t pcap_pktio_ops;
 #endif
+extern const pktio_if_ops_t ipc_pktio_ops;
 extern const pktio_if_ops_t * const pktio_if_ops[];
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
new file mode 100644
index 0000000..bec2af6
--- /dev/null
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -0,0 +1,47 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/packet_io.h>
+#include <odp_packet_io_internal.h>
+#include <odp/packet.h>
+#include <odp_packet_internal.h>
+#include <odp_internal.h>
+#include <odp/shared_memory.h>
+
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+/* IPC packet I/O over shared memory ring */
+#include <odp/helper/ring.h>
+
+/* number of odp buffers in odp ring queue */
+#define PKTIO_IPC_ENTRIES 4096
+
+/* that struct is exported to shared memory, so that processes can find
+ * each other.
+ */
+struct pktio_info {
+	struct {
+		/* number of buffer in remote pool */
+		int shm_pool_bufs_num;
+		/* size of remote pool */
+		size_t shm_pkt_pool_size;
+		/* size of packet/segment in remote pool */
+		uint32_t shm_pkt_size;
+		/* offset from shared memory block start
+		 * to pool_mdata_addr (linux-generic pool specific) */
+		size_t mdata_offset;
+		char pool_name[ODP_POOL_NAME_LEN];
+	} master;
+	struct {
+		/* offset from shared memory block start
+		 * to pool_mdata_addr in remote process.
+		 * (linux-generic pool specific) */
+		size_t mdata_offset;
+		char pool_name[ODP_POOL_NAME_LEN];
+	} slave;
+} ODP_PACKED;
diff --git a/platform/linux-generic/include/odp_shm_internal.h b/platform/linux-generic/include/odp_shm_internal.h
new file mode 100644
index 0000000..1fd7a3c
--- /dev/null
+++ b/platform/linux-generic/include/odp_shm_internal.h
@@ -0,0 +1,21 @@ 
+/* Copyright (c) 2013, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_SHM_INTERNAL_H_
+#define ODP_SHM_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define _ODP_SHM_PROC_NOCREAT 0x4  /**< Do not create shm if not exist */
+#define _ODP_SHM_O_EXCL	      0x8  /**< Do not create shm if exist */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index 20c66f3..0c02f9e 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -19,6 +19,7 @@ 
 #include <odp_schedule_internal.h>
 #include <odp_classification_internal.h>
 #include <odp_debug_internal.h>
+#include <odp_packet_io_ipc_internal.h>
 
 #include <string.h>
 #include <sys/ioctl.h>
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 2036c2a..220dcd2 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -219,8 +219,11 @@  odp_pool_t _pool_create(const char *name,
 			ODP_ALIGN_ROUNDUP(params->pkt.len, seg_len);
 
 		/* Reject create if pkt.len needs too many segments */
-		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG)
+		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG) {
+			ODP_ERR("ODP_BUFFER_MAX_SEG exceed %d(%d)\n",
+				blk_size / seg_len, ODP_BUFFER_MAX_SEG);
 			return ODP_POOL_INVALID;
+		}
 
 		p_udata_size = params->pkt.uarea_size;
 		udata_stride = ODP_ALIGN_ROUNDUP(p_udata_size,
@@ -241,8 +244,12 @@  odp_pool_t _pool_create(const char *name,
 
 	/* Validate requested number of buffers against addressable limits */
 	if (buf_num >
-	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE)))
+	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE))) {
+		ODP_ERR("buf_num %d > then expected %d\n",
+			buf_num, ODP_BUFFER_MAX_BUFFERS /
+			(buf_stride / ODP_CACHE_LINE_SIZE));
 		return ODP_POOL_INVALID;
+	}
 
 	/* Find an unused buffer pool slot and iniitalize it as requested */
 	for (i = 0; i < ODP_CONFIG_POOLS; i++) {
@@ -468,6 +475,9 @@  int odp_pool_destroy(odp_pool_t pool_hdl)
 	/* Call fails if pool has allocated buffers */
 	if (odp_atomic_load_u32(&pool->s.bufcount) < pool->s.buf_num) {
 		POOL_UNLOCK(&pool->s.lock);
+		ODP_DBG("error: pool has allocated buffers %d/%d\n",
+			odp_atomic_load_u32(&pool->s.bufcount),
+			pool->s.buf_num);
 		return -1;
 	}
 
diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c
index ab48dda..6a4b5ba 100644
--- a/platform/linux-generic/odp_shared_memory.c
+++ b/platform/linux-generic/odp_shared_memory.c
@@ -15,6 +15,7 @@ 
 #include <odp/debug.h>
 #include <odp_debug_internal.h>
 #include <odp_align_internal.h>
+#include <odp_shm_internal.h>
 #include <odp/config.h>
 
 #include <unistd.h>
@@ -167,7 +168,7 @@  int odp_shm_free(odp_shm_t shm)
 		return -1;
 	}
 
-	if (block->flags & ODP_SHM_PROC) {
+	if (block->flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
 		ret = shm_unlink(block->name);
 		if (0 != ret) {
 			ODP_DBG("odp_shm_free: shm_unlink failed\n");
@@ -189,7 +190,7 @@  odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
 	int fd = -1;
 	int map_flag = MAP_SHARED;
 	/* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */
-	int oflag = O_RDWR | O_CREAT | O_TRUNC;
+	int oflag = O_RDWR;
 	uint64_t alloc_size;
 	uint64_t page_sz, huge_sz;
 #ifdef MAP_HUGETLB
@@ -207,7 +208,14 @@  odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
 	alloc_hp_size = (size + align + (huge_sz - 1)) & (-huge_sz);
 #endif
 
-	if (flags & ODP_SHM_PROC) {
+	if (flags & ODP_SHM_PROC)
+		oflag |= O_CREAT | O_TRUNC;
+	if (flags & _ODP_SHM_O_EXCL)
+		oflag |= O_EXCL;
+
+	if (flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
+		need_huge_page = 0;
+
 		/* Creates a file to /dev/shm */
 		fd = shm_open(name, oflag,
 			      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
diff --git a/platform/linux-generic/pktio/io_ops.c b/platform/linux-generic/pktio/io_ops.c
index 3b344e6..b7e6f7c 100644
--- a/platform/linux-generic/pktio/io_ops.c
+++ b/platform/linux-generic/pktio/io_ops.c
@@ -18,6 +18,7 @@  const pktio_if_ops_t * const pktio_if_ops[]  = {
 #ifdef HAVE_PCAP
 	&pcap_pktio_ops,
 #endif
+	&ipc_pktio_ops,
 	&sock_mmap_pktio_ops,
 	&sock_mmsg_pktio_ops,
 	NULL
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
new file mode 100644
index 0000000..05fce2a
--- /dev/null
+++ b/platform/linux-generic/pktio/ipc.c
@@ -0,0 +1,729 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp_packet_io_ipc_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_packet_io_internal.h>
+#include <odp_spin_internal.h>
+#include <odp/system_info.h>
+#include <odp_shm_internal.h>
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+/* MAC address for the "ipc" interface */
+static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
+
+static void *_ipc_map_remote_pool(const char *name, size_t size);
+
+static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
+{
+	pool_entry_t *pool;
+	uint32_t pool_id;
+	odp_shm_t shm;
+	odp_shm_info_t info;
+
+	pool_id = pool_handle_to_index(pool_hdl);
+	pool    = get_pool_entry(pool_id);
+	shm = pool->s.pool_shm;
+
+	odp_shm_info(shm, &info);
+
+	return info.name;
+}
+
+/**
+* Look up for shared memory object.
+*
+* @param name   name of shm object
+*
+* @return 0 on success, otherwise non-zero
+*/
+static int _ipc_shm_lookup(const char *name)
+{
+	int shm;
+
+	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
+	if (shm == -1) {
+		if (errno == ENOENT)
+			return -1;
+		ODP_ABORT("shm_open for %s err %s\n",
+			  name, strerror(errno));
+	}
+	close(shm);
+	return 0;
+}
+
+static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
+			       const char *dev,
+			       int *slave)
+{
+	struct pktio_info *pinfo;
+	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+	uint32_t flags;
+	odp_shm_t shm;
+
+	/* Create info about remote pktio */
+	snprintf(name, sizeof(name), "%s_info", dev);
+
+	flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
+
+	shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+			      ODP_CACHE_LINE_SIZE,
+			      flags);
+	if (ODP_SHM_INVALID != shm) {
+		pinfo = odp_shm_addr(shm);
+		pinfo->master.pool_name[0] = 0;
+		*slave = 0;
+	} else {
+		flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
+		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+				      ODP_CACHE_LINE_SIZE,
+				      flags);
+		if (ODP_SHM_INVALID == shm)
+			ODP_ABORT("can not connect to shm\n");
+
+		pinfo = odp_shm_addr(shm);
+		*slave = 1;
+	}
+
+	pktio_entry->s.ipc.pinfo = pinfo;
+	pktio_entry->s.ipc.pinfo_shm = shm;
+
+	return 0;
+}
+
+static int _ipc_master_start(pktio_entry_t *pktio_entry)
+{
+	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
+	int ret;
+	void *ipc_pool_base;
+
+	if (pinfo->slave.mdata_offset == 0)
+		return -1;
+
+	ret = _ipc_shm_lookup(pinfo->slave.pool_name);
+	if (ret) {
+		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
+		return -1;
+	}
+
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
+					     pinfo->master.shm_pkt_pool_size);
+	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->slave.mdata_offset;
+
+	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+
+	ODP_DBG("%s started.\n",  pktio_entry->s.name);
+	return 0;
+}
+
+static int _ipc_init_master(pktio_entry_t *pktio_entry,
+			    const char *dev,
+			    odp_pool_t pool)
+{
+	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+	pool_entry_t *pool_entry;
+	uint32_t pool_id;
+	struct pktio_info *pinfo;
+	const char *pool_name;
+	odp_shm_t shm;
+
+	pool_id = pool_handle_to_index(pool);
+	pool_entry    = get_pool_entry(pool_id);
+
+	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) {
+		ODP_DBG("too big ipc name\n");
+		return -1;
+	}
+
+	/* generate name in shm like ipc_pktio_r for
+	 * to be processed packets ring.
+	 */
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	pktio_entry->s.ipc.tx.send = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.tx.send) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
+		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
+
+	/* generate name in shm like ipc_pktio_p for
+	 * already processed packets
+	 */
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	pktio_entry->s.ipc.tx.free = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.tx.free) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_m_prod;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
+		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	pktio_entry->s.ipc.rx.recv = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.rx.recv) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_m_cons;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
+		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
+	pktio_entry->s.ipc.rx.free = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.rx.free) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_s_prod;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
+		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
+
+	/* Set up pool name for remote info */
+	pinfo = pktio_entry->s.ipc.pinfo;
+	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+	memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
+	pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
+	pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
+	pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
+	pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
+			       pool_entry->s.pool_base_addr;
+	pinfo->slave.mdata_offset = 0;
+
+	pktio_entry->s.ipc.pool = pool;
+
+	ODP_DBG("Pre init... DONE.\n");
+
+	_ipc_master_start(pktio_entry);
+
+	return 0;
+
+free_s_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_cons:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+	return -1;
+}
+
+static void _ipc_export_pool(struct pktio_info *pinfo,
+			     odp_pool_t pool)
+{
+	pool_entry_t *pool_entry;
+
+	pool_entry = odp_pool_to_entry(pool);
+	if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
+		ODP_ABORT("pktio for same name should have the same pool size\n");
+	if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
+		ODP_ABORT("pktio for same name should have the same pool size\n");
+
+	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
+		 pool_entry->s.name);
+	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
+				    pool_entry->s.pool_base_addr;
+}
+
+static void *_ipc_map_remote_pool(const char *name, size_t size)
+{
+	odp_shm_t shm;
+	void *addr;
+
+	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
+	shm = odp_shm_reserve(name,
+			      size,
+			      ODP_CACHE_LINE_SIZE,
+			      _ODP_SHM_PROC_NOCREAT);
+	if (shm == ODP_SHM_INVALID)
+		ODP_ABORT("unable map %s\n", name);
+
+	addr = odp_shm_addr(shm);
+	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
+		addr, (char *)addr + size, size, name);
+	return addr;
+}
+
+static void *_ipc_shm_map(char *name, size_t size)
+{
+	odp_shm_t shm;
+	int ret;
+
+	ret = _ipc_shm_lookup(name);
+	if (ret == -1)
+		return NULL;
+
+	shm = odp_shm_reserve(name, size,
+			      ODP_CACHE_LINE_SIZE,
+			      _ODP_SHM_PROC_NOCREAT);
+	if (ODP_SHM_INVALID == shm)
+		ODP_ABORT("unable to map: %s\n", name);
+
+	return odp_shm_addr(shm);
+}
+
+static int _ipc_init_slave(const char *dev,
+			   pktio_entry_t *pktio_entry,
+			   odp_pool_t pool)
+{
+	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r")))
+		ODP_ABORT("too big ipc name\n");
+
+	pktio_entry->s.ipc.pool = pool;
+	return 0;
+}
+
+static int _ipc_slave_start(pktio_entry_t *pktio_entry)
+{
+	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
+			   sizeof(odph_ring_t);
+	struct pktio_info *pinfo;
+	void *ipc_pool_base;
+	odp_shm_t shm;
+	const char *dev = pktio_entry->s.name;
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
+	if (!pktio_entry->s.ipc.rx.recv) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		sleep(1);
+		return -1;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.recv),
+		odph_ring_free_count(pktio_entry->s.ipc.rx.recv));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	if (!pktio_entry->s.ipc.rx.free) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_m_prod;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.rx.free),
+		odph_ring_free_count(pktio_entry->s.ipc.rx.free));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
+	if (!pktio_entry->s.ipc.tx.send) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_m_cons;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.send),
+		odph_ring_free_count(pktio_entry->s.ipc.tx.send));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
+	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	if (!pktio_entry->s.ipc.tx.free) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_s_prod;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.tx.free),
+		odph_ring_free_count(pktio_entry->s.ipc.tx.free));
+
+	/* Get info about remote pool */
+	pinfo = pktio_entry->s.ipc.pinfo;
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
+					     pinfo->master.shm_pkt_pool_size);
+	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->master.mdata_offset;
+	pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
+
+	/* @todo: to simplify in linux-generic implementation we create pool for
+	 * packets from IPC queue. On receive implementation copies packets to
+	 * that pool. Later we can try to reuse original pool without packets
+	 * copying. (pkt refcounts needs to be implemented).
+	 */
+	_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
+
+	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+
+	ODP_DBG("%s started.\n",  pktio_entry->s.name);
+	return 0;
+
+free_s_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_cons:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+	return -1;
+}
+
+static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
+			  pktio_entry_t *pktio_entry,
+			  const char *dev,
+			  odp_pool_t pool)
+{
+	int ret = -1;
+	int slave;
+
+	_ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == ODPH_RING_NAMESIZE,
+			   "mismatch pool and ring name arrays");
+
+	if (strncmp(dev, "ipc", 3))
+		return -1;
+
+	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
+
+	_ipc_map_pktio_info(pktio_entry, dev, &slave);
+	pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
+						 PKTIO_TYPE_IPC_SLAVE;
+
+	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+		ODP_DBG("process %d is master\n", getpid());
+		ret = _ipc_init_master(pktio_entry, dev, pool);
+	} else {
+		ODP_DBG("process %d is slave\n", getpid());
+		ret = _ipc_init_slave(dev, pktio_entry, pool);
+	}
+
+	return ret;
+}
+
+static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
+				    uint32_t offset,
+				    uint32_t *seglen,
+				    uint32_t limit)
+{
+	int seg_index  = offset / buf->segsize;
+	int seg_offset = offset % buf->segsize;
+	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
+
+	if (seglen) {
+		uint32_t buf_left = limit - offset;
+		*seglen = seg_offset + buf_left <= buf->segsize ?
+			buf_left : buf->segsize - seg_offset;
+	}
+
+	return (void *)(seg_offset + (uint8_t *)addr);
+}
+
+static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
+				    uint32_t offset, uint32_t *seglen)
+{
+	if (offset > pkt_hdr->frame_len)
+		return NULL;
+
+	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
+			  pkt_hdr->headroom + offset, seglen,
+			  pkt_hdr->headroom + pkt_hdr->frame_len);
+}
+
+static void _ipc_free_ring_packets(odph_ring_t *r)
+{
+	odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+	int ret;
+	void **rbuf_p;
+	int i;
+
+	rbuf_p = (void *)&r_p_pkts;
+
+	while (1) {
+		ret = odph_ring_mc_dequeue_burst(r, rbuf_p,
+						 PKTIO_IPC_ENTRIES);
+		if (0 == ret)
+			break;
+		for (i = 0; i < ret; i++) {
+			if (r_p_pkts[i] != ODP_PACKET_INVALID)
+				odp_packet_free(r_p_pkts[i]);
+		}
+	}
+}
+
+static int ipc_pktio_recv(pktio_entry_t *pktio_entry,
+			  odp_packet_t pkt_table[], unsigned len)
+{
+	int pkts = 0;
+	int i;
+	odph_ring_t *r;
+	odph_ring_t *r_p;
+
+	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
+	void **ipcbufs_p = (void *)&remote_pkts;
+	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
+
+	if (odp_unlikely(!ready)) {
+		ODP_DBG("start pktio is missing before usage?\n");
+		return -1;
+	}
+
+	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+
+	r = pktio_entry->s.ipc.rx.recv;
+	pkts = odph_ring_mc_dequeue_burst(r, ipcbufs_p, len);
+	if (odp_unlikely(pkts < 0))
+		ODP_ABORT("error to dequeue no packets\n");
+
+	/* fast path */
+	if (odp_likely(0 == pkts))
+		return 0;
+
+	for (i = 0; i < pkts; i++) {
+		odp_pool_t pool;
+		odp_packet_t pkt;
+		odp_packet_hdr_t phdr;
+		void *ptr;
+		odp_buffer_bits_t handle;
+		int idx; /* Remote packet has coded pool and index.
+			  * We need only index.*/
+		void *pkt_data;
+		void *remote_pkt_data;
+
+		if (remote_pkts[i] == ODP_PACKET_INVALID)
+			continue;
+
+		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
+		idx = handle.index;
+
+		/* Link to packed data. To this line we have Zero-Copy between
+		 * processes, to simplify use packet copy in that version which
+		 * can be removed later with more advance buffer management
+		 * (ref counters).
+		 */
+		/* reverse odp_buf_to_hdr() */
+		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
+		      (idx * ODP_CACHE_LINE_SIZE);
+		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
+
+		/* Allocate new packet. Select*/
+		pool = pktio_entry->s.ipc.pool;
+		if (odp_unlikely(pool == ODP_POOL_INVALID))
+			ODP_ABORT("invalid pool");
+
+		pkt = odp_packet_alloc(pool, phdr.frame_len);
+		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
+			/* Original pool might be smaller then
+			*  PKTIO_IPC_ENTRIES. If packet can not be
+			 * allocated from pool at this time,
+			 * simple get in on next recv() call.
+			 */
+			if (i == 0)
+				return 0;
+			break;
+		}
+
+		/* Copy packet data. */
+		pkt_data = odp_packet_data(pkt);
+		if (odp_unlikely(!pkt_data))
+			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
+				  (PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.ipc.type));
+
+		remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
+		if (odp_unlikely(!remote_pkt_data))
+			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
+				  (PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.ipc.type));
+
+		/* @todo fix copy packet!!! */
+		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+
+		/* Copy packets L2, L3 parsed offsets and size */
+		copy_packet_parser_metadata(&phdr, odp_packet_hdr(pkt));
+
+		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
+		odp_packet_hdr(pkt)->headroom = phdr.headroom;
+		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
+		pkt_table[i] = pkt;
+	}
+
+	/* Now tell other process that we no longer need that buffers.*/
+	r_p = pktio_entry->s.ipc.rx.free;
+	pkts = odph_ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+	if (odp_unlikely(pkts < 0))
+		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+
+	return pkts;
+}
+
+static int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+			  unsigned len)
+{
+	odph_ring_t *r;
+	void **rbuf_p;
+	int ret;
+	unsigned i;
+	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
+
+	if (odp_unlikely(!ready))
+		return 0;
+
+	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+
+	/* Prepare packets: calculate offset from address. */
+	for (i = 0; i < len; i++) {
+		int j;
+		odp_packet_t pkt =  pkt_table[i];
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		odp_buffer_bits_t handle;
+		uint32_t cur_mapped_pool_id =
+			 pool_handle_to_index(pktio_entry->s.ipc.pool);
+		uint32_t pool_id;
+
+		/* do copy if packet was allocated from not mapped pool */
+		handle.handle = _odp_packet_to_buffer(pkt);
+		pool_id = handle.pool_id;
+		if (pool_id != cur_mapped_pool_id) {
+			odp_packet_t newpkt;
+
+			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
+			if (newpkt == ODP_PACKET_INVALID)
+				ODP_ABORT("Unable to copy packet\n");
+
+			odp_packet_free(pkt);
+			pkt_table[i] = newpkt;
+		}
+
+		rbuf_p = (void *)&pkt;
+
+		/* buf_hdr.addr can not be used directly in remote process,
+		 * convert it to offset
+		 */
+		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
+			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
+				(char *)pkt_hdr->buf_hdr.addr[j];
+		}
+	}
+
+	/* Put packets to ring to be processed by other process. */
+	rbuf_p = (void *)&pkt_table[0];
+	r = pktio_entry->s.ipc.tx.send;
+	ret = odph_ring_mp_enqueue_burst(r, rbuf_p, len);
+	if (odp_unlikely(ret < 0)) {
+		ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
+			getpid(),
+			(PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type),
+			ret);
+		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
+			odph_ring_full(r), odph_ring_count(r),
+			odph_ring_free_count(r));
+	}
+
+	return ret;
+}
+
+static int ipc_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
+{
+	/* mtu not limited, pool settings are used. */
+	return (9 * 1024);
+}
+
+static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
+			    void *mac_addr)
+{
+	memcpy(mac_addr, pktio_ipc_mac, ETH_ALEN);
+	return ETH_ALEN;
+}
+
+static int ipc_start(pktio_entry_t *pktio_entry)
+{
+	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
+
+	if (ready) {
+		ODP_ABORT("%s Already started\n", pktio_entry->s.name);
+		return -1;
+	}
+
+	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER)
+		return _ipc_master_start(pktio_entry);
+	else
+		return _ipc_slave_start(pktio_entry);
+}
+
+static int ipc_stop(pktio_entry_t *pktio_entry)
+{
+	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
+
+	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
+	/* other process can transfer packets from one ring to
+	 * other, use delay here to free that packets. */
+	sleep(0.3);
+	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+
+	return 0;
+}
+
+static int ipc_close(pktio_entry_t *pktio_entry)
+{
+	ipc_stop(pktio_entry);
+
+	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+		char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+		char *dev = pktio_entry->s.name;
+		odp_shm_t shm;
+
+		/* unlink this pktio info */
+		odp_shm_free(pktio_entry->s.ipc.pinfo_shm);
+
+		/* unlink rings */
+		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
+		shm = odp_shm_lookup(ipc_shm_name);
+		odp_shm_free(shm);
+		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+		shm = odp_shm_lookup(ipc_shm_name);
+		odp_shm_free(shm);
+		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+		shm = odp_shm_lookup(ipc_shm_name);
+		odp_shm_free(shm);
+		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+		shm = odp_shm_lookup(ipc_shm_name);
+		odp_shm_free(shm);
+	}
+
+	return 0;
+}
+
+const pktio_if_ops_t ipc_pktio_ops = {
+	.init = NULL,
+	.term = NULL,
+	.open = ipc_pktio_open,
+	.close = ipc_close,
+	.recv =  ipc_pktio_recv,
+	.send = ipc_pktio_send,
+	.start = ipc_start,
+	.stop = ipc_stop,
+	.mtu_get = ipc_mtu_get,
+	.promisc_mode_set = NULL,
+	.promisc_mode_get = NULL,
+	.mac_get = ipc_mac_addr_get
+};
diff --git a/platform/linux-generic/pktio/ring.c b/platform/linux-generic/pktio/ring.c
new file mode 120000
index 0000000..d35c589
--- /dev/null
+++ b/platform/linux-generic/pktio/ring.c
@@ -0,0 +1 @@ 
+../../../helper/ring.c