diff mbox

[API-NEXT,4/4] linux-gen: pktio ipc: make it work again

Message ID 1481313005-9281-5-git-send-email-maxim.uvarov@linaro.org
State Superseded
Headers show

Commit Message

Maxim Uvarov Dec. 9, 2016, 7:50 p.m. UTC
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

---
 .../linux-generic/include/odp_buffer_internal.h    |  10 +-
 platform/linux-generic/include/odp_internal.h      |   1 -
 .../include/odp_packet_io_ipc_internal.h           |  27 +-
 platform/linux-generic/odp_init.c                  |   5 +-
 platform/linux-generic/pktio/ipc.c                 | 488 +++++++++------------
 5 files changed, 236 insertions(+), 295 deletions(-)

-- 
2.7.1.250.gff4ea60
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 4e75908..d0eb597 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -64,6 +64,11 @@  struct odp_buffer_hdr_t {
 	struct {
 		void     *hdr;
 		uint8_t  *data;
+#ifdef _ODP_PKTIO_IPC
+	/* ipc mapped process can not walk over pointers,
+	 * offset has to be used */
+		uint64_t ipc_data_offset;
+#endif
 		uint32_t  len;
 	} seg[CONFIG_PACKET_MAX_SEGS];
 
@@ -101,11 +106,6 @@  struct odp_buffer_hdr_t {
 		queue_entry_t   *target_qe;  /* ordered queue target */
 		uint64_t         sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE];
 	};
-#ifdef _ODP_PKTIO_IPC
-	/* ipc mapped process can not walk over pointers,
-	 * offset has to be used */
-	uint64_t		 ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];
-#endif
 
 	/* Data or next header */
 	uint8_t data[0];
diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h
index 6a80d9d..b313b1f 100644
--- a/platform/linux-generic/include/odp_internal.h
+++ b/platform/linux-generic/include/odp_internal.h
@@ -50,7 +50,6 @@  struct odp_global_data_s {
 	odp_cpumask_t control_cpus;
 	odp_cpumask_t worker_cpus;
 	int num_cpus_installed;
-	int ipc_ns;
 };
 
 enum init_stage {
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
index 851114d..7cd2948 100644
--- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -26,22 +26,31 @@ 
  */
 struct pktio_info {
 	struct {
-		/* number of buffer in remote pool */
-		int shm_pool_bufs_num;
-		/* size of remote pool */
-		size_t shm_pkt_pool_size;
+		/* number of buffer*/
+		int num;
 		/* size of packet/segment in remote pool */
-		uint32_t shm_pkt_size;
+		uint32_t block_size;
 		/* offset from shared memory block start
-		 * to pool_mdata_addr (odp-linux pool specific) */
-		size_t mdata_offset;
+		 * to pool *base_addr in remote process.
+		 * (odp-linux pool specific) */
+		size_t base_addr_offset;
 		char pool_name[ODP_POOL_NAME_LEN];
+		/* 1 if master finished creation of all shared objects */
+		int init_done;
 	} master;
 	struct {
 		/* offset from shared memory block start
-		 * to pool_mdata_addr in remote process.
+		 * to pool *base_addr in remote process.
 		 * (odp-linux pool specific) */
-		size_t mdata_offset;
+		size_t base_addr_offset;
+		void *base_addr;
+		uint32_t block_size;
 		char pool_name[ODP_POOL_NAME_LEN];
+		/* pid of the slave process written to shm and
+		 * used by master to look up memory created by
+		 * slave
+		 */
+		int pid;
+		int init_done;
 	} slave;
 } ODP_PACKED;
diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c
index d40a83c..1b0d8f8 100644
--- a/platform/linux-generic/odp_init.c
+++ b/platform/linux-generic/odp_init.c
@@ -67,7 +67,7 @@  static int cleanup_files(const char *dirpath, int odp_pid)
 
 int odp_init_global(odp_instance_t *instance,
 		    const odp_init_t *params,
-		    const odp_platform_init_t *platform_params)
+		    const odp_platform_init_t *platform_params ODP_UNUSED)
 {
 	char *hpdir;
 
@@ -75,9 +75,6 @@  int odp_init_global(odp_instance_t *instance,
 	odp_global_data.main_pid = getpid();
 	cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);
 
-	if (platform_params)
-		odp_global_data.ipc_ns = platform_params->ipc_ns;
-
 	enum init_stage stage = NO_INIT;
 	odp_global_data.log_fn = odp_override_log;
 	odp_global_data.abort_fn = odp_override_abort;
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
index 0e99c6e..5e5e3b2 100644
--- a/platform/linux-generic/pktio/ipc.c
+++ b/platform/linux-generic/pktio/ipc.c
@@ -3,150 +3,84 @@ 
  *
  * SPDX-License-Identifier:     BSD-3-Clause
  */
-#ifdef _ODP_PKTIO_IPC
 #include <odp_packet_io_ipc_internal.h>
 #include <odp_debug_internal.h>
 #include <odp_packet_io_internal.h>
 #include <odp/api/system_info.h>
 #include <odp_shm_internal.h>
+#include <_ishm_internal.h>
 
 #include <sys/mman.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#define IPC_ODP_DEBUG_PRINT 0
+
+#define IPC_ODP_DBG(fmt, ...) \
+	do { \
+		if (IPC_ODP_DEBUG_PRINT == 1) \
+			ODP_DBG(fmt, ##__VA_ARGS__);\
+	} while (0)
+
 /* MAC address for the "ipc" interface */
 static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
 
-static void *_ipc_map_remote_pool(const char *name, size_t size);
+static void *_ipc_map_remote_pool(const char *name, int pid);
 
 static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
 {
-	pool_entry_t *pool;
-	uint32_t pool_id;
+	pool_t *pool;
 	odp_shm_t shm;
 	odp_shm_info_t info;
 
-	pool_id = pool_handle_to_index(pool_hdl);
-	pool    = get_pool_entry(pool_id);
-	shm = pool->s.pool_shm;
+	pool    = pool_entry_from_hdl(pool_hdl);
+	shm = pool->shm;
 
 	odp_shm_info(shm, &info);
 
 	return info.name;
 }
 
-/**
-* Look up for shared memory object.
-*
-* @param name   name of shm object
-*
-* @return 0 on success, otherwise non-zero
-*/
-static int _ipc_shm_lookup(const char *name)
-{
-	int shm;
-	char shm_devname[SHM_DEVNAME_MAXLEN];
-
-	if (!odp_global_data.ipc_ns)
-		ODP_ABORT("ipc_ns not set\n");
-
-	snprintf(shm_devname, SHM_DEVNAME_MAXLEN,
-		 SHM_DEVNAME_FORMAT,
-		 odp_global_data.ipc_ns, name);
-
-	shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);
-	if (shm == -1) {
-		if (errno == ENOENT) {
-			ODP_DBG("no file %s\n", shm_devname);
-			return -1;
-		}
-		ODP_ABORT("shm_open for %s err %s\n",
-			  shm_devname, strerror(errno));
-	}
-	close(shm);
-	return 0;
-}
-
-static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
-			       const char *dev,
-			       int *slave)
-{
-	struct pktio_info *pinfo;
-	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
-	uint32_t flags;
-	odp_shm_t shm;
-
-	/* Create info about remote pktio */
-	snprintf(name, sizeof(name), "%s_info", dev);
-
-	flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
-
-	shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-			      ODP_CACHE_LINE_SIZE,
-			      flags);
-	if (ODP_SHM_INVALID != shm) {
-		pinfo = odp_shm_addr(shm);
-		pinfo->master.pool_name[0] = 0;
-		*slave = 0;
-	} else {
-		flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
-		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-				      ODP_CACHE_LINE_SIZE,
-				      flags);
-		if (ODP_SHM_INVALID == shm)
-			ODP_ABORT("can not connect to shm\n");
-
-		pinfo = odp_shm_addr(shm);
-		*slave = 1;
-	}
-
-	pktio_entry->s.ipc.pinfo = pinfo;
-	pktio_entry->s.ipc.pinfo_shm = shm;
-
-	return 0;
-}
-
 static int _ipc_master_start(pktio_entry_t *pktio_entry)
 {
 	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
-	int ret;
 	void *ipc_pool_base;
 
-	if (pinfo->slave.mdata_offset == 0)
+	if (pinfo->slave.init_done == 0)
 		return -1;
 
-	ret = _ipc_shm_lookup(pinfo->slave.pool_name);
-	if (ret) {
-		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
-		return -1;
-	}
-
 	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
-					     pinfo->master.shm_pkt_pool_size);
+					     pinfo->slave.pid);
+	if (ipc_pool_base == NULL) {
+		ODP_DBG("no pool file %s for pid %d\n",
+			pinfo->slave.pool_name, pinfo->slave.pid);
+		return -1;
+	}
+
+	pktio_entry->s.ipc.pool_base = ipc_pool_base;
 	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-					     pinfo->slave.mdata_offset;
+					     pinfo->slave.base_addr_offset;
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
 
-	ODP_DBG("%s started.\n",  pktio_entry->s.name);
+	IPC_ODP_DBG("%s started.\n",  pktio_entry->s.name);
 	return 0;
 }
 
 static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			    const char *dev,
-			    odp_pool_t pool)
+			    odp_pool_t pool_hdl)
 {
 	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];
-	pool_entry_t *pool_entry;
-	uint32_t pool_id;
+	pool_t *pool;
 	struct pktio_info *pinfo;
 	const char *pool_name;
 
-	pool_id = pool_handle_to_index(pool);
-	pool_entry    = get_pool_entry(pool_id);
+	pool = pool_entry_from_hdl(pool_hdl);
+	(void)pool;
 
 	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {
-		ODP_DBG("too big ipc name\n");
+		ODP_ERR("too big ipc name\n");
 		return -1;
 	}
 
@@ -158,7 +92,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.tx.send) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		return -1;
 	}
@@ -174,7 +108,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.tx.free) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_m_prod;
 	}
@@ -187,7 +121,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.rx.recv) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_m_cons;
 	}
@@ -200,7 +134,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.rx.free) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_s_prod;
 	}
@@ -210,24 +144,23 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 
 	/* Set up pool name for remote info */
 	pinfo = pktio_entry->s.ipc.pinfo;
-	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+	pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);
 	if (strlen(pool_name) > ODP_POOL_NAME_LEN) {
-		ODP_DBG("pid %d ipc pool name %s is too big %d\n",
+		ODP_ERR("pid %d ipc pool name %s is too big %d\n",
 			getpid(), pool_name, strlen(pool_name));
 		goto free_s_prod;
 	}
 
 	memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
-	pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
-	pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
-	pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
-	pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
-			       pool_entry->s.pool_base_addr;
-	pinfo->slave.mdata_offset = 0;
+	pinfo->slave.base_addr_offset = 0;
+	pinfo->slave.base_addr = 0;
+	pinfo->slave.pid = 0;
+	pinfo->slave.init_done = 0;
 
-	pktio_entry->s.ipc.pool = pool;
+	pktio_entry->s.ipc.pool = pool_hdl;
 
 	ODP_DBG("Pre init... DONE.\n");
+	pinfo->master.init_done = 1;
 
 	_ipc_master_start(pktio_entry);
 
@@ -246,55 +179,45 @@  free_m_prod:
 }
 
 static void _ipc_export_pool(struct pktio_info *pinfo,
-			     odp_pool_t pool)
+			     odp_pool_t pool_hdl)
 {
-	pool_entry_t *pool_entry;
-
-	pool_entry = odp_pool_to_entry(pool);
-	if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
-		ODP_ABORT("pktio for same name should have the same pool size\n");
-	if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
-		ODP_ABORT("pktio for same name should have the same pool size\n");
+	pool_t *pool = pool_entry_from_hdl(pool_hdl);
 
 	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
-		 pool_entry->s.name);
-	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
-				    pool_entry->s.pool_base_addr;
+		 _ipc_odp_buffer_pool_shm_name(pool_hdl));
+	pinfo->slave.pid = odp_global_data.main_pid;
+	pinfo->slave.block_size = pool->block_size;
+	pinfo->slave.base_addr = pool->base_addr;
 }
 
-static void *_ipc_map_remote_pool(const char *name, size_t size)
+static void *_ipc_map_remote_pool(const char *name, int pid)
 {
 	odp_shm_t shm;
 	void *addr;
+	char rname[ODP_SHM_NAME_LEN];
 
-	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
-	shm = odp_shm_reserve(name,
-			      size,
-			      ODP_CACHE_LINE_SIZE,
-			      _ODP_SHM_PROC_NOCREAT);
-	if (shm == ODP_SHM_INVALID)
-		ODP_ABORT("unable map %s\n", name);
+	snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);
+	shm = odp_shm_import(name, pid, rname);
+	if (shm == ODP_SHM_INVALID) {
+		ODP_ERR("unable map %s\n", name);
+		return NULL;
+	}
 
 	addr = odp_shm_addr(shm);
-	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
-		addr, (char *)addr + size, size, name);
+
+	IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);
 	return addr;
 }
 
-static void *_ipc_shm_map(char *name, size_t size)
+static void *_ipc_shm_map(char *name, int pid)
 {
 	odp_shm_t shm;
-	int ret;
 
-	ret = _ipc_shm_lookup(name);
-	if (ret == -1)
+	shm = odp_shm_import(name, pid, name);
+	if (ODP_SHM_INVALID == shm) {
+		ODP_ERR("unable to map: %s\n", name);
 		return NULL;
-
-	shm = odp_shm_reserve(name, size,
-			      ODP_CACHE_LINE_SIZE,
-			      _ODP_SHM_PROC_NOCREAT);
-	if (ODP_SHM_INVALID == shm)
-		ODP_ABORT("unable to map: %s\n", name);
+	}
 
 	return odp_shm_addr(shm);
 }
@@ -313,15 +236,22 @@  static int _ipc_init_slave(const char *dev,
 static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 {
 	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
-	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
-			   sizeof(_ring_t);
 	struct pktio_info *pinfo;
 	void *ipc_pool_base;
 	odp_shm_t shm;
-	const char *dev = pktio_entry->s.name;
+	char tail[ODP_POOL_NAME_LEN];
+	char dev[ODP_POOL_NAME_LEN];
+	int pid;
+
+	if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {
+		ODP_ERR("wrong pktio name\n");
+		return -1;
+	}
+
+	sprintf(dev, "ipc:%s", tail);
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
-	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.rx.recv) {
 		ODP_DBG("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
@@ -333,9 +263,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.rx.recv));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
-	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.rx.free) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_m_prod;
 	}
@@ -344,9 +274,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.rx.free));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
-	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.tx.send) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_m_cons;
 	}
@@ -355,9 +285,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.tx.send));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
-	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.tx.free) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_s_prod;
 	}
@@ -368,14 +298,15 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 	/* Get info about remote pool */
 	pinfo = pktio_entry->s.ipc.pinfo;
 	ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
-					     pinfo->master.shm_pkt_pool_size);
+					     pid);
 	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-					     pinfo->master.mdata_offset;
-	pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
+					     pinfo->master.base_addr_offset;
+	pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;
 
 	_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+	pinfo->slave.init_done = 1;
 
 	ODP_DBG("%s started.\n",  pktio_entry->s.name);
 	return 0;
@@ -401,7 +332,11 @@  static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 			  odp_pool_t pool)
 {
 	int ret = -1;
-	int slave;
+	int pid ODP_UNUSED;
+	struct pktio_info *pinfo;
+	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+	char tail[ODP_POOL_NAME_LEN];
+	odp_shm_t shm;
 
 	ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,
 			  "mismatch pool and ring name arrays");
@@ -411,65 +346,59 @@  static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 
 	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
 
-	_ipc_map_pktio_info(pktio_entry, dev, &slave);
-	pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
-						 PKTIO_TYPE_IPC_SLAVE;
+	/* Shared info about remote pktio */
+	if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
+		pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
 
-	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+		snprintf(name, sizeof(name), "ipc:%s_info", tail);
+		IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);
+		shm = odp_shm_import(name, pid, name);
+		if (ODP_SHM_INVALID == shm)
+			return -1;
+		pinfo = odp_shm_addr(shm);
+
+		if (!pinfo->master.init_done) {
+			odp_shm_free(shm);
+			return -1;
+		}
+		pktio_entry->s.ipc.pinfo = pinfo;
+		pktio_entry->s.ipc.pinfo_shm = shm;
+		ODP_DBG("process %d is slave\n", getpid());
+		ret = _ipc_init_slave(name, pktio_entry, pool);
+	} else {
+		pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;
+		snprintf(name, sizeof(name), "%s_info", dev);
+		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+				      ODP_CACHE_LINE_SIZE,
+				      _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);
+		if (ODP_SHM_INVALID == shm) {
+			ODP_ERR("can not create shm %s\n", name);
+			return -1;
+		}
+
+		pinfo = odp_shm_addr(shm);
+		pinfo->master.init_done = 0;
+		pinfo->master.pool_name[0] = 0;
+		pktio_entry->s.ipc.pinfo = pinfo;
+		pktio_entry->s.ipc.pinfo_shm = shm;
 		ODP_DBG("process %d is master\n", getpid());
 		ret = _ipc_init_master(pktio_entry, dev, pool);
-	} else {
-		ODP_DBG("process %d is slave\n", getpid());
-		ret = _ipc_init_slave(dev, pktio_entry, pool);
 	}
 
 	return ret;
 }
 
-static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
-				    uint32_t offset,
-				    uint32_t *seglen,
-				    uint32_t limit)
+static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)
 {
-	int seg_index  = offset / buf->segsize;
-	int seg_offset = offset % buf->segsize;
-#ifdef _ODP_PKTIO_IPC
-	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
-#else
-	/** buf_hdr.ipc_addr_offset defined only when ipc is
-	 *  enabled. */
-	void *addr = NULL;
-
-	(void)seg_index;
-#endif
-	if (seglen) {
-		uint32_t buf_left = limit - offset;
-		*seglen = seg_offset + buf_left <= buf->segsize ?
-			buf_left : buf->segsize - seg_offset;
-	}
-
-	return (void *)(seg_offset + (uint8_t *)addr);
-}
-
-static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
-				    uint32_t offset, uint32_t *seglen)
-{
-	if (offset > pkt_hdr->frame_len)
-		return NULL;
-
-	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
-			  pkt_hdr->headroom + offset, seglen,
-			  pkt_hdr->headroom + pkt_hdr->frame_len);
-}
-
-static void _ipc_free_ring_packets(_ring_t *r)
-{
-	odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+	uint64_t offsets[PKTIO_IPC_ENTRIES];
 	int ret;
 	void **rbuf_p;
 	int i;
 
-	rbuf_p = (void *)&r_p_pkts;
+	if (!r)
+		return;
+
+	rbuf_p = (void *)&offsets;
 
 	while (1) {
 		ret = _ring_mc_dequeue_burst(r, rbuf_p,
@@ -477,8 +406,13 @@  static void _ipc_free_ring_packets(_ring_t *r)
 		if (0 == ret)
 			break;
 		for (i = 0; i < ret; i++) {
-			if (r_p_pkts[i] != ODP_PACKET_INVALID)
-				odp_packet_free(r_p_pkts[i]);
+			odp_packet_hdr_t *phdr;
+			odp_packet_t pkt;
+			void *mbase = pktio_entry->s.ipc.pool_mdata_base;
+
+			phdr = (void *)((uint8_t *)mbase + offsets[i]);
+			pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;
+			odp_packet_free(pkt);
 		}
 	}
 }
@@ -490,22 +424,23 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 	int i;
 	_ring_t *r;
 	_ring_t *r_p;
+	uint64_t offsets[PKTIO_IPC_ENTRIES];
+	void **ipcbufs_p = (void *)&offsets;
+	uint32_t ready;
+	int pkts_ring;
 
-	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
-	void **ipcbufs_p = (void *)&remote_pkts;
-	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
-
+	ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
 	if (odp_unlikely(!ready)) {
-		ODP_DBG("start pktio is missing before usage?\n");
+		IPC_ODP_DBG("start pktio is missing before usage?\n");
 		return -1;
 	}
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
 	r = pktio_entry->s.ipc.rx.recv;
 	pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
 	if (odp_unlikely(pkts < 0))
-		ODP_ABORT("error to dequeue no packets\n");
+		ODP_ABORT("internal error dequeue\n");
 
 	/* fast path */
 	if (odp_likely(0 == pkts))
@@ -514,36 +449,21 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 	for (i = 0; i < pkts; i++) {
 		odp_pool_t pool;
 		odp_packet_t pkt;
-		odp_packet_hdr_t phdr;
-		void *ptr;
-		odp_buffer_bits_t handle;
-		int idx; /* Remote packet has coded pool and index.
-			  * We need only index.*/
+		odp_packet_hdr_t *phdr;
 		void *pkt_data;
-		void *remote_pkt_data;
+		uint64_t data_pool_off;
+		void *rmt_data_ptr;
 
-		if (remote_pkts[i] == ODP_PACKET_INVALID)
-			continue;
+		phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+		       offsets[i]);
 
-		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
-		idx = handle.index;
-
-		/* Link to packed data. To this line we have Zero-Copy between
-		 * processes, to simplify use packet copy in that version which
-		 * can be removed later with more advance buffer management
-		 * (ref counters).
-		 */
-		/* reverse odp_buf_to_hdr() */
-		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
-		      (idx * ODP_CACHE_LINE_SIZE);
-		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
-
-		/* Allocate new packet. Select*/
 		pool = pktio_entry->s.ipc.pool;
 		if (odp_unlikely(pool == ODP_POOL_INVALID))
 			ODP_ABORT("invalid pool");
 
-		pkt = odp_packet_alloc(pool, phdr.frame_len);
+		data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;
+
+		pkt = odp_packet_alloc(pool, phdr->frame_len);
 		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
 			/* Original pool might be smaller then
 			*  PKTIO_IPC_ENTRIES. If packet can not be
@@ -562,30 +482,40 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 				  (PKTIO_TYPE_IPC_SLAVE ==
 					pktio_entry->s.ipc.type));
 
-		remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
-		if (odp_unlikely(!remote_pkt_data))
-			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
-				  (PKTIO_TYPE_IPC_SLAVE ==
-					pktio_entry->s.ipc.type));
-
 		/* Copy packet data from shared pool to local pool. */
-		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+		rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+			       data_pool_off;
+		memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);
 
 		/* Copy packets L2, L3 parsed offsets and size */
-		copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));
+		copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));
+
+		odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
+		odp_packet_hdr(pkt)->headroom = phdr->headroom;
+		odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
+
+		/* Take classification fields */
+		odp_packet_hdr(pkt)->p = phdr->p;
 
-		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
-		odp_packet_hdr(pkt)->headroom = phdr.headroom;
-		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
-		odp_packet_hdr(pkt)->input = pktio_entry->s.handle;
 		pkt_table[i] = pkt;
 	}
 
 	/* Now tell other process that we no longer need that buffers.*/
 	r_p = pktio_entry->s.ipc.rx.free;
-	pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+
+repeat:
+	pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);
 	if (odp_unlikely(pkts < 0))
 		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+	if (odp_unlikely(pkts != pkts_ring)) {
+		IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"
+			    " _ring_free_count %d\n",
+			    _ring_full(r_p), _ring_count(r_p),
+			    _ring_free_count(r_p));
+		ipcbufs_p = (void *)&offsets[pkts_ring - 1];
+		pkts = pkts - pkts_ring;
+		goto repeat;
+	}
 
 	return pkts;
 }
@@ -614,26 +544,23 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
 	odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be
 					      * in memory mapped pool. */
+	uint64_t offsets[len];
 
 	if (odp_unlikely(!ready))
 		return 0;
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-	/* Prepare packets: calculate offset from address. */
+	/* Copy packets to shm shared pool if they are in different */
 	for (i = 0; i < len; i++) {
-		int j;
 		odp_packet_t pkt =  pkt_table[i];
-		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);
 		odp_buffer_bits_t handle;
-		uint32_t cur_mapped_pool_id =
-			 pool_handle_to_index(pktio_entry->s.ipc.pool);
-		uint32_t pool_id;
+		uint32_t pkt_pool_id;
 
-		/* do copy if packet was allocated from not mapped pool */
 		handle.handle = _odp_packet_to_buffer(pkt);
-		pool_id = handle.pool_id;
-		if (pool_id != cur_mapped_pool_id) {
+		pkt_pool_id = handle.pool_id;
+		if (pkt_pool_id != ipc_pool->pool_idx) {
 			odp_packet_t newpkt;
 
 			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
@@ -645,24 +572,30 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 		} else {
 			pkt_table_mapped[i] = pkt;
 		}
+	}
 
-		/* buf_hdr.addr can not be used directly in remote process,
-		 * convert it to offset
-		 */
-		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
-#ifdef _ODP_PKTIO_IPC
-			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
-				(char *)pkt_hdr->buf_hdr.addr[j];
-#else
-			/** buf_hdr.ipc_addr_offset defined only when ipc is
-			 *  enabled. */
-			(void)pkt_hdr;
-#endif
-		}
+	/* Set offset to phdr for outgoing packets */
+	for (i = 0; i < len; i++) {
+		uint64_t data_pool_off;
+		odp_packet_t pkt = pkt_table_mapped[i];
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		odp_pool_t pool_hdl = odp_packet_pool(pkt);
+		pool_t *pool = pool_entry_from_hdl(pool_hdl);
+
+		offsets[i] = (uint8_t *)pkt_hdr -
+			     (uint8_t *)odp_shm_addr(pool->shm);
+		data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -
+				(uint8_t *)odp_shm_addr(pool->shm);
+		pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;
+		IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"
+			    "phdr = %p, offset %x\n",
+			    i, len,
+			    odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),
+			    pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);
 	}
 
 	/* Put packets to ring to be processed by other process. */
-	rbuf_p = (void *)&pkt_table_mapped[0];
+	rbuf_p = (void *)&offsets[0];
 	r = pktio_entry->s.ipc.tx.send;
 	ret = _ring_mp_enqueue_burst(r, rbuf_p, len);
 	if (odp_unlikely(ret < 0)) {
@@ -673,6 +606,7 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n",
 			_ring_full(r), _ring_count(r),
 			_ring_free_count(r));
+		ODP_ABORT("Unexpected!\n");
 	}
 
 	return ret;
@@ -722,22 +656,25 @@  static int ipc_start(pktio_entry_t *pktio_entry)
 
 static int ipc_stop(pktio_entry_t *pktio_entry)
 {
-	unsigned tx_send, tx_free;
+	unsigned tx_send = 0, tx_free = 0;
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
+	if (pktio_entry->s.ipc.tx.send)
+		_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);
 	/* other process can transfer packets from one ring to
 	 * other, use delay here to free that packets. */
 	sleep(1);
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	if (pktio_entry->s.ipc.tx.free)
+		_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-	tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
-	tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
+	if (pktio_entry->s.ipc.tx.send)
+		tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
+	if (pktio_entry->s.ipc.tx.free)
+		tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
 	if (tx_send | tx_free) {
 		ODP_DBG("IPC rings: tx send %d tx free %d\n",
-			_ring_free_count(pktio_entry->s.ipc.tx.send),
-			_ring_free_count(pktio_entry->s.ipc.tx.free));
+			tx_send, tx_free);
 	}
 
 	return 0;
@@ -795,4 +732,3 @@  const pktio_if_ops_t ipc_pktio_ops = {
 	.pktin_ts_from_ns = NULL,
 	.config = NULL
 };
-#endif