diff mbox

[PATCHv9,7/8] linux-generic: add ipc pktio support

Message ID 1444391944-20772-8-git-send-email-maxim.uvarov@linaro.org
State New
Headers show

Commit Message

Maxim Uvarov Oct. 9, 2015, 11:59 a.m. UTC
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
---
 platform/linux-generic/Makefile.am                 |   2 +
 .../linux-generic/include/odp_buffer_internal.h    |   3 +
 .../linux-generic/include/odp_packet_io_internal.h |  35 +
 .../include/odp_packet_io_ipc_internal.h           |  51 ++
 platform/linux-generic/include/odp_shm_internal.h  |  20 +
 platform/linux-generic/odp_packet_io.c             |   1 +
 platform/linux-generic/odp_pool.c                  |  11 +-
 platform/linux-generic/odp_shared_memory.c         |  10 +-
 platform/linux-generic/pktio/io_ops.c              |   1 +
 platform/linux-generic/pktio/ipc.c                 | 720 +++++++++++++++++++++
 platform/linux-generic/pktio/ring.c                |   1 +
 11 files changed, 851 insertions(+), 4 deletions(-)
 create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
 create mode 100644 platform/linux-generic/include/odp_shm_internal.h
 create mode 100644 platform/linux-generic/pktio/ipc.c
 create mode 120000 platform/linux-generic/pktio/ring.c

\ No newline at end of file

Comments

Nicolas Morey-Chaisemartin Oct. 13, 2015, 1:50 p.m. UTC | #1
On 10/09/2015 01:59 PM, Maxim Uvarov wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  35 +
>  .../include/odp_packet_io_ipc_internal.h           |  51 ++
>  platform/linux-generic/include/odp_shm_internal.h  |  20 +
>  platform/linux-generic/odp_packet_io.c             |   1 +
>  platform/linux-generic/odp_pool.c                  |  11 +-
>  platform/linux-generic/odp_shared_memory.c         |  10 +-
>  platform/linux-generic/pktio/io_ops.c              |   1 +
>  platform/linux-generic/pktio/ipc.c                 | 720 +++++++++++++++++++++
>  platform/linux-generic/pktio/ring.c                |   1 +
>  11 files changed, 851 insertions(+), 4 deletions(-)
>  create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>  create mode 100644 platform/linux-generic/pktio/ipc.c
>  create mode 120000 platform/linux-generic/pktio/ring.c
>
> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
> index b9ed3b0..71353dd 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -151,9 +151,11 @@ __LIB__libodp_la_SOURCES = \
>  			   odp_packet_flags.c \
>  			   odp_packet_io.c \
>  			   pktio/io_ops.c \
> +			   pktio/ipc.c \
>  			   pktio/loop.c \
>  			   pktio/socket.c \
>  			   pktio/socket_mmap.c \
> +			   pktio/ring.c \
>  			   odp_pool.c \
>  			   odp_queue.c \
>  			   odp_rwlock.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
> index 4cacca1..a078e52 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>  	uint32_t                 uarea_size; /* size of user area */
>  	uint32_t                 segcount;   /* segment count */
>  	uint32_t                 segsize;    /* segment size */
> +	/* ipc mapped process can not walk over pointers,
> +	 * offset has to be used */
> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>  	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>  	uint64_t                 order;      /* sequence for ordered queues */
>  	queue_entry_t           *origin_qe;  /* ordered queue origin */
I haven't been through everything yet but should this be an union with the addr ?
The odp_buffer_hdr_t is already quite large as it is.

Nicolas
Maxim Uvarov Oct. 13, 2015, 2:52 p.m. UTC | #2
On 10/13/2015 16:50, Nicolas Morey-Chaisemartin wrote:
>
> On 10/09/2015 01:59 PM, Maxim Uvarov wrote:
>> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
>> ---
>>   platform/linux-generic/Makefile.am                 |   2 +
>>   .../linux-generic/include/odp_buffer_internal.h    |   3 +
>>   .../linux-generic/include/odp_packet_io_internal.h |  35 +
>>   .../include/odp_packet_io_ipc_internal.h           |  51 ++
>>   platform/linux-generic/include/odp_shm_internal.h  |  20 +
>>   platform/linux-generic/odp_packet_io.c             |   1 +
>>   platform/linux-generic/odp_pool.c                  |  11 +-
>>   platform/linux-generic/odp_shared_memory.c         |  10 +-
>>   platform/linux-generic/pktio/io_ops.c              |   1 +
>>   platform/linux-generic/pktio/ipc.c                 | 720 +++++++++++++++++++++
>>   platform/linux-generic/pktio/ring.c                |   1 +
>>   11 files changed, 851 insertions(+), 4 deletions(-)
>>   create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>>   create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>>   create mode 100644 platform/linux-generic/pktio/ipc.c
>>   create mode 120000 platform/linux-generic/pktio/ring.c
>>
>> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
>> index b9ed3b0..71353dd 100644
>> --- a/platform/linux-generic/Makefile.am
>> +++ b/platform/linux-generic/Makefile.am
>> @@ -151,9 +151,11 @@ __LIB__libodp_la_SOURCES = \
>>   			   odp_packet_flags.c \
>>   			   odp_packet_io.c \
>>   			   pktio/io_ops.c \
>> +			   pktio/ipc.c \
>>   			   pktio/loop.c \
>>   			   pktio/socket.c \
>>   			   pktio/socket_mmap.c \
>> +			   pktio/ring.c \
>>   			   odp_pool.c \
>>   			   odp_queue.c \
>>   			   odp_rwlock.c \
>> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
>> index 4cacca1..a078e52 100644
>> --- a/platform/linux-generic/include/odp_buffer_internal.h
>> +++ b/platform/linux-generic/include/odp_buffer_internal.h
>> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>>   	uint32_t                 uarea_size; /* size of user area */
>>   	uint32_t                 segcount;   /* segment count */
>>   	uint32_t                 segsize;    /* segment size */
>> +	/* ipc mapped process can not walk over pointers,
>> +	 * offset has to be used */
>> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>>   	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>>   	uint64_t                 order;      /* sequence for ordered queues */
>>   	queue_entry_t           *origin_qe;  /* ordered queue origin */
> I haven't been through everything yet but should this be an union with the addr ?
> The odp_buffer_hdr_t is already quite large as it is.
>
> Nicolas
Yes, that can be union with addr, I think. Will update.

Maxim.
Stuart Haslam Oct. 16, 2015, 6:16 p.m. UTC | #3
On Fri, Oct 09, 2015 at 02:59:03PM +0300, Maxim Uvarov wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  35 +
>  .../include/odp_packet_io_ipc_internal.h           |  51 ++
>  platform/linux-generic/include/odp_shm_internal.h  |  20 +
>  platform/linux-generic/odp_packet_io.c             |   1 +
>  platform/linux-generic/odp_pool.c                  |  11 +-
>  platform/linux-generic/odp_shared_memory.c         |  10 +-
>  platform/linux-generic/pktio/io_ops.c              |   1 +
>  platform/linux-generic/pktio/ipc.c                 | 720 +++++++++++++++++++++
>  platform/linux-generic/pktio/ring.c                |   1 +
>  11 files changed, 851 insertions(+), 4 deletions(-)
>  create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>  create mode 100644 platform/linux-generic/pktio/ipc.c
>  create mode 120000 platform/linux-generic/pktio/ring.c
> 
> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
> index b9ed3b0..71353dd 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -151,9 +151,11 @@ __LIB__libodp_la_SOURCES = \
>  			   odp_packet_flags.c \
>  			   odp_packet_io.c \
>  			   pktio/io_ops.c \
> +			   pktio/ipc.c \
>  			   pktio/loop.c \
>  			   pktio/socket.c \
>  			   pktio/socket_mmap.c \
> +			   pktio/ring.c \
>  			   odp_pool.c \
>  			   odp_queue.c \
>  			   odp_rwlock.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
> index 4cacca1..a078e52 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>  	uint32_t                 uarea_size; /* size of user area */
>  	uint32_t                 segcount;   /* segment count */
>  	uint32_t                 segsize;    /* segment size */
> +	/* ipc mapped process can not walk over pointers,
> +	 * offset has to be used */
> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>  	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>  	uint64_t                 order;      /* sequence for ordered queues */
>  	queue_entry_t           *origin_qe;  /* ordered queue origin */
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
> index 6b03051..62e6829 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -23,6 +23,7 @@ extern "C" {
>  #include <odp_classification_datamodel.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp/helper/ring.h>
>  
>  #include <odp/config.h>
>  #include <odp/hints.h>
> @@ -36,6 +37,38 @@ typedef struct {
>  	odp_bool_t promisc;		/**< promiscuous mode state */
>  } pkt_loop_t;
>  
> +typedef	struct {
> +	/* TX */
> +	struct  {
> +		odph_ring_t	*prod; /**< ODP ring for IPC msg packets
> +					  indexes transmitted to shared
> +					  memory */
> +		odph_ring_t	*cons; /**< ODP ring for IPC msg packets
> +					    indexes already processed by remote
> +					    process */
> +	} m; /* master */
> +	/* RX */
> +	struct {
> +		odph_ring_t	*prod; /**< ODP ring for IPC msg packets
> +					    indexes received from shared
> +					     memory (from remote process) */
> +		odph_ring_t	*cons; /**< ODP ring for IPC msg packets
> +					    indexes already processed by
> +					    current process */
> +	} s; /* slave */

It would be better to do away with the master/slave naming and instead use
names that match their function, e.g.; rx.recv/rx.free and tx.recv/tx.free

> +	void		*pool_base;		/**< Remote pool base addr */
> +	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
> +	uint64_t	pkt_size;		/**< Packet size in remote pool */
> +	odp_pool_t	pool;			/**< Pool of main process */
> +	odp_shm_t	pool_shm;		/**< Shm memory for remote pool */
> +	enum {
> +		ODP_PKTIO_TYPE_IPC = 0,
> +		ODP_PKTIO_TYPE_IPC_SLAVE

Shouldn't have the ODP_ prefix

> +	} type; /**< define if it's master or slave process */
> +	int  ready; /**< 1 - pktio is ready and can recv/send packet, 0 - not yet ready */

This needs to be atomic (unless you can git rid of it by moving the
_post_init()s?).

> +	void *pinfo;
> +} _ipc_pktio_t;
> +
>  struct pktio_entry {
>  	const struct pktio_if_ops *ops; /**< Implementation specific methods */
>  	odp_spinlock_t lock;		/**< entry spinlock */
> @@ -49,6 +82,7 @@ struct pktio_entry {
>  		pkt_sock_t pkt_sock;		/**< using socket API for IO */
>  		pkt_sock_mmap_t pkt_sock_mmap;	/**< using socket mmap
>  						 *   API for IO */
> +		_ipc_pktio_t ipc;		/**< IPC pktio data */
>  	};
>  	enum {
>  		STATE_START = 0,
> @@ -124,6 +158,7 @@ int pktin_poll(pktio_entry_t *entry);
>  extern const pktio_if_ops_t sock_mmsg_pktio_ops;
>  extern const pktio_if_ops_t sock_mmap_pktio_ops;
>  extern const pktio_if_ops_t loopback_pktio_ops;
> +extern const pktio_if_ops_t ipc_pktio_ops;
>  extern const pktio_if_ops_t * const pktio_if_ops[];
>  
>  #ifdef __cplusplus
> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> new file mode 100644
> index 0000000..25fd1d6
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> @@ -0,0 +1,51 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/packet_io.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp/packet.h>
> +#include <odp_packet_internal.h>
> +#include <odp_internal.h>
> +#include <odp/shared_memory.h>
> +
> +#include <string.h>
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +/* IPC packet I/O over shared memory ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +					odp ring queue */
> +
> +/* that struct is exported to shared memory, so that 2 processes can find
> + * each other.
> + */
> +struct pktio_info {
> +	char remote_pool_name[ODP_POOL_NAME_LEN];
> +	int shm_pool_bufs_num;	/*< number of buffer in remote pool */
> +	size_t shm_pkt_pool_size; /*< size of remote pool */
> +	uint32_t shm_pkt_size; /*< size of packet/segment in remote pool */
> +	odp_shm_t shm;	/*< current structure stored in this shm */
> +	size_t mdata_offset; /*< offset from shared memory block start
> +			      *to pool_mdata_addr
> +			      * (linux-generic pool specific) */
> +	struct {
> +		size_t mdata_offset; /*< offset from shared memory block start
> +				      * to pool_mdata_addr in remote process.
> +				      * (linux-generic pool specific) */
> +		char   pool_name[ODP_POOL_NAME_LEN];
> +	} slave;
> +} __packed;

ODP_PACKED

> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +		   odp_pool_t pool);
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		   unsigned len);
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		   unsigned len);
> diff --git a/platform/linux-generic/include/odp_shm_internal.h b/platform/linux-generic/include/odp_shm_internal.h
> new file mode 100644
> index 0000000..f3ce892
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_shm_internal.h
> @@ -0,0 +1,20 @@
> +/* Copyright (c) 2013, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#ifndef ODP_SHM_INTERNAL_H_
> +#define ODP_SHM_INTERNAL_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#define _ODP_SHM_PROC_NOCREAT 0x4  /**< Do not create shm if not exist */
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index 2e7b199..bedf221 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -18,6 +18,7 @@
>  #include <odp_schedule_internal.h>
>  #include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_packet_io_ipc_internal.h>
>  
>  #include <string.h>
>  #include <sys/ioctl.h>
> diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
> index 2036c2a..e757235 100644
> --- a/platform/linux-generic/odp_pool.c
> +++ b/platform/linux-generic/odp_pool.c
> @@ -219,8 +219,11 @@ odp_pool_t _pool_create(const char *name,
>  			ODP_ALIGN_ROUNDUP(params->pkt.len, seg_len);
>  
>  		/* Reject create if pkt.len needs too many segments */
> -		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG)
> +		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG) {
> +			ODP_ERR("ODP_BUFFER_MAX_SEG exceed %d(%d)\n",
> +				blk_size / seg_len, ODP_BUFFER_MAX_SEG);
>  			return ODP_POOL_INVALID;
> +		}
>  
>  		p_udata_size = params->pkt.uarea_size;
>  		udata_stride = ODP_ALIGN_ROUNDUP(p_udata_size,
> @@ -241,8 +244,12 @@ odp_pool_t _pool_create(const char *name,
>  
>  	/* Validate requested number of buffers against addressable limits */
>  	if (buf_num >
> -	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE)))
> +	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE))) {
> +		ODP_ERR("buf_num %d > then expected %d\n",
> +			buf_num, ODP_BUFFER_MAX_BUFFERS /
> +			(buf_stride / ODP_CACHE_LINE_SIZE));
>  		return ODP_POOL_INVALID;
> +	}
>  
>  	/* Find an unused buffer pool slot and iniitalize it as requested */
>  	for (i = 0; i < ODP_CONFIG_POOLS; i++) {
> diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c
> index ab48dda..5de48d3 100644
> --- a/platform/linux-generic/odp_shared_memory.c
> +++ b/platform/linux-generic/odp_shared_memory.c
> @@ -15,6 +15,7 @@
>  #include <odp/debug.h>
>  #include <odp_debug_internal.h>
>  #include <odp_align_internal.h>
> +#include <odp_shm_internal.h>
>  #include <odp/config.h>
>  
>  #include <unistd.h>
> @@ -189,7 +190,7 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>  	int fd = -1;
>  	int map_flag = MAP_SHARED;
>  	/* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */
> -	int oflag = O_RDWR | O_CREAT | O_TRUNC;
> +	int oflag = O_RDWR;
>  	uint64_t alloc_size;
>  	uint64_t page_sz, huge_sz;
>  #ifdef MAP_HUGETLB
> @@ -207,7 +208,12 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
>  	alloc_hp_size = (size + align + (huge_sz - 1)) & (-huge_sz);
>  #endif
>  
> -	if (flags & ODP_SHM_PROC) {
> +	if (flags & ODP_SHM_PROC)
> +		oflag |= O_CREAT | O_TRUNC;
> +
> +	if (flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
> +		need_huge_page = 0;
> +
>  		/* Creates a file to /dev/shm */
>  		fd = shm_open(name, oflag,
>  			      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
> diff --git a/platform/linux-generic/pktio/io_ops.c b/platform/linux-generic/pktio/io_ops.c
> index 1d47e74..5d8d4a5 100644
> --- a/platform/linux-generic/pktio/io_ops.c
> +++ b/platform/linux-generic/pktio/io_ops.c
> @@ -12,6 +12,7 @@
>   * Array must be NULL terminated */
>  const pktio_if_ops_t * const pktio_if_ops[]  = {
>  	&loopback_pktio_ops,
> +	&ipc_pktio_ops,
>  	&sock_mmap_pktio_ops,
>  	&sock_mmsg_pktio_ops,
>  	NULL
> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
> new file mode 100644
> index 0000000..70e8854
> --- /dev/null
> +++ b/platform/linux-generic/pktio/ipc.c
> @@ -0,0 +1,720 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_packet_io_ipc_internal.h>
> +#include <odp_debug_internal.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_spin_internal.h>
> +#include <odp/system_info.h>
> +#include <odp_shm_internal.h>
> +
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +
> +/* MAC address for the "ipc" interface */
> +static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size);
> +
> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> +{
> +	pool_entry_t *pool;
> +	uint32_t pool_id;
> +	odp_shm_t shm;
> +	odp_shm_info_t info;
> +
> +	pool_id = pool_handle_to_index(pool_hdl);
> +	pool    = get_pool_entry(pool_id);
> +	shm = pool->s.pool_shm;
> +
> +	odp_shm_info(shm, &info);
> +
> +	return info.name;
> +}
> +
> +/**
> +* Look up for shared memory object.
> +*
> +* @param name   name of shm object
> +*
> +* @return 0 on success, otherwise non-zero
> +*/
> +static int _odp_shm_lookup_ipc(const char *name)
> +{
> +	int shm;
> +
> +	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> +	if (shm == -1) {
> +		if (errno == ENOENT)
> +			return -1;
> +		ODP_ABORT("shm_open for %s err %s\n",
> +			  name, strerror(errno));
> +	}
> +	close(shm);
> +	return 0;
> +}
> +
> +static struct pktio_info *_ipc_map_pool_info(pktio_entry_t *pktio_entry,
> +					     const char *pool_name,
> +					     int flag)
> +{
> +	struct pktio_info *pinfo;
> +	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
> +
> +	/* Create info about remote pktio */
> +	snprintf(name, sizeof(name), "%s_info", pool_name);
> +	odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +			ODP_CACHE_LINE_SIZE,
> +			flag);
> +	if (ODP_SHM_INVALID == shm)
> +		ODP_ABORT("unable to reserve memory for shm info");
> +	pinfo = odp_shm_addr(shm);
> +	if (flag != _ODP_SHM_PROC_NOCREAT)
> +		pinfo->remote_pool_name[0] = 0;
> +
> +	pktio_entry->s.ipc.pool_shm = shm;
> +	return pinfo;
> +}
> +
> +static int master_post_init(pktio_entry_t *pktio_entry)
> +{
> +	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
> +	int ret;
> +	void *ipc_pool_base;
> +
> +	if (pinfo->slave.mdata_offset == 0)
> +		return -1;
> +
> +	ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
> +	if (ret) {
> +		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
> +		return -1;
> +	}
> +
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> +					     pinfo->shm_pkt_pool_size);
> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->slave.mdata_offset;
> +
> +	pktio_entry->s.ipc.ready = 1;
> +
> +	ODP_DBG("Post init... DONE.\n");
> +	return 0;
> +}
> +
> +static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char *dev,
> +				  odp_pool_t pool)
> +{
> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> +	pool_entry_t *pool_entry;
> +	uint32_t pool_id;
> +	struct pktio_info *pinfo;
> +	const char *pool_name;
> +	odp_shm_t shm;
> +
> +	pool_id = pool_handle_to_index(pool);
> +	pool_entry    = get_pool_entry(pool_id);
> +
> +	if (ODP_POOL_NAME_LEN != ODPH_RING_NAMESIZE)
> +		ODP_ABORT("");

This should be an _ODP_STATIC_ASSERT()

> +
> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) {
> +		ODP_DBG("too big ipc name\n");
> +		return -1;
> +	}
> +
> +	/* generate name in shm like ipc_pktio_r for
> +	 * to be processed packets ring.
> +	 */
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	pktio_entry->s.ipc.m.prod = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.m.prod) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		return -1;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.prod),
> +		odph_ring_free_count(pktio_entry->s.ipc.m.prod));
> +
> +	/* generate name in shm like ipc_pktio_p for
> +	 * already processed packets
> +	 */
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	pktio_entry->s.ipc.m.cons = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.m.cons) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_m_prod;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.cons),
> +		odph_ring_free_count(pktio_entry->s.ipc.m.cons));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	pktio_entry->s.ipc.s.prod = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.s.prod) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_m_cons;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.prod),
> +		odph_ring_free_count(pktio_entry->s.ipc.s.prod));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
> +	pktio_entry->s.ipc.s.cons = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
> +	if (!pktio_entry->s.ipc.s.cons) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		goto free_s_prod;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.cons),
> +		odph_ring_free_count(pktio_entry->s.ipc.s.cons));
> +
> +	/* Memory to store information about exported pool */
> +	pinfo = _ipc_map_pool_info(pktio_entry, dev, ODP_SHM_PROC);
> +
> +	/* Set up pool name for remote info */
> +	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> +	memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
> +	pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
> +	pinfo->shm_pool_bufs_num = pool_entry->s.buf_num;
> +	pinfo->shm_pkt_size = pool_entry->s.seg_size;
> +	pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
> +			       pool_entry->s.pool_base_addr;
> +	pinfo->slave.mdata_offset = 0;
> +
> +	pktio_entry->s.ipc.pinfo = pinfo;
> +	pktio_entry->s.ipc.pool = pool;
> +
> +	ODP_DBG("Pre init... DONE.\n");
> +
> +	master_post_init(pktio_entry);
> +
> +	return 0;
> +
> +free_s_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_cons:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +	return -1;
> +}
> +
> +static void _odp_ipc_export_pool(struct pktio_info *pinfo,
> +				 odp_pool_t pool)
> +{
> +	pool_entry_t *pool_entry;
> +
> +	pool_entry = odp_pool_to_entry(pool);
> +	if (pool_entry->s.blk_size != pinfo->shm_pkt_size)
> +		ODP_ABORT("pktio for same name should have the same pool size\n");
> +	if (pool_entry->s.buf_num != (unsigned)pinfo->shm_pool_bufs_num)
> +		ODP_ABORT("pktio for same name should have the same pool size\n");
> +
> +	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
> +		 pool_entry->s.name);
> +	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> +				    pool_entry->s.pool_base_addr;
> +}
> +
> +static void _verify_addr(void *addr, size_t size)
> +{
> +	char *x = addr;
> +	unsigned int i;
> +
> +	for (i = 0; i < size; i++)
> +		*x += 1;
> +	for (i = 0; i < size; i++)
> +		*x -= 1;
> +}

Is this just debug code that can be removed/ifdef'd?

> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size)
> +{
> +	odp_shm_t shm;
> +	void *addr;
> +
> +	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> +	shm = odp_shm_reserve(name,
> +			      size,
> +			      ODP_CACHE_LINE_SIZE,
> +			      _ODP_SHM_PROC_NOCREAT);
> +	if (shm == ODP_SHM_INVALID)
> +		ODP_ABORT("unable map %s\n", name);
> +
> +	addr = odp_shm_addr(shm);
> +	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
> +		addr, (char *)addr + size, size, name);
> +	_verify_addr(addr, size);
> +	return addr;
> +}
> +
> +static void *_ipc_shm_map(char *name, size_t size, int timeout)
> +{
> +	odp_shm_t shm;
> +	int ret;
> +
> +	while (1) {
> +		ret = _odp_shm_lookup_ipc(name);
> +		if (!ret)
> +			break;
> +		ODP_DBG("Waiting for %s\n", name);
> +		if (timeout <= 0)
> +			return NULL;
> +		timeout--;
> +		sleep(1);
> +	}
> +
> +	shm = odp_shm_reserve(name, size,
> +			      ODP_CACHE_LINE_SIZE,
> +			      _ODP_SHM_PROC_NOCREAT);
> +	if (ODP_SHM_INVALID == shm)
> +		ODP_ABORT("unable to map: %s\n", name);
> +
> +	return odp_shm_addr(shm);
> +}
> +
> +static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t *pktio_entry,
> +				 odp_pool_t pool)
> +{
> +	if (ODP_POOL_NAME_LEN != ODPH_RING_NAMESIZE)
> +		ODP_ABORT("");

_ODP_STATIC_ASSERT

> +
> +	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r")))
> +		ODP_ABORT("too big ipc name\n");
> +
> +	pktio_entry->s.ipc.pool = pool;
> +	return 0;
> +}
> +
> +static int slave_post_init(pktio_entry_t *pktio_entry)
> +{
> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> +	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> +			   sizeof(odph_ring_t);
> +	struct pktio_info *pinfo;
> +	void *ipc_pool_base;
> +	odp_shm_t shm;
> +	const char *dev = pktio_entry->s.name;
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	pktio_entry->s.ipc.m.prod  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc.m.prod) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		return -1;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.prod),
> +		odph_ring_free_count(pktio_entry->s.ipc.m.prod));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	pktio_entry->s.ipc.m.cons = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc.m.cons) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_m_prod;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.cons),
> +		odph_ring_free_count(pktio_entry->s.ipc.m.cons));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	pktio_entry->s.ipc.s.prod = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc.s.prod) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_m_cons;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.prod),
> +		odph_ring_free_count(pktio_entry->s.ipc.s.prod));
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
> +	pktio_entry->s.ipc.s.cons = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc.s.cons) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto free_s_prod;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.cons),
> +		odph_ring_free_count(pktio_entry->s.ipc.s.cons));
> +
> +	/* Get info about remote pool */
> +	pinfo = _ipc_map_pool_info(pktio_entry, dev, _ODP_SHM_PROC_NOCREAT);
> +
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
> +					     pinfo->shm_pkt_pool_size);
> +	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->mdata_offset;
> +	pktio_entry->s.ipc.pkt_size = pinfo->shm_pkt_size;
> +
> +	/* @todo: to simplify in linux-generic implementation we create pool for
> +	 * packets from IPC queue. On receive implementation copies packets to
> +	 * that pool. Later we can try to reuse original pool without packets
> +	 * copying. (pkt refcounts needs to be implemented).
> +	 */
> +	_odp_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
> +
> +	pktio_entry->s.ipc.ready = 1;
> +	ODP_DBG("Post init... DONE.\n");
> +	return 0;
> +
> +free_s_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_cons:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +free_m_prod:
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
> +	shm = odp_shm_lookup(ipc_shm_name);
> +	odp_shm_free(shm);
> +	return -1;
> +}
> +
> +static int _ipc_is_master(const char *dev)
> +{
> +	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
> +	int ret = ODP_PKTIO_TYPE_IPC;
> +	FILE *f;
> +
> +	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "/dev/shm/%s_m_prod", dev);
> +	f = fopen(ipc_shm_name, "r");
> +	if (f) {
> +		ret = ODP_PKTIO_TYPE_IPC_SLAVE;
> +		fclose(f);
> +	}

I've not grokked exactly the master/slave stuff is synchronised but this
doesn't look right. If two processes are calling ipc_pktio_open() at the
same time they may both end up thinking they're master, then one will
fail when it tries to create the shm later. Is it not better to just
always attempt to create the shm (with O_EXCL), then whichever attempt
succeeds is the master. You could then get rid of the sleep in
_ipc_shm_map() as it should always succeed first time.

And a more general question, are multiple slaves supported or is this
intended to be a 1-1 pipe? I can't see anything explicitly preventing 
multiple slaves but it looks like all slaves would map the same rings.
The rings as accessed with mp/mc safe functions so I guess it will work
but the master/slave relationship is exposed (packets sent be the master
go to only one of the slaves, all packets sent by the slaves go to the
master) but the application has no way of knowing which it is.

> +
> +	ODP_DBG("checking file: %s, ret %d\n", ipc_shm_name, ret);
> +	return ret;
> +}
> +
> +static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
> +			  pktio_entry_t *pktio_entry,
> +			  const char *dev,
> +			  odp_pool_t pool)
> +{
> +	int ret = -1;
> +
> +	if (strncmp(dev, "ipc", 3))
> +		return -1;
> +
> +	pktio_entry->s.ipc.ready = 0;
> +	pktio_entry->s.ipc.type = _ipc_is_master(dev);
> +
> +	switch (pktio_entry->s.ipc.type) {
> +	case ODP_PKTIO_TYPE_IPC:
> +		ODP_DBG("process %d is master\n", getpid());
> +		ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
> +		break;
> +	case ODP_PKTIO_TYPE_IPC_SLAVE:
> +		ODP_DBG("process %d is slave\n", getpid());
> +		ret = _ipc_pktio_init_slave(dev, pktio_entry, pool);
> +		break;
> +	default:
> +		ODP_ABORT("");

I was going to say need a message in here, but actually I don't think
you need this abort, this switch looks like a hangover from previous
code and you'd now be better off with an if/else.

> +	}
> +
> +	return ret;
> +}
> +
> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> +				    uint32_t offset,
> +				    uint32_t *seglen,
> +				    uint32_t limit)
> +{
> +	int seg_index  = offset / buf->segsize;
> +	int seg_offset = offset % buf->segsize;
> +	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> +
> +	if (seglen) {
> +		uint32_t buf_left = limit - offset;
> +		*seglen = seg_offset + buf_left <= buf->segsize ?
> +			buf_left : buf->segsize - seg_offset;
> +	}
> +
> +	return (void *)(seg_offset + (uint8_t *)addr);
> +}
> +
> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> +				    uint32_t offset, uint32_t *seglen)
> +{
> +	if (offset > pkt_hdr->frame_len)
> +		return NULL;
> +
> +	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> +			  pkt_hdr->headroom + offset, seglen,
> +			  pkt_hdr->headroom + pkt_hdr->frame_len);
> +}
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> +		   odp_packet_t pkt_table[], unsigned len)
> +{
> +	int pkts = 0;
> +	int i;
> +	odph_ring_t *r;	  /* link to ring to receive from */
> +	odph_ring_t *r_p; /* link to ring with produced packes */
> +	odph_ring_t *r_p_send; /* link to produced packets while send */

These names aren't very helpful IMO. Also if you change the struct members
as I suggested above then during _open you can set the pointers appropriately
for the direction, then just use pktio_entry->s.ipc.rx.recv without having
the check whether it's master or slave.

> +	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> +	void **ipcbufs_p = (void *)&remote_pkts;
> +
> +	if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC) {
> +		if (!pktio_entry->s.ipc.ready) {
> +			if (master_post_init(pktio_entry))
> +				return 0;

How would this happen? looks like master_post_init() is called during
odp_pktio_open().

> +		}
> +		r = pktio_entry->s.ipc.s.prod;
> +		r_p = pktio_entry->s.ipc.s.cons;
> +		r_p_send = pktio_entry->s.ipc.m.cons;
> +	} else if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +		if (!pktio_entry->s.ipc.ready) {
> +			if (slave_post_init(pktio_entry))
> +				return 0;

But this isn't called during open, why not?.. maybe you just need an
atomic flag at the end of the master init then wait on that in the slave
during open. Worst case it waits for the amount of time it takes the
master to run through it's init sequence.

> +		}
> +		r = pktio_entry->s.ipc.m.prod;
> +		r_p = pktio_entry->s.ipc.m.cons;
> +		r_p_send = pktio_entry->s.ipc.s.cons;
> +	} else {
> +		ODP_ABORT("wrong type: %d\n", pktio_entry->s.ipc.type);
> +	}
> +
> +	/* Free already processed packets, for send */
> +	while (1) {
> +		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +		int ret;
> +		void **rbuf_p;
> +
> +		rbuf_p = (void *)&r_p_pkts;
> +		ret = odph_ring_mc_dequeue_burst(r_p_send, rbuf_p,
> +						 PKTIO_IPC_ENTRIES);
> +		if (0 == ret)
> +			break;
> +		for (i = 0; i < ret; i++) {
> +			if (r_p_pkts[i] != ODP_PACKET_INVALID)
> +				odp_packet_free(r_p_pkts[i]);
> +		}
> +	}

Above could do with going in a helper.

> +
> +	pkts = odph_ring_mc_dequeue_burst(r, ipcbufs_p, len);
> +	if (odp_unlikely(pkts < 0))
> +		ODP_ABORT("error to dequeue no packets\n");
> +
> +	/* fast path */
> +	if (odp_likely(0 == pkts))
> +		return 0;
> +
> +	for (i = 0; i < pkts; i++) {
> +		odp_pool_t pool;
> +		odp_packet_t pkt;
> +		odp_packet_hdr_t phdr;
> +		void *ptr;
> +		odp_buffer_bits_t handle;
> +		int idx; /* Remote packet has coded pool and index.
> +			  * We need only index.*/
> +		void *pkt_data;
> +		void *remote_pkt_data;
> +
> +		if (remote_pkts[i] == ODP_PACKET_INVALID)
> +			continue;
> +
> +		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> +		idx = handle.index;
> +
> +		/* Link to packed data. To this line we have Zero-Copy between
> +		 * processes, to simplify use packet copy in that version which
> +		 * can be removed later with more advance buffer management
> +		 * (ref counters).
> +		 */
> +		/* reverse odp_buf_to_hdr() */
> +		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
> +		      (idx * ODP_CACHE_LINE_SIZE);
> +		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
> +
> +		/* Allocate new packet. Select*/
> +		pool = pktio_entry->s.ipc.pool;
> +		if (odp_unlikely(pool == ODP_POOL_INVALID))
> +			ODP_ABORT("invalid pool");
> +
> +		pkt = odp_packet_alloc(pool, phdr.frame_len);
> +		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +			/* Original pool might be smaller then
> +			*  PKTIO_IPC_ENTRIES. If packet can not be
> +			 * allocated from pool at this time,
> +			 * simple get in on next recv() call.
> +			 */
> +			if (i == 0)
> +				return 0;
> +			break;
> +		}
> +
> +		/* Copy packet data. */
> +		pkt_data = odp_packet_data(pkt);
> +		if (odp_unlikely(!pkt_data))
> +			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> +				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.ipc.type));
> +
> +		remote_pkt_data =  _ipc_packet_map(ptr, 0, NULL);
> +		if (odp_unlikely(!remote_pkt_data))
> +			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
> +				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.ipc.type));
> +
> +		/* @todo fix copy packet!!! */
> +		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
> +
> +		/* Copy packets L2, L3 parsed offsets and size */
> +		copy_packet_parser_metadata(&phdr, odp_packet_hdr(pkt));
> +
> +		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
> +		odp_packet_hdr(pkt)->headroom = phdr.headroom;
> +		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
> +		pkt_table[i] = pkt;
> +	}
> +
> +	/* Now tell other process that we no longer need that buffers.*/
> +	pkts = odph_ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
> +	if (odp_unlikely(pkts < 0))
> +		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> +
> +	return pkts;
> +}
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		   unsigned len)
> +{
> +	odph_ring_t *r;
> +	odph_ring_t *r_p;
> +	void **rbuf_p;
> +	int ret;
> +	unsigned i;
> +
> +	if (!pktio_entry->s.ipc.ready)
> +		return 0;
> +
> +	if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +		r = pktio_entry->s.ipc.s.prod;
> +		r_p = pktio_entry->s.ipc.s.cons;
> +	} else if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC) {
> +		r = pktio_entry->s.ipc.m.prod;
> +		r_p = pktio_entry->s.ipc.m.cons;
> +	} else {
> +		ODP_ABORT("wrong type: %d\n", pktio_entry->s.ipc.type);
> +	}
> +
> +	/* Free already processed packets, if any */
> +	while (1) {
> +		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +
> +		rbuf_p = (void *)&r_p_pkts;
> +		ret = odph_ring_mc_dequeue_burst(r_p, rbuf_p,
> +						 PKTIO_IPC_ENTRIES);
> +		if (0 == ret)
> +			break;
> +		for (i = 0; i < (unsigned)ret; i++) {
> +			if (r_p_pkts[i] != ODP_PACKET_INVALID)
> +				odp_packet_free(r_p_pkts[i]);
> +		}
> +	}
> +
> +	/* Prepare packets: calculate offset from address. */
> +	for (i = 0; i < len; i++) {
> +		int j;
> +		odp_packet_t pkt =  pkt_table[i];
> +		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> +		odp_buffer_bits_t handle;
> +		uint32_t cur_mapped_pool_id =
> +			 pool_handle_to_index(pktio_entry->s.ipc.pool);
> +		uint32_t pool_id;
> +
> +		/* do copy if packet was allocated from not mapped pool */
> +		handle.handle = _odp_packet_to_buffer(pkt);
> +		pool_id = handle.pool_id;
> +		if (pool_id != cur_mapped_pool_id) {
> +			odp_packet_t newpkt;
> +
> +			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
> +			if (newpkt == ODP_PACKET_INVALID)
> +				ODP_ABORT("Unable to copy packet\n");
> +
> +			odp_packet_free(pkt);
> +			pkt_table[i] = newpkt;
> +		}
> +
> +		rbuf_p = (void *)&pkt;
> +
> +		/* buf_hdr.addr can not be used directly in remote process,
> +		 * convert it to offset
> +		 */
> +		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
> +			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
> +				(char *)pkt_hdr->buf_hdr.addr[j];
> +		}
> +	}
> +
> +	/* Put packets to ring to be processed in other process. */
> +	rbuf_p = (void *)&pkt_table[0];
> +	ret = odph_ring_mp_enqueue_burst(r, rbuf_p, len);
> +	if (odp_unlikely(ret < 0)) {
> +		ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
> +			getpid(),
> +			(ODP_PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type),
> +			ret);
> +		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
> +			odph_ring_full(r), odph_ring_count(r),
> +			odph_ring_free_count(r));
> +	}
> +
> +	return ret;
> +}
> +
> +static int ipc_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
> +{
> +	/* mtu not limited, pool settings are used. */
> +	return (9 * 1024);
> +}
> +
> +static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
> +			    void *mac_addr)
> +{
> +	memcpy(mac_addr, pktio_ipc_mac, ETH_ALEN);
> +	return ETH_ALEN;
> +}
> +
> +const pktio_if_ops_t ipc_pktio_ops = {
> +	.init = NULL,
> +	.term = NULL,
> +	.open = ipc_pktio_open,
> +	.close = NULL,
> +	.recv =  ipc_pktio_recv,
> +	.send = ipc_pktio_send,
> +	.mtu_get = ipc_mtu_get,
> +	.promisc_mode_set = NULL,
> +	.promisc_mode_get = NULL,
> +	.mac_get = ipc_mac_addr_get
> +};
> diff --git a/platform/linux-generic/pktio/ring.c b/platform/linux-generic/pktio/ring.c
> new file mode 120000
> index 0000000..d35c589
> --- /dev/null
> +++ b/platform/linux-generic/pktio/ring.c
> @@ -0,0 +1 @@
> +../../../helper/ring.c
> \ No newline at end of file
> -- 
> 1.9.1
>
Maxim Uvarov Oct. 19, 2015, 7:56 a.m. UTC | #4
On 10/16/2015 21:16, Stuart Haslam wrote:
> It would be better to do away with the master/slave naming and instead use
> names that match their function, e.g.; rx.recv/rx.free and tx.recv/tx.free

Stuart you have also about the same question bellow in the code. I also
don't line naming ipc.rx.recv and ipc.tx.free.

That is not rx or tx because of pkio is bidirectional. There is process 
with creates
shm and processes which connect to that shm. They are very symmetrical 
and difference
is only used flag and expected some information to be already filled. I 
deceived to name
it master and slave. I.e. master is process which creates shm and other 
processes which reuse it.

Change name is easy find and replace in the patch. But we can try to 
think which name is more
suitable. How about:

ipc.proc0
ipc.procX

ipc.parent
ipc.child

ipc.initial
ipc.secondary

ipc.batman
ipc.robin

?

In that version I support only 2 processes connected to each other using 
single pktio. But we can update it
to use multi version. For that needed to export information about 3-rd 
process to first and second. It should
not be complex but I would like to do it in separate patch after that 
series included.

Other comments looks reasonable. Will send updated patches.

Maxim.
Maxim Uvarov Oct. 19, 2015, 10:28 a.m. UTC | #5
On 10/16/2015 21:16, Stuart Haslam wrote:
> And a more general question, are multiple slaves supported or is this
> intended to be a 1-1 pipe? I can't see anything explicitly preventing
> multiple slaves but it looks like all slaves would map the same rings.
> The rings as accessed with mp/mc safe functions so I guess it will work
> but the master/slave relationship is exposed (packets sent be the master
> go to only one of the slaves, all packets sent by the slaves go to the
> master) but the application has no way of knowing which it is.
I want to have separate patch for it.

In general needed some array  for rings + process identifier in main
shared memory.

| p0 packet pool |
| p0 tx ring|
| p0 rx ring|
| p1 packet pool |
| p1 tx ring|
| p1 rx ring|
| p2 packet pool |
| p2 tx ring |
| p2 rx ring |

Because of p0 (process 0) can place packet to p1 and p2 it has to be 
connected with it's own
rx/tx link (which is done as odp ring). So it will be something like:

| p0 packet pool |
| int number of processes |
| p0 tx ring array |
| p0 rx ring array |

Plus some notification that new process opened pktio with the same name 
and want to connect to it.

But because it's not clear for me how to define which process should 
take packet in case if there are several
processes then I think first version is reasonable to be 1-1. Assuming 
that you can create banch of different pktios ipc-1, ipc-2, ipc-3 and etc.
If we will accept Nikitas idea to support hw queues for pktio, that I 
can reuse that api for ipc.

Maxim.
Stuart Haslam Oct. 19, 2015, 10:43 a.m. UTC | #6
On Mon, Oct 19, 2015 at 10:56:01AM +0300, Maxim Uvarov wrote:
> On 10/16/2015 21:16, Stuart Haslam wrote:
> >It would be better to do away with the master/slave naming and instead use
> >names that match their function, e.g.; rx.recv/rx.free and tx.recv/tx.free
> 
> Stuart you have also about the same question bellow in the code. I also
> don't line naming ipc.rx.recv and ipc.tx.free.
> 
> That is not rx or tx because of pkio is bidirectional. There is
> process with creates
> shm and processes which connect to that shm. They are very
> symmetrical and difference
> is only used flag and expected some information to be already
> filled. I deceived to name
> it master and slave. I.e. master is process which creates shm and
> other processes which reuse it.

Yes I got that, I'm suggesting that the conditional check for whether
this end of the pktio is a master or a slave can be done only once at
open time rather than in every send/recv, since it doesn't change
after the open.

You have four rings with different uses depending on which end of the
link you're on -

m.prod   master: enqueue to send
         slave:  dequeue to recv
m.cons   master: dequeue to free transmitted packets
         slave:  enqueue to request free of received packets
s.prod   master: dequeue to recv
         slave:  enqueue to send
s.cons   master: enqueue to request free of received packets
         slave:  dequeue to free transmitted packets

So the pointers can be setup like this on open -

if (master) {
	tx.send = m.prod;
	tx.free = m.cons;
	rx.recv = s.prod;
	rx.free = s.cons;
} else {
	tx.send = s.prod;
	tx.free = s.cons;
	rx.recv = m.prod;
	rx.free = m.cons;
}

Obviously not exactly like that, but I hope you get the idea.

> 
> Change name is easy find and replace in the patch. But we can try to
> think which name is more
> suitable. How about:
> 
> ipc.proc0
> ipc.procX
> 
> ipc.parent
> ipc.child
> 
> ipc.initial
> ipc.secondary
> 
> ipc.batman
> ipc.robin
> 
> ?
> 
> In that version I support only 2 processes connected to each other
> using single pktio. But we can update it
> to use multi version. For that needed to export information about
> 3-rd process to first and second. It should
> not be complex but I would like to do it in separate patch after
> that series included.
> 
> Other comments looks reasonable. Will send updated patches.
> 
> Maxim.
> 
>
Maxim Uvarov Oct. 19, 2015, 10:58 a.m. UTC | #7
On 10/19/2015 13:43, Stuart Haslam wrote:
> On Mon, Oct 19, 2015 at 10:56:01AM +0300, Maxim Uvarov wrote:
>> On 10/16/2015 21:16, Stuart Haslam wrote:
>>> It would be better to do away with the master/slave naming and instead use
>>> names that match their function, e.g.; rx.recv/rx.free and tx.recv/tx.free
>> Stuart you have also about the same question bellow in the code. I also
>> don't line naming ipc.rx.recv and ipc.tx.free.
>>
>> That is not rx or tx because of pkio is bidirectional. There is
>> process with creates
>> shm and processes which connect to that shm. They are very
>> symmetrical and difference
>> is only used flag and expected some information to be already
>> filled. I deceived to name
>> it master and slave. I.e. master is process which creates shm and
>> other processes which reuse it.
> Yes I got that, I'm suggesting that the conditional check for whether
> this end of the pktio is a master or a slave can be done only once at
> open time rather than in every send/recv, since it doesn't change
> after the open.
>
> You have four rings with different uses depending on which end of the
> link you're on -
>
> m.prod   master: enqueue to send
>           slave:  dequeue to recv
> m.cons   master: dequeue to free transmitted packets
>           slave:  enqueue to request free of received packets
> s.prod   master: dequeue to recv
>           slave:  enqueue to send
> s.cons   master: enqueue to request free of received packets
>           slave:  dequeue to free transmitted packets
>
> So the pointers can be setup like this on open -
>
> if (master) {
> 	tx.send = m.prod;
> 	tx.free = m.cons;
> 	rx.recv = s.prod;
> 	rx.free = s.cons;
> } else {
> 	tx.send = s.prod;
> 	tx.free = s.cons;
> 	rx.recv = m.prod;
> 	rx.free = m.cons;
> }
>
> Obviously not exactly like that, but I hope you get the idea.

Ok, that looks reasonable.

Maxim.
>> Change name is easy find and replace in the patch. But we can try to
>> think which name is more
>> suitable. How about:
>>
>> ipc.proc0
>> ipc.procX
>>
>> ipc.parent
>> ipc.child
>>
>> ipc.initial
>> ipc.secondary
>>
>> ipc.batman
>> ipc.robin
>>
>> ?
>>
>> In that version I support only 2 processes connected to each other
>> using single pktio. But we can update it
>> to use multi version. For that needed to export information about
>> 3-rd process to first and second. It should
>> not be complex but I would like to do it in separate patch after
>> that series included.
>>
>> Other comments looks reasonable. Will send updated patches.
>>
>> Maxim.
>>
>>
Maxim Uvarov Oct. 19, 2015, 4:03 p.m. UTC | #8
On 10/13/2015 16:50, Nicolas Morey-Chaisemartin wrote:
>
> On 10/09/2015 01:59 PM, Maxim Uvarov wrote:
>> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
>> ---
>>   platform/linux-generic/Makefile.am                 |   2 +
>>   .../linux-generic/include/odp_buffer_internal.h    |   3 +
>>   .../linux-generic/include/odp_packet_io_internal.h |  35 +
>>   .../include/odp_packet_io_ipc_internal.h           |  51 ++
>>   platform/linux-generic/include/odp_shm_internal.h  |  20 +
>>   platform/linux-generic/odp_packet_io.c             |   1 +
>>   platform/linux-generic/odp_pool.c                  |  11 +-
>>   platform/linux-generic/odp_shared_memory.c         |  10 +-
>>   platform/linux-generic/pktio/io_ops.c              |   1 +
>>   platform/linux-generic/pktio/ipc.c                 | 720 +++++++++++++++++++++
>>   platform/linux-generic/pktio/ring.c                |   1 +
>>   11 files changed, 851 insertions(+), 4 deletions(-)
>>   create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>>   create mode 100644 platform/linux-generic/include/odp_shm_internal.h
>>   create mode 100644 platform/linux-generic/pktio/ipc.c
>>   create mode 120000 platform/linux-generic/pktio/ring.c
>>
>> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
>> index b9ed3b0..71353dd 100644
>> --- a/platform/linux-generic/Makefile.am
>> +++ b/platform/linux-generic/Makefile.am
>> @@ -151,9 +151,11 @@ __LIB__libodp_la_SOURCES = \
>>   			   odp_packet_flags.c \
>>   			   odp_packet_io.c \
>>   			   pktio/io_ops.c \
>> +			   pktio/ipc.c \
>>   			   pktio/loop.c \
>>   			   pktio/socket.c \
>>   			   pktio/socket_mmap.c \
>> +			   pktio/ring.c \
>>   			   odp_pool.c \
>>   			   odp_queue.c \
>>   			   odp_rwlock.c \
>> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
>> index 4cacca1..a078e52 100644
>> --- a/platform/linux-generic/include/odp_buffer_internal.h
>> +++ b/platform/linux-generic/include/odp_buffer_internal.h
>> @@ -132,6 +132,9 @@ struct odp_buffer_hdr_t {
>>   	uint32_t                 uarea_size; /* size of user area */
>>   	uint32_t                 segcount;   /* segment count */
>>   	uint32_t                 segsize;    /* segment size */
>> +	/* ipc mapped process can not walk over pointers,
>> +	 * offset has to be used */
>> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>>   	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>>   	uint64_t                 order;      /* sequence for ordered queues */
>>   	queue_entry_t           *origin_qe;  /* ordered queue origin */
> I haven't been through everything yet but should this be an union with the addr ?
> The odp_buffer_hdr_t is already quite large as it is.
>
> Nicolas
Unfortunately that can not be union. Because of pool can be used by 
current process and also with other process which mapped this pool
I will have the same code in new version until I will find better 
solution to lower this struct size. Might be we don't need u64 for 
offsets here.

Maxim.
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index b9ed3b0..71353dd 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -151,9 +151,11 @@  __LIB__libodp_la_SOURCES = \
 			   odp_packet_flags.c \
 			   odp_packet_io.c \
 			   pktio/io_ops.c \
+			   pktio/ipc.c \
 			   pktio/loop.c \
 			   pktio/socket.c \
 			   pktio/socket_mmap.c \
+			   pktio/ring.c \
 			   odp_pool.c \
 			   odp_queue.c \
 			   odp_rwlock.c \
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 4cacca1..a078e52 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -132,6 +132,9 @@  struct odp_buffer_hdr_t {
 	uint32_t                 uarea_size; /* size of user area */
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
+	/* ipc mapped process can not walk over pointers,
+	 * offset has to be used */
+	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 	uint64_t                 order;      /* sequence for ordered queues */
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index 6b03051..62e6829 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -23,6 +23,7 @@  extern "C" {
 #include <odp_classification_datamodel.h>
 #include <odp_align_internal.h>
 #include <odp_debug_internal.h>
+#include <odp/helper/ring.h>
 
 #include <odp/config.h>
 #include <odp/hints.h>
@@ -36,6 +37,38 @@  typedef struct {
 	odp_bool_t promisc;		/**< promiscuous mode state */
 } pkt_loop_t;
 
+typedef	struct {
+	/* TX */
+	struct  {
+		odph_ring_t	*prod; /**< ODP ring for IPC msg packets
+					  indexes transmitted to shared
+					  memory */
+		odph_ring_t	*cons; /**< ODP ring for IPC msg packets
+					    indexes already processed by remote
+					    process */
+	} m; /* master */
+	/* RX */
+	struct {
+		odph_ring_t	*prod; /**< ODP ring for IPC msg packets
+					    indexes received from shared
+					     memory (from remote process) */
+		odph_ring_t	*cons; /**< ODP ring for IPC msg packets
+					    indexes already processed by
+					    current process */
+	} s; /* slave */
+	void		*pool_base;		/**< Remote pool base addr */
+	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
+	uint64_t	pkt_size;		/**< Packet size in remote pool */
+	odp_pool_t	pool;			/**< Pool of main process */
+	odp_shm_t	pool_shm;		/**< Shm memory for remote pool */
+	enum {
+		ODP_PKTIO_TYPE_IPC = 0,
+		ODP_PKTIO_TYPE_IPC_SLAVE
+	} type; /**< define if it's master or slave process */
+	int  ready; /**< 1 - pktio is ready and can recv/send packet, 0 - not yet ready */
+	void *pinfo;
+} _ipc_pktio_t;
+
 struct pktio_entry {
 	const struct pktio_if_ops *ops; /**< Implementation specific methods */
 	odp_spinlock_t lock;		/**< entry spinlock */
@@ -49,6 +82,7 @@  struct pktio_entry {
 		pkt_sock_t pkt_sock;		/**< using socket API for IO */
 		pkt_sock_mmap_t pkt_sock_mmap;	/**< using socket mmap
 						 *   API for IO */
+		_ipc_pktio_t ipc;		/**< IPC pktio data */
 	};
 	enum {
 		STATE_START = 0,
@@ -124,6 +158,7 @@  int pktin_poll(pktio_entry_t *entry);
 extern const pktio_if_ops_t sock_mmsg_pktio_ops;
 extern const pktio_if_ops_t sock_mmap_pktio_ops;
 extern const pktio_if_ops_t loopback_pktio_ops;
+extern const pktio_if_ops_t ipc_pktio_ops;
 extern const pktio_if_ops_t * const pktio_if_ops[];
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
new file mode 100644
index 0000000..25fd1d6
--- /dev/null
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -0,0 +1,51 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/packet_io.h>
+#include <odp_packet_io_internal.h>
+#include <odp/packet.h>
+#include <odp_packet_internal.h>
+#include <odp_internal.h>
+#include <odp/shared_memory.h>
+
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+/* IPC packet I/O over shared memory ring */
+#include <odp/helper/ring.h>
+
+#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
+					odp ring queue */
+
+/* that struct is exported to shared memory, so that 2 processes can find
+ * each other.
+ */
+struct pktio_info {
+	char remote_pool_name[ODP_POOL_NAME_LEN];
+	int shm_pool_bufs_num;	/*< number of buffer in remote pool */
+	size_t shm_pkt_pool_size; /*< size of remote pool */
+	uint32_t shm_pkt_size; /*< size of packet/segment in remote pool */
+	odp_shm_t shm;	/*< current structure stored in this shm */
+	size_t mdata_offset; /*< offset from shared memory block start
+			      *to pool_mdata_addr
+			      * (linux-generic pool specific) */
+	struct {
+		size_t mdata_offset; /*< offset from shared memory block start
+				      * to pool_mdata_addr in remote process.
+				      * (linux-generic pool specific) */
+		char   pool_name[ODP_POOL_NAME_LEN];
+	} slave;
+} __packed;
+
+int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
+		   odp_pool_t pool);
+
+int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		   unsigned len);
+
+int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		   unsigned len);
diff --git a/platform/linux-generic/include/odp_shm_internal.h b/platform/linux-generic/include/odp_shm_internal.h
new file mode 100644
index 0000000..f3ce892
--- /dev/null
+++ b/platform/linux-generic/include/odp_shm_internal.h
@@ -0,0 +1,20 @@ 
+/* Copyright (c) 2013, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_SHM_INTERNAL_H_
+#define ODP_SHM_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define _ODP_SHM_PROC_NOCREAT 0x4  /**< Do not create shm if not exist */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index 2e7b199..bedf221 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -18,6 +18,7 @@ 
 #include <odp_schedule_internal.h>
 #include <odp_classification_internal.h>
 #include <odp_debug_internal.h>
+#include <odp_packet_io_ipc_internal.h>
 
 #include <string.h>
 #include <sys/ioctl.h>
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 2036c2a..e757235 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -219,8 +219,11 @@  odp_pool_t _pool_create(const char *name,
 			ODP_ALIGN_ROUNDUP(params->pkt.len, seg_len);
 
 		/* Reject create if pkt.len needs too many segments */
-		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG)
+		if (blk_size / seg_len > ODP_BUFFER_MAX_SEG) {
+			ODP_ERR("ODP_BUFFER_MAX_SEG exceed %d(%d)\n",
+				blk_size / seg_len, ODP_BUFFER_MAX_SEG);
 			return ODP_POOL_INVALID;
+		}
 
 		p_udata_size = params->pkt.uarea_size;
 		udata_stride = ODP_ALIGN_ROUNDUP(p_udata_size,
@@ -241,8 +244,12 @@  odp_pool_t _pool_create(const char *name,
 
 	/* Validate requested number of buffers against addressable limits */
 	if (buf_num >
-	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE)))
+	    (ODP_BUFFER_MAX_BUFFERS / (buf_stride / ODP_CACHE_LINE_SIZE))) {
+		ODP_ERR("buf_num %d > then expected %d\n",
+			buf_num, ODP_BUFFER_MAX_BUFFERS /
+			(buf_stride / ODP_CACHE_LINE_SIZE));
 		return ODP_POOL_INVALID;
+	}
 
 	/* Find an unused buffer pool slot and iniitalize it as requested */
 	for (i = 0; i < ODP_CONFIG_POOLS; i++) {
diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c
index ab48dda..5de48d3 100644
--- a/platform/linux-generic/odp_shared_memory.c
+++ b/platform/linux-generic/odp_shared_memory.c
@@ -15,6 +15,7 @@ 
 #include <odp/debug.h>
 #include <odp_debug_internal.h>
 #include <odp_align_internal.h>
+#include <odp_shm_internal.h>
 #include <odp/config.h>
 
 #include <unistd.h>
@@ -189,7 +190,7 @@  odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
 	int fd = -1;
 	int map_flag = MAP_SHARED;
 	/* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */
-	int oflag = O_RDWR | O_CREAT | O_TRUNC;
+	int oflag = O_RDWR;
 	uint64_t alloc_size;
 	uint64_t page_sz, huge_sz;
 #ifdef MAP_HUGETLB
@@ -207,7 +208,12 @@  odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
 	alloc_hp_size = (size + align + (huge_sz - 1)) & (-huge_sz);
 #endif
 
-	if (flags & ODP_SHM_PROC) {
+	if (flags & ODP_SHM_PROC)
+		oflag |= O_CREAT | O_TRUNC;
+
+	if (flags & (ODP_SHM_PROC | _ODP_SHM_PROC_NOCREAT)) {
+		need_huge_page = 0;
+
 		/* Creates a file to /dev/shm */
 		fd = shm_open(name, oflag,
 			      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
diff --git a/platform/linux-generic/pktio/io_ops.c b/platform/linux-generic/pktio/io_ops.c
index 1d47e74..5d8d4a5 100644
--- a/platform/linux-generic/pktio/io_ops.c
+++ b/platform/linux-generic/pktio/io_ops.c
@@ -12,6 +12,7 @@ 
  * Array must be NULL terminated */
 const pktio_if_ops_t * const pktio_if_ops[]  = {
 	&loopback_pktio_ops,
+	&ipc_pktio_ops,
 	&sock_mmap_pktio_ops,
 	&sock_mmsg_pktio_ops,
 	NULL
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
new file mode 100644
index 0000000..70e8854
--- /dev/null
+++ b/platform/linux-generic/pktio/ipc.c
@@ -0,0 +1,720 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp_packet_io_ipc_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_packet_io_internal.h>
+#include <odp_spin_internal.h>
+#include <odp/system_info.h>
+#include <odp_shm_internal.h>
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+/* MAC address for the "ipc" interface */
+static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
+
+static void *_ipc_map_remote_pool(const char *name, size_t size);
+
+static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
+{
+	pool_entry_t *pool;
+	uint32_t pool_id;
+	odp_shm_t shm;
+	odp_shm_info_t info;
+
+	pool_id = pool_handle_to_index(pool_hdl);
+	pool    = get_pool_entry(pool_id);
+	shm = pool->s.pool_shm;
+
+	odp_shm_info(shm, &info);
+
+	return info.name;
+}
+
+/**
+* Look up for shared memory object.
+*
+* @param name   name of shm object
+*
+* @return 0 on success, otherwise non-zero
+*/
+static int _odp_shm_lookup_ipc(const char *name)
+{
+	int shm;
+
+	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
+	if (shm == -1) {
+		if (errno == ENOENT)
+			return -1;
+		ODP_ABORT("shm_open for %s err %s\n",
+			  name, strerror(errno));
+	}
+	close(shm);
+	return 0;
+}
+
+static struct pktio_info *_ipc_map_pool_info(pktio_entry_t *pktio_entry,
+					     const char *pool_name,
+					     int flag)
+{
+	struct pktio_info *pinfo;
+	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+
+	/* Create info about remote pktio */
+	snprintf(name, sizeof(name), "%s_info", pool_name);
+	odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+			ODP_CACHE_LINE_SIZE,
+			flag);
+	if (ODP_SHM_INVALID == shm)
+		ODP_ABORT("unable to reserve memory for shm info");
+	pinfo = odp_shm_addr(shm);
+	if (flag != _ODP_SHM_PROC_NOCREAT)
+		pinfo->remote_pool_name[0] = 0;
+
+	pktio_entry->s.ipc.pool_shm = shm;
+	return pinfo;
+}
+
+static int master_post_init(pktio_entry_t *pktio_entry)
+{
+	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
+	int ret;
+	void *ipc_pool_base;
+
+	if (pinfo->slave.mdata_offset == 0)
+		return -1;
+
+	ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
+	if (ret) {
+		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
+		return -1;
+	}
+
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
+					     pinfo->shm_pkt_pool_size);
+	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->slave.mdata_offset;
+
+	pktio_entry->s.ipc.ready = 1;
+
+	ODP_DBG("Post init... DONE.\n");
+	return 0;
+}
+
+static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char *dev,
+				  odp_pool_t pool)
+{
+	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+	pool_entry_t *pool_entry;
+	uint32_t pool_id;
+	struct pktio_info *pinfo;
+	const char *pool_name;
+	odp_shm_t shm;
+
+	pool_id = pool_handle_to_index(pool);
+	pool_entry    = get_pool_entry(pool_id);
+
+	if (ODP_POOL_NAME_LEN != ODPH_RING_NAMESIZE)
+		ODP_ABORT("");
+
+	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) {
+		ODP_DBG("too big ipc name\n");
+		return -1;
+	}
+
+	/* generate name in shm like ipc_pktio_r for
+	 * to be processed packets ring.
+	 */
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	pktio_entry->s.ipc.m.prod = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.m.prod) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.prod),
+		odph_ring_free_count(pktio_entry->s.ipc.m.prod));
+
+	/* generate name in shm like ipc_pktio_p for
+	 * already processed packets
+	 */
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	pktio_entry->s.ipc.m.cons = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.m.cons) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_m_prod;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.cons),
+		odph_ring_free_count(pktio_entry->s.ipc.m.cons));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	pktio_entry->s.ipc.s.prod = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.s.prod) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_m_cons;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.prod),
+		odph_ring_free_count(pktio_entry->s.ipc.s.prod));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
+	pktio_entry->s.ipc.s.cons = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC | ODPH_RING_NO_LIST);
+	if (!pktio_entry->s.ipc.s.cons) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		goto free_s_prod;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.cons),
+		odph_ring_free_count(pktio_entry->s.ipc.s.cons));
+
+	/* Memory to store information about exported pool */
+	pinfo = _ipc_map_pool_info(pktio_entry, dev, ODP_SHM_PROC);
+
+	/* Set up pool name for remote info */
+	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+	memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
+	pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
+	pinfo->shm_pool_bufs_num = pool_entry->s.buf_num;
+	pinfo->shm_pkt_size = pool_entry->s.seg_size;
+	pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
+			       pool_entry->s.pool_base_addr;
+	pinfo->slave.mdata_offset = 0;
+
+	pktio_entry->s.ipc.pinfo = pinfo;
+	pktio_entry->s.ipc.pool = pool;
+
+	ODP_DBG("Pre init... DONE.\n");
+
+	master_post_init(pktio_entry);
+
+	return 0;
+
+free_s_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_cons:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+	return -1;
+}
+
+static void _odp_ipc_export_pool(struct pktio_info *pinfo,
+				 odp_pool_t pool)
+{
+	pool_entry_t *pool_entry;
+
+	pool_entry = odp_pool_to_entry(pool);
+	if (pool_entry->s.blk_size != pinfo->shm_pkt_size)
+		ODP_ABORT("pktio for same name should have the same pool size\n");
+	if (pool_entry->s.buf_num != (unsigned)pinfo->shm_pool_bufs_num)
+		ODP_ABORT("pktio for same name should have the same pool size\n");
+
+	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
+		 pool_entry->s.name);
+	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
+				    pool_entry->s.pool_base_addr;
+}
+
+static void _verify_addr(void *addr, size_t size)
+{
+	char *x = addr;
+	unsigned int i;
+
+	for (i = 0; i < size; i++)
+		*x += 1;
+	for (i = 0; i < size; i++)
+		*x -= 1;
+}
+
+static void *_ipc_map_remote_pool(const char *name, size_t size)
+{
+	odp_shm_t shm;
+	void *addr;
+
+	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
+	shm = odp_shm_reserve(name,
+			      size,
+			      ODP_CACHE_LINE_SIZE,
+			      _ODP_SHM_PROC_NOCREAT);
+	if (shm == ODP_SHM_INVALID)
+		ODP_ABORT("unable map %s\n", name);
+
+	addr = odp_shm_addr(shm);
+	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
+		addr, (char *)addr + size, size, name);
+	_verify_addr(addr, size);
+	return addr;
+}
+
+static void *_ipc_shm_map(char *name, size_t size, int timeout)
+{
+	odp_shm_t shm;
+	int ret;
+
+	while (1) {
+		ret = _odp_shm_lookup_ipc(name);
+		if (!ret)
+			break;
+		ODP_DBG("Waiting for %s\n", name);
+		if (timeout <= 0)
+			return NULL;
+		timeout--;
+		sleep(1);
+	}
+
+	shm = odp_shm_reserve(name, size,
+			      ODP_CACHE_LINE_SIZE,
+			      _ODP_SHM_PROC_NOCREAT);
+	if (ODP_SHM_INVALID == shm)
+		ODP_ABORT("unable to map: %s\n", name);
+
+	return odp_shm_addr(shm);
+}
+
+static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t *pktio_entry,
+				 odp_pool_t pool)
+{
+	if (ODP_POOL_NAME_LEN != ODPH_RING_NAMESIZE)
+		ODP_ABORT("");
+
+	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r")))
+		ODP_ABORT("too big ipc name\n");
+
+	pktio_entry->s.ipc.pool = pool;
+	return 0;
+}
+
+static int slave_post_init(pktio_entry_t *pktio_entry)
+{
+	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
+			   sizeof(odph_ring_t);
+	struct pktio_info *pinfo;
+	void *ipc_pool_base;
+	odp_shm_t shm;
+	const char *dev = pktio_entry->s.name;
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	pktio_entry->s.ipc.m.prod  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc.m.prod) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		return -1;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.prod),
+		odph_ring_free_count(pktio_entry->s.ipc.m.prod));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	pktio_entry->s.ipc.m.cons = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc.m.cons) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_m_prod;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.m.cons),
+		odph_ring_free_count(pktio_entry->s.ipc.m.cons));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	pktio_entry->s.ipc.s.prod = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc.s.prod) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_m_cons;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.prod),
+		odph_ring_free_count(pktio_entry->s.ipc.s.prod));
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
+	pktio_entry->s.ipc.s.cons = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc.s.cons) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto free_s_prod;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc.s.cons),
+		odph_ring_free_count(pktio_entry->s.ipc.s.cons));
+
+	/* Get info about remote pool */
+	pinfo = _ipc_map_pool_info(pktio_entry, dev, _ODP_SHM_PROC_NOCREAT);
+
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
+					     pinfo->shm_pkt_pool_size);
+	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->mdata_offset;
+	pktio_entry->s.ipc.pkt_size = pinfo->shm_pkt_size;
+
+	/* @todo: to simplify in linux-generic implementation we create pool for
+	 * packets from IPC queue. On receive implementation copies packets to
+	 * that pool. Later we can try to reuse original pool without packets
+	 * copying. (pkt refcounts needs to be implemented).
+	 */
+	_odp_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
+
+	pktio_entry->s.ipc.ready = 1;
+	ODP_DBG("Post init... DONE.\n");
+	return 0;
+
+free_s_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_cons:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+free_m_prod:
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
+	shm = odp_shm_lookup(ipc_shm_name);
+	odp_shm_free(shm);
+	return -1;
+}
+
+static int _ipc_is_master(const char *dev)
+{
+	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
+	int ret = ODP_PKTIO_TYPE_IPC;
+	FILE *f;
+
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "/dev/shm/%s_m_prod", dev);
+	f = fopen(ipc_shm_name, "r");
+	if (f) {
+		ret = ODP_PKTIO_TYPE_IPC_SLAVE;
+		fclose(f);
+	}
+
+	ODP_DBG("checking file: %s, ret %d\n", ipc_shm_name, ret);
+	return ret;
+}
+
+static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
+			  pktio_entry_t *pktio_entry,
+			  const char *dev,
+			  odp_pool_t pool)
+{
+	int ret = -1;
+
+	if (strncmp(dev, "ipc", 3))
+		return -1;
+
+	pktio_entry->s.ipc.ready = 0;
+	pktio_entry->s.ipc.type = _ipc_is_master(dev);
+
+	switch (pktio_entry->s.ipc.type) {
+	case ODP_PKTIO_TYPE_IPC:
+		ODP_DBG("process %d is master\n", getpid());
+		ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
+		break;
+	case ODP_PKTIO_TYPE_IPC_SLAVE:
+		ODP_DBG("process %d is slave\n", getpid());
+		ret = _ipc_pktio_init_slave(dev, pktio_entry, pool);
+		break;
+	default:
+		ODP_ABORT("");
+	}
+
+	return ret;
+}
+
+static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
+				    uint32_t offset,
+				    uint32_t *seglen,
+				    uint32_t limit)
+{
+	int seg_index  = offset / buf->segsize;
+	int seg_offset = offset % buf->segsize;
+	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
+
+	if (seglen) {
+		uint32_t buf_left = limit - offset;
+		*seglen = seg_offset + buf_left <= buf->segsize ?
+			buf_left : buf->segsize - seg_offset;
+	}
+
+	return (void *)(seg_offset + (uint8_t *)addr);
+}
+
+static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
+				    uint32_t offset, uint32_t *seglen)
+{
+	if (offset > pkt_hdr->frame_len)
+		return NULL;
+
+	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
+			  pkt_hdr->headroom + offset, seglen,
+			  pkt_hdr->headroom + pkt_hdr->frame_len);
+}
+
+int ipc_pktio_recv(pktio_entry_t *pktio_entry,
+		   odp_packet_t pkt_table[], unsigned len)
+{
+	int pkts = 0;
+	int i;
+	odph_ring_t *r;	  /* link to ring to receive from */
+	odph_ring_t *r_p; /* link to ring with produced packes */
+	odph_ring_t *r_p_send; /* link to produced packets while send */
+	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
+	void **ipcbufs_p = (void *)&remote_pkts;
+
+	if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC) {
+		if (!pktio_entry->s.ipc.ready) {
+			if (master_post_init(pktio_entry))
+				return 0;
+		}
+		r = pktio_entry->s.ipc.s.prod;
+		r_p = pktio_entry->s.ipc.s.cons;
+		r_p_send = pktio_entry->s.ipc.m.cons;
+	} else if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
+		if (!pktio_entry->s.ipc.ready) {
+			if (slave_post_init(pktio_entry))
+				return 0;
+		}
+		r = pktio_entry->s.ipc.m.prod;
+		r_p = pktio_entry->s.ipc.m.cons;
+		r_p_send = pktio_entry->s.ipc.s.cons;
+	} else {
+		ODP_ABORT("wrong type: %d\n", pktio_entry->s.ipc.type);
+	}
+
+	/* Free already processed packets, for send */
+	while (1) {
+		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+		int ret;
+		void **rbuf_p;
+
+		rbuf_p = (void *)&r_p_pkts;
+		ret = odph_ring_mc_dequeue_burst(r_p_send, rbuf_p,
+						 PKTIO_IPC_ENTRIES);
+		if (0 == ret)
+			break;
+		for (i = 0; i < ret; i++) {
+			if (r_p_pkts[i] != ODP_PACKET_INVALID)
+				odp_packet_free(r_p_pkts[i]);
+		}
+	}
+
+	pkts = odph_ring_mc_dequeue_burst(r, ipcbufs_p, len);
+	if (odp_unlikely(pkts < 0))
+		ODP_ABORT("error to dequeue no packets\n");
+
+	/* fast path */
+	if (odp_likely(0 == pkts))
+		return 0;
+
+	for (i = 0; i < pkts; i++) {
+		odp_pool_t pool;
+		odp_packet_t pkt;
+		odp_packet_hdr_t phdr;
+		void *ptr;
+		odp_buffer_bits_t handle;
+		int idx; /* Remote packet has coded pool and index.
+			  * We need only index.*/
+		void *pkt_data;
+		void *remote_pkt_data;
+
+		if (remote_pkts[i] == ODP_PACKET_INVALID)
+			continue;
+
+		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
+		idx = handle.index;
+
+		/* Link to packed data. To this line we have Zero-Copy between
+		 * processes, to simplify use packet copy in that version which
+		 * can be removed later with more advance buffer management
+		 * (ref counters).
+		 */
+		/* reverse odp_buf_to_hdr() */
+		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
+		      (idx * ODP_CACHE_LINE_SIZE);
+		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
+
+		/* Allocate new packet. Select*/
+		pool = pktio_entry->s.ipc.pool;
+		if (odp_unlikely(pool == ODP_POOL_INVALID))
+			ODP_ABORT("invalid pool");
+
+		pkt = odp_packet_alloc(pool, phdr.frame_len);
+		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
+			/* Original pool might be smaller then
+			*  PKTIO_IPC_ENTRIES. If packet can not be
+			 * allocated from pool at this time,
+			 * simple get in on next recv() call.
+			 */
+			if (i == 0)
+				return 0;
+			break;
+		}
+
+		/* Copy packet data. */
+		pkt_data = odp_packet_data(pkt);
+		if (odp_unlikely(!pkt_data))
+			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
+				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.ipc.type));
+
+		remote_pkt_data =  _ipc_packet_map(ptr, 0, NULL);
+		if (odp_unlikely(!remote_pkt_data))
+			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
+				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.ipc.type));
+
+		/* @todo fix copy packet!!! */
+		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+
+		/* Copy packets L2, L3 parsed offsets and size */
+		copy_packet_parser_metadata(&phdr, odp_packet_hdr(pkt));
+
+		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
+		odp_packet_hdr(pkt)->headroom = phdr.headroom;
+		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
+		pkt_table[i] = pkt;
+	}
+
+	/* Now tell other process that we no longer need that buffers.*/
+	pkts = odph_ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+	if (odp_unlikely(pkts < 0))
+		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+
+	return pkts;
+}
+
+int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		   unsigned len)
+{
+	odph_ring_t *r;
+	odph_ring_t *r_p;
+	void **rbuf_p;
+	int ret;
+	unsigned i;
+
+	if (!pktio_entry->s.ipc.ready)
+		return 0;
+
+	if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
+		r = pktio_entry->s.ipc.s.prod;
+		r_p = pktio_entry->s.ipc.s.cons;
+	} else if (pktio_entry->s.ipc.type == ODP_PKTIO_TYPE_IPC) {
+		r = pktio_entry->s.ipc.m.prod;
+		r_p = pktio_entry->s.ipc.m.cons;
+	} else {
+		ODP_ABORT("wrong type: %d\n", pktio_entry->s.ipc.type);
+	}
+
+	/* Free already processed packets, if any */
+	while (1) {
+		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+
+		rbuf_p = (void *)&r_p_pkts;
+		ret = odph_ring_mc_dequeue_burst(r_p, rbuf_p,
+						 PKTIO_IPC_ENTRIES);
+		if (0 == ret)
+			break;
+		for (i = 0; i < (unsigned)ret; i++) {
+			if (r_p_pkts[i] != ODP_PACKET_INVALID)
+				odp_packet_free(r_p_pkts[i]);
+		}
+	}
+
+	/* Prepare packets: calculate offset from address. */
+	for (i = 0; i < len; i++) {
+		int j;
+		odp_packet_t pkt =  pkt_table[i];
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		odp_buffer_bits_t handle;
+		uint32_t cur_mapped_pool_id =
+			 pool_handle_to_index(pktio_entry->s.ipc.pool);
+		uint32_t pool_id;
+
+		/* do copy if packet was allocated from not mapped pool */
+		handle.handle = _odp_packet_to_buffer(pkt);
+		pool_id = handle.pool_id;
+		if (pool_id != cur_mapped_pool_id) {
+			odp_packet_t newpkt;
+
+			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
+			if (newpkt == ODP_PACKET_INVALID)
+				ODP_ABORT("Unable to copy packet\n");
+
+			odp_packet_free(pkt);
+			pkt_table[i] = newpkt;
+		}
+
+		rbuf_p = (void *)&pkt;
+
+		/* buf_hdr.addr can not be used directly in remote process,
+		 * convert it to offset
+		 */
+		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
+			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
+				(char *)pkt_hdr->buf_hdr.addr[j];
+		}
+	}
+
+	/* Put packets to ring to be processed in other process. */
+	rbuf_p = (void *)&pkt_table[0];
+	ret = odph_ring_mp_enqueue_burst(r, rbuf_p, len);
+	if (odp_unlikely(ret < 0)) {
+		ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
+			getpid(),
+			(ODP_PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type),
+			ret);
+		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
+			odph_ring_full(r), odph_ring_count(r),
+			odph_ring_free_count(r));
+	}
+
+	return ret;
+}
+
+static int ipc_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
+{
+	/* mtu not limited, pool settings are used. */
+	return (9 * 1024);
+}
+
+static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
+			    void *mac_addr)
+{
+	memcpy(mac_addr, pktio_ipc_mac, ETH_ALEN);
+	return ETH_ALEN;
+}
+
+const pktio_if_ops_t ipc_pktio_ops = {
+	.init = NULL,
+	.term = NULL,
+	.open = ipc_pktio_open,
+	.close = NULL,
+	.recv =  ipc_pktio_recv,
+	.send = ipc_pktio_send,
+	.mtu_get = ipc_mtu_get,
+	.promisc_mode_set = NULL,
+	.promisc_mode_get = NULL,
+	.mac_get = ipc_mac_addr_get
+};
diff --git a/platform/linux-generic/pktio/ring.c b/platform/linux-generic/pktio/ring.c
new file mode 120000
index 0000000..d35c589
--- /dev/null
+++ b/platform/linux-generic/pktio/ring.c
@@ -0,0 +1 @@ 
+../../../helper/ring.c