diff mbox

[API-NEXT,PATCHv3,4/5] linux-gen: pktio ipc: make it work again

Message ID 1481745477-6137-5-git-send-email-maxim.uvarov@linaro.org
State Accepted
Commit 101e8188088b91e8d85e0fef0d6674dae05c306e
Headers show

Commit Message

Maxim Uvarov Dec. 14, 2016, 7:57 p.m. UTC
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

---
 .../linux-generic/include/odp_buffer_internal.h    |  10 +-
 platform/linux-generic/include/odp_internal.h      |   1 -
 .../linux-generic/include/odp_packet_io_internal.h |   2 +
 .../include/odp_packet_io_ipc_internal.h           |  27 +-
 platform/linux-generic/odp_init.c                  |   5 +-
 platform/linux-generic/pktio/ipc.c                 | 534 +++++++++------------
 test/linux-generic/pktio_ipc/pktio_ipc_run.sh      |  35 +-
 7 files changed, 289 insertions(+), 325 deletions(-)

-- 
2.7.1.250.gff4ea60

Comments

Bill Fischofer Dec. 14, 2016, 9:31 p.m. UTC | #1
For the v3 series:

Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org>

On Wed, Dec 14, 2016 at 1:57 PM, Maxim Uvarov <maxim.uvarov@linaro.org> wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

> ---

>  .../linux-generic/include/odp_buffer_internal.h    |  10 +-

>  platform/linux-generic/include/odp_internal.h      |   1 -

>  .../linux-generic/include/odp_packet_io_internal.h |   2 +

>  .../include/odp_packet_io_ipc_internal.h           |  27 +-

>  platform/linux-generic/odp_init.c                  |   5 +-

>  platform/linux-generic/pktio/ipc.c                 | 534 +++++++++------------

>  test/linux-generic/pktio_ipc/pktio_ipc_run.sh      |  35 +-

>  7 files changed, 289 insertions(+), 325 deletions(-)

>

> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h

> index 2064f7c..903f0a7 100644

> --- a/platform/linux-generic/include/odp_buffer_internal.h

> +++ b/platform/linux-generic/include/odp_buffer_internal.h

> @@ -64,6 +64,11 @@ struct odp_buffer_hdr_t {

>         struct {

>                 void     *hdr;

>                 uint8_t  *data;

> +#ifdef _ODP_PKTIO_IPC

> +       /* ipc mapped process can not walk over pointers,

> +        * offset has to be used */

> +               uint64_t ipc_data_offset;

> +#endif

>                 uint32_t  len;

>         } seg[CONFIG_PACKET_MAX_SEGS];

>

> @@ -94,11 +99,6 @@ struct odp_buffer_hdr_t {

>         uint32_t                 uarea_size; /* size of user area */

>         uint32_t                 segcount;   /* segment count */

>         uint32_t                 segsize;    /* segment size */

> -#ifdef _ODP_PKTIO_IPC

> -       /* ipc mapped process can not walk over pointers,

> -        * offset has to be used */

> -       uint64_t                 ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];

> -#endif

>

>         /* Data or next header */

>         uint8_t data[0];

> diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h

> index 6a80d9d..b313b1f 100644

> --- a/platform/linux-generic/include/odp_internal.h

> +++ b/platform/linux-generic/include/odp_internal.h

> @@ -50,7 +50,6 @@ struct odp_global_data_s {

>         odp_cpumask_t control_cpus;

>         odp_cpumask_t worker_cpus;

>         int num_cpus_installed;

> -       int ipc_ns;

>  };

>

>  enum init_stage {

> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h

> index bdf6316..2001c42 100644

> --- a/platform/linux-generic/include/odp_packet_io_internal.h

> +++ b/platform/linux-generic/include/odp_packet_io_internal.h

> @@ -102,6 +102,8 @@ typedef     struct {

>                                      packet, 0 - not yet ready */

>         void *pinfo;

>         odp_shm_t pinfo_shm;

> +       odp_shm_t remote_pool_shm; /**< shm of remote pool get with

> +                                       _ipc_map_remote_pool() */

>  } _ipc_pktio_t;

>

>  struct pktio_entry {

> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h

> index 851114d..7cd2948 100644

> --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h

> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h

> @@ -26,22 +26,31 @@

>   */

>  struct pktio_info {

>         struct {

> -               /* number of buffer in remote pool */

> -               int shm_pool_bufs_num;

> -               /* size of remote pool */

> -               size_t shm_pkt_pool_size;

> +               /* number of buffer*/

> +               int num;

>                 /* size of packet/segment in remote pool */

> -               uint32_t shm_pkt_size;

> +               uint32_t block_size;

>                 /* offset from shared memory block start

> -                * to pool_mdata_addr (odp-linux pool specific) */

> -               size_t mdata_offset;

> +                * to pool *base_addr in remote process.

> +                * (odp-linux pool specific) */

> +               size_t base_addr_offset;

>                 char pool_name[ODP_POOL_NAME_LEN];

> +               /* 1 if master finished creation of all shared objects */

> +               int init_done;

>         } master;

>         struct {

>                 /* offset from shared memory block start

> -                * to pool_mdata_addr in remote process.

> +                * to pool *base_addr in remote process.

>                  * (odp-linux pool specific) */

> -               size_t mdata_offset;

> +               size_t base_addr_offset;

> +               void *base_addr;

> +               uint32_t block_size;

>                 char pool_name[ODP_POOL_NAME_LEN];

> +               /* pid of the slave process written to shm and

> +                * used by master to look up memory created by

> +                * slave

> +                */

> +               int pid;

> +               int init_done;

>         } slave;

>  } ODP_PACKED;

> diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c

> index d40a83c..1b0d8f8 100644

> --- a/platform/linux-generic/odp_init.c

> +++ b/platform/linux-generic/odp_init.c

> @@ -67,7 +67,7 @@ static int cleanup_files(const char *dirpath, int odp_pid)

>

>  int odp_init_global(odp_instance_t *instance,

>                     const odp_init_t *params,

> -                   const odp_platform_init_t *platform_params)

> +                   const odp_platform_init_t *platform_params ODP_UNUSED)

>  {

>         char *hpdir;

>

> @@ -75,9 +75,6 @@ int odp_init_global(odp_instance_t *instance,

>         odp_global_data.main_pid = getpid();

>         cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);

>

> -       if (platform_params)

> -               odp_global_data.ipc_ns = platform_params->ipc_ns;

> -

>         enum init_stage stage = NO_INIT;

>         odp_global_data.log_fn = odp_override_log;

>         odp_global_data.abort_fn = odp_override_abort;

> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c

> index 0e99c6e..5e47a33 100644

> --- a/platform/linux-generic/pktio/ipc.c

> +++ b/platform/linux-generic/pktio/ipc.c

> @@ -3,150 +3,85 @@

>   *

>   * SPDX-License-Identifier:     BSD-3-Clause

>   */

> -#ifdef _ODP_PKTIO_IPC

>  #include <odp_packet_io_ipc_internal.h>

>  #include <odp_debug_internal.h>

>  #include <odp_packet_io_internal.h>

>  #include <odp/api/system_info.h>

>  #include <odp_shm_internal.h>

> +#include <_ishm_internal.h>

>

>  #include <sys/mman.h>

>  #include <sys/stat.h>

>  #include <fcntl.h>

>

> +#define IPC_ODP_DEBUG_PRINT 0

> +

> +#define IPC_ODP_DBG(fmt, ...) \

> +       do { \

> +               if (IPC_ODP_DEBUG_PRINT == 1) \

> +                       ODP_DBG(fmt, ##__VA_ARGS__);\

> +       } while (0)

> +

>  /* MAC address for the "ipc" interface */

>  static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};

>

> -static void *_ipc_map_remote_pool(const char *name, size_t size);

> +static odp_shm_t _ipc_map_remote_pool(const char *name, int pid);

>

>  static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)

>  {

> -       pool_entry_t *pool;

> -       uint32_t pool_id;

> +       pool_t *pool;

>         odp_shm_t shm;

>         odp_shm_info_t info;

>

> -       pool_id = pool_handle_to_index(pool_hdl);

> -       pool    = get_pool_entry(pool_id);

> -       shm = pool->s.pool_shm;

> +       pool    = pool_entry_from_hdl(pool_hdl);

> +       shm = pool->shm;

>

>         odp_shm_info(shm, &info);

>

>         return info.name;

>  }

>

> -/**

> -* Look up for shared memory object.

> -*

> -* @param name   name of shm object

> -*

> -* @return 0 on success, otherwise non-zero

> -*/

> -static int _ipc_shm_lookup(const char *name)

> -{

> -       int shm;

> -       char shm_devname[SHM_DEVNAME_MAXLEN];

> -

> -       if (!odp_global_data.ipc_ns)

> -               ODP_ABORT("ipc_ns not set\n");

> -

> -       snprintf(shm_devname, SHM_DEVNAME_MAXLEN,

> -                SHM_DEVNAME_FORMAT,

> -                odp_global_data.ipc_ns, name);

> -

> -       shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);

> -       if (shm == -1) {

> -               if (errno == ENOENT) {

> -                       ODP_DBG("no file %s\n", shm_devname);

> -                       return -1;

> -               }

> -               ODP_ABORT("shm_open for %s err %s\n",

> -                         shm_devname, strerror(errno));

> -       }

> -       close(shm);

> -       return 0;

> -}

> -

> -static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,

> -                              const char *dev,

> -                              int *slave)

> -{

> -       struct pktio_info *pinfo;

> -       char name[ODP_POOL_NAME_LEN + sizeof("_info")];

> -       uint32_t flags;

> -       odp_shm_t shm;

> -

> -       /* Create info about remote pktio */

> -       snprintf(name, sizeof(name), "%s_info", dev);

> -

> -       flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;

> -

> -       shm = odp_shm_reserve(name, sizeof(struct pktio_info),

> -                             ODP_CACHE_LINE_SIZE,

> -                             flags);

> -       if (ODP_SHM_INVALID != shm) {

> -               pinfo = odp_shm_addr(shm);

> -               pinfo->master.pool_name[0] = 0;

> -               *slave = 0;

> -       } else {

> -               flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;

> -               shm = odp_shm_reserve(name, sizeof(struct pktio_info),

> -                                     ODP_CACHE_LINE_SIZE,

> -                                     flags);

> -               if (ODP_SHM_INVALID == shm)

> -                       ODP_ABORT("can not connect to shm\n");

> -

> -               pinfo = odp_shm_addr(shm);

> -               *slave = 1;

> -       }

> -

> -       pktio_entry->s.ipc.pinfo = pinfo;

> -       pktio_entry->s.ipc.pinfo_shm = shm;

> -

> -       return 0;

> -}

> -

>  static int _ipc_master_start(pktio_entry_t *pktio_entry)

>  {

>         struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;

> -       int ret;

> -       void *ipc_pool_base;

> +       odp_shm_t shm;

>

> -       if (pinfo->slave.mdata_offset == 0)

> +       if (pinfo->slave.init_done == 0)

>                 return -1;

>

> -       ret = _ipc_shm_lookup(pinfo->slave.pool_name);

> -       if (ret) {

> -               ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);

> +       shm = _ipc_map_remote_pool(pinfo->slave.pool_name,

> +                                  pinfo->slave.pid);

> +       if (shm == ODP_SHM_INVALID) {

> +               ODP_DBG("no pool file %s for pid %d\n",

> +                       pinfo->slave.pool_name, pinfo->slave.pid);

>                 return -1;

>         }

>

> -       ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,

> -                                            pinfo->master.shm_pkt_pool_size);

> -       pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +

> -                                            pinfo->slave.mdata_offset;

> +       pktio_entry->s.ipc.remote_pool_shm = shm;

> +       pktio_entry->s.ipc.pool_base = odp_shm_addr(shm);

> +       pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +

> +                                            pinfo->slave.base_addr_offset;

>

>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);

>

> -       ODP_DBG("%s started.\n",  pktio_entry->s.name);

> +       IPC_ODP_DBG("%s started.\n",  pktio_entry->s.name);

>         return 0;

>  }

>

>  static int _ipc_init_master(pktio_entry_t *pktio_entry,

>                             const char *dev,

> -                           odp_pool_t pool)

> +                           odp_pool_t pool_hdl)

>  {

>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];

> -       pool_entry_t *pool_entry;

> -       uint32_t pool_id;

> +       pool_t *pool;

>         struct pktio_info *pinfo;

>         const char *pool_name;

>

> -       pool_id = pool_handle_to_index(pool);

> -       pool_entry    = get_pool_entry(pool_id);

> +       pool = pool_entry_from_hdl(pool_hdl);

> +       (void)pool;

>

>         if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {

> -               ODP_DBG("too big ipc name\n");

> +               ODP_ERR("too big ipc name\n");

>                 return -1;

>         }

>

> @@ -158,7 +93,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>                         PKTIO_IPC_ENTRIES,

>                         _RING_SHM_PROC | _RING_NO_LIST);

>         if (!pktio_entry->s.ipc.tx.send) {

> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>                         getpid(), ipc_shm_name);

>                 return -1;

>         }

> @@ -174,7 +109,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>                         PKTIO_IPC_ENTRIES,

>                         _RING_SHM_PROC | _RING_NO_LIST);

>         if (!pktio_entry->s.ipc.tx.free) {

> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>                         getpid(), ipc_shm_name);

>                 goto free_m_prod;

>         }

> @@ -187,7 +122,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>                         PKTIO_IPC_ENTRIES,

>                         _RING_SHM_PROC | _RING_NO_LIST);

>         if (!pktio_entry->s.ipc.rx.recv) {

> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>                         getpid(), ipc_shm_name);

>                 goto free_m_cons;

>         }

> @@ -200,7 +135,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>                         PKTIO_IPC_ENTRIES,

>                         _RING_SHM_PROC | _RING_NO_LIST);

>         if (!pktio_entry->s.ipc.rx.free) {

> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>                         getpid(), ipc_shm_name);

>                 goto free_s_prod;

>         }

> @@ -210,24 +145,23 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>

>         /* Set up pool name for remote info */

>         pinfo = pktio_entry->s.ipc.pinfo;

> -       pool_name = _ipc_odp_buffer_pool_shm_name(pool);

> +       pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);

>         if (strlen(pool_name) > ODP_POOL_NAME_LEN) {

> -               ODP_DBG("pid %d ipc pool name %s is too big %d\n",

> +               ODP_ERR("pid %d ipc pool name %s is too big %d\n",

>                         getpid(), pool_name, strlen(pool_name));

>                 goto free_s_prod;

>         }

>

>         memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));

> -       pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;

> -       pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;

> -       pinfo->master.shm_pkt_size = pool_entry->s.seg_size;

> -       pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -

> -                              pool_entry->s.pool_base_addr;

> -       pinfo->slave.mdata_offset = 0;

> +       pinfo->slave.base_addr_offset = 0;

> +       pinfo->slave.base_addr = 0;

> +       pinfo->slave.pid = 0;

> +       pinfo->slave.init_done = 0;

>

> -       pktio_entry->s.ipc.pool = pool;

> +       pktio_entry->s.ipc.pool = pool_hdl;

>

>         ODP_DBG("Pre init... DONE.\n");

> +       pinfo->master.init_done = 1;

>

>         _ipc_master_start(pktio_entry);

>

> @@ -246,55 +180,42 @@ free_m_prod:

>  }

>

>  static void _ipc_export_pool(struct pktio_info *pinfo,

> -                            odp_pool_t pool)

> +                            odp_pool_t pool_hdl)

>  {

> -       pool_entry_t *pool_entry;

> -

> -       pool_entry = odp_pool_to_entry(pool);

> -       if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)

> -               ODP_ABORT("pktio for same name should have the same pool size\n");

> -       if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)

> -               ODP_ABORT("pktio for same name should have the same pool size\n");

> +       pool_t *pool = pool_entry_from_hdl(pool_hdl);

>

>         snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",

> -                pool_entry->s.name);

> -       pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -

> -                                   pool_entry->s.pool_base_addr;

> +                _ipc_odp_buffer_pool_shm_name(pool_hdl));

> +       pinfo->slave.pid = odp_global_data.main_pid;

> +       pinfo->slave.block_size = pool->block_size;

> +       pinfo->slave.base_addr = pool->base_addr;

>  }

>

> -static void *_ipc_map_remote_pool(const char *name, size_t size)

> +static odp_shm_t _ipc_map_remote_pool(const char *name, int pid)

>  {

>         odp_shm_t shm;

> -       void *addr;

> +       char rname[ODP_SHM_NAME_LEN];

>

> -       ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);

> -       shm = odp_shm_reserve(name,

> -                             size,

> -                             ODP_CACHE_LINE_SIZE,

> -                             _ODP_SHM_PROC_NOCREAT);

> -       if (shm == ODP_SHM_INVALID)

> -               ODP_ABORT("unable map %s\n", name);

> +       snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);

> +       shm = odp_shm_import(name, pid, rname);

> +       if (shm == ODP_SHM_INVALID) {

> +               ODP_ERR("unable map %s\n", name);

> +               return ODP_SHM_INVALID;

> +       }

>

> -       addr = odp_shm_addr(shm);

> -       ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",

> -               addr, (char *)addr + size, size, name);

> -       return addr;

> +       IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);

> +       return shm;

>  }

>

> -static void *_ipc_shm_map(char *name, size_t size)

> +static void *_ipc_shm_map(char *name, int pid)

>  {

>         odp_shm_t shm;

> -       int ret;

>

> -       ret = _ipc_shm_lookup(name);

> -       if (ret == -1)

> +       shm = odp_shm_import(name, pid, name);

> +       if (ODP_SHM_INVALID == shm) {

> +               ODP_ERR("unable to map: %s\n", name);

>                 return NULL;

> -

> -       shm = odp_shm_reserve(name, size,

> -                             ODP_CACHE_LINE_SIZE,

> -                             _ODP_SHM_PROC_NOCREAT);

> -       if (ODP_SHM_INVALID == shm)

> -               ODP_ABORT("unable to map: %s\n", name);

> +       }

>

>         return odp_shm_addr(shm);

>  }

> @@ -313,15 +234,21 @@ static int _ipc_init_slave(const char *dev,

>  static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>  {

>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];

> -       size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +

> -                          sizeof(_ring_t);

>         struct pktio_info *pinfo;

> -       void *ipc_pool_base;

>         odp_shm_t shm;

> -       const char *dev = pktio_entry->s.name;

> +       char tail[ODP_POOL_NAME_LEN];

> +       char dev[ODP_POOL_NAME_LEN];

> +       int pid;

> +

> +       if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {

> +               ODP_ERR("wrong pktio name\n");

> +               return -1;

> +       }

> +

> +       sprintf(dev, "ipc:%s", tail);

>

>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);

> -       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);

> +       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, pid);

>         if (!pktio_entry->s.ipc.rx.recv) {

>                 ODP_DBG("pid %d unable to find ipc ring %s name\n",

>                         getpid(), dev);

> @@ -333,9 +260,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>                 _ring_free_count(pktio_entry->s.ipc.rx.recv));

>

>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);

> -       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);

> +       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);

>         if (!pktio_entry->s.ipc.rx.free) {

> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>                         getpid(), dev);

>                 goto free_m_prod;

>         }

> @@ -344,9 +271,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>                 _ring_free_count(pktio_entry->s.ipc.rx.free));

>

>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);

> -       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);

> +       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);

>         if (!pktio_entry->s.ipc.tx.send) {

> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>                         getpid(), dev);

>                 goto free_m_cons;

>         }

> @@ -355,9 +282,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>                 _ring_free_count(pktio_entry->s.ipc.tx.send));

>

>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);

> -       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);

> +       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);

>         if (!pktio_entry->s.ipc.tx.free) {

> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>                         getpid(), dev);

>                 goto free_s_prod;

>         }

> @@ -367,15 +294,17 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>

>         /* Get info about remote pool */

>         pinfo = pktio_entry->s.ipc.pinfo;

> -       ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,

> -                                            pinfo->master.shm_pkt_pool_size);

> -       pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +

> -                                            pinfo->master.mdata_offset;

> -       pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;

> +       shm = _ipc_map_remote_pool(pinfo->master.pool_name,

> +                                  pid);

> +       pktio_entry->s.ipc.remote_pool_shm = shm;

> +       pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +

> +                                            pinfo->master.base_addr_offset;

> +       pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;

>

>         _ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);

>

>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);

> +       pinfo->slave.init_done = 1;

>

>         ODP_DBG("%s started.\n",  pktio_entry->s.name);

>         return 0;

> @@ -401,7 +330,11 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,

>                           odp_pool_t pool)

>  {

>         int ret = -1;

> -       int slave;

> +       int pid ODP_UNUSED;

> +       struct pktio_info *pinfo;

> +       char name[ODP_POOL_NAME_LEN + sizeof("_info")];

> +       char tail[ODP_POOL_NAME_LEN];

> +       odp_shm_t shm;

>

>         ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,

>                           "mismatch pool and ring name arrays");

> @@ -411,65 +344,59 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,

>

>         odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);

>

> -       _ipc_map_pktio_info(pktio_entry, dev, &slave);

> -       pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :

> -                                                PKTIO_TYPE_IPC_SLAVE;

> +       /* Shared info about remote pktio */

> +       if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {

> +               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;

>

> -       if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {

> +               snprintf(name, sizeof(name), "ipc:%s_info", tail);

> +               IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);

> +               shm = odp_shm_import(name, pid, name);

> +               if (ODP_SHM_INVALID == shm)

> +                       return -1;

> +               pinfo = odp_shm_addr(shm);

> +

> +               if (!pinfo->master.init_done) {

> +                       odp_shm_free(shm);

> +                       return -1;

> +               }

> +               pktio_entry->s.ipc.pinfo = pinfo;

> +               pktio_entry->s.ipc.pinfo_shm = shm;

> +               ODP_DBG("process %d is slave\n", getpid());

> +               ret = _ipc_init_slave(name, pktio_entry, pool);

> +       } else {

> +               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;

> +               snprintf(name, sizeof(name), "%s_info", dev);

> +               shm = odp_shm_reserve(name, sizeof(struct pktio_info),

> +                                     ODP_CACHE_LINE_SIZE,

> +                                     _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);

> +               if (ODP_SHM_INVALID == shm) {

> +                       ODP_ERR("can not create shm %s\n", name);

> +                       return -1;

> +               }

> +

> +               pinfo = odp_shm_addr(shm);

> +               pinfo->master.init_done = 0;

> +               pinfo->master.pool_name[0] = 0;

> +               pktio_entry->s.ipc.pinfo = pinfo;

> +               pktio_entry->s.ipc.pinfo_shm = shm;

>                 ODP_DBG("process %d is master\n", getpid());

>                 ret = _ipc_init_master(pktio_entry, dev, pool);

> -       } else {

> -               ODP_DBG("process %d is slave\n", getpid());

> -               ret = _ipc_init_slave(dev, pktio_entry, pool);

>         }

>

>         return ret;

>  }

>

> -static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,

> -                                   uint32_t offset,

> -                                   uint32_t *seglen,

> -                                   uint32_t limit)

> +static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)

>  {

> -       int seg_index  = offset / buf->segsize;

> -       int seg_offset = offset % buf->segsize;

> -#ifdef _ODP_PKTIO_IPC

> -       void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];

> -#else

> -       /** buf_hdr.ipc_addr_offset defined only when ipc is

> -        *  enabled. */

> -       void *addr = NULL;

> -

> -       (void)seg_index;

> -#endif

> -       if (seglen) {

> -               uint32_t buf_left = limit - offset;

> -               *seglen = seg_offset + buf_left <= buf->segsize ?

> -                       buf_left : buf->segsize - seg_offset;

> -       }

> -

> -       return (void *)(seg_offset + (uint8_t *)addr);

> -}

> -

> -static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,

> -                                   uint32_t offset, uint32_t *seglen)

> -{

> -       if (offset > pkt_hdr->frame_len)

> -               return NULL;

> -

> -       return _ipc_buffer_map(&pkt_hdr->buf_hdr,

> -                         pkt_hdr->headroom + offset, seglen,

> -                         pkt_hdr->headroom + pkt_hdr->frame_len);

> -}

> -

> -static void _ipc_free_ring_packets(_ring_t *r)

> -{

> -       odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];

> +       uintptr_t offsets[PKTIO_IPC_ENTRIES];

>         int ret;

>         void **rbuf_p;

>         int i;

>

> -       rbuf_p = (void *)&r_p_pkts;

> +       if (!r)

> +               return;

> +

> +       rbuf_p = (void *)&offsets;

>

>         while (1) {

>                 ret = _ring_mc_dequeue_burst(r, rbuf_p,

> @@ -477,8 +404,13 @@ static void _ipc_free_ring_packets(_ring_t *r)

>                 if (0 == ret)

>                         break;

>                 for (i = 0; i < ret; i++) {

> -                       if (r_p_pkts[i] != ODP_PACKET_INVALID)

> -                               odp_packet_free(r_p_pkts[i]);

> +                       odp_packet_hdr_t *phdr;

> +                       odp_packet_t pkt;

> +                       void *mbase = pktio_entry->s.ipc.pool_mdata_base;

> +

> +                       phdr = (void *)((uint8_t *)mbase + offsets[i]);

> +                       pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;

> +                       odp_packet_free(pkt);

>                 }

>         }

>  }

> @@ -490,22 +422,23 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>         int i;

>         _ring_t *r;

>         _ring_t *r_p;

> +       uintptr_t offsets[PKTIO_IPC_ENTRIES];

> +       void **ipcbufs_p = (void *)&offsets;

> +       uint32_t ready;

> +       int pkts_ring;

>

> -       odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];

> -       void **ipcbufs_p = (void *)&remote_pkts;

> -       uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

> -

> +       ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

>         if (odp_unlikely(!ready)) {

> -               ODP_DBG("start pktio is missing before usage?\n");

> -               return -1;

> +               IPC_ODP_DBG("start pktio is missing before usage?\n");

> +               return 0;

>         }

>

> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

> +       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>

>         r = pktio_entry->s.ipc.rx.recv;

>         pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);

>         if (odp_unlikely(pkts < 0))

> -               ODP_ABORT("error to dequeue no packets\n");

> +               ODP_ABORT("internal error dequeue\n");

>

>         /* fast path */

>         if (odp_likely(0 == pkts))

> @@ -514,36 +447,21 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>         for (i = 0; i < pkts; i++) {

>                 odp_pool_t pool;

>                 odp_packet_t pkt;

> -               odp_packet_hdr_t phdr;

> -               void *ptr;

> -               odp_buffer_bits_t handle;

> -               int idx; /* Remote packet has coded pool and index.

> -                         * We need only index.*/

> +               odp_packet_hdr_t *phdr;

>                 void *pkt_data;

> -               void *remote_pkt_data;

> +               uint64_t data_pool_off;

> +               void *rmt_data_ptr;

>

> -               if (remote_pkts[i] == ODP_PACKET_INVALID)

> -                       continue;

> +               phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +

> +                      offsets[i]);

>

> -               handle.handle = _odp_packet_to_buffer(remote_pkts[i]);

> -               idx = handle.index;

> -

> -               /* Link to packed data. To this line we have Zero-Copy between

> -                * processes, to simplify use packet copy in that version which

> -                * can be removed later with more advance buffer management

> -                * (ref counters).

> -                */

> -               /* reverse odp_buf_to_hdr() */

> -               ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +

> -                     (idx * ODP_CACHE_LINE_SIZE);

> -               memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));

> -

> -               /* Allocate new packet. Select*/

>                 pool = pktio_entry->s.ipc.pool;

>                 if (odp_unlikely(pool == ODP_POOL_INVALID))

>                         ODP_ABORT("invalid pool");

>

> -               pkt = odp_packet_alloc(pool, phdr.frame_len);

> +               data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;

> +

> +               pkt = odp_packet_alloc(pool, phdr->frame_len);

>                 if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {

>                         /* Original pool might be smaller then

>                         *  PKTIO_IPC_ENTRIES. If packet can not be

> @@ -562,30 +480,40 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>                                   (PKTIO_TYPE_IPC_SLAVE ==

>                                         pktio_entry->s.ipc.type));

>

> -               remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);

> -               if (odp_unlikely(!remote_pkt_data))

> -                       ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",

> -                                 (PKTIO_TYPE_IPC_SLAVE ==

> -                                       pktio_entry->s.ipc.type));

> -

>                 /* Copy packet data from shared pool to local pool. */

> -               memcpy(pkt_data, remote_pkt_data, phdr.frame_len);

> +               rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +

> +                              data_pool_off;

> +               memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);

>

>                 /* Copy packets L2, L3 parsed offsets and size */

> -               copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));

> +               copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));

> +

> +               odp_packet_hdr(pkt)->frame_len = phdr->frame_len;

> +               odp_packet_hdr(pkt)->headroom = phdr->headroom;

> +               odp_packet_hdr(pkt)->tailroom = phdr->tailroom;

> +

> +               /* Take classification fields */

> +               odp_packet_hdr(pkt)->p = phdr->p;

>

> -               odp_packet_hdr(pkt)->frame_len = phdr.frame_len;

> -               odp_packet_hdr(pkt)->headroom = phdr.headroom;

> -               odp_packet_hdr(pkt)->tailroom = phdr.tailroom;

> -               odp_packet_hdr(pkt)->input = pktio_entry->s.handle;

>                 pkt_table[i] = pkt;

>         }

>

>         /* Now tell other process that we no longer need that buffers.*/

>         r_p = pktio_entry->s.ipc.rx.free;

> -       pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);

> +

> +repeat:

> +       pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);

>         if (odp_unlikely(pkts < 0))

>                 ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");

> +       if (odp_unlikely(pkts != pkts_ring)) {

> +               IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"

> +                           " _ring_free_count %d\n",

> +                           _ring_full(r_p), _ring_count(r_p),

> +                           _ring_free_count(r_p));

> +               ipcbufs_p = (void *)&offsets[pkts_ring - 1];

> +               pkts = pkts - pkts_ring;

> +               goto repeat;

> +       }

>

>         return pkts;

>  }

> @@ -614,26 +542,23 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>         uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

>         odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be

>                                               * in memory mapped pool. */

> +       uintptr_t offsets[len];

>

>         if (odp_unlikely(!ready))

>                 return 0;

>

> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

> +       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>

> -       /* Prepare packets: calculate offset from address. */

> +       /* Copy packets to shm shared pool if they are in different */

>         for (i = 0; i < len; i++) {

> -               int j;

>                 odp_packet_t pkt =  pkt_table[i];

> -               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);

> +               pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);

>                 odp_buffer_bits_t handle;

> -               uint32_t cur_mapped_pool_id =

> -                        pool_handle_to_index(pktio_entry->s.ipc.pool);

> -               uint32_t pool_id;

> +               uint32_t pkt_pool_id;

>

> -               /* do copy if packet was allocated from not mapped pool */

>                 handle.handle = _odp_packet_to_buffer(pkt);

> -               pool_id = handle.pool_id;

> -               if (pool_id != cur_mapped_pool_id) {

> +               pkt_pool_id = handle.pool_id;

> +               if (pkt_pool_id != ipc_pool->pool_idx) {

>                         odp_packet_t newpkt;

>

>                         newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);

> @@ -645,24 +570,30 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>                 } else {

>                         pkt_table_mapped[i] = pkt;

>                 }

> +       }

>

> -               /* buf_hdr.addr can not be used directly in remote process,

> -                * convert it to offset

> -                */

> -               for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {

> -#ifdef _ODP_PKTIO_IPC

> -                       pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -

> -                               (char *)pkt_hdr->buf_hdr.addr[j];

> -#else

> -                       /** buf_hdr.ipc_addr_offset defined only when ipc is

> -                        *  enabled. */

> -                       (void)pkt_hdr;

> -#endif

> -               }

> +       /* Set offset to phdr for outgoing packets */

> +       for (i = 0; i < len; i++) {

> +               uint64_t data_pool_off;

> +               odp_packet_t pkt = pkt_table_mapped[i];

> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);

> +               odp_pool_t pool_hdl = odp_packet_pool(pkt);

> +               pool_t *pool = pool_entry_from_hdl(pool_hdl);

> +

> +               offsets[i] = (uint8_t *)pkt_hdr -

> +                            (uint8_t *)odp_shm_addr(pool->shm);

> +               data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -

> +                               (uint8_t *)odp_shm_addr(pool->shm);

> +               pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;

> +               IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"

> +                           "phdr = %p, offset %x\n",

> +                           i, len,

> +                           odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),

> +                           pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);

>         }

>

>         /* Put packets to ring to be processed by other process. */

> -       rbuf_p = (void *)&pkt_table_mapped[0];

> +       rbuf_p = (void *)&offsets[0];

>         r = pktio_entry->s.ipc.tx.send;

>         ret = _ring_mp_enqueue_burst(r, rbuf_p, len);

>         if (odp_unlikely(ret < 0)) {

> @@ -673,6 +604,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>                 ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n",

>                         _ring_full(r), _ring_count(r),

>                         _ring_free_count(r));

> +               ODP_ABORT("Unexpected!\n");

>         }

>

>         return ret;

> @@ -722,22 +654,25 @@ static int ipc_start(pktio_entry_t *pktio_entry)

>

>  static int ipc_stop(pktio_entry_t *pktio_entry)

>  {

> -       unsigned tx_send, tx_free;

> +       unsigned tx_send = 0, tx_free = 0;

>

>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);

>

> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);

> +       if (pktio_entry->s.ipc.tx.send)

> +               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);

>         /* other process can transfer packets from one ring to

>          * other, use delay here to free that packets. */

>         sleep(1);

> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

> +       if (pktio_entry->s.ipc.tx.free)

> +               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>

> -       tx_send = _ring_count(pktio_entry->s.ipc.tx.send);

> -       tx_free = _ring_count(pktio_entry->s.ipc.tx.free);

> +       if (pktio_entry->s.ipc.tx.send)

> +               tx_send = _ring_count(pktio_entry->s.ipc.tx.send);

> +       if (pktio_entry->s.ipc.tx.free)

> +               tx_free = _ring_count(pktio_entry->s.ipc.tx.free);

>         if (tx_send | tx_free) {

>                 ODP_DBG("IPC rings: tx send %d tx free %d\n",

> -                       _ring_free_count(pktio_entry->s.ipc.tx.send),

> -                       _ring_free_count(pktio_entry->s.ipc.tx.free));

> +                       tx_send, tx_free);

>         }

>

>         return 0;

> @@ -747,23 +682,31 @@ static int ipc_close(pktio_entry_t *pktio_entry)

>  {

>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];

>         char *dev = pktio_entry->s.name;

> +       char name[ODP_POOL_NAME_LEN];

> +       char tail[ODP_POOL_NAME_LEN];

> +       int pid = 0;

>

>         ipc_stop(pktio_entry);

>

> -       if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {

> -               /* unlink this pktio info for both master and slave */

> -               odp_shm_free(pktio_entry->s.ipc.pinfo_shm);

> +       odp_shm_free(pktio_entry->s.ipc.remote_pool_shm);

>

> -               /* destroy rings */

> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);

> -               _ring_destroy(ipc_shm_name);

> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);

> -               _ring_destroy(ipc_shm_name);

> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);

> -               _ring_destroy(ipc_shm_name);

> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);

> -               _ring_destroy(ipc_shm_name);

> -       }

> +       if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2)

> +               snprintf(name, sizeof(name), "ipc:%s", tail);

> +       else

> +               snprintf(name, sizeof(name), "%s", dev);

> +

> +       /* unlink this pktio info for both master and slave */

> +       odp_shm_free(pktio_entry->s.ipc.pinfo_shm);

> +

> +       /* destroy rings */

> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", name);

> +       _ring_destroy(ipc_shm_name);

> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", name);

> +       _ring_destroy(ipc_shm_name);

> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", name);

> +       _ring_destroy(ipc_shm_name);

> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", name);

> +       _ring_destroy(ipc_shm_name);

>

>         return 0;

>  }

> @@ -795,4 +738,3 @@ const pktio_if_ops_t ipc_pktio_ops = {

>         .pktin_ts_from_ns = NULL,

>         .config = NULL

>  };

> -#endif

> diff --git a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

> index 3cd28f5..52e8d42 100755

> --- a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

> +++ b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

> @@ -25,19 +25,23 @@ run()

>         rm -rf /tmp/odp-* 2>&1 > /dev/null

>

>         echo "==== run pktio_ipc1 then pktio_ipc2 ===="

> -       pktio_ipc1${EXEEXT} -t 30 &

> +       pktio_ipc1${EXEEXT} -t 10 &

>         IPC_PID=$!

>

> -       pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 10

> +       pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 5

>         ret=$?

>         # pktio_ipc1 should do clean up and exit just

>         # after pktio_ipc2 exited. If it does not happen

>         # kill him in test.

> -       sleep 1

> -       kill ${IPC_PID} 2>&1 > /dev/null

> +       sleep 13

> +       (kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null

>         if [ $? -eq 0 ]; then

> -               ls -l /tmp/odp*

> +               echo "pktio_ipc1${EXEEXT} was killed"

> +               ls -l /tmp/odp* 2> /dev/null

>                 rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

> +       else

> +               echo "normal exit of 2 application"

> +               ls -l /tmp/odp* 2> /dev/null

>         fi

>

>         if [ $ret -ne 0 ]; then

> @@ -47,21 +51,32 @@ run()

>                 echo "First stage PASSED"

>         fi

>

> -

>         echo "==== run pktio_ipc2 then pktio_ipc1 ===="

> -       pktio_ipc2${EXEEXT} -t 20 &

> +       pktio_ipc2${EXEEXT} -t 10 &

>         IPC_PID=$!

>

> -       pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 10

> +       pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 5

>         ret=$?

> -       (kill ${IPC_PID} 2>&1 > /dev/null) > /dev/null || true

> +       # pktio_ipc2 do not exit on pktio_ipc1 disconnect

> +       # wait until it exits cleanly

> +       sleep 13

> +       (kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null

> +       if [ $? -eq 0 ]; then

> +               echo "pktio_ipc2${EXEEXT} was killed"

> +               ls -l /tmp/odp* 2> /dev/null

> +               rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

> +       else

> +               echo "normal exit of 2 application"

> +               ls -l /tmp/odp* 2> /dev/null

> +       fi

>

>         if [ $ret -ne 0 ]; then

>                 echo "!!! FAILED !!!"

> -               ls -l /tmp/odp*

> +               ls -l /tmp/odp* 2> /dev/null

>                 rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

>                 exit $ret

>         else

> +               ls -l /tmp/odp* 2> /dev/null

>                 echo "Second stage PASSED"

>         fi

>

> --

> 2.7.1.250.gff4ea60

>
Maxim Uvarov Dec. 16, 2016, 12:36 p.m. UTC | #2
Merged,
Maxim.

On 12/15/16 00:31, Bill Fischofer wrote:
> For the v3 series:

> 

> Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org>

> 

> On Wed, Dec 14, 2016 at 1:57 PM, Maxim Uvarov <maxim.uvarov@linaro.org> wrote:

>> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

>> ---

>>  .../linux-generic/include/odp_buffer_internal.h    |  10 +-

>>  platform/linux-generic/include/odp_internal.h      |   1 -

>>  .../linux-generic/include/odp_packet_io_internal.h |   2 +

>>  .../include/odp_packet_io_ipc_internal.h           |  27 +-

>>  platform/linux-generic/odp_init.c                  |   5 +-

>>  platform/linux-generic/pktio/ipc.c                 | 534 +++++++++------------

>>  test/linux-generic/pktio_ipc/pktio_ipc_run.sh      |  35 +-

>>  7 files changed, 289 insertions(+), 325 deletions(-)

>>

>> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h

>> index 2064f7c..903f0a7 100644

>> --- a/platform/linux-generic/include/odp_buffer_internal.h

>> +++ b/platform/linux-generic/include/odp_buffer_internal.h

>> @@ -64,6 +64,11 @@ struct odp_buffer_hdr_t {

>>         struct {

>>                 void     *hdr;

>>                 uint8_t  *data;

>> +#ifdef _ODP_PKTIO_IPC

>> +       /* ipc mapped process can not walk over pointers,

>> +        * offset has to be used */

>> +               uint64_t ipc_data_offset;

>> +#endif

>>                 uint32_t  len;

>>         } seg[CONFIG_PACKET_MAX_SEGS];

>>

>> @@ -94,11 +99,6 @@ struct odp_buffer_hdr_t {

>>         uint32_t                 uarea_size; /* size of user area */

>>         uint32_t                 segcount;   /* segment count */

>>         uint32_t                 segsize;    /* segment size */

>> -#ifdef _ODP_PKTIO_IPC

>> -       /* ipc mapped process can not walk over pointers,

>> -        * offset has to be used */

>> -       uint64_t                 ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];

>> -#endif

>>

>>         /* Data or next header */

>>         uint8_t data[0];

>> diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h

>> index 6a80d9d..b313b1f 100644

>> --- a/platform/linux-generic/include/odp_internal.h

>> +++ b/platform/linux-generic/include/odp_internal.h

>> @@ -50,7 +50,6 @@ struct odp_global_data_s {

>>         odp_cpumask_t control_cpus;

>>         odp_cpumask_t worker_cpus;

>>         int num_cpus_installed;

>> -       int ipc_ns;

>>  };

>>

>>  enum init_stage {

>> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h

>> index bdf6316..2001c42 100644

>> --- a/platform/linux-generic/include/odp_packet_io_internal.h

>> +++ b/platform/linux-generic/include/odp_packet_io_internal.h

>> @@ -102,6 +102,8 @@ typedef     struct {

>>                                      packet, 0 - not yet ready */

>>         void *pinfo;

>>         odp_shm_t pinfo_shm;

>> +       odp_shm_t remote_pool_shm; /**< shm of remote pool get with

>> +                                       _ipc_map_remote_pool() */

>>  } _ipc_pktio_t;

>>

>>  struct pktio_entry {

>> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h

>> index 851114d..7cd2948 100644

>> --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h

>> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h

>> @@ -26,22 +26,31 @@

>>   */

>>  struct pktio_info {

>>         struct {

>> -               /* number of buffer in remote pool */

>> -               int shm_pool_bufs_num;

>> -               /* size of remote pool */

>> -               size_t shm_pkt_pool_size;

>> +               /* number of buffer*/

>> +               int num;

>>                 /* size of packet/segment in remote pool */

>> -               uint32_t shm_pkt_size;

>> +               uint32_t block_size;

>>                 /* offset from shared memory block start

>> -                * to pool_mdata_addr (odp-linux pool specific) */

>> -               size_t mdata_offset;

>> +                * to pool *base_addr in remote process.

>> +                * (odp-linux pool specific) */

>> +               size_t base_addr_offset;

>>                 char pool_name[ODP_POOL_NAME_LEN];

>> +               /* 1 if master finished creation of all shared objects */

>> +               int init_done;

>>         } master;

>>         struct {

>>                 /* offset from shared memory block start

>> -                * to pool_mdata_addr in remote process.

>> +                * to pool *base_addr in remote process.

>>                  * (odp-linux pool specific) */

>> -               size_t mdata_offset;

>> +               size_t base_addr_offset;

>> +               void *base_addr;

>> +               uint32_t block_size;

>>                 char pool_name[ODP_POOL_NAME_LEN];

>> +               /* pid of the slave process written to shm and

>> +                * used by master to look up memory created by

>> +                * slave

>> +                */

>> +               int pid;

>> +               int init_done;

>>         } slave;

>>  } ODP_PACKED;

>> diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c

>> index d40a83c..1b0d8f8 100644

>> --- a/platform/linux-generic/odp_init.c

>> +++ b/platform/linux-generic/odp_init.c

>> @@ -67,7 +67,7 @@ static int cleanup_files(const char *dirpath, int odp_pid)

>>

>>  int odp_init_global(odp_instance_t *instance,

>>                     const odp_init_t *params,

>> -                   const odp_platform_init_t *platform_params)

>> +                   const odp_platform_init_t *platform_params ODP_UNUSED)

>>  {

>>         char *hpdir;

>>

>> @@ -75,9 +75,6 @@ int odp_init_global(odp_instance_t *instance,

>>         odp_global_data.main_pid = getpid();

>>         cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);

>>

>> -       if (platform_params)

>> -               odp_global_data.ipc_ns = platform_params->ipc_ns;

>> -

>>         enum init_stage stage = NO_INIT;

>>         odp_global_data.log_fn = odp_override_log;

>>         odp_global_data.abort_fn = odp_override_abort;

>> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c

>> index 0e99c6e..5e47a33 100644

>> --- a/platform/linux-generic/pktio/ipc.c

>> +++ b/platform/linux-generic/pktio/ipc.c

>> @@ -3,150 +3,85 @@

>>   *

>>   * SPDX-License-Identifier:     BSD-3-Clause

>>   */

>> -#ifdef _ODP_PKTIO_IPC

>>  #include <odp_packet_io_ipc_internal.h>

>>  #include <odp_debug_internal.h>

>>  #include <odp_packet_io_internal.h>

>>  #include <odp/api/system_info.h>

>>  #include <odp_shm_internal.h>

>> +#include <_ishm_internal.h>

>>

>>  #include <sys/mman.h>

>>  #include <sys/stat.h>

>>  #include <fcntl.h>

>>

>> +#define IPC_ODP_DEBUG_PRINT 0

>> +

>> +#define IPC_ODP_DBG(fmt, ...) \

>> +       do { \

>> +               if (IPC_ODP_DEBUG_PRINT == 1) \

>> +                       ODP_DBG(fmt, ##__VA_ARGS__);\

>> +       } while (0)

>> +

>>  /* MAC address for the "ipc" interface */

>>  static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};

>>

>> -static void *_ipc_map_remote_pool(const char *name, size_t size);

>> +static odp_shm_t _ipc_map_remote_pool(const char *name, int pid);

>>

>>  static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)

>>  {

>> -       pool_entry_t *pool;

>> -       uint32_t pool_id;

>> +       pool_t *pool;

>>         odp_shm_t shm;

>>         odp_shm_info_t info;

>>

>> -       pool_id = pool_handle_to_index(pool_hdl);

>> -       pool    = get_pool_entry(pool_id);

>> -       shm = pool->s.pool_shm;

>> +       pool    = pool_entry_from_hdl(pool_hdl);

>> +       shm = pool->shm;

>>

>>         odp_shm_info(shm, &info);

>>

>>         return info.name;

>>  }

>>

>> -/**

>> -* Look up for shared memory object.

>> -*

>> -* @param name   name of shm object

>> -*

>> -* @return 0 on success, otherwise non-zero

>> -*/

>> -static int _ipc_shm_lookup(const char *name)

>> -{

>> -       int shm;

>> -       char shm_devname[SHM_DEVNAME_MAXLEN];

>> -

>> -       if (!odp_global_data.ipc_ns)

>> -               ODP_ABORT("ipc_ns not set\n");

>> -

>> -       snprintf(shm_devname, SHM_DEVNAME_MAXLEN,

>> -                SHM_DEVNAME_FORMAT,

>> -                odp_global_data.ipc_ns, name);

>> -

>> -       shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);

>> -       if (shm == -1) {

>> -               if (errno == ENOENT) {

>> -                       ODP_DBG("no file %s\n", shm_devname);

>> -                       return -1;

>> -               }

>> -               ODP_ABORT("shm_open for %s err %s\n",

>> -                         shm_devname, strerror(errno));

>> -       }

>> -       close(shm);

>> -       return 0;

>> -}

>> -

>> -static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,

>> -                              const char *dev,

>> -                              int *slave)

>> -{

>> -       struct pktio_info *pinfo;

>> -       char name[ODP_POOL_NAME_LEN + sizeof("_info")];

>> -       uint32_t flags;

>> -       odp_shm_t shm;

>> -

>> -       /* Create info about remote pktio */

>> -       snprintf(name, sizeof(name), "%s_info", dev);

>> -

>> -       flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;

>> -

>> -       shm = odp_shm_reserve(name, sizeof(struct pktio_info),

>> -                             ODP_CACHE_LINE_SIZE,

>> -                             flags);

>> -       if (ODP_SHM_INVALID != shm) {

>> -               pinfo = odp_shm_addr(shm);

>> -               pinfo->master.pool_name[0] = 0;

>> -               *slave = 0;

>> -       } else {

>> -               flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;

>> -               shm = odp_shm_reserve(name, sizeof(struct pktio_info),

>> -                                     ODP_CACHE_LINE_SIZE,

>> -                                     flags);

>> -               if (ODP_SHM_INVALID == shm)

>> -                       ODP_ABORT("can not connect to shm\n");

>> -

>> -               pinfo = odp_shm_addr(shm);

>> -               *slave = 1;

>> -       }

>> -

>> -       pktio_entry->s.ipc.pinfo = pinfo;

>> -       pktio_entry->s.ipc.pinfo_shm = shm;

>> -

>> -       return 0;

>> -}

>> -

>>  static int _ipc_master_start(pktio_entry_t *pktio_entry)

>>  {

>>         struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;

>> -       int ret;

>> -       void *ipc_pool_base;

>> +       odp_shm_t shm;

>>

>> -       if (pinfo->slave.mdata_offset == 0)

>> +       if (pinfo->slave.init_done == 0)

>>                 return -1;

>>

>> -       ret = _ipc_shm_lookup(pinfo->slave.pool_name);

>> -       if (ret) {

>> -               ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);

>> +       shm = _ipc_map_remote_pool(pinfo->slave.pool_name,

>> +                                  pinfo->slave.pid);

>> +       if (shm == ODP_SHM_INVALID) {

>> +               ODP_DBG("no pool file %s for pid %d\n",

>> +                       pinfo->slave.pool_name, pinfo->slave.pid);

>>                 return -1;

>>         }

>>

>> -       ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,

>> -                                            pinfo->master.shm_pkt_pool_size);

>> -       pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +

>> -                                            pinfo->slave.mdata_offset;

>> +       pktio_entry->s.ipc.remote_pool_shm = shm;

>> +       pktio_entry->s.ipc.pool_base = odp_shm_addr(shm);

>> +       pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +

>> +                                            pinfo->slave.base_addr_offset;

>>

>>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);

>>

>> -       ODP_DBG("%s started.\n",  pktio_entry->s.name);

>> +       IPC_ODP_DBG("%s started.\n",  pktio_entry->s.name);

>>         return 0;

>>  }

>>

>>  static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>                             const char *dev,

>> -                           odp_pool_t pool)

>> +                           odp_pool_t pool_hdl)

>>  {

>>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];

>> -       pool_entry_t *pool_entry;

>> -       uint32_t pool_id;

>> +       pool_t *pool;

>>         struct pktio_info *pinfo;

>>         const char *pool_name;

>>

>> -       pool_id = pool_handle_to_index(pool);

>> -       pool_entry    = get_pool_entry(pool_id);

>> +       pool = pool_entry_from_hdl(pool_hdl);

>> +       (void)pool;

>>

>>         if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {

>> -               ODP_DBG("too big ipc name\n");

>> +               ODP_ERR("too big ipc name\n");

>>                 return -1;

>>         }

>>

>> @@ -158,7 +93,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>                         PKTIO_IPC_ENTRIES,

>>                         _RING_SHM_PROC | _RING_NO_LIST);

>>         if (!pktio_entry->s.ipc.tx.send) {

>> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>>                         getpid(), ipc_shm_name);

>>                 return -1;

>>         }

>> @@ -174,7 +109,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>                         PKTIO_IPC_ENTRIES,

>>                         _RING_SHM_PROC | _RING_NO_LIST);

>>         if (!pktio_entry->s.ipc.tx.free) {

>> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>>                         getpid(), ipc_shm_name);

>>                 goto free_m_prod;

>>         }

>> @@ -187,7 +122,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>                         PKTIO_IPC_ENTRIES,

>>                         _RING_SHM_PROC | _RING_NO_LIST);

>>         if (!pktio_entry->s.ipc.rx.recv) {

>> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>>                         getpid(), ipc_shm_name);

>>                 goto free_m_cons;

>>         }

>> @@ -200,7 +135,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>                         PKTIO_IPC_ENTRIES,

>>                         _RING_SHM_PROC | _RING_NO_LIST);

>>         if (!pktio_entry->s.ipc.rx.free) {

>> -               ODP_DBG("pid %d unable to create ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to create ipc ring %s name\n",

>>                         getpid(), ipc_shm_name);

>>                 goto free_s_prod;

>>         }

>> @@ -210,24 +145,23 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,

>>

>>         /* Set up pool name for remote info */

>>         pinfo = pktio_entry->s.ipc.pinfo;

>> -       pool_name = _ipc_odp_buffer_pool_shm_name(pool);

>> +       pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);

>>         if (strlen(pool_name) > ODP_POOL_NAME_LEN) {

>> -               ODP_DBG("pid %d ipc pool name %s is too big %d\n",

>> +               ODP_ERR("pid %d ipc pool name %s is too big %d\n",

>>                         getpid(), pool_name, strlen(pool_name));

>>                 goto free_s_prod;

>>         }

>>

>>         memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));

>> -       pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;

>> -       pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;

>> -       pinfo->master.shm_pkt_size = pool_entry->s.seg_size;

>> -       pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -

>> -                              pool_entry->s.pool_base_addr;

>> -       pinfo->slave.mdata_offset = 0;

>> +       pinfo->slave.base_addr_offset = 0;

>> +       pinfo->slave.base_addr = 0;

>> +       pinfo->slave.pid = 0;

>> +       pinfo->slave.init_done = 0;

>>

>> -       pktio_entry->s.ipc.pool = pool;

>> +       pktio_entry->s.ipc.pool = pool_hdl;

>>

>>         ODP_DBG("Pre init... DONE.\n");

>> +       pinfo->master.init_done = 1;

>>

>>         _ipc_master_start(pktio_entry);

>>

>> @@ -246,55 +180,42 @@ free_m_prod:

>>  }

>>

>>  static void _ipc_export_pool(struct pktio_info *pinfo,

>> -                            odp_pool_t pool)

>> +                            odp_pool_t pool_hdl)

>>  {

>> -       pool_entry_t *pool_entry;

>> -

>> -       pool_entry = odp_pool_to_entry(pool);

>> -       if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)

>> -               ODP_ABORT("pktio for same name should have the same pool size\n");

>> -       if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)

>> -               ODP_ABORT("pktio for same name should have the same pool size\n");

>> +       pool_t *pool = pool_entry_from_hdl(pool_hdl);

>>

>>         snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",

>> -                pool_entry->s.name);

>> -       pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -

>> -                                   pool_entry->s.pool_base_addr;

>> +                _ipc_odp_buffer_pool_shm_name(pool_hdl));

>> +       pinfo->slave.pid = odp_global_data.main_pid;

>> +       pinfo->slave.block_size = pool->block_size;

>> +       pinfo->slave.base_addr = pool->base_addr;

>>  }

>>

>> -static void *_ipc_map_remote_pool(const char *name, size_t size)

>> +static odp_shm_t _ipc_map_remote_pool(const char *name, int pid)

>>  {

>>         odp_shm_t shm;

>> -       void *addr;

>> +       char rname[ODP_SHM_NAME_LEN];

>>

>> -       ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);

>> -       shm = odp_shm_reserve(name,

>> -                             size,

>> -                             ODP_CACHE_LINE_SIZE,

>> -                             _ODP_SHM_PROC_NOCREAT);

>> -       if (shm == ODP_SHM_INVALID)

>> -               ODP_ABORT("unable map %s\n", name);

>> +       snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);

>> +       shm = odp_shm_import(name, pid, rname);

>> +       if (shm == ODP_SHM_INVALID) {

>> +               ODP_ERR("unable map %s\n", name);

>> +               return ODP_SHM_INVALID;

>> +       }

>>

>> -       addr = odp_shm_addr(shm);

>> -       ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",

>> -               addr, (char *)addr + size, size, name);

>> -       return addr;

>> +       IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);

>> +       return shm;

>>  }

>>

>> -static void *_ipc_shm_map(char *name, size_t size)

>> +static void *_ipc_shm_map(char *name, int pid)

>>  {

>>         odp_shm_t shm;

>> -       int ret;

>>

>> -       ret = _ipc_shm_lookup(name);

>> -       if (ret == -1)

>> +       shm = odp_shm_import(name, pid, name);

>> +       if (ODP_SHM_INVALID == shm) {

>> +               ODP_ERR("unable to map: %s\n", name);

>>                 return NULL;

>> -

>> -       shm = odp_shm_reserve(name, size,

>> -                             ODP_CACHE_LINE_SIZE,

>> -                             _ODP_SHM_PROC_NOCREAT);

>> -       if (ODP_SHM_INVALID == shm)

>> -               ODP_ABORT("unable to map: %s\n", name);

>> +       }

>>

>>         return odp_shm_addr(shm);

>>  }

>> @@ -313,15 +234,21 @@ static int _ipc_init_slave(const char *dev,

>>  static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>>  {

>>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];

>> -       size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +

>> -                          sizeof(_ring_t);

>>         struct pktio_info *pinfo;

>> -       void *ipc_pool_base;

>>         odp_shm_t shm;

>> -       const char *dev = pktio_entry->s.name;

>> +       char tail[ODP_POOL_NAME_LEN];

>> +       char dev[ODP_POOL_NAME_LEN];

>> +       int pid;

>> +

>> +       if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {

>> +               ODP_ERR("wrong pktio name\n");

>> +               return -1;

>> +       }

>> +

>> +       sprintf(dev, "ipc:%s", tail);

>>

>>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);

>> -       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);

>> +       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, pid);

>>         if (!pktio_entry->s.ipc.rx.recv) {

>>                 ODP_DBG("pid %d unable to find ipc ring %s name\n",

>>                         getpid(), dev);

>> @@ -333,9 +260,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>>                 _ring_free_count(pktio_entry->s.ipc.rx.recv));

>>

>>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);

>> -       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);

>> +       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);

>>         if (!pktio_entry->s.ipc.rx.free) {

>> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>>                         getpid(), dev);

>>                 goto free_m_prod;

>>         }

>> @@ -344,9 +271,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>>                 _ring_free_count(pktio_entry->s.ipc.rx.free));

>>

>>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);

>> -       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);

>> +       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);

>>         if (!pktio_entry->s.ipc.tx.send) {

>> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>>                         getpid(), dev);

>>                 goto free_m_cons;

>>         }

>> @@ -355,9 +282,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>>                 _ring_free_count(pktio_entry->s.ipc.tx.send));

>>

>>         snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);

>> -       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);

>> +       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);

>>         if (!pktio_entry->s.ipc.tx.free) {

>> -               ODP_DBG("pid %d unable to find ipc ring %s name\n",

>> +               ODP_ERR("pid %d unable to find ipc ring %s name\n",

>>                         getpid(), dev);

>>                 goto free_s_prod;

>>         }

>> @@ -367,15 +294,17 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)

>>

>>         /* Get info about remote pool */

>>         pinfo = pktio_entry->s.ipc.pinfo;

>> -       ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,

>> -                                            pinfo->master.shm_pkt_pool_size);

>> -       pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +

>> -                                            pinfo->master.mdata_offset;

>> -       pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;

>> +       shm = _ipc_map_remote_pool(pinfo->master.pool_name,

>> +                                  pid);

>> +       pktio_entry->s.ipc.remote_pool_shm = shm;

>> +       pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +

>> +                                            pinfo->master.base_addr_offset;

>> +       pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;

>>

>>         _ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);

>>

>>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);

>> +       pinfo->slave.init_done = 1;

>>

>>         ODP_DBG("%s started.\n",  pktio_entry->s.name);

>>         return 0;

>> @@ -401,7 +330,11 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,

>>                           odp_pool_t pool)

>>  {

>>         int ret = -1;

>> -       int slave;

>> +       int pid ODP_UNUSED;

>> +       struct pktio_info *pinfo;

>> +       char name[ODP_POOL_NAME_LEN + sizeof("_info")];

>> +       char tail[ODP_POOL_NAME_LEN];

>> +       odp_shm_t shm;

>>

>>         ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,

>>                           "mismatch pool and ring name arrays");

>> @@ -411,65 +344,59 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,

>>

>>         odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);

>>

>> -       _ipc_map_pktio_info(pktio_entry, dev, &slave);

>> -       pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :

>> -                                                PKTIO_TYPE_IPC_SLAVE;

>> +       /* Shared info about remote pktio */

>> +       if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {

>> +               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;

>>

>> -       if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {

>> +               snprintf(name, sizeof(name), "ipc:%s_info", tail);

>> +               IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);

>> +               shm = odp_shm_import(name, pid, name);

>> +               if (ODP_SHM_INVALID == shm)

>> +                       return -1;

>> +               pinfo = odp_shm_addr(shm);

>> +

>> +               if (!pinfo->master.init_done) {

>> +                       odp_shm_free(shm);

>> +                       return -1;

>> +               }

>> +               pktio_entry->s.ipc.pinfo = pinfo;

>> +               pktio_entry->s.ipc.pinfo_shm = shm;

>> +               ODP_DBG("process %d is slave\n", getpid());

>> +               ret = _ipc_init_slave(name, pktio_entry, pool);

>> +       } else {

>> +               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;

>> +               snprintf(name, sizeof(name), "%s_info", dev);

>> +               shm = odp_shm_reserve(name, sizeof(struct pktio_info),

>> +                                     ODP_CACHE_LINE_SIZE,

>> +                                     _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);

>> +               if (ODP_SHM_INVALID == shm) {

>> +                       ODP_ERR("can not create shm %s\n", name);

>> +                       return -1;

>> +               }

>> +

>> +               pinfo = odp_shm_addr(shm);

>> +               pinfo->master.init_done = 0;

>> +               pinfo->master.pool_name[0] = 0;

>> +               pktio_entry->s.ipc.pinfo = pinfo;

>> +               pktio_entry->s.ipc.pinfo_shm = shm;

>>                 ODP_DBG("process %d is master\n", getpid());

>>                 ret = _ipc_init_master(pktio_entry, dev, pool);

>> -       } else {

>> -               ODP_DBG("process %d is slave\n", getpid());

>> -               ret = _ipc_init_slave(dev, pktio_entry, pool);

>>         }

>>

>>         return ret;

>>  }

>>

>> -static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,

>> -                                   uint32_t offset,

>> -                                   uint32_t *seglen,

>> -                                   uint32_t limit)

>> +static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)

>>  {

>> -       int seg_index  = offset / buf->segsize;

>> -       int seg_offset = offset % buf->segsize;

>> -#ifdef _ODP_PKTIO_IPC

>> -       void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];

>> -#else

>> -       /** buf_hdr.ipc_addr_offset defined only when ipc is

>> -        *  enabled. */

>> -       void *addr = NULL;

>> -

>> -       (void)seg_index;

>> -#endif

>> -       if (seglen) {

>> -               uint32_t buf_left = limit - offset;

>> -               *seglen = seg_offset + buf_left <= buf->segsize ?

>> -                       buf_left : buf->segsize - seg_offset;

>> -       }

>> -

>> -       return (void *)(seg_offset + (uint8_t *)addr);

>> -}

>> -

>> -static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,

>> -                                   uint32_t offset, uint32_t *seglen)

>> -{

>> -       if (offset > pkt_hdr->frame_len)

>> -               return NULL;

>> -

>> -       return _ipc_buffer_map(&pkt_hdr->buf_hdr,

>> -                         pkt_hdr->headroom + offset, seglen,

>> -                         pkt_hdr->headroom + pkt_hdr->frame_len);

>> -}

>> -

>> -static void _ipc_free_ring_packets(_ring_t *r)

>> -{

>> -       odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];

>> +       uintptr_t offsets[PKTIO_IPC_ENTRIES];

>>         int ret;

>>         void **rbuf_p;

>>         int i;

>>

>> -       rbuf_p = (void *)&r_p_pkts;

>> +       if (!r)

>> +               return;

>> +

>> +       rbuf_p = (void *)&offsets;

>>

>>         while (1) {

>>                 ret = _ring_mc_dequeue_burst(r, rbuf_p,

>> @@ -477,8 +404,13 @@ static void _ipc_free_ring_packets(_ring_t *r)

>>                 if (0 == ret)

>>                         break;

>>                 for (i = 0; i < ret; i++) {

>> -                       if (r_p_pkts[i] != ODP_PACKET_INVALID)

>> -                               odp_packet_free(r_p_pkts[i]);

>> +                       odp_packet_hdr_t *phdr;

>> +                       odp_packet_t pkt;

>> +                       void *mbase = pktio_entry->s.ipc.pool_mdata_base;

>> +

>> +                       phdr = (void *)((uint8_t *)mbase + offsets[i]);

>> +                       pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;

>> +                       odp_packet_free(pkt);

>>                 }

>>         }

>>  }

>> @@ -490,22 +422,23 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>>         int i;

>>         _ring_t *r;

>>         _ring_t *r_p;

>> +       uintptr_t offsets[PKTIO_IPC_ENTRIES];

>> +       void **ipcbufs_p = (void *)&offsets;

>> +       uint32_t ready;

>> +       int pkts_ring;

>>

>> -       odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];

>> -       void **ipcbufs_p = (void *)&remote_pkts;

>> -       uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

>> -

>> +       ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

>>         if (odp_unlikely(!ready)) {

>> -               ODP_DBG("start pktio is missing before usage?\n");

>> -               return -1;

>> +               IPC_ODP_DBG("start pktio is missing before usage?\n");

>> +               return 0;

>>         }

>>

>> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

>> +       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>>

>>         r = pktio_entry->s.ipc.rx.recv;

>>         pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);

>>         if (odp_unlikely(pkts < 0))

>> -               ODP_ABORT("error to dequeue no packets\n");

>> +               ODP_ABORT("internal error dequeue\n");

>>

>>         /* fast path */

>>         if (odp_likely(0 == pkts))

>> @@ -514,36 +447,21 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>>         for (i = 0; i < pkts; i++) {

>>                 odp_pool_t pool;

>>                 odp_packet_t pkt;

>> -               odp_packet_hdr_t phdr;

>> -               void *ptr;

>> -               odp_buffer_bits_t handle;

>> -               int idx; /* Remote packet has coded pool and index.

>> -                         * We need only index.*/

>> +               odp_packet_hdr_t *phdr;

>>                 void *pkt_data;

>> -               void *remote_pkt_data;

>> +               uint64_t data_pool_off;

>> +               void *rmt_data_ptr;

>>

>> -               if (remote_pkts[i] == ODP_PACKET_INVALID)

>> -                       continue;

>> +               phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +

>> +                      offsets[i]);

>>

>> -               handle.handle = _odp_packet_to_buffer(remote_pkts[i]);

>> -               idx = handle.index;

>> -

>> -               /* Link to packed data. To this line we have Zero-Copy between

>> -                * processes, to simplify use packet copy in that version which

>> -                * can be removed later with more advance buffer management

>> -                * (ref counters).

>> -                */

>> -               /* reverse odp_buf_to_hdr() */

>> -               ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +

>> -                     (idx * ODP_CACHE_LINE_SIZE);

>> -               memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));

>> -

>> -               /* Allocate new packet. Select*/

>>                 pool = pktio_entry->s.ipc.pool;

>>                 if (odp_unlikely(pool == ODP_POOL_INVALID))

>>                         ODP_ABORT("invalid pool");

>>

>> -               pkt = odp_packet_alloc(pool, phdr.frame_len);

>> +               data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;

>> +

>> +               pkt = odp_packet_alloc(pool, phdr->frame_len);

>>                 if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {

>>                         /* Original pool might be smaller then

>>                         *  PKTIO_IPC_ENTRIES. If packet can not be

>> @@ -562,30 +480,40 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,

>>                                   (PKTIO_TYPE_IPC_SLAVE ==

>>                                         pktio_entry->s.ipc.type));

>>

>> -               remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);

>> -               if (odp_unlikely(!remote_pkt_data))

>> -                       ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",

>> -                                 (PKTIO_TYPE_IPC_SLAVE ==

>> -                                       pktio_entry->s.ipc.type));

>> -

>>                 /* Copy packet data from shared pool to local pool. */

>> -               memcpy(pkt_data, remote_pkt_data, phdr.frame_len);

>> +               rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +

>> +                              data_pool_off;

>> +               memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);

>>

>>                 /* Copy packets L2, L3 parsed offsets and size */

>> -               copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));

>> +               copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));

>> +

>> +               odp_packet_hdr(pkt)->frame_len = phdr->frame_len;

>> +               odp_packet_hdr(pkt)->headroom = phdr->headroom;

>> +               odp_packet_hdr(pkt)->tailroom = phdr->tailroom;

>> +

>> +               /* Take classification fields */

>> +               odp_packet_hdr(pkt)->p = phdr->p;

>>

>> -               odp_packet_hdr(pkt)->frame_len = phdr.frame_len;

>> -               odp_packet_hdr(pkt)->headroom = phdr.headroom;

>> -               odp_packet_hdr(pkt)->tailroom = phdr.tailroom;

>> -               odp_packet_hdr(pkt)->input = pktio_entry->s.handle;

>>                 pkt_table[i] = pkt;

>>         }

>>

>>         /* Now tell other process that we no longer need that buffers.*/

>>         r_p = pktio_entry->s.ipc.rx.free;

>> -       pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);

>> +

>> +repeat:

>> +       pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);

>>         if (odp_unlikely(pkts < 0))

>>                 ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");

>> +       if (odp_unlikely(pkts != pkts_ring)) {

>> +               IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"

>> +                           " _ring_free_count %d\n",

>> +                           _ring_full(r_p), _ring_count(r_p),

>> +                           _ring_free_count(r_p));

>> +               ipcbufs_p = (void *)&offsets[pkts_ring - 1];

>> +               pkts = pkts - pkts_ring;

>> +               goto repeat;

>> +       }

>>

>>         return pkts;

>>  }

>> @@ -614,26 +542,23 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>>         uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);

>>         odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be

>>                                               * in memory mapped pool. */

>> +       uintptr_t offsets[len];

>>

>>         if (odp_unlikely(!ready))

>>                 return 0;

>>

>> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

>> +       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>>

>> -       /* Prepare packets: calculate offset from address. */

>> +       /* Copy packets to shm shared pool if they are in different */

>>         for (i = 0; i < len; i++) {

>> -               int j;

>>                 odp_packet_t pkt =  pkt_table[i];

>> -               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);

>> +               pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);

>>                 odp_buffer_bits_t handle;

>> -               uint32_t cur_mapped_pool_id =

>> -                        pool_handle_to_index(pktio_entry->s.ipc.pool);

>> -               uint32_t pool_id;

>> +               uint32_t pkt_pool_id;

>>

>> -               /* do copy if packet was allocated from not mapped pool */

>>                 handle.handle = _odp_packet_to_buffer(pkt);

>> -               pool_id = handle.pool_id;

>> -               if (pool_id != cur_mapped_pool_id) {

>> +               pkt_pool_id = handle.pool_id;

>> +               if (pkt_pool_id != ipc_pool->pool_idx) {

>>                         odp_packet_t newpkt;

>>

>>                         newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);

>> @@ -645,24 +570,30 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>>                 } else {

>>                         pkt_table_mapped[i] = pkt;

>>                 }

>> +       }

>>

>> -               /* buf_hdr.addr can not be used directly in remote process,

>> -                * convert it to offset

>> -                */

>> -               for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {

>> -#ifdef _ODP_PKTIO_IPC

>> -                       pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -

>> -                               (char *)pkt_hdr->buf_hdr.addr[j];

>> -#else

>> -                       /** buf_hdr.ipc_addr_offset defined only when ipc is

>> -                        *  enabled. */

>> -                       (void)pkt_hdr;

>> -#endif

>> -               }

>> +       /* Set offset to phdr for outgoing packets */

>> +       for (i = 0; i < len; i++) {

>> +               uint64_t data_pool_off;

>> +               odp_packet_t pkt = pkt_table_mapped[i];

>> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);

>> +               odp_pool_t pool_hdl = odp_packet_pool(pkt);

>> +               pool_t *pool = pool_entry_from_hdl(pool_hdl);

>> +

>> +               offsets[i] = (uint8_t *)pkt_hdr -

>> +                            (uint8_t *)odp_shm_addr(pool->shm);

>> +               data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -

>> +                               (uint8_t *)odp_shm_addr(pool->shm);

>> +               pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;

>> +               IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"

>> +                           "phdr = %p, offset %x\n",

>> +                           i, len,

>> +                           odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),

>> +                           pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);

>>         }

>>

>>         /* Put packets to ring to be processed by other process. */

>> -       rbuf_p = (void *)&pkt_table_mapped[0];

>> +       rbuf_p = (void *)&offsets[0];

>>         r = pktio_entry->s.ipc.tx.send;

>>         ret = _ring_mp_enqueue_burst(r, rbuf_p, len);

>>         if (odp_unlikely(ret < 0)) {

>> @@ -673,6 +604,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,

>>                 ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n",

>>                         _ring_full(r), _ring_count(r),

>>                         _ring_free_count(r));

>> +               ODP_ABORT("Unexpected!\n");

>>         }

>>

>>         return ret;

>> @@ -722,22 +654,25 @@ static int ipc_start(pktio_entry_t *pktio_entry)

>>

>>  static int ipc_stop(pktio_entry_t *pktio_entry)

>>  {

>> -       unsigned tx_send, tx_free;

>> +       unsigned tx_send = 0, tx_free = 0;

>>

>>         odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);

>>

>> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);

>> +       if (pktio_entry->s.ipc.tx.send)

>> +               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);

>>         /* other process can transfer packets from one ring to

>>          * other, use delay here to free that packets. */

>>         sleep(1);

>> -       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);

>> +       if (pktio_entry->s.ipc.tx.free)

>> +               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);

>>

>> -       tx_send = _ring_count(pktio_entry->s.ipc.tx.send);

>> -       tx_free = _ring_count(pktio_entry->s.ipc.tx.free);

>> +       if (pktio_entry->s.ipc.tx.send)

>> +               tx_send = _ring_count(pktio_entry->s.ipc.tx.send);

>> +       if (pktio_entry->s.ipc.tx.free)

>> +               tx_free = _ring_count(pktio_entry->s.ipc.tx.free);

>>         if (tx_send | tx_free) {

>>                 ODP_DBG("IPC rings: tx send %d tx free %d\n",

>> -                       _ring_free_count(pktio_entry->s.ipc.tx.send),

>> -                       _ring_free_count(pktio_entry->s.ipc.tx.free));

>> +                       tx_send, tx_free);

>>         }

>>

>>         return 0;

>> @@ -747,23 +682,31 @@ static int ipc_close(pktio_entry_t *pktio_entry)

>>  {

>>         char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];

>>         char *dev = pktio_entry->s.name;

>> +       char name[ODP_POOL_NAME_LEN];

>> +       char tail[ODP_POOL_NAME_LEN];

>> +       int pid = 0;

>>

>>         ipc_stop(pktio_entry);

>>

>> -       if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {

>> -               /* unlink this pktio info for both master and slave */

>> -               odp_shm_free(pktio_entry->s.ipc.pinfo_shm);

>> +       odp_shm_free(pktio_entry->s.ipc.remote_pool_shm);

>>

>> -               /* destroy rings */

>> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);

>> -               _ring_destroy(ipc_shm_name);

>> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);

>> -               _ring_destroy(ipc_shm_name);

>> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);

>> -               _ring_destroy(ipc_shm_name);

>> -               snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);

>> -               _ring_destroy(ipc_shm_name);

>> -       }

>> +       if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2)

>> +               snprintf(name, sizeof(name), "ipc:%s", tail);

>> +       else

>> +               snprintf(name, sizeof(name), "%s", dev);

>> +

>> +       /* unlink this pktio info for both master and slave */

>> +       odp_shm_free(pktio_entry->s.ipc.pinfo_shm);

>> +

>> +       /* destroy rings */

>> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", name);

>> +       _ring_destroy(ipc_shm_name);

>> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", name);

>> +       _ring_destroy(ipc_shm_name);

>> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", name);

>> +       _ring_destroy(ipc_shm_name);

>> +       snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", name);

>> +       _ring_destroy(ipc_shm_name);

>>

>>         return 0;

>>  }

>> @@ -795,4 +738,3 @@ const pktio_if_ops_t ipc_pktio_ops = {

>>         .pktin_ts_from_ns = NULL,

>>         .config = NULL

>>  };

>> -#endif

>> diff --git a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

>> index 3cd28f5..52e8d42 100755

>> --- a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

>> +++ b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh

>> @@ -25,19 +25,23 @@ run()

>>         rm -rf /tmp/odp-* 2>&1 > /dev/null

>>

>>         echo "==== run pktio_ipc1 then pktio_ipc2 ===="

>> -       pktio_ipc1${EXEEXT} -t 30 &

>> +       pktio_ipc1${EXEEXT} -t 10 &

>>         IPC_PID=$!

>>

>> -       pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 10

>> +       pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 5

>>         ret=$?

>>         # pktio_ipc1 should do clean up and exit just

>>         # after pktio_ipc2 exited. If it does not happen

>>         # kill him in test.

>> -       sleep 1

>> -       kill ${IPC_PID} 2>&1 > /dev/null

>> +       sleep 13

>> +       (kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null

>>         if [ $? -eq 0 ]; then

>> -               ls -l /tmp/odp*

>> +               echo "pktio_ipc1${EXEEXT} was killed"

>> +               ls -l /tmp/odp* 2> /dev/null

>>                 rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

>> +       else

>> +               echo "normal exit of 2 application"

>> +               ls -l /tmp/odp* 2> /dev/null

>>         fi

>>

>>         if [ $ret -ne 0 ]; then

>> @@ -47,21 +51,32 @@ run()

>>                 echo "First stage PASSED"

>>         fi

>>

>> -

>>         echo "==== run pktio_ipc2 then pktio_ipc1 ===="

>> -       pktio_ipc2${EXEEXT} -t 20 &

>> +       pktio_ipc2${EXEEXT} -t 10 &

>>         IPC_PID=$!

>>

>> -       pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 10

>> +       pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 5

>>         ret=$?

>> -       (kill ${IPC_PID} 2>&1 > /dev/null) > /dev/null || true

>> +       # pktio_ipc2 do not exit on pktio_ipc1 disconnect

>> +       # wait until it exits cleanly

>> +       sleep 13

>> +       (kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null

>> +       if [ $? -eq 0 ]; then

>> +               echo "pktio_ipc2${EXEEXT} was killed"

>> +               ls -l /tmp/odp* 2> /dev/null

>> +               rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

>> +       else

>> +               echo "normal exit of 2 application"

>> +               ls -l /tmp/odp* 2> /dev/null

>> +       fi

>>

>>         if [ $ret -ne 0 ]; then

>>                 echo "!!! FAILED !!!"

>> -               ls -l /tmp/odp*

>> +               ls -l /tmp/odp* 2> /dev/null

>>                 rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null

>>                 exit $ret

>>         else

>> +               ls -l /tmp/odp* 2> /dev/null

>>                 echo "Second stage PASSED"

>>         fi

>>

>> --

>> 2.7.1.250.gff4ea60

>>
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 2064f7c..903f0a7 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -64,6 +64,11 @@  struct odp_buffer_hdr_t {
 	struct {
 		void     *hdr;
 		uint8_t  *data;
+#ifdef _ODP_PKTIO_IPC
+	/* ipc mapped process can not walk over pointers,
+	 * offset has to be used */
+		uint64_t ipc_data_offset;
+#endif
 		uint32_t  len;
 	} seg[CONFIG_PACKET_MAX_SEGS];
 
@@ -94,11 +99,6 @@  struct odp_buffer_hdr_t {
 	uint32_t                 uarea_size; /* size of user area */
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
-#ifdef _ODP_PKTIO_IPC
-	/* ipc mapped process can not walk over pointers,
-	 * offset has to be used */
-	uint64_t		 ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];
-#endif
 
 	/* Data or next header */
 	uint8_t data[0];
diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h
index 6a80d9d..b313b1f 100644
--- a/platform/linux-generic/include/odp_internal.h
+++ b/platform/linux-generic/include/odp_internal.h
@@ -50,7 +50,6 @@  struct odp_global_data_s {
 	odp_cpumask_t control_cpus;
 	odp_cpumask_t worker_cpus;
 	int num_cpus_installed;
-	int ipc_ns;
 };
 
 enum init_stage {
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index bdf6316..2001c42 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -102,6 +102,8 @@  typedef	struct {
 				     packet, 0 - not yet ready */
 	void *pinfo;
 	odp_shm_t pinfo_shm;
+	odp_shm_t remote_pool_shm; /**< shm of remote pool get with
+					_ipc_map_remote_pool() */
 } _ipc_pktio_t;
 
 struct pktio_entry {
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
index 851114d..7cd2948 100644
--- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -26,22 +26,31 @@ 
  */
 struct pktio_info {
 	struct {
-		/* number of buffer in remote pool */
-		int shm_pool_bufs_num;
-		/* size of remote pool */
-		size_t shm_pkt_pool_size;
+		/* number of buffer*/
+		int num;
 		/* size of packet/segment in remote pool */
-		uint32_t shm_pkt_size;
+		uint32_t block_size;
 		/* offset from shared memory block start
-		 * to pool_mdata_addr (odp-linux pool specific) */
-		size_t mdata_offset;
+		 * to pool *base_addr in remote process.
+		 * (odp-linux pool specific) */
+		size_t base_addr_offset;
 		char pool_name[ODP_POOL_NAME_LEN];
+		/* 1 if master finished creation of all shared objects */
+		int init_done;
 	} master;
 	struct {
 		/* offset from shared memory block start
-		 * to pool_mdata_addr in remote process.
+		 * to pool *base_addr in remote process.
 		 * (odp-linux pool specific) */
-		size_t mdata_offset;
+		size_t base_addr_offset;
+		void *base_addr;
+		uint32_t block_size;
 		char pool_name[ODP_POOL_NAME_LEN];
+		/* pid of the slave process written to shm and
+		 * used by master to look up memory created by
+		 * slave
+		 */
+		int pid;
+		int init_done;
 	} slave;
 } ODP_PACKED;
diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c
index d40a83c..1b0d8f8 100644
--- a/platform/linux-generic/odp_init.c
+++ b/platform/linux-generic/odp_init.c
@@ -67,7 +67,7 @@  static int cleanup_files(const char *dirpath, int odp_pid)
 
 int odp_init_global(odp_instance_t *instance,
 		    const odp_init_t *params,
-		    const odp_platform_init_t *platform_params)
+		    const odp_platform_init_t *platform_params ODP_UNUSED)
 {
 	char *hpdir;
 
@@ -75,9 +75,6 @@  int odp_init_global(odp_instance_t *instance,
 	odp_global_data.main_pid = getpid();
 	cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);
 
-	if (platform_params)
-		odp_global_data.ipc_ns = platform_params->ipc_ns;
-
 	enum init_stage stage = NO_INIT;
 	odp_global_data.log_fn = odp_override_log;
 	odp_global_data.abort_fn = odp_override_abort;
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
index 0e99c6e..5e47a33 100644
--- a/platform/linux-generic/pktio/ipc.c
+++ b/platform/linux-generic/pktio/ipc.c
@@ -3,150 +3,85 @@ 
  *
  * SPDX-License-Identifier:     BSD-3-Clause
  */
-#ifdef _ODP_PKTIO_IPC
 #include <odp_packet_io_ipc_internal.h>
 #include <odp_debug_internal.h>
 #include <odp_packet_io_internal.h>
 #include <odp/api/system_info.h>
 #include <odp_shm_internal.h>
+#include <_ishm_internal.h>
 
 #include <sys/mman.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#define IPC_ODP_DEBUG_PRINT 0
+
+#define IPC_ODP_DBG(fmt, ...) \
+	do { \
+		if (IPC_ODP_DEBUG_PRINT == 1) \
+			ODP_DBG(fmt, ##__VA_ARGS__);\
+	} while (0)
+
 /* MAC address for the "ipc" interface */
 static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
 
-static void *_ipc_map_remote_pool(const char *name, size_t size);
+static odp_shm_t _ipc_map_remote_pool(const char *name, int pid);
 
 static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
 {
-	pool_entry_t *pool;
-	uint32_t pool_id;
+	pool_t *pool;
 	odp_shm_t shm;
 	odp_shm_info_t info;
 
-	pool_id = pool_handle_to_index(pool_hdl);
-	pool    = get_pool_entry(pool_id);
-	shm = pool->s.pool_shm;
+	pool    = pool_entry_from_hdl(pool_hdl);
+	shm = pool->shm;
 
 	odp_shm_info(shm, &info);
 
 	return info.name;
 }
 
-/**
-* Look up for shared memory object.
-*
-* @param name   name of shm object
-*
-* @return 0 on success, otherwise non-zero
-*/
-static int _ipc_shm_lookup(const char *name)
-{
-	int shm;
-	char shm_devname[SHM_DEVNAME_MAXLEN];
-
-	if (!odp_global_data.ipc_ns)
-		ODP_ABORT("ipc_ns not set\n");
-
-	snprintf(shm_devname, SHM_DEVNAME_MAXLEN,
-		 SHM_DEVNAME_FORMAT,
-		 odp_global_data.ipc_ns, name);
-
-	shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);
-	if (shm == -1) {
-		if (errno == ENOENT) {
-			ODP_DBG("no file %s\n", shm_devname);
-			return -1;
-		}
-		ODP_ABORT("shm_open for %s err %s\n",
-			  shm_devname, strerror(errno));
-	}
-	close(shm);
-	return 0;
-}
-
-static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
-			       const char *dev,
-			       int *slave)
-{
-	struct pktio_info *pinfo;
-	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
-	uint32_t flags;
-	odp_shm_t shm;
-
-	/* Create info about remote pktio */
-	snprintf(name, sizeof(name), "%s_info", dev);
-
-	flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
-
-	shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-			      ODP_CACHE_LINE_SIZE,
-			      flags);
-	if (ODP_SHM_INVALID != shm) {
-		pinfo = odp_shm_addr(shm);
-		pinfo->master.pool_name[0] = 0;
-		*slave = 0;
-	} else {
-		flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
-		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-				      ODP_CACHE_LINE_SIZE,
-				      flags);
-		if (ODP_SHM_INVALID == shm)
-			ODP_ABORT("can not connect to shm\n");
-
-		pinfo = odp_shm_addr(shm);
-		*slave = 1;
-	}
-
-	pktio_entry->s.ipc.pinfo = pinfo;
-	pktio_entry->s.ipc.pinfo_shm = shm;
-
-	return 0;
-}
-
 static int _ipc_master_start(pktio_entry_t *pktio_entry)
 {
 	struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
-	int ret;
-	void *ipc_pool_base;
+	odp_shm_t shm;
 
-	if (pinfo->slave.mdata_offset == 0)
+	if (pinfo->slave.init_done == 0)
 		return -1;
 
-	ret = _ipc_shm_lookup(pinfo->slave.pool_name);
-	if (ret) {
-		ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
+	shm = _ipc_map_remote_pool(pinfo->slave.pool_name,
+				   pinfo->slave.pid);
+	if (shm == ODP_SHM_INVALID) {
+		ODP_DBG("no pool file %s for pid %d\n",
+			pinfo->slave.pool_name, pinfo->slave.pid);
 		return -1;
 	}
 
-	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
-					     pinfo->master.shm_pkt_pool_size);
-	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-					     pinfo->slave.mdata_offset;
+	pktio_entry->s.ipc.remote_pool_shm = shm;
+	pktio_entry->s.ipc.pool_base = odp_shm_addr(shm);
+	pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +
+					     pinfo->slave.base_addr_offset;
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
 
-	ODP_DBG("%s started.\n",  pktio_entry->s.name);
+	IPC_ODP_DBG("%s started.\n",  pktio_entry->s.name);
 	return 0;
 }
 
 static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			    const char *dev,
-			    odp_pool_t pool)
+			    odp_pool_t pool_hdl)
 {
 	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];
-	pool_entry_t *pool_entry;
-	uint32_t pool_id;
+	pool_t *pool;
 	struct pktio_info *pinfo;
 	const char *pool_name;
 
-	pool_id = pool_handle_to_index(pool);
-	pool_entry    = get_pool_entry(pool_id);
+	pool = pool_entry_from_hdl(pool_hdl);
+	(void)pool;
 
 	if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {
-		ODP_DBG("too big ipc name\n");
+		ODP_ERR("too big ipc name\n");
 		return -1;
 	}
 
@@ -158,7 +93,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.tx.send) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		return -1;
 	}
@@ -174,7 +109,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.tx.free) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_m_prod;
 	}
@@ -187,7 +122,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.rx.recv) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_m_cons;
 	}
@@ -200,7 +135,7 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 			PKTIO_IPC_ENTRIES,
 			_RING_SHM_PROC | _RING_NO_LIST);
 	if (!pktio_entry->s.ipc.rx.free) {
-		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+		ODP_ERR("pid %d unable to create ipc ring %s name\n",
 			getpid(), ipc_shm_name);
 		goto free_s_prod;
 	}
@@ -210,24 +145,23 @@  static int _ipc_init_master(pktio_entry_t *pktio_entry,
 
 	/* Set up pool name for remote info */
 	pinfo = pktio_entry->s.ipc.pinfo;
-	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+	pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);
 	if (strlen(pool_name) > ODP_POOL_NAME_LEN) {
-		ODP_DBG("pid %d ipc pool name %s is too big %d\n",
+		ODP_ERR("pid %d ipc pool name %s is too big %d\n",
 			getpid(), pool_name, strlen(pool_name));
 		goto free_s_prod;
 	}
 
 	memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
-	pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
-	pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
-	pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
-	pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
-			       pool_entry->s.pool_base_addr;
-	pinfo->slave.mdata_offset = 0;
+	pinfo->slave.base_addr_offset = 0;
+	pinfo->slave.base_addr = 0;
+	pinfo->slave.pid = 0;
+	pinfo->slave.init_done = 0;
 
-	pktio_entry->s.ipc.pool = pool;
+	pktio_entry->s.ipc.pool = pool_hdl;
 
 	ODP_DBG("Pre init... DONE.\n");
+	pinfo->master.init_done = 1;
 
 	_ipc_master_start(pktio_entry);
 
@@ -246,55 +180,42 @@  free_m_prod:
 }
 
 static void _ipc_export_pool(struct pktio_info *pinfo,
-			     odp_pool_t pool)
+			     odp_pool_t pool_hdl)
 {
-	pool_entry_t *pool_entry;
-
-	pool_entry = odp_pool_to_entry(pool);
-	if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
-		ODP_ABORT("pktio for same name should have the same pool size\n");
-	if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
-		ODP_ABORT("pktio for same name should have the same pool size\n");
+	pool_t *pool = pool_entry_from_hdl(pool_hdl);
 
 	snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
-		 pool_entry->s.name);
-	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
-				    pool_entry->s.pool_base_addr;
+		 _ipc_odp_buffer_pool_shm_name(pool_hdl));
+	pinfo->slave.pid = odp_global_data.main_pid;
+	pinfo->slave.block_size = pool->block_size;
+	pinfo->slave.base_addr = pool->base_addr;
 }
 
-static void *_ipc_map_remote_pool(const char *name, size_t size)
+static odp_shm_t _ipc_map_remote_pool(const char *name, int pid)
 {
 	odp_shm_t shm;
-	void *addr;
+	char rname[ODP_SHM_NAME_LEN];
 
-	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
-	shm = odp_shm_reserve(name,
-			      size,
-			      ODP_CACHE_LINE_SIZE,
-			      _ODP_SHM_PROC_NOCREAT);
-	if (shm == ODP_SHM_INVALID)
-		ODP_ABORT("unable map %s\n", name);
+	snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);
+	shm = odp_shm_import(name, pid, rname);
+	if (shm == ODP_SHM_INVALID) {
+		ODP_ERR("unable map %s\n", name);
+		return ODP_SHM_INVALID;
+	}
 
-	addr = odp_shm_addr(shm);
-	ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
-		addr, (char *)addr + size, size, name);
-	return addr;
+	IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);
+	return shm;
 }
 
-static void *_ipc_shm_map(char *name, size_t size)
+static void *_ipc_shm_map(char *name, int pid)
 {
 	odp_shm_t shm;
-	int ret;
 
-	ret = _ipc_shm_lookup(name);
-	if (ret == -1)
+	shm = odp_shm_import(name, pid, name);
+	if (ODP_SHM_INVALID == shm) {
+		ODP_ERR("unable to map: %s\n", name);
 		return NULL;
-
-	shm = odp_shm_reserve(name, size,
-			      ODP_CACHE_LINE_SIZE,
-			      _ODP_SHM_PROC_NOCREAT);
-	if (ODP_SHM_INVALID == shm)
-		ODP_ABORT("unable to map: %s\n", name);
+	}
 
 	return odp_shm_addr(shm);
 }
@@ -313,15 +234,21 @@  static int _ipc_init_slave(const char *dev,
 static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 {
 	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
-	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
-			   sizeof(_ring_t);
 	struct pktio_info *pinfo;
-	void *ipc_pool_base;
 	odp_shm_t shm;
-	const char *dev = pktio_entry->s.name;
+	char tail[ODP_POOL_NAME_LEN];
+	char dev[ODP_POOL_NAME_LEN];
+	int pid;
+
+	if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {
+		ODP_ERR("wrong pktio name\n");
+		return -1;
+	}
+
+	sprintf(dev, "ipc:%s", tail);
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
-	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.rx.recv) {
 		ODP_DBG("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
@@ -333,9 +260,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.rx.recv));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
-	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.rx.free) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_m_prod;
 	}
@@ -344,9 +271,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.rx.free));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
-	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.tx.send) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_m_cons;
 	}
@@ -355,9 +282,9 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 		_ring_free_count(pktio_entry->s.ipc.tx.send));
 
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
-	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+	pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);
 	if (!pktio_entry->s.ipc.tx.free) {
-		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+		ODP_ERR("pid %d unable to find ipc ring %s name\n",
 			getpid(), dev);
 		goto free_s_prod;
 	}
@@ -367,15 +294,17 @@  static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 
 	/* Get info about remote pool */
 	pinfo = pktio_entry->s.ipc.pinfo;
-	ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
-					     pinfo->master.shm_pkt_pool_size);
-	pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-					     pinfo->master.mdata_offset;
-	pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
+	shm = _ipc_map_remote_pool(pinfo->master.pool_name,
+				   pid);
+	pktio_entry->s.ipc.remote_pool_shm = shm;
+	pktio_entry->s.ipc.pool_mdata_base = (char *)odp_shm_addr(shm) +
+					     pinfo->master.base_addr_offset;
+	pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;
 
 	_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+	pinfo->slave.init_done = 1;
 
 	ODP_DBG("%s started.\n",  pktio_entry->s.name);
 	return 0;
@@ -401,7 +330,11 @@  static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 			  odp_pool_t pool)
 {
 	int ret = -1;
-	int slave;
+	int pid ODP_UNUSED;
+	struct pktio_info *pinfo;
+	char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+	char tail[ODP_POOL_NAME_LEN];
+	odp_shm_t shm;
 
 	ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,
 			  "mismatch pool and ring name arrays");
@@ -411,65 +344,59 @@  static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 
 	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
 
-	_ipc_map_pktio_info(pktio_entry, dev, &slave);
-	pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
-						 PKTIO_TYPE_IPC_SLAVE;
+	/* Shared info about remote pktio */
+	if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
+		pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
 
-	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+		snprintf(name, sizeof(name), "ipc:%s_info", tail);
+		IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);
+		shm = odp_shm_import(name, pid, name);
+		if (ODP_SHM_INVALID == shm)
+			return -1;
+		pinfo = odp_shm_addr(shm);
+
+		if (!pinfo->master.init_done) {
+			odp_shm_free(shm);
+			return -1;
+		}
+		pktio_entry->s.ipc.pinfo = pinfo;
+		pktio_entry->s.ipc.pinfo_shm = shm;
+		ODP_DBG("process %d is slave\n", getpid());
+		ret = _ipc_init_slave(name, pktio_entry, pool);
+	} else {
+		pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;
+		snprintf(name, sizeof(name), "%s_info", dev);
+		shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+				      ODP_CACHE_LINE_SIZE,
+				      _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);
+		if (ODP_SHM_INVALID == shm) {
+			ODP_ERR("can not create shm %s\n", name);
+			return -1;
+		}
+
+		pinfo = odp_shm_addr(shm);
+		pinfo->master.init_done = 0;
+		pinfo->master.pool_name[0] = 0;
+		pktio_entry->s.ipc.pinfo = pinfo;
+		pktio_entry->s.ipc.pinfo_shm = shm;
 		ODP_DBG("process %d is master\n", getpid());
 		ret = _ipc_init_master(pktio_entry, dev, pool);
-	} else {
-		ODP_DBG("process %d is slave\n", getpid());
-		ret = _ipc_init_slave(dev, pktio_entry, pool);
 	}
 
 	return ret;
 }
 
-static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
-				    uint32_t offset,
-				    uint32_t *seglen,
-				    uint32_t limit)
+static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)
 {
-	int seg_index  = offset / buf->segsize;
-	int seg_offset = offset % buf->segsize;
-#ifdef _ODP_PKTIO_IPC
-	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
-#else
-	/** buf_hdr.ipc_addr_offset defined only when ipc is
-	 *  enabled. */
-	void *addr = NULL;
-
-	(void)seg_index;
-#endif
-	if (seglen) {
-		uint32_t buf_left = limit - offset;
-		*seglen = seg_offset + buf_left <= buf->segsize ?
-			buf_left : buf->segsize - seg_offset;
-	}
-
-	return (void *)(seg_offset + (uint8_t *)addr);
-}
-
-static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
-				    uint32_t offset, uint32_t *seglen)
-{
-	if (offset > pkt_hdr->frame_len)
-		return NULL;
-
-	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
-			  pkt_hdr->headroom + offset, seglen,
-			  pkt_hdr->headroom + pkt_hdr->frame_len);
-}
-
-static void _ipc_free_ring_packets(_ring_t *r)
-{
-	odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+	uintptr_t offsets[PKTIO_IPC_ENTRIES];
 	int ret;
 	void **rbuf_p;
 	int i;
 
-	rbuf_p = (void *)&r_p_pkts;
+	if (!r)
+		return;
+
+	rbuf_p = (void *)&offsets;
 
 	while (1) {
 		ret = _ring_mc_dequeue_burst(r, rbuf_p,
@@ -477,8 +404,13 @@  static void _ipc_free_ring_packets(_ring_t *r)
 		if (0 == ret)
 			break;
 		for (i = 0; i < ret; i++) {
-			if (r_p_pkts[i] != ODP_PACKET_INVALID)
-				odp_packet_free(r_p_pkts[i]);
+			odp_packet_hdr_t *phdr;
+			odp_packet_t pkt;
+			void *mbase = pktio_entry->s.ipc.pool_mdata_base;
+
+			phdr = (void *)((uint8_t *)mbase + offsets[i]);
+			pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;
+			odp_packet_free(pkt);
 		}
 	}
 }
@@ -490,22 +422,23 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 	int i;
 	_ring_t *r;
 	_ring_t *r_p;
+	uintptr_t offsets[PKTIO_IPC_ENTRIES];
+	void **ipcbufs_p = (void *)&offsets;
+	uint32_t ready;
+	int pkts_ring;
 
-	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
-	void **ipcbufs_p = (void *)&remote_pkts;
-	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
-
+	ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
 	if (odp_unlikely(!ready)) {
-		ODP_DBG("start pktio is missing before usage?\n");
-		return -1;
+		IPC_ODP_DBG("start pktio is missing before usage?\n");
+		return 0;
 	}
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
 	r = pktio_entry->s.ipc.rx.recv;
 	pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
 	if (odp_unlikely(pkts < 0))
-		ODP_ABORT("error to dequeue no packets\n");
+		ODP_ABORT("internal error dequeue\n");
 
 	/* fast path */
 	if (odp_likely(0 == pkts))
@@ -514,36 +447,21 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 	for (i = 0; i < pkts; i++) {
 		odp_pool_t pool;
 		odp_packet_t pkt;
-		odp_packet_hdr_t phdr;
-		void *ptr;
-		odp_buffer_bits_t handle;
-		int idx; /* Remote packet has coded pool and index.
-			  * We need only index.*/
+		odp_packet_hdr_t *phdr;
 		void *pkt_data;
-		void *remote_pkt_data;
+		uint64_t data_pool_off;
+		void *rmt_data_ptr;
 
-		if (remote_pkts[i] == ODP_PACKET_INVALID)
-			continue;
+		phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+		       offsets[i]);
 
-		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
-		idx = handle.index;
-
-		/* Link to packed data. To this line we have Zero-Copy between
-		 * processes, to simplify use packet copy in that version which
-		 * can be removed later with more advance buffer management
-		 * (ref counters).
-		 */
-		/* reverse odp_buf_to_hdr() */
-		ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
-		      (idx * ODP_CACHE_LINE_SIZE);
-		memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
-
-		/* Allocate new packet. Select*/
 		pool = pktio_entry->s.ipc.pool;
 		if (odp_unlikely(pool == ODP_POOL_INVALID))
 			ODP_ABORT("invalid pool");
 
-		pkt = odp_packet_alloc(pool, phdr.frame_len);
+		data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;
+
+		pkt = odp_packet_alloc(pool, phdr->frame_len);
 		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
 			/* Original pool might be smaller then
 			*  PKTIO_IPC_ENTRIES. If packet can not be
@@ -562,30 +480,40 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 				  (PKTIO_TYPE_IPC_SLAVE ==
 					pktio_entry->s.ipc.type));
 
-		remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
-		if (odp_unlikely(!remote_pkt_data))
-			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
-				  (PKTIO_TYPE_IPC_SLAVE ==
-					pktio_entry->s.ipc.type));
-
 		/* Copy packet data from shared pool to local pool. */
-		memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+		rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+			       data_pool_off;
+		memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);
 
 		/* Copy packets L2, L3 parsed offsets and size */
-		copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));
+		copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));
+
+		odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
+		odp_packet_hdr(pkt)->headroom = phdr->headroom;
+		odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
+
+		/* Take classification fields */
+		odp_packet_hdr(pkt)->p = phdr->p;
 
-		odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
-		odp_packet_hdr(pkt)->headroom = phdr.headroom;
-		odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
-		odp_packet_hdr(pkt)->input = pktio_entry->s.handle;
 		pkt_table[i] = pkt;
 	}
 
 	/* Now tell other process that we no longer need that buffers.*/
 	r_p = pktio_entry->s.ipc.rx.free;
-	pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+
+repeat:
+	pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);
 	if (odp_unlikely(pkts < 0))
 		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+	if (odp_unlikely(pkts != pkts_ring)) {
+		IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"
+			    " _ring_free_count %d\n",
+			    _ring_full(r_p), _ring_count(r_p),
+			    _ring_free_count(r_p));
+		ipcbufs_p = (void *)&offsets[pkts_ring - 1];
+		pkts = pkts - pkts_ring;
+		goto repeat;
+	}
 
 	return pkts;
 }
@@ -614,26 +542,23 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 	uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
 	odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be
 					      * in memory mapped pool. */
+	uintptr_t offsets[len];
 
 	if (odp_unlikely(!ready))
 		return 0;
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-	/* Prepare packets: calculate offset from address. */
+	/* Copy packets to shm shared pool if they are in different */
 	for (i = 0; i < len; i++) {
-		int j;
 		odp_packet_t pkt =  pkt_table[i];
-		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);
 		odp_buffer_bits_t handle;
-		uint32_t cur_mapped_pool_id =
-			 pool_handle_to_index(pktio_entry->s.ipc.pool);
-		uint32_t pool_id;
+		uint32_t pkt_pool_id;
 
-		/* do copy if packet was allocated from not mapped pool */
 		handle.handle = _odp_packet_to_buffer(pkt);
-		pool_id = handle.pool_id;
-		if (pool_id != cur_mapped_pool_id) {
+		pkt_pool_id = handle.pool_id;
+		if (pkt_pool_id != ipc_pool->pool_idx) {
 			odp_packet_t newpkt;
 
 			newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
@@ -645,24 +570,30 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 		} else {
 			pkt_table_mapped[i] = pkt;
 		}
+	}
 
-		/* buf_hdr.addr can not be used directly in remote process,
-		 * convert it to offset
-		 */
-		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
-#ifdef _ODP_PKTIO_IPC
-			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
-				(char *)pkt_hdr->buf_hdr.addr[j];
-#else
-			/** buf_hdr.ipc_addr_offset defined only when ipc is
-			 *  enabled. */
-			(void)pkt_hdr;
-#endif
-		}
+	/* Set offset to phdr for outgoing packets */
+	for (i = 0; i < len; i++) {
+		uint64_t data_pool_off;
+		odp_packet_t pkt = pkt_table_mapped[i];
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+		odp_pool_t pool_hdl = odp_packet_pool(pkt);
+		pool_t *pool = pool_entry_from_hdl(pool_hdl);
+
+		offsets[i] = (uint8_t *)pkt_hdr -
+			     (uint8_t *)odp_shm_addr(pool->shm);
+		data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -
+				(uint8_t *)odp_shm_addr(pool->shm);
+		pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;
+		IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"
+			    "phdr = %p, offset %x\n",
+			    i, len,
+			    odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),
+			    pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);
 	}
 
 	/* Put packets to ring to be processed by other process. */
-	rbuf_p = (void *)&pkt_table_mapped[0];
+	rbuf_p = (void *)&offsets[0];
 	r = pktio_entry->s.ipc.tx.send;
 	ret = _ring_mp_enqueue_burst(r, rbuf_p, len);
 	if (odp_unlikely(ret < 0)) {
@@ -673,6 +604,7 @@  static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
 		ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n",
 			_ring_full(r), _ring_count(r),
 			_ring_free_count(r));
+		ODP_ABORT("Unexpected!\n");
 	}
 
 	return ret;
@@ -722,22 +654,25 @@  static int ipc_start(pktio_entry_t *pktio_entry)
 
 static int ipc_stop(pktio_entry_t *pktio_entry)
 {
-	unsigned tx_send, tx_free;
+	unsigned tx_send = 0, tx_free = 0;
 
 	odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
 
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
+	if (pktio_entry->s.ipc.tx.send)
+		_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);
 	/* other process can transfer packets from one ring to
 	 * other, use delay here to free that packets. */
 	sleep(1);
-	_ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+	if (pktio_entry->s.ipc.tx.free)
+		_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-	tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
-	tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
+	if (pktio_entry->s.ipc.tx.send)
+		tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
+	if (pktio_entry->s.ipc.tx.free)
+		tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
 	if (tx_send | tx_free) {
 		ODP_DBG("IPC rings: tx send %d tx free %d\n",
-			_ring_free_count(pktio_entry->s.ipc.tx.send),
-			_ring_free_count(pktio_entry->s.ipc.tx.free));
+			tx_send, tx_free);
 	}
 
 	return 0;
@@ -747,23 +682,31 @@  static int ipc_close(pktio_entry_t *pktio_entry)
 {
 	char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];
 	char *dev = pktio_entry->s.name;
+	char name[ODP_POOL_NAME_LEN];
+	char tail[ODP_POOL_NAME_LEN];
+	int pid = 0;
 
 	ipc_stop(pktio_entry);
 
-	if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
-		/* unlink this pktio info for both master and slave */
-		odp_shm_free(pktio_entry->s.ipc.pinfo_shm);
+	odp_shm_free(pktio_entry->s.ipc.remote_pool_shm);
 
-		/* destroy rings */
-		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
-		_ring_destroy(ipc_shm_name);
-		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
-		_ring_destroy(ipc_shm_name);
-		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
-		_ring_destroy(ipc_shm_name);
-		snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
-		_ring_destroy(ipc_shm_name);
-	}
+	if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2)
+		snprintf(name, sizeof(name), "ipc:%s", tail);
+	else
+		snprintf(name, sizeof(name), "%s", dev);
+
+	/* unlink this pktio info for both master and slave */
+	odp_shm_free(pktio_entry->s.ipc.pinfo_shm);
+
+	/* destroy rings */
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", name);
+	_ring_destroy(ipc_shm_name);
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", name);
+	_ring_destroy(ipc_shm_name);
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", name);
+	_ring_destroy(ipc_shm_name);
+	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", name);
+	_ring_destroy(ipc_shm_name);
 
 	return 0;
 }
@@ -795,4 +738,3 @@  const pktio_if_ops_t ipc_pktio_ops = {
 	.pktin_ts_from_ns = NULL,
 	.config = NULL
 };
-#endif
diff --git a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh
index 3cd28f5..52e8d42 100755
--- a/test/linux-generic/pktio_ipc/pktio_ipc_run.sh
+++ b/test/linux-generic/pktio_ipc/pktio_ipc_run.sh
@@ -25,19 +25,23 @@  run()
 	rm -rf /tmp/odp-* 2>&1 > /dev/null
 
 	echo "==== run pktio_ipc1 then pktio_ipc2 ===="
-	pktio_ipc1${EXEEXT} -t 30 &
+	pktio_ipc1${EXEEXT} -t 10 &
 	IPC_PID=$!
 
-	pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 10
+	pktio_ipc2${EXEEXT} -p ${IPC_PID} -t 5
 	ret=$?
 	# pktio_ipc1 should do clean up and exit just
 	# after pktio_ipc2 exited. If it does not happen
 	# kill him in test.
-	sleep 1
-	kill ${IPC_PID} 2>&1 > /dev/null
+	sleep 13
+	(kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null
 	if [ $? -eq 0 ]; then
-		ls -l /tmp/odp*
+		echo "pktio_ipc1${EXEEXT} was killed"
+		ls -l /tmp/odp* 2> /dev/null
 		rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null
+	else
+		echo "normal exit of 2 application"
+		ls -l /tmp/odp* 2> /dev/null
 	fi
 
 	if [ $ret -ne 0 ]; then
@@ -47,21 +51,32 @@  run()
 		echo "First stage PASSED"
 	fi
 
-
 	echo "==== run pktio_ipc2 then pktio_ipc1 ===="
-	pktio_ipc2${EXEEXT} -t 20 &
+	pktio_ipc2${EXEEXT} -t 10 &
 	IPC_PID=$!
 
-	pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 10
+	pktio_ipc1${EXEEXT} -p ${IPC_PID} -t 5
 	ret=$?
-	(kill ${IPC_PID} 2>&1 > /dev/null) > /dev/null || true
+	# pktio_ipc2 do not exit on pktio_ipc1 disconnect
+	# wait until it exits cleanly
+	sleep 13
+	(kill ${IPC_PID} 2>&1 > /dev/null ) > /dev/null
+	if [ $? -eq 0 ]; then
+		echo "pktio_ipc2${EXEEXT} was killed"
+		ls -l /tmp/odp* 2> /dev/null
+		rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null
+	else
+		echo "normal exit of 2 application"
+		ls -l /tmp/odp* 2> /dev/null
+	fi
 
 	if [ $ret -ne 0 ]; then
 		echo "!!! FAILED !!!"
-		ls -l /tmp/odp*
+		ls -l /tmp/odp* 2> /dev/null
 		rm -rf /tmp/odp-${IPC_PID}* 2>&1 > /dev/null
 		exit $ret
 	else
+		ls -l /tmp/odp* 2> /dev/null
 		echo "Second stage PASSED"
 	fi