diff mbox

[API-NEXT,6/7] linux-generic: add ipc pktio support

Message ID 1429811225-10239-7-git-send-email-maxim.uvarov@linaro.org
State New
Headers show

Commit Message

Maxim Uvarov April 23, 2015, 5:47 p.m. UTC
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
---
 platform/linux-generic/Makefile.am                 |   2 +
 .../linux-generic/include/odp_buffer_internal.h    |   3 +
 .../linux-generic/include/odp_packet_io_internal.h |  15 +
 .../include/odp_packet_io_ipc_internal.h           |  47 ++
 platform/linux-generic/odp_packet_io.c             |  30 +-
 platform/linux-generic/odp_packet_io_ipc.c         | 590 +++++++++++++++++++++
 6 files changed, 686 insertions(+), 1 deletion(-)
 create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
 create mode 100644 platform/linux-generic/odp_packet_io_ipc.c

Comments

Ciprian Barbu April 27, 2015, 12:24 p.m. UTC | #1
On Thu, Apr 23, 2015 at 8:47 PM, Maxim Uvarov <maxim.uvarov@linaro.org> wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  15 +
>  .../include/odp_packet_io_ipc_internal.h           |  47 ++
>  platform/linux-generic/odp_packet_io.c             |  30 +-
>  platform/linux-generic/odp_packet_io_ipc.c         | 590 +++++++++++++++++++++
>  6 files changed, 686 insertions(+), 1 deletion(-)
>  create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/odp_packet_io_ipc.c
>
> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
> index 66f0474..055f75c 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -120,6 +120,7 @@ noinst_HEADERS = \
>                   ${top_srcdir}/platform/linux-generic/include/odp_internal.h \
>                   ${top_srcdir}/platform/linux-generic/include/odp_packet_internal.h \
>                   ${top_srcdir}/platform/linux-generic/include/odp_packet_io_internal.h \
> +                 ${top_srcdir}/platform/linux-generic/include/odp_packet_io_ipc_internal.h \
>                   ${top_srcdir}/platform/linux-generic/include/odp_packet_io_queue.h \
>                   ${top_srcdir}/platform/linux-generic/include/odp_packet_socket.h \
>                   ${top_srcdir}/platform/linux-generic/include/odp_pool_internal.h \
> @@ -155,6 +156,7 @@ __LIB__libodp_la_SOURCES = \
>                            odp_packet.c \
>                            odp_packet_flags.c \
>                            odp_packet_io.c \
> +                          odp_packet_io_ipc.c \
>                            odp_packet_socket.c \
>                            odp_pool.c \
>                            odp_queue.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
> index 3a3d2a2..4ea7c62 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -129,6 +129,9 @@ typedef struct odp_buffer_hdr_t {
>         size_t                   udata_size; /* size of user metadata */
>         uint32_t                 segcount;   /* segment count */
>         uint32_t                 segsize;    /* segment size */
> +       /* ipc mapped process can not walk over pointers,
> +        * offset has to be used */
> +       uint64_t                 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>         void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>  } odp_buffer_hdr_t;
>
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
> index 18b59ef..744f438 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -23,6 +23,7 @@ extern "C" {
>  #include <odp_classification_datamodel.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp/helper/ring.h>
>
>  #include <odp/config.h>
>  #include <odp/hints.h>
> @@ -36,6 +37,8 @@ typedef enum {
>         ODP_PKTIO_TYPE_SOCKET_MMSG,
>         ODP_PKTIO_TYPE_SOCKET_MMAP,
>         ODP_PKTIO_TYPE_LOOPBACK,
> +       ODP_PKTIO_TYPE_IPC,
> +       ODP_PKTIO_TYPE_IPC_SLAVE,
>  } odp_pktio_type_t;
>
>  struct pktio_entry {
> @@ -53,6 +56,18 @@ struct pktio_entry {
>         char name[IFNAMSIZ];            /**< name of pktio provided to
>                                            pktio_open() */
>         odp_bool_t promisc;             /**< promiscuous mode state */
> +       odph_ring_t     *ipc_r;         /**< ODP ring for IPC mgs packets

typo: mgs

> +                                       indexes transmitted to shared memory */
> +       odph_ring_t     *ipc_p;         /**< ODP ring for IPC msg packets
> +                                       indexes already processed by remote process */
> +       void            *ipc_pool_base; /**< IPC Remote pool base addr */
> +       void            *ipc_pool_mdata_base; /**< IPC Remote pool mdata base addr */
> +       uint64_t        ipc_pkt_size;   /**< IPC: packet size in remote pool */
> +
> +       odph_ring_t     *ipc_r_slave;
> +       odph_ring_t     *ipc_p_slave;
> +
> +       odp_pool_t      ipc_pool;       /**< IPC: Pool of main process */
>  };
>
>  typedef union {
> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> new file mode 100644
> index 0000000..a766519
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> @@ -0,0 +1,47 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/packet_io.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp/packet.h>
> +#include <odp_packet_internal.h>
> +#include <odp_internal.h>
> +#include <odp/shared_memory.h>
> +
> +#include <string.h>
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +                                       odp ring queue */
> +
> +/* that struct is exported to shared memory, so that 2 processes can find
> + * each other.
> + */
> +struct pktio_info {
> +       char remote_pool_name[30];
> +       int shm_pool_num;
> +       size_t shm_pkt_pool_size;
> +       size_t shm_pkt_size;
> +       size_t mdata_offset; /*< offset from shared memory block start
> +                             *to pool_mdata_addr */
> +       struct {
> +               size_t mdata_offset;
> +               char   pool_name[30];
> +       } slave;
> +} __packed;
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +                  odp_pool_t pool);
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                  unsigned len);
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                  unsigned len);
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index cfe5b71..01077fb 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -18,6 +18,7 @@
>  #include <odp_schedule_internal.h>
>  #include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_packet_io_ipc_internal.h>
>
>  #include <string.h>
>  #include <sys/ioctl.h>
> @@ -25,6 +26,15 @@
>  #include <ifaddrs.h>
>  #include <errno.h>
>
> +#include <sys/types.h>
> +#include <unistd.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +                                       odp ring queue */
> +
>  /* MTU to be reported for the "loop" interface */
>  #define PKTIO_LOOP_MTU 1500
>  /* MAC address for the "loop" interface */
> @@ -263,7 +273,12 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool)
>
>         if (strcmp(dev, "loop") == 0)
>                 ret = init_loop(pktio_entry, id);
> -       else
> +       else if (!strncmp(dev, "ipc", 3)) {
> +               ret = ipc_pktio_init(pktio_entry, dev, pool);
> +               if (ret != 0)
> +                       ODP_ABORT("unable to init ipc for %s, pool %" PRIu64 "\n",
> +                                 dev, pool);
> +       } else
>                 ret = init_socket(pktio_entry, dev, pool);
>
>         if (ret != 0) {
> @@ -285,6 +300,10 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
>  {
>         odp_pktio_t id;
>
> +       /* no local table lookup for ipc case */
> +       if (pool == NULL && !memcmp(dev, "ipc", 3))
> +               goto no_local_lookup;

I'm not sure about this, don't we need this lookup in case the slave
process tries to open the ipc pktio multiple times?

> +
>         id = odp_pktio_lookup(dev);
>         if (id != ODP_PKTIO_INVALID) {
>                 /* interface is already open */
> @@ -292,6 +311,7 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
>                 return ODP_PKTIO_INVALID;
>         }
>
> +no_local_lookup:
>         odp_spinlock_lock(&pktio_tbl->lock);
>         id = setup_pktio_entry(dev, pool);
>         odp_spinlock_unlock(&pktio_tbl->lock);
> @@ -408,6 +428,10 @@ int odp_pktio_recv(odp_pktio_t id, odp_packet_t pkt_table[], int len)
>         case ODP_PKTIO_TYPE_LOOPBACK:
>                 pkts = deq_loopback(pktio_entry, pkt_table, len);
>                 break;
> +       case ODP_PKTIO_TYPE_IPC_SLAVE:
> +       case ODP_PKTIO_TYPE_IPC:
> +               pkts = ipc_pktio_recv(pktio_entry, pkt_table, len);
> +               break;
>         default:
>                 pkts = -1;
>                 break;
> @@ -462,6 +486,10 @@ int odp_pktio_send(odp_pktio_t id, odp_packet_t pkt_table[], int len)
>         case ODP_PKTIO_TYPE_LOOPBACK:
>                 pkts = enq_loopback(pktio_entry, pkt_table, len);
>                 break;
> +       case ODP_PKTIO_TYPE_IPC:
> +       case ODP_PKTIO_TYPE_IPC_SLAVE:
> +               pkts = ipc_pktio_send(pktio_entry, pkt_table, len);
> +               break;
>         default:
>                 pkts = -1;
>         }
> diff --git a/platform/linux-generic/odp_packet_io_ipc.c b/platform/linux-generic/odp_packet_io_ipc.c
> new file mode 100644
> index 0000000..312ad3a
> --- /dev/null
> +++ b/platform/linux-generic/odp_packet_io_ipc.c
> @@ -0,0 +1,590 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_packet_io_ipc_internal.h>
> +#include <odp_debug_internal.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_spin_internal.h>
> +#include <odp/system_info.h>
> +
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size);
> +
> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> +{
> +       pool_entry_t *pool;
> +       uint32_t pool_id;
> +       odp_shm_t shm;
> +       odp_shm_info_t info;
> +
> +       pool_id = pool_handle_to_index(pool_hdl);
> +       pool    = get_pool_entry(pool_id);
> +       shm = pool->s.pool_shm;
> +
> +       odp_shm_info(shm, &info);
> +
> +       return info.name;
> +}
> +
> +/**
> +* Look up for shared memory object.
> +*
> +* @param name   name of shm object
> +*
> +* @return 0 on success, otherwise non-zero
> +*/
> +static int _odp_shm_lookup_ipc(const char *name)
> +{
> +       int shm;
> +
> +       shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> +       if (shm == -1) {
> +               ODP_DBG("IPC shm_open for %s not found\n", name);
> +               return -1;
> +       }
> +       close(shm);
> +       return 0;
> +}
> +
> +static struct pktio_info *_ipc_map_pool_info(const char *pool_name, int flag)
> +{
> +       char *name;
> +       struct pktio_info *pinfo;
> +
> +       /* Create info about remote pktio */
> +       name = (char *)malloc(strlen(pool_name) + sizeof("_info"));
> +       memcpy(name, pool_name, strlen(pool_name));
> +       memcpy(name + strlen(pool_name), "_info", sizeof("_info"));
> +       odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +                       ODP_CACHE_LINE_SIZE,
> +                       flag);
> +       if (ODP_SHM_INVALID == shm)
> +               ODP_ABORT("unable to reserve memory for shm info");
> +       free(name);
> +       pinfo = odp_shm_addr(shm);
> +       if (flag != ODP_SHM_PROC_NOCREAT)
> +               memset(pinfo->remote_pool_name, 0, 30);
> +       return pinfo;
> +}
> +
> +static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char *dev,
> +                          odp_pool_t pool)
> +{
> +       char ipc_shm_name[ODPH_RING_NAMESIZE];
> +       pool_entry_t *pool_entry;
> +       uint32_t pool_id;
> +       void *ipc_pool_base;
> +       struct pktio_info *pinfo;
> +       const char *pool_name;
> +
> +       pool_id = pool_handle_to_index(pool);
> +       pool_entry    = get_pool_entry(pool_id);
> +
> +       /* generate name in shm like ipc_pktio_r for
> +        * to be processed packets ring.
> +        */
> +       memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +       memcpy(ipc_shm_name, dev, strlen(dev));
> +       memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> +
> +       pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_r) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +               odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +       /* generate name in shm like ipc_pktio_p for
> +        * already processed packets
> +        */
> +       memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> +
> +       pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_p) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +               odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +       pktio_entry->s.ipc_r_slave = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_r_slave) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +       pktio_entry->s.ipc_p_slave = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_p_slave) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +       /* Memory to store information about exported pool */
> +       pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC);
> +
> +       /* Set up pool name for remote info */
> +       pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> +       memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
> +       pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
> +       pinfo->shm_pool_num = pool_entry->s.buf_num;
> +       pinfo->shm_pkt_size = pool_entry->s.seg_size;
> +       pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
> +                              pool_entry->s.pool_base_addr;
> +       pinfo->slave.mdata_offset = 0;
> +       ODP_DBG("Master waiting for slave to be connected now..\n");
> +
> +       /* Wait for remote process to export his pool. */
> +       ODP_DBG("Wait for second process set mdata_offset...\n");
> +       while (pinfo->slave.mdata_offset == 0)
> +               odp_spin();
> +
> +       ODP_DBG("Wait for second process set mdata_offset... DONE.\n");
> +
> +       while (1) {
> +               int ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
> +               if (!ret)
> +                       break;
> +               ODP_DBG("Master looking for %s\n", pinfo->slave.pool_name);
> +               sleep(1);
> +       }
> +
> +       ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> +                                            pinfo->shm_pkt_pool_size);
> +       pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +                                            pinfo->slave.mdata_offset;
> +       pktio_entry->s.ipc_pool = pool;
> +
> +       return 0;
> +}
> +
> +static odp_pool_t _ipc_odp_alloc_and_create_pool_slave(struct pktio_info *pinfo)
> +{
> +       odp_pool_t pool;
> +       char *pool_name;
> +       odp_pool_param_t params;
> +       int num = pinfo->shm_pool_num;
> +       uint64_t buf_size = pinfo->shm_pkt_size;
> +       pool_entry_t *pool_entry;
> +
> +       pool_name = calloc(1, strlen(pinfo->remote_pool_name) +
> +                          sizeof("ipc_pool_slave_"));
> +       sprintf(pool_name, "ipc_pool_slave_%s", pinfo->remote_pool_name);
> +
> +       ODP_DBG("slave uses pool %s\n", pool_name);
> +
> +       memset(&params, 0, sizeof(params));
> +       params.pkt.num = num;
> +       params.pkt.len = buf_size;
> +       params.pkt.seg_len = buf_size;
> +       params.type = ODP_POOL_PACKET;
> +       params.shm_flags = ODP_SHM_PROC;
> +
> +       pool = odp_pool_create(pool_name, ODP_SHM_NULL, &params);
> +       if (pool == ODP_POOL_INVALID)
> +               ODP_ABORT("Error: packet pool create failed.\n"
> +                       "num %d, len %d, seg_len %d\n",
> +                       params.pkt.num, params.pkt.len, params.pkt.seg_len);
> +
> +       /* Export info so that master can connect to that pool*/
> +       snprintf(pinfo->slave.pool_name, 30, "%s", pool_name);
> +       pool_entry = odp_pool_to_entry(pool);
> +       pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> +                                   pool_entry->s.pool_base_addr;
> +       free(pool_name);
> +
> +       return pool;
> +}
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size)
> +{
> +       odp_shm_t shm;
> +
> +       ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> +       shm = odp_shm_reserve(name,
> +                       size,
> +                       ODP_CACHE_LINE_SIZE,
> +                       ODP_SHM_PROC_NOCREAT);
> +       if (shm == ODP_SHM_INVALID)
> +               ODP_ABORT("unable map %s\n", name);
> +       return odp_shm_addr(shm);
> +}
> +
> +static void *_ipc_shm_map(char *name, size_t size, int timeout)
> +{
> +       odp_shm_t shm;
> +       int ret;
> +
> +       while (1) {
> +               ret = _odp_shm_lookup_ipc(name);
> +               if (!ret)
> +                       break;
> +               ODP_DBG("Waiting for %s\n", name);
> +               if (timeout <= 0)
> +                       return NULL;
> +               timeout--;
> +               sleep(1);
> +       }
> +
> +       shm = odp_shm_reserve(name, size,
> +                       ODP_CACHE_LINE_SIZE,
> +                       ODP_SHM_PROC_NOCREAT);
> +       if (ODP_SHM_INVALID == shm)
> +               ODP_ABORT("unable to map: %s\n", name);
> +
> +       return odp_shm_addr(shm);
> +}
> +
> +static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t *pktio_entry)
> +{
> +       int ret = -1;
> +       char ipc_shm_name[ODPH_RING_NAMESIZE];
> +       size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> +                          sizeof(odph_ring_t);
> +       struct pktio_info *pinfo;
> +       void *ipc_pool_base;
> +
> +       memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +       memcpy(ipc_shm_name, dev, strlen(dev));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> +       pktio_entry->s.ipc_r  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_r) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +               odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> +       pktio_entry->s.ipc_p = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_p) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +               odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +       pktio_entry->s.ipc_r_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_r_slave) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +       pktio_entry->s.ipc_p_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_p_slave) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +
> +       /* Get info about remote pool */
> +       pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC_NOCREAT);
> +
> +       ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
> +                                            pinfo->shm_pkt_pool_size);
> +       pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +                                            pinfo->mdata_offset;
> +       pktio_entry->s.ipc_pkt_size = pinfo->shm_pkt_size;
> +
> +       /* @todo: to simplify in linux-generic implementation we create pool for
> +        * packets from IPC queue. On receive implementation copies packets to
> +        * that pool. Later we can try to reuse original pool without packets
> +        * copying.
> +        */
> +       pktio_entry->s.ipc_pool = _ipc_odp_alloc_and_create_pool_slave(pinfo);
> +
> +       ret = 0;
> +       ODP_DBG("%s OK.\n", __func__);
> +error:
> +       /* @todo free shm on error (api not impemented yet) */
> +       return ret;
> +}
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +                          odp_pool_t pool)
> +{
> +       int ret;
> +
> +       /* if pool is zero we assume that it's slave process connects
> +        * to shared memory already created by main process.
> +        */
> +       if (pool) {
> +               pktio_entry->s.type = ODP_PKTIO_TYPE_IPC;
> +               ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
> +       } else {
> +               pktio_entry->s.type = ODP_PKTIO_TYPE_IPC_SLAVE;
> +               ret = _ipc_pktio_init_slave(dev, pktio_entry);
> +       }
> +
> +       return ret;
> +}
> +
> +
> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> +                              uint32_t offset,
> +                              uint32_t *seglen,
> +                              uint32_t limit)
> +{
> +       int seg_index  = offset / buf->segsize;
> +       int seg_offset = offset % buf->segsize;
> +       void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> +
> +       if (seglen != NULL) {
> +               uint32_t buf_left = limit - offset;
> +               *seglen = seg_offset + buf_left <= buf->segsize ?
> +                       buf_left : buf->segsize - seg_offset;
> +       }
> +
> +       return (void *)(seg_offset + (uint8_t *)addr);
> +}
> +
> +
> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> +                              uint32_t offset, uint32_t *seglen)
> +{
> +       if (offset > pkt_hdr->frame_len)
> +               return NULL;
> +
> +       return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> +                         pkt_hdr->headroom + offset, seglen,
> +                         pkt_hdr->headroom + pkt_hdr->frame_len);
> +}
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> +                     odp_packet_t pkt_table[], unsigned len)
> +{
> +       int pkts = 0;
> +       int ret;
> +       int i;
> +       odph_ring_t *r;
> +       odph_ring_t *r_p;
> +       odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> +       void **ipcbufs_p = (void *)&remote_pkts;
> +       unsigned ring_len;
> +       int p_free;
> +
> +       if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +               r = pktio_entry->s.ipc_r_slave;
> +               r_p = pktio_entry->s.ipc_p_slave;
> +       } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +               r = pktio_entry->s.ipc_r;
> +               r_p = pktio_entry->s.ipc_p;
> +       } else {
> +               ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +       }
> +
> +       ring_len = odph_ring_count(r);
> +
> +       pkts = len;
> +       if (len > ring_len)
> +               pkts = ring_len;
> +
> +       /* Do not dequeue more then we can put to producer ring */
> +       p_free = odph_ring_free_count(r_p);
> +       if (pkts > p_free)
> +               pkts = p_free;
> +
> +       ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts);
> +       if (ret != 0) {
> +               ODP_DBG("error to dequeue no packets\n");
> +               pkts = -1;
> +               return pkts;
> +       }
> +
> +       if (pkts == 0)
> +               return 0;
> +
> +       for (i = 0; i < pkts; i++) {
> +               odp_pool_t pool;
> +               odp_packet_t pkt;
> +               odp_packet_hdr_t *phdr;
> +               odp_buffer_bits_t handle;
> +               int idx; /* Remote packet has coded pool and index.
> +                         * We need only index.*/
> +               void *pkt_data;
> +               void *remote_pkt_data;
> +
> +               handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> +               idx = handle.index;
> +
> +               /* Link to packed data. To this line we have Zero-Copy between
> +                * processes, to simplify use packet copy in that version which
> +                * can be removed later with more advance buffer management
> +                * (ref counters).
> +                */
> +               /* reverse odp_buf_to_hdr() */
> +               phdr = (odp_packet_hdr_t *)(
> +                               (char *)pktio_entry->s.ipc_pool_mdata_base +
> +                               (idx * ODP_CACHE_LINE_SIZE));
> +
> +               /* Allocate new packet. Select*/
> +               pool = pktio_entry->s.ipc_pool;
> +               if (odp_unlikely(pool == ODP_POOL_INVALID))
> +                       ODP_ABORT("invalid pool");
> +
> +               pkt = odp_packet_alloc(pool, phdr->frame_len);
> +               if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +                       /* Original pool might be smaller then
> +                       *  PKTIO_IPC_ENTRIES. If packet can not be
> +                        * allocated from pool at this time,
> +                        * simple get in on next recv() call.
> +                        */
> +                       pkts = i - 1;
> +                       break;
> +               }
> +
> +               /* Copy packet data. */
> +               pkt_data = odp_packet_data(pkt);
> +               if (odp_unlikely(pkt_data == NULL))
> +                       ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> +                                 (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                       pktio_entry->s.type));
> +
> +               remote_pkt_data =  _ipc_packet_map(phdr, 0, NULL);
> +               if (odp_unlikely(remote_pkt_data == NULL))
> +                       ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
> +                                 (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                       pktio_entry->s.type));
> +
> +               /* @todo fix copy packet!!! */
> +               memcpy(pkt_data, remote_pkt_data, phdr->frame_len);
> +
> +               /* Copy packets L2, L3 parsed offsets and size */
> +               odp_packet_hdr(pkt)->l2_offset = phdr->l2_offset;
> +               odp_packet_hdr(pkt)->l3_offset = phdr->l3_offset;
> +               odp_packet_hdr(pkt)->l4_offset = phdr->l4_offset;
> +               odp_packet_hdr(pkt)->payload_offset = phdr->payload_offset;
> +
> +               odp_packet_hdr(pkt)->vlan_s_tag =  phdr->vlan_s_tag;
> +               odp_packet_hdr(pkt)->vlan_c_tag =  phdr->vlan_c_tag;
> +               odp_packet_hdr(pkt)->l3_protocol = phdr->l3_protocol;
> +               odp_packet_hdr(pkt)->l3_len = phdr->l3_len;
> +
> +               odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
> +               odp_packet_hdr(pkt)->headroom = phdr->headroom;
> +               odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
> +               pkt_table[i] = pkt;
> +       }
> +
> +       /* Now tell other process that we no longer need that buffers.*/
> +       ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts);
> +       if (ret != 0)
> +               ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> +
> +       return pkts;
> +}
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                     unsigned len)
> +{
> +       odph_ring_t *r;
> +       odph_ring_t *r_p;
> +       void **rbuf_p;
> +       int ret;
> +       unsigned i;
> +
> +       if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +               r = pktio_entry->s.ipc_r_slave;
> +               r_p = pktio_entry->s.ipc_p_slave;
> +       } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +               r = pktio_entry->s.ipc_r;
> +               r_p = pktio_entry->s.ipc_p;
> +       } else {
> +               ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +       }
> +
> +       /* Free already processed packets, if any */
> +       {
> +               unsigned complete_packets = odph_ring_count(r_p);
> +               odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +
> +               if (complete_packets > 0) {
> +                       rbuf_p = (void *)&r_p_pkts;
> +                       ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p,
> +                                       complete_packets);
> +                       if (ret == 0) {
> +                               for (i = 0; i < complete_packets; i++)
> +                                       odp_packet_free(r_p_pkts[i]);
> +                       }
> +               }
> +       }
> +
> +       /* wait while second process takes packet from the ring.*/
> +       i = 3;
> +       while (odph_ring_free_count(r) < len && i) {
> +               i--;
> +               sleep(1);
> +       }
> +
> +       /* Put packets to ring to be processed in other process. */
> +       for (i = 0; i < len; i++) {
> +               int j;
> +               odp_packet_t pkt =  pkt_table[i];
> +               rbuf_p = (void *)&pkt;
> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> +
> +               /* buf_hdr.addr can not be used directly in remote process,
> +                * convert it to offset
> +                */
> +               for (j = 0; j < ODP_BUFFER_MAX_SEG; j++)
> +                       pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
> +                               (char *)pkt_hdr->buf_hdr.addr[j];
> +
> +               ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
> +               if (odp_unlikely(ret != 0)) {
> +                       ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
> +                               getpid(),
> +                               (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                pktio_entry->s.type),
> +                               ret);
> +                       ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
> +                               odph_ring_full(r), odph_ring_count(r),
> +                               odph_ring_free_count(r));
> +               }
> +       }
> +       return len;
> +}
> --
> 1.9.1
>
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> https://lists.linaro.org/mailman/listinfo/lng-odp
Stuart Haslam April 27, 2015, 1:03 p.m. UTC | #2
On Thu, Apr 23, 2015 at 08:47:04PM +0300, Maxim Uvarov wrote:
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  15 +
>  .../include/odp_packet_io_ipc_internal.h           |  47 ++
>  platform/linux-generic/odp_packet_io.c             |  30 +-
>  platform/linux-generic/odp_packet_io_ipc.c         | 590 +++++++++++++++++++++
>  6 files changed, 686 insertions(+), 1 deletion(-)
>  create mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/odp_packet_io_ipc.c
> 
> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
> index 66f0474..055f75c 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -120,6 +120,7 @@ noinst_HEADERS = \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_internal.h \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_packet_internal.h \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_internal.h \
> +		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_ipc_internal.h \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_queue.h \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_packet_socket.h \
>  		  ${top_srcdir}/platform/linux-generic/include/odp_pool_internal.h \
> @@ -155,6 +156,7 @@ __LIB__libodp_la_SOURCES = \
>  			   odp_packet.c \
>  			   odp_packet_flags.c \
>  			   odp_packet_io.c \
> +			   odp_packet_io_ipc.c \
>  			   odp_packet_socket.c \
>  			   odp_pool.c \
>  			   odp_queue.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
> index 3a3d2a2..4ea7c62 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -129,6 +129,9 @@ typedef struct odp_buffer_hdr_t {
>  	size_t                   udata_size; /* size of user metadata */
>  	uint32_t                 segcount;   /* segment count */
>  	uint32_t                 segsize;    /* segment size */
> +	/* ipc mapped process can not walk over pointers,
> +	 * offset has to be used */
> +	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>  	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
>  } odp_buffer_hdr_t;
>  
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
> index 18b59ef..744f438 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -23,6 +23,7 @@ extern "C" {
>  #include <odp_classification_datamodel.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp/helper/ring.h>
>  
>  #include <odp/config.h>
>  #include <odp/hints.h>
> @@ -36,6 +37,8 @@ typedef enum {
>  	ODP_PKTIO_TYPE_SOCKET_MMSG,
>  	ODP_PKTIO_TYPE_SOCKET_MMAP,
>  	ODP_PKTIO_TYPE_LOOPBACK,
> +	ODP_PKTIO_TYPE_IPC,
> +	ODP_PKTIO_TYPE_IPC_SLAVE,
>  } odp_pktio_type_t;
>  
>  struct pktio_entry {
> @@ -53,6 +56,18 @@ struct pktio_entry {
>  	char name[IFNAMSIZ];		/**< name of pktio provided to
>  					   pktio_open() */
>  	odp_bool_t promisc;		/**< promiscuous mode state */
> +	odph_ring_t	*ipc_r;		/**< ODP ring for IPC mgs packets
> +					indexes transmitted to shared memory */
> +	odph_ring_t	*ipc_p;		/**< ODP ring for IPC msg packets
> +					indexes already processed by remote process */
> +	void		*ipc_pool_base; /**< IPC Remote pool base addr */
> +	void		*ipc_pool_mdata_base; /**< IPC Remote pool mdata base addr */
> +	uint64_t	ipc_pkt_size;	/**< IPC: packet size in remote pool */

We generally use uint32_t for packet length.

> +
> +	odph_ring_t	*ipc_r_slave;
> +	odph_ring_t	*ipc_p_slave;

Why are these needed?.. having looked at the rest of the patch I still
don't understand why both ends of the pipe need both sets (with and
without the _slave suffix) of pointers.

> +
> +	odp_pool_t	ipc_pool;	/**< IPC: Pool of main process */
>  };
>  
>  typedef union {
> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> new file mode 100644
> index 0000000..a766519
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> @@ -0,0 +1,47 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/packet_io.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp/packet.h>
> +#include <odp_packet_internal.h>
> +#include <odp_internal.h>
> +#include <odp/shared_memory.h>
> +
> +#include <string.h>
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +					odp ring queue */
> +
> +/* that struct is exported to shared memory, so that 2 processes can find
> + * each other.
> + */
> +struct pktio_info {
> +	char remote_pool_name[30];

ODP_POOL_NAME_LEN

> +	int shm_pool_num;
> +	size_t shm_pkt_pool_size;
> +	size_t shm_pkt_size;
> +	size_t mdata_offset; /*< offset from shared memory block start
> +			      *to pool_mdata_addr */
> +	struct {
> +		size_t mdata_offset;
> +		char   pool_name[30];

ODP_POOL_NAME_LEN

> +	} slave;
> +} __packed;
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +		   odp_pool_t pool);
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		   unsigned len);
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		   unsigned len);
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index cfe5b71..01077fb 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -18,6 +18,7 @@
>  #include <odp_schedule_internal.h>
>  #include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_packet_io_ipc_internal.h>
>  
>  #include <string.h>
>  #include <sys/ioctl.h>
> @@ -25,6 +26,15 @@
>  #include <ifaddrs.h>
>  #include <errno.h>
>  
> +#include <sys/types.h>
> +#include <unistd.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +					odp ring queue */

This is defined in odp_packet_io_ipc_internal.h too.

> +
>  /* MTU to be reported for the "loop" interface */
>  #define PKTIO_LOOP_MTU 1500
>  /* MAC address for the "loop" interface */
> @@ -263,7 +273,12 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool)
>  
>  	if (strcmp(dev, "loop") == 0)
>  		ret = init_loop(pktio_entry, id);
> -	else
> +	else if (!strncmp(dev, "ipc", 3)) {
> +		ret = ipc_pktio_init(pktio_entry, dev, pool);
> +		if (ret != 0)
> +			ODP_ABORT("unable to init ipc for %s, pool %" PRIu64 "\n",
> +				  dev, pool);
> +	} else
>  		ret = init_socket(pktio_entry, dev, pool);
>  
>  	if (ret != 0) {
> @@ -285,6 +300,10 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
>  {
>  	odp_pktio_t id;
>  
> +	/* no local table lookup for ipc case */
> +	if (pool == NULL && !memcmp(dev, "ipc", 3))
> +		goto no_local_lookup;
> +
>  	id = odp_pktio_lookup(dev);
>  	if (id != ODP_PKTIO_INVALID) {
>  		/* interface is already open */
> @@ -292,6 +311,7 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
>  		return ODP_PKTIO_INVALID;
>  	}
>  
> +no_local_lookup:
>  	odp_spinlock_lock(&pktio_tbl->lock);
>  	id = setup_pktio_entry(dev, pool);
>  	odp_spinlock_unlock(&pktio_tbl->lock);
> @@ -408,6 +428,10 @@ int odp_pktio_recv(odp_pktio_t id, odp_packet_t pkt_table[], int len)
>  	case ODP_PKTIO_TYPE_LOOPBACK:
>  		pkts = deq_loopback(pktio_entry, pkt_table, len);
>  		break;
> +	case ODP_PKTIO_TYPE_IPC_SLAVE:
> +	case ODP_PKTIO_TYPE_IPC:
> +		pkts = ipc_pktio_recv(pktio_entry, pkt_table, len);
> +		break;
>  	default:
>  		pkts = -1;
>  		break;
> @@ -462,6 +486,10 @@ int odp_pktio_send(odp_pktio_t id, odp_packet_t pkt_table[], int len)
>  	case ODP_PKTIO_TYPE_LOOPBACK:
>  		pkts = enq_loopback(pktio_entry, pkt_table, len);
>  		break;
> +	case ODP_PKTIO_TYPE_IPC:
> +	case ODP_PKTIO_TYPE_IPC_SLAVE:
> +		pkts = ipc_pktio_send(pktio_entry, pkt_table, len);
> +		break;
>  	default:
>  		pkts = -1;
>  	}
> diff --git a/platform/linux-generic/odp_packet_io_ipc.c b/platform/linux-generic/odp_packet_io_ipc.c
> new file mode 100644
> index 0000000..312ad3a
> --- /dev/null
> +++ b/platform/linux-generic/odp_packet_io_ipc.c
> @@ -0,0 +1,590 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_packet_io_ipc_internal.h>
> +#include <odp_debug_internal.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_spin_internal.h>
> +#include <odp/system_info.h>
> +
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size);
> +
> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> +{
> +	pool_entry_t *pool;
> +	uint32_t pool_id;
> +	odp_shm_t shm;
> +	odp_shm_info_t info;
> +
> +	pool_id = pool_handle_to_index(pool_hdl);
> +	pool    = get_pool_entry(pool_id);
> +	shm = pool->s.pool_shm;
> +
> +	odp_shm_info(shm, &info);
> +
> +	return info.name;
> +}
> +
> +/**
> +* Look up for shared memory object.
> +*
> +* @param name   name of shm object
> +*
> +* @return 0 on success, otherwise non-zero
> +*/
> +static int _odp_shm_lookup_ipc(const char *name)
> +{
> +	int shm;
> +
> +	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> +	if (shm == -1) {
> +		ODP_DBG("IPC shm_open for %s not found\n", name);

The open could have failed for some reason other than "not found", in
which case this error message will be misleading.

> +		return -1;
> +	}
> +	close(shm);
> +	return 0;
> +}
> +
> +static struct pktio_info *_ipc_map_pool_info(const char *pool_name, int flag)
> +{
> +	char *name;
> +	struct pktio_info *pinfo;
> +
> +	/* Create info about remote pktio */
> +	name = (char *)malloc(strlen(pool_name) + sizeof("_info"));

Not enough room for the terminator.

> +	memcpy(name, pool_name, strlen(pool_name));
> +	memcpy(name + strlen(pool_name), "_info", sizeof("_info"));

Wouldn't it be simpler and safer just to do this on the stack;

char name[ODP_POOL_NAME_LEN + sizeof("_info")];
snprintf(name, sizeof(name), "%s_info", pool_name);

> +	odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +			ODP_CACHE_LINE_SIZE,
> +			flag);
> +	if (ODP_SHM_INVALID == shm)
> +		ODP_ABORT("unable to reserve memory for shm info");
> +	free(name);
> +	pinfo = odp_shm_addr(shm);
> +	if (flag != ODP_SHM_PROC_NOCREAT)
> +		memset(pinfo->remote_pool_name, 0, 30);

There's no need to memset the entire array, just initialise the first
element.

> +	return pinfo;

What happens to shm?.. the caller doesn't get a reference to it so can't
free it as far as I can tell.

> +}
> +
> +static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char *dev,
> +			   odp_pool_t pool)
> +{
> +	char ipc_shm_name[ODPH_RING_NAMESIZE];
> +	pool_entry_t *pool_entry;
> +	uint32_t pool_id;
> +	void *ipc_pool_base;
> +	struct pktio_info *pinfo;
> +	const char *pool_name;
> +
> +	pool_id = pool_handle_to_index(pool);
> +	pool_entry    = get_pool_entry(pool_id);
> +
> +	/* generate name in shm like ipc_pktio_r for
> +	 * to be processed packets ring.
> +	 */
> +	memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +	memcpy(ipc_shm_name, dev, strlen(dev));
> +	memcpy(ipc_shm_name + strlen(dev), "_r", 2);

This may overflow and/or not be terminated, again snprintf would be
better.

> +
> +	pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC);
> +	if (!pktio_entry->s.ipc_r) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		return -1;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +		odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +	/* generate name in shm like ipc_pktio_p for
> +	 * already processed packets
> +	 */
> +	memcpy(ipc_shm_name + strlen(dev), "_p", 2);

and here

> +
> +	pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC);
> +	if (!pktio_entry->s.ipc_p) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		return -1;

What happens to ipc_r?

> +	}
> +
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +		odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +	pktio_entry->s.ipc_r_slave = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC);
> +	if (!pktio_entry->s.ipc_r_slave) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);

and again, leaking ipc_r + ipc_p.. same thing below.

> +		return -1;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +		odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +	pktio_entry->s.ipc_p_slave = odph_ring_create(ipc_shm_name,
> +			PKTIO_IPC_ENTRIES,
> +			ODPH_RING_SHM_PROC);
> +	if (!pktio_entry->s.ipc_p_slave) {
> +		ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +			getpid(), ipc_shm_name);
> +		return -1;
> +	}
> +	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +		odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +	/* Memory to store information about exported pool */
> +	pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC);
> +
> +	/* Set up pool name for remote info */
> +	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> +	memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
> +	pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
> +	pinfo->shm_pool_num = pool_entry->s.buf_num;
> +	pinfo->shm_pkt_size = pool_entry->s.seg_size;
> +	pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
> +			       pool_entry->s.pool_base_addr;
> +	pinfo->slave.mdata_offset = 0;
> +	ODP_DBG("Master waiting for slave to be connected now..\n");
> +
> +	/* Wait for remote process to export his pool. */
> +	ODP_DBG("Wait for second process set mdata_offset...\n");
> +	while (pinfo->slave.mdata_offset == 0)
> +		odp_spin();

Here you're spinning on a value in a non-volatile without a barrier,
which isn't going to work reliably.

> +
> +	ODP_DBG("Wait for second process set mdata_offset... DONE.\n");
> +
> +	while (1) {
> +		int ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
> +		if (!ret)
> +			break;
> +		ODP_DBG("Master looking for %s\n", pinfo->slave.pool_name);
> +		sleep(1);
> +	}
> +
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> +					     pinfo->shm_pkt_pool_size);
> +	pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->slave.mdata_offset;
> +	pktio_entry->s.ipc_pool = pool;
> +
> +	return 0;
> +}
> +
> +static odp_pool_t _ipc_odp_alloc_and_create_pool_slave(struct pktio_info *pinfo)
> +{
> +	odp_pool_t pool;
> +	char *pool_name;
> +	odp_pool_param_t params;
> +	int num = pinfo->shm_pool_num;
> +	uint64_t buf_size = pinfo->shm_pkt_size;
> +	pool_entry_t *pool_entry;
> +
> +	pool_name = calloc(1, strlen(pinfo->remote_pool_name) +
> +			   sizeof("ipc_pool_slave_"));
> +	sprintf(pool_name, "ipc_pool_slave_%s", pinfo->remote_pool_name);
> +
> +	ODP_DBG("slave uses pool %s\n", pool_name);
> +
> +	memset(&params, 0, sizeof(params));
> +	params.pkt.num = num;
> +	params.pkt.len = buf_size;
> +	params.pkt.seg_len = buf_size;
> +	params.type = ODP_POOL_PACKET;
> +	params.shm_flags = ODP_SHM_PROC;
> +
> +	pool = odp_pool_create(pool_name, ODP_SHM_NULL, &params);
> +	if (pool == ODP_POOL_INVALID)
> +		ODP_ABORT("Error: packet pool create failed.\n"
> +			"num %d, len %d, seg_len %d\n",
> +			params.pkt.num, params.pkt.len, params.pkt.seg_len);
> +
> +	/* Export info so that master can connect to that pool*/
> +	snprintf(pinfo->slave.pool_name, 30, "%s", pool_name);
> +	pool_entry = odp_pool_to_entry(pool);
> +	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> +				    pool_entry->s.pool_base_addr;
> +	free(pool_name);
> +
> +	return pool;
> +}
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size)
> +{
> +	odp_shm_t shm;
> +
> +	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> +	shm = odp_shm_reserve(name,
> +			size,
> +			ODP_CACHE_LINE_SIZE,
> +			ODP_SHM_PROC_NOCREAT);
> +	if (shm == ODP_SHM_INVALID)
> +		ODP_ABORT("unable map %s\n", name);
> +	return odp_shm_addr(shm);
> +}
> +
> +static void *_ipc_shm_map(char *name, size_t size, int timeout)
> +{
> +	odp_shm_t shm;
> +	int ret;
> +
> +	while (1) {
> +		ret = _odp_shm_lookup_ipc(name);
> +		if (!ret)
> +			break;
> +		ODP_DBG("Waiting for %s\n", name);
> +		if (timeout <= 0)
> +			return NULL;
> +		timeout--;
> +		sleep(1);
> +	}
> +
> +	shm = odp_shm_reserve(name, size,
> +			ODP_CACHE_LINE_SIZE,
> +			ODP_SHM_PROC_NOCREAT);
> +	if (ODP_SHM_INVALID == shm)
> +		ODP_ABORT("unable to map: %s\n", name);
> +
> +	return odp_shm_addr(shm);
> +}
> +
> +static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t *pktio_entry)
> +{
> +	int ret = -1;
> +	char ipc_shm_name[ODPH_RING_NAMESIZE];
> +	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> +			   sizeof(odph_ring_t);
> +	struct pktio_info *pinfo;
> +	void *ipc_pool_base;
> +
> +	memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +	memcpy(ipc_shm_name, dev, strlen(dev));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> +	pktio_entry->s.ipc_r  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc_r) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto error;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +		odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> +	pktio_entry->s.ipc_p = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc_p) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto error;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +		odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +	pktio_entry->s.ipc_r_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc_r_slave) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto error;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +		odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +	memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +	pktio_entry->s.ipc_p_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +	if (!pktio_entry->s.ipc_p_slave) {
> +		ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +			getpid(), dev);
> +		goto error;
> +	}
> +	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +		odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +
> +	/* Get info about remote pool */
> +	pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC_NOCREAT);
> +
> +	ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
> +					     pinfo->shm_pkt_pool_size);
> +	pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +					     pinfo->mdata_offset;
> +	pktio_entry->s.ipc_pkt_size = pinfo->shm_pkt_size;
> +
> +	/* @todo: to simplify in linux-generic implementation we create pool for
> +	 * packets from IPC queue. On receive implementation copies packets to
> +	 * that pool. Later we can try to reuse original pool without packets
> +	 * copying.
> +	 */
> +	pktio_entry->s.ipc_pool = _ipc_odp_alloc_and_create_pool_slave(pinfo);
> +
> +	ret = 0;
> +	ODP_DBG("%s OK.\n", __func__);
> +error:
> +	/* @todo free shm on error (api not impemented yet) */

It is now.

> +	return ret;
> +}
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +			   odp_pool_t pool)
> +{
> +	int ret;
> +
> +	/* if pool is zero we assume that it's slave process connects
> +	 * to shared memory already created by main process.
> +	 */
> +	if (pool) {

Should be pool != ODP_POOL_INVALID

> +		pktio_entry->s.type = ODP_PKTIO_TYPE_IPC;
> +		ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
> +	} else {
> +		pktio_entry->s.type = ODP_PKTIO_TYPE_IPC_SLAVE;
> +		ret = _ipc_pktio_init_slave(dev, pktio_entry);
> +	}
> +
> +	return ret;
> +}
> +
> +
> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> +			       uint32_t offset,
> +			       uint32_t *seglen,
> +			       uint32_t limit)
> +{
> +	int seg_index  = offset / buf->segsize;
> +	int seg_offset = offset % buf->segsize;
> +	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> +
> +	if (seglen != NULL) {
> +		uint32_t buf_left = limit - offset;
> +		*seglen = seg_offset + buf_left <= buf->segsize ?
> +			buf_left : buf->segsize - seg_offset;
> +	}
> +
> +	return (void *)(seg_offset + (uint8_t *)addr);
> +}
> +
> +
> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> +			       uint32_t offset, uint32_t *seglen)
> +{
> +	if (offset > pkt_hdr->frame_len)
> +		return NULL;
> +
> +	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> +			  pkt_hdr->headroom + offset, seglen,
> +			  pkt_hdr->headroom + pkt_hdr->frame_len);
> +}
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> +		      odp_packet_t pkt_table[], unsigned len)
> +{
> +	int pkts = 0;
> +	int ret;
> +	int i;
> +	odph_ring_t *r;
> +	odph_ring_t *r_p;
> +	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> +	void **ipcbufs_p = (void *)&remote_pkts;
> +	unsigned ring_len;
> +	int p_free;
> +
> +	if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +		r = pktio_entry->s.ipc_r_slave;
> +		r_p = pktio_entry->s.ipc_p_slave;
> +	} else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +		r = pktio_entry->s.ipc_r;
> +		r_p = pktio_entry->s.ipc_p;
> +	} else {
> +		ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +	}
> +
> +	ring_len = odph_ring_count(r);
> +
> +	pkts = len;
> +	if (len > ring_len)
> +		pkts = ring_len;
> +
> +	/* Do not dequeue more then we can put to producer ring */
> +	p_free = odph_ring_free_count(r_p);

This is going to be racy, by the time you've got around to enqueueing to
r_p it may have been changed via another thread. It may be protected by
a lock somewhere at the minute, but I'm assuming that'll not always be
the case as you're using the _mc_/_mp_ calls.

> +	if (pkts > p_free)
> +		pkts = p_free;
> +
> +	ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts);

It looks like this call will return -ENOENT if there are < pkts buffers
available, which will cause batching issues. odp_pktio_recv() should
return as many packets as are immediately available (up to the provided
len).

> +	if (ret != 0) {
> +		ODP_DBG("error to dequeue no packets\n");
> +		pkts = -1;
> +		return pkts;

A return value of -1 is an error, but having no packets to receive
shouldn't be treated as an error, so should just return 0;

> +	}
> +
> +	if (pkts == 0)
> +		return 0;

This check doesn't make sense here since the dequeue doesn't modify it,
it should be moved further up.

> +
> +	for (i = 0; i < pkts; i++) {
> +		odp_pool_t pool;
> +		odp_packet_t pkt;
> +		odp_packet_hdr_t *phdr;
> +		odp_buffer_bits_t handle;
> +		int idx; /* Remote packet has coded pool and index.
> +			  * We need only index.*/
> +		void *pkt_data;
> +		void *remote_pkt_data;
> +
> +		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> +		idx = handle.index;
> +
> +		/* Link to packed data. To this line we have Zero-Copy between
> +		 * processes, to simplify use packet copy in that version which
> +		 * can be removed later with more advance buffer management
> +		 * (ref counters).
> +		 */
> +		/* reverse odp_buf_to_hdr() */
> +		phdr = (odp_packet_hdr_t *)(
> +				(char *)pktio_entry->s.ipc_pool_mdata_base +
> +				(idx * ODP_CACHE_LINE_SIZE));
> +
> +		/* Allocate new packet. Select*/
> +		pool = pktio_entry->s.ipc_pool;
> +		if (odp_unlikely(pool == ODP_POOL_INVALID))
> +			ODP_ABORT("invalid pool");
> +
> +		pkt = odp_packet_alloc(pool, phdr->frame_len);
> +		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +			/* Original pool might be smaller then
> +			*  PKTIO_IPC_ENTRIES. If packet can not be
> +			 * allocated from pool at this time,
> +			 * simple get in on next recv() call.
> +			 */
> +			pkts = i - 1;
> +			break;
> +		}
> +
> +		/* Copy packet data. */
> +		pkt_data = odp_packet_data(pkt);
> +		if (odp_unlikely(pkt_data == NULL))
> +			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> +				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.type));
> +
> +		remote_pkt_data =  _ipc_packet_map(phdr, 0, NULL);
> +		if (odp_unlikely(remote_pkt_data == NULL))
> +			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
> +				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +					pktio_entry->s.type));
> +
> +		/* @todo fix copy packet!!! */
> +		memcpy(pkt_data, remote_pkt_data, phdr->frame_len);
> +
> +		/* Copy packets L2, L3 parsed offsets and size */
> +		odp_packet_hdr(pkt)->l2_offset = phdr->l2_offset;
> +		odp_packet_hdr(pkt)->l3_offset = phdr->l3_offset;
> +		odp_packet_hdr(pkt)->l4_offset = phdr->l4_offset;
> +		odp_packet_hdr(pkt)->payload_offset = phdr->payload_offset;
> +
> +		odp_packet_hdr(pkt)->vlan_s_tag =  phdr->vlan_s_tag;
> +		odp_packet_hdr(pkt)->vlan_c_tag =  phdr->vlan_c_tag;
> +		odp_packet_hdr(pkt)->l3_protocol = phdr->l3_protocol;
> +		odp_packet_hdr(pkt)->l3_len = phdr->l3_len;
> +
> +		odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
> +		odp_packet_hdr(pkt)->headroom = phdr->headroom;
> +		odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
> +		pkt_table[i] = pkt;
> +	}
> +
> +	/* Now tell other process that we no longer need that buffers.*/
> +	ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts);
> +	if (ret != 0)
> +		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> +
> +	return pkts;
> +}
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +		      unsigned len)
> +{
> +	odph_ring_t *r;
> +	odph_ring_t *r_p;
> +	void **rbuf_p;
> +	int ret;
> +	unsigned i;
> +
> +	if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +		r = pktio_entry->s.ipc_r_slave;
> +		r_p = pktio_entry->s.ipc_p_slave;
> +	} else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +		r = pktio_entry->s.ipc_r;
> +		r_p = pktio_entry->s.ipc_p;
> +	} else {
> +		ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +	}
> +
> +	/* Free already processed packets, if any */
> +	{
> +		unsigned complete_packets = odph_ring_count(r_p);
> +		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +
> +		if (complete_packets > 0) {
> +			rbuf_p = (void *)&r_p_pkts;
> +			ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p,
> +					complete_packets);
> +			if (ret == 0) {
> +				for (i = 0; i < complete_packets; i++)
> +					odp_packet_free(r_p_pkts[i]);
> +			}
> +		}
> +	}

The above block should go into a helper function.

> +
> +	/* wait while second process takes packet from the ring.*/
> +	i = 3;
> +	while (odph_ring_free_count(r) < len && i) {
> +		i--;
> +		sleep(1);
> +	}

If there's not enough room we shouldn't attempt to deal with it here,
just enqueue as many as we can.

> +
> +	/* Put packets to ring to be processed in other process. */
> +	for (i = 0; i < len; i++) {
> +		int j;
> +		odp_packet_t pkt =  pkt_table[i];
> +		rbuf_p = (void *)&pkt;
> +		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> +
> +		/* buf_hdr.addr can not be used directly in remote process,
> +		 * convert it to offset
> +		 */
> +		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++)
> +			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
> +				(char *)pkt_hdr->buf_hdr.addr[j];
> +
> +		ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);

Why enqueue 1 at a time?

> +		if (odp_unlikely(ret != 0)) {
> +			ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
> +				getpid(),
> +				(ODP_PKTIO_TYPE_IPC_SLAVE ==
> +				 pktio_entry->s.type),
> +				ret);
> +			ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
> +				odph_ring_full(r), odph_ring_count(r),
> +				odph_ring_free_count(r));
> +		}

As with the recv side I think the semantics are wrong, we should send as
many as can be sent immediately rather than blocking for an unbounded
length of time. It looks like the ring implementation has a mode that
does this;

enum odph_ring_queue_behavior {
	ODPH_RING_QUEUE_FIXED = 0, /**< Enq/Deq a fixed number
				of items from a ring */
	ODPH_RING_QUEUE_VARIABLE   /**< Enq/Deq as many items
				a possible from ring */
};

> +	}
> +	return len;
> +}
Ola Liljedahl April 27, 2015, 1:54 p.m. UTC | #3
On 27 April 2015 at 15:03, Stuart Haslam <stuart.haslam@linaro.org> wrote:

> On Thu, Apr 23, 2015 at 08:47:04PM +0300, Maxim Uvarov wrote:
> > Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> > ---
> >  platform/linux-generic/Makefile.am                 |   2 +
> >  .../linux-generic/include/odp_buffer_internal.h    |   3 +
> >  .../linux-generic/include/odp_packet_io_internal.h |  15 +
> >  .../include/odp_packet_io_ipc_internal.h           |  47 ++
> >  platform/linux-generic/odp_packet_io.c             |  30 +-
> >  platform/linux-generic/odp_packet_io_ipc.c         | 590
> +++++++++++++++++++++
> >  6 files changed, 686 insertions(+), 1 deletion(-)
> >  create mode 100644
> platform/linux-generic/include/odp_packet_io_ipc_internal.h
> >  create mode 100644 platform/linux-generic/odp_packet_io_ipc.c
> >
> > diff --git a/platform/linux-generic/Makefile.am
> b/platform/linux-generic/Makefile.am
> > index 66f0474..055f75c 100644
> > --- a/platform/linux-generic/Makefile.am
> > +++ b/platform/linux-generic/Makefile.am
> > @@ -120,6 +120,7 @@ noinst_HEADERS = \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_internal.h \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_internal.h \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_internal.h \
> > +
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_ipc_internal.h \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_queue.h \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_socket.h \
> >
>  ${top_srcdir}/platform/linux-generic/include/odp_pool_internal.h \
> > @@ -155,6 +156,7 @@ __LIB__libodp_la_SOURCES = \
> >                          odp_packet.c \
> >                          odp_packet_flags.c \
> >                          odp_packet_io.c \
> > +                        odp_packet_io_ipc.c \
> >                          odp_packet_socket.c \
> >                          odp_pool.c \
> >                          odp_queue.c \
> > diff --git a/platform/linux-generic/include/odp_buffer_internal.h
> b/platform/linux-generic/include/odp_buffer_internal.h
> > index 3a3d2a2..4ea7c62 100644
> > --- a/platform/linux-generic/include/odp_buffer_internal.h
> > +++ b/platform/linux-generic/include/odp_buffer_internal.h
> > @@ -129,6 +129,9 @@ typedef struct odp_buffer_hdr_t {
> >       size_t                   udata_size; /* size of user metadata */
> >       uint32_t                 segcount;   /* segment count */
> >       uint32_t                 segsize;    /* segment size */
> > +     /* ipc mapped process can not walk over pointers,
> > +      * offset has to be used */
> > +     uint64_t                 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
> >       void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs
> */
> >  } odp_buffer_hdr_t;
> >
> > diff --git a/platform/linux-generic/include/odp_packet_io_internal.h
> b/platform/linux-generic/include/odp_packet_io_internal.h
> > index 18b59ef..744f438 100644
> > --- a/platform/linux-generic/include/odp_packet_io_internal.h
> > +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> > @@ -23,6 +23,7 @@ extern "C" {
> >  #include <odp_classification_datamodel.h>
> >  #include <odp_align_internal.h>
> >  #include <odp_debug_internal.h>
> > +#include <odp/helper/ring.h>
> >
> >  #include <odp/config.h>
> >  #include <odp/hints.h>
> > @@ -36,6 +37,8 @@ typedef enum {
> >       ODP_PKTIO_TYPE_SOCKET_MMSG,
> >       ODP_PKTIO_TYPE_SOCKET_MMAP,
> >       ODP_PKTIO_TYPE_LOOPBACK,
> > +     ODP_PKTIO_TYPE_IPC,
> > +     ODP_PKTIO_TYPE_IPC_SLAVE,
> >  } odp_pktio_type_t;
> >
> >  struct pktio_entry {
> > @@ -53,6 +56,18 @@ struct pktio_entry {
> >       char name[IFNAMSIZ];            /**< name of pktio provided to
> >                                          pktio_open() */
> >       odp_bool_t promisc;             /**< promiscuous mode state */
> > +     odph_ring_t     *ipc_r;         /**< ODP ring for IPC mgs packets
> > +                                     indexes transmitted to shared
> memory */
> > +     odph_ring_t     *ipc_p;         /**< ODP ring for IPC msg packets
> > +                                     indexes already processed by
> remote process */
> > +     void            *ipc_pool_base; /**< IPC Remote pool base addr */
> > +     void            *ipc_pool_mdata_base; /**< IPC Remote pool mdata
> base addr */
> > +     uint64_t        ipc_pkt_size;   /**< IPC: packet size in remote
> pool */
>
> We generally use uint32_t for packet length.
>
> > +
> > +     odph_ring_t     *ipc_r_slave;
> > +     odph_ring_t     *ipc_p_slave;
>
> Why are these needed?.. having looked at the rest of the patch I still
> don't understand why both ends of the pipe need both sets (with and
> without the _slave suffix) of pointers.
>
> > +
> > +     odp_pool_t      ipc_pool;       /**< IPC: Pool of main process */
> >  };
> >
> >  typedef union {
> > diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> > new file mode 100644
> > index 0000000..a766519
> > --- /dev/null
> > +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> > @@ -0,0 +1,47 @@
> > +/* Copyright (c) 2015, Linaro Limited
> > + * All rights reserved.
> > + *
> > + * SPDX-License-Identifier:     BSD-3-Clause
> > + */
> > +
> > +#include <odp/packet_io.h>
> > +#include <odp_packet_io_internal.h>
> > +#include <odp/packet.h>
> > +#include <odp_packet_internal.h>
> > +#include <odp_internal.h>
> > +#include <odp/shared_memory.h>
> > +
> > +#include <string.h>
> > +#include <unistd.h>
> > +#include <stdlib.h>
> > +
> > +/* IPC packet I/O over odph_ring */
> > +#include <odp/helper/ring.h>
> > +
> > +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> > +                                     odp ring queue */
> > +
> > +/* that struct is exported to shared memory, so that 2 processes can
> find
> > + * each other.
> > + */
> > +struct pktio_info {
> > +     char remote_pool_name[30];
>
> ODP_POOL_NAME_LEN
>
> > +     int shm_pool_num;
> > +     size_t shm_pkt_pool_size;
> > +     size_t shm_pkt_size;
> > +     size_t mdata_offset; /*< offset from shared memory block start
> > +                           *to pool_mdata_addr */
> > +     struct {
> > +             size_t mdata_offset;
> > +             char   pool_name[30];
>
> ODP_POOL_NAME_LEN
>
> > +     } slave;
> > +} __packed;
> > +
> > +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> > +                odp_pool_t pool);
> > +
> > +int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> > +                unsigned len);
> > +
> > +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> > +                unsigned len);
> > diff --git a/platform/linux-generic/odp_packet_io.c
> b/platform/linux-generic/odp_packet_io.c
> > index cfe5b71..01077fb 100644
> > --- a/platform/linux-generic/odp_packet_io.c
> > +++ b/platform/linux-generic/odp_packet_io.c
> > @@ -18,6 +18,7 @@
> >  #include <odp_schedule_internal.h>
> >  #include <odp_classification_internal.h>
> >  #include <odp_debug_internal.h>
> > +#include <odp_packet_io_ipc_internal.h>
> >
> >  #include <string.h>
> >  #include <sys/ioctl.h>
> > @@ -25,6 +26,15 @@
> >  #include <ifaddrs.h>
> >  #include <errno.h>
> >
> > +#include <sys/types.h>
> > +#include <unistd.h>
> > +
> > +/* IPC packet I/O over odph_ring */
> > +#include <odp/helper/ring.h>
> > +
> > +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> > +                                     odp ring queue */
>
> This is defined in odp_packet_io_ipc_internal.h too.
>
> > +
> >  /* MTU to be reported for the "loop" interface */
> >  #define PKTIO_LOOP_MTU 1500
> >  /* MAC address for the "loop" interface */
> > @@ -263,7 +273,12 @@ static odp_pktio_t setup_pktio_entry(const char
> *dev, odp_pool_t pool)
> >
> >       if (strcmp(dev, "loop") == 0)
> >               ret = init_loop(pktio_entry, id);
> > -     else
> > +     else if (!strncmp(dev, "ipc", 3)) {
> > +             ret = ipc_pktio_init(pktio_entry, dev, pool);
> > +             if (ret != 0)
> > +                     ODP_ABORT("unable to init ipc for %s, pool %"
> PRIu64 "\n",
> > +                               dev, pool);
> > +     } else
> >               ret = init_socket(pktio_entry, dev, pool);
> >
> >       if (ret != 0) {
> > @@ -285,6 +300,10 @@ odp_pktio_t odp_pktio_open(const char *dev,
> odp_pool_t pool)
> >  {
> >       odp_pktio_t id;
> >
> > +     /* no local table lookup for ipc case */
> > +     if (pool == NULL && !memcmp(dev, "ipc", 3))
> > +             goto no_local_lookup;
> > +
> >       id = odp_pktio_lookup(dev);
> >       if (id != ODP_PKTIO_INVALID) {
> >               /* interface is already open */
> > @@ -292,6 +311,7 @@ odp_pktio_t odp_pktio_open(const char *dev,
> odp_pool_t pool)
> >               return ODP_PKTIO_INVALID;
> >       }
> >
> > +no_local_lookup:
> >       odp_spinlock_lock(&pktio_tbl->lock);
> >       id = setup_pktio_entry(dev, pool);
> >       odp_spinlock_unlock(&pktio_tbl->lock);
> > @@ -408,6 +428,10 @@ int odp_pktio_recv(odp_pktio_t id, odp_packet_t
> pkt_table[], int len)
> >       case ODP_PKTIO_TYPE_LOOPBACK:
> >               pkts = deq_loopback(pktio_entry, pkt_table, len);
> >               break;
> > +     case ODP_PKTIO_TYPE_IPC_SLAVE:
> > +     case ODP_PKTIO_TYPE_IPC:
> > +             pkts = ipc_pktio_recv(pktio_entry, pkt_table, len);
> > +             break;
> >       default:
> >               pkts = -1;
> >               break;
> > @@ -462,6 +486,10 @@ int odp_pktio_send(odp_pktio_t id, odp_packet_t
> pkt_table[], int len)
> >       case ODP_PKTIO_TYPE_LOOPBACK:
> >               pkts = enq_loopback(pktio_entry, pkt_table, len);
> >               break;
> > +     case ODP_PKTIO_TYPE_IPC:
> > +     case ODP_PKTIO_TYPE_IPC_SLAVE:
> > +             pkts = ipc_pktio_send(pktio_entry, pkt_table, len);
> > +             break;
> >       default:
> >               pkts = -1;
> >       }
> > diff --git a/platform/linux-generic/odp_packet_io_ipc.c
> b/platform/linux-generic/odp_packet_io_ipc.c
> > new file mode 100644
> > index 0000000..312ad3a
> > --- /dev/null
> > +++ b/platform/linux-generic/odp_packet_io_ipc.c
> > @@ -0,0 +1,590 @@
> > +/* Copyright (c) 2015, Linaro Limited
> > + * All rights reserved.
> > + *
> > + * SPDX-License-Identifier:     BSD-3-Clause
> > + */
> > +
> > +#include <odp_packet_io_ipc_internal.h>
> > +#include <odp_debug_internal.h>
> > +#include <odp_packet_io_internal.h>
> > +#include <odp_spin_internal.h>
> > +#include <odp/system_info.h>
> > +
> > +#include <sys/mman.h>
> > +#include <sys/stat.h>
> > +#include <fcntl.h>
> > +
> > +static void *_ipc_map_remote_pool(const char *name, size_t size);
> > +
> > +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> > +{
> > +     pool_entry_t *pool;
> > +     uint32_t pool_id;
> > +     odp_shm_t shm;
> > +     odp_shm_info_t info;
> > +
> > +     pool_id = pool_handle_to_index(pool_hdl);
> > +     pool    = get_pool_entry(pool_id);
> > +     shm = pool->s.pool_shm;
> > +
> > +     odp_shm_info(shm, &info);
> > +
> > +     return info.name;
> > +}
> > +
> > +/**
> > +* Look up for shared memory object.
> > +*
> > +* @param name   name of shm object
> > +*
> > +* @return 0 on success, otherwise non-zero
> > +*/
> > +static int _odp_shm_lookup_ipc(const char *name)
> > +{
> > +     int shm;
> > +
> > +     shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> > +     if (shm == -1) {
> > +             ODP_DBG("IPC shm_open for %s not found\n", name);
>
> The open could have failed for some reason other than "not found", in
> which case this error message will be misleading.
>
strerror(odp_errno())?


>
> > +             return -1;
> > +     }
> > +     close(shm);
> > +     return 0;
> > +}
> > +
> > +static struct pktio_info *_ipc_map_pool_info(const char *pool_name, int
> flag)
> > +{
> > +     char *name;
> > +     struct pktio_info *pinfo;
> > +
> > +     /* Create info about remote pktio */
> > +     name = (char *)malloc(strlen(pool_name) + sizeof("_info"));
>
> Not enough room for the terminator.
>
> > +     memcpy(name, pool_name, strlen(pool_name));
> > +     memcpy(name + strlen(pool_name), "_info", sizeof("_info"));
>
> Wouldn't it be simpler and safer just to do this on the stack;
>
> char name[ODP_POOL_NAME_LEN + sizeof("_info")];
> snprintf(name, sizeof(name), "%s_info", pool_name);
>
> > +     odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> > +                     ODP_CACHE_LINE_SIZE,
> > +                     flag);
> > +     if (ODP_SHM_INVALID == shm)
> > +             ODP_ABORT("unable to reserve memory for shm info");
> > +     free(name);
> > +     pinfo = odp_shm_addr(shm);
> > +     if (flag != ODP_SHM_PROC_NOCREAT)
> > +             memset(pinfo->remote_pool_name, 0, 30);
>
> There's no need to memset the entire array, just initialise the first
> element.
>
> > +     return pinfo;
>
> What happens to shm?.. the caller doesn't get a reference to it so can't
> free it as far as I can tell.
>
> > +}
> > +
> > +static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const
> char *dev,
> > +                        odp_pool_t pool)
> > +{
> > +     char ipc_shm_name[ODPH_RING_NAMESIZE];
> > +     pool_entry_t *pool_entry;
> > +     uint32_t pool_id;
> > +     void *ipc_pool_base;
> > +     struct pktio_info *pinfo;
> > +     const char *pool_name;
> > +
> > +     pool_id = pool_handle_to_index(pool);
> > +     pool_entry    = get_pool_entry(pool_id);
> > +
> > +     /* generate name in shm like ipc_pktio_r for
> > +      * to be processed packets ring.
> > +      */
> > +     memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> > +     memcpy(ipc_shm_name, dev, strlen(dev));
> > +     memcpy(ipc_shm_name + strlen(dev), "_r", 2);
>
> This may overflow and/or not be terminated, again snprintf would be
> better.
>
> > +
> > +     pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name,
> > +                     PKTIO_IPC_ENTRIES,
> > +                     ODPH_RING_SHM_PROC);
> > +     if (!pktio_entry->s.ipc_r) {
> > +             ODP_DBG("pid %d unable to create ipc ring %s name\n",
> > +                     getpid(), ipc_shm_name);
> > +             return -1;
> > +     }
> > +     ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> > +             odph_ring_free_count(pktio_entry->s.ipc_r));
> > +
> > +     /* generate name in shm like ipc_pktio_p for
> > +      * already processed packets
> > +      */
> > +     memcpy(ipc_shm_name + strlen(dev), "_p", 2);
>
> and here
>
> > +
> > +     pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name,
> > +                     PKTIO_IPC_ENTRIES,
> > +                     ODPH_RING_SHM_PROC);
> > +     if (!pktio_entry->s.ipc_p) {
> > +             ODP_DBG("pid %d unable to create ipc ring %s name\n",
> > +                     getpid(), ipc_shm_name);
> > +             return -1;
>
> What happens to ipc_r?
>
> > +     }
> > +
> > +     ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> > +             odph_ring_free_count(pktio_entry->s.ipc_p));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> > +     pktio_entry->s.ipc_r_slave = odph_ring_create(ipc_shm_name,
> > +                     PKTIO_IPC_ENTRIES,
> > +                     ODPH_RING_SHM_PROC);
> > +     if (!pktio_entry->s.ipc_r_slave) {
> > +             ODP_DBG("pid %d unable to create ipc ring %s name\n",
> > +                     getpid(), ipc_shm_name);
>
> and again, leaking ipc_r + ipc_p.. same thing below.
>
> > +             return -1;
> > +     }
> > +     ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> > +             odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> > +     pktio_entry->s.ipc_p_slave = odph_ring_create(ipc_shm_name,
> > +                     PKTIO_IPC_ENTRIES,
> > +                     ODPH_RING_SHM_PROC);
> > +     if (!pktio_entry->s.ipc_p_slave) {
> > +             ODP_DBG("pid %d unable to create ipc ring %s name\n",
> > +                     getpid(), ipc_shm_name);
> > +             return -1;
> > +     }
> > +     ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> > +             odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> > +
> > +     /* Memory to store information about exported pool */
> > +     pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC);
> > +
> > +     /* Set up pool name for remote info */
> > +     pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> > +     memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
> > +     pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
> > +     pinfo->shm_pool_num = pool_entry->s.buf_num;
> > +     pinfo->shm_pkt_size = pool_entry->s.seg_size;
> > +     pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
> > +                            pool_entry->s.pool_base_addr;
> > +     pinfo->slave.mdata_offset = 0;
> > +     ODP_DBG("Master waiting for slave to be connected now..\n");
> > +
> > +     /* Wait for remote process to export his pool. */
> > +     ODP_DBG("Wait for second process set mdata_offset...\n");
> > +     while (pinfo->slave.mdata_offset == 0)
> > +             odp_spin();
>
> Here you're spinning on a value in a non-volatile without a barrier,
> which isn't going to work reliably.

 Use _odp_atomic_flag_t and _odp_atomic_flag_tas() for a acquiring the
remote updates.
Use _odp_atomic_flag_init(true) and _odp_atomic_flag_clear() for releasing
your updates.


> > +
> > +     ODP_DBG("Wait for second process set mdata_offset... DONE.\n");
> > +
> > +     while (1) {
> > +             int ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
> > +             if (!ret)
> > +                     break;
> > +             ODP_DBG("Master looking for %s\n", pinfo->slave.pool_name);
> > +             sleep(1);
> > +     }
> > +
> > +     ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> > +                                          pinfo->shm_pkt_pool_size);
> > +     pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> > +                                          pinfo->slave.mdata_offset;
> > +     pktio_entry->s.ipc_pool = pool;
> > +
> > +     return 0;
> > +}
> > +
> > +static odp_pool_t _ipc_odp_alloc_and_create_pool_slave(struct
> pktio_info *pinfo)
> > +{
> > +     odp_pool_t pool;
> > +     char *pool_name;
> > +     odp_pool_param_t params;
> > +     int num = pinfo->shm_pool_num;
> > +     uint64_t buf_size = pinfo->shm_pkt_size;
> > +     pool_entry_t *pool_entry;
> > +
> > +     pool_name = calloc(1, strlen(pinfo->remote_pool_name) +
> > +                        sizeof("ipc_pool_slave_"));
> > +     sprintf(pool_name, "ipc_pool_slave_%s", pinfo->remote_pool_name);
> > +
> > +     ODP_DBG("slave uses pool %s\n", pool_name);
> > +
> > +     memset(&params, 0, sizeof(params));
> > +     params.pkt.num = num;
> > +     params.pkt.len = buf_size;
> > +     params.pkt.seg_len = buf_size;
> > +     params.type = ODP_POOL_PACKET;
> > +     params.shm_flags = ODP_SHM_PROC;
> > +
> > +     pool = odp_pool_create(pool_name, ODP_SHM_NULL, &params);
> > +     if (pool == ODP_POOL_INVALID)
> > +             ODP_ABORT("Error: packet pool create failed.\n"
> > +                     "num %d, len %d, seg_len %d\n",
> > +                     params.pkt.num, params.pkt.len,
> params.pkt.seg_len);
> > +
> > +     /* Export info so that master can connect to that pool*/
> > +     snprintf(pinfo->slave.pool_name, 30, "%s", pool_name);
> > +     pool_entry = odp_pool_to_entry(pool);
> > +     pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> > +                                 pool_entry->s.pool_base_addr;
> > +     free(pool_name);
> > +
> > +     return pool;
> > +}
> > +
> > +static void *_ipc_map_remote_pool(const char *name, size_t size)
> > +{
> > +     odp_shm_t shm;
> > +
> > +     ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> > +     shm = odp_shm_reserve(name,
> > +                     size,
> > +                     ODP_CACHE_LINE_SIZE,
> > +                     ODP_SHM_PROC_NOCREAT);
> > +     if (shm == ODP_SHM_INVALID)
> > +             ODP_ABORT("unable map %s\n", name);
> > +     return odp_shm_addr(shm);
> > +}
> > +
> > +static void *_ipc_shm_map(char *name, size_t size, int timeout)
> > +{
> > +     odp_shm_t shm;
> > +     int ret;
> > +
> > +     while (1) {
> > +             ret = _odp_shm_lookup_ipc(name);
> > +             if (!ret)
> > +                     break;
> > +             ODP_DBG("Waiting for %s\n", name);
> > +             if (timeout <= 0)
> > +                     return NULL;
> > +             timeout--;
> > +             sleep(1);
> > +     }
> > +
> > +     shm = odp_shm_reserve(name, size,
> > +                     ODP_CACHE_LINE_SIZE,
> > +                     ODP_SHM_PROC_NOCREAT);
> > +     if (ODP_SHM_INVALID == shm)
> > +             ODP_ABORT("unable to map: %s\n", name);
> > +
> > +     return odp_shm_addr(shm);
> > +}
> > +
> > +static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t
> *pktio_entry)
> > +{
> > +     int ret = -1;
> > +     char ipc_shm_name[ODPH_RING_NAMESIZE];
> > +     size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> > +                        sizeof(odph_ring_t);
> > +     struct pktio_info *pinfo;
> > +     void *ipc_pool_base;
> > +
> > +     memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> > +     memcpy(ipc_shm_name, dev, strlen(dev));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> > +     pktio_entry->s.ipc_r  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> > +     if (!pktio_entry->s.ipc_r) {
> > +             ODP_DBG("pid %d unable to find ipc ring %s name\n",
> > +                     getpid(), dev);
> > +             goto error;
> > +     }
> > +     ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> > +             odph_ring_free_count(pktio_entry->s.ipc_r));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> > +     pktio_entry->s.ipc_p = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> > +     if (!pktio_entry->s.ipc_p) {
> > +             ODP_DBG("pid %d unable to find ipc ring %s name\n",
> > +                     getpid(), dev);
> > +             goto error;
> > +     }
> > +     ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> > +             odph_ring_free_count(pktio_entry->s.ipc_p));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> > +     pktio_entry->s.ipc_r_slave = _ipc_shm_map(ipc_shm_name, ring_size,
> 10);
> > +     if (!pktio_entry->s.ipc_r_slave) {
> > +             ODP_DBG("pid %d unable to find ipc ring %s name\n",
> > +                     getpid(), dev);
> > +             goto error;
> > +     }
> > +     ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> > +             odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> > +
> > +     memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> > +     pktio_entry->s.ipc_p_slave = _ipc_shm_map(ipc_shm_name, ring_size,
> 10);
> > +     if (!pktio_entry->s.ipc_p_slave) {
> > +             ODP_DBG("pid %d unable to find ipc ring %s name\n",
> > +                     getpid(), dev);
> > +             goto error;
> > +     }
> > +     ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> > +             ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> > +             odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> > +
> > +
> > +     /* Get info about remote pool */
> > +     pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC_NOCREAT);
> > +
> > +     ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
> > +                                          pinfo->shm_pkt_pool_size);
> > +     pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> > +                                          pinfo->mdata_offset;
> > +     pktio_entry->s.ipc_pkt_size = pinfo->shm_pkt_size;
> > +
> > +     /* @todo: to simplify in linux-generic implementation we create
> pool for
> > +      * packets from IPC queue. On receive implementation copies
> packets to
> > +      * that pool. Later we can try to reuse original pool without
> packets
> > +      * copying.
> > +      */
> > +     pktio_entry->s.ipc_pool =
> _ipc_odp_alloc_and_create_pool_slave(pinfo);
> > +
> > +     ret = 0;
> > +     ODP_DBG("%s OK.\n", __func__);
> > +error:
> > +     /* @todo free shm on error (api not impemented yet) */
>
> It is now.
>
> > +     return ret;
> > +}
> > +
> > +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> > +                        odp_pool_t pool)
> > +{
> > +     int ret;
> > +
> > +     /* if pool is zero we assume that it's slave process connects
> > +      * to shared memory already created by main process.
> > +      */
> > +     if (pool) {
>
> Should be pool != ODP_POOL_INVALID
>
> > +             pktio_entry->s.type = ODP_PKTIO_TYPE_IPC;
> > +             ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
> > +     } else {
> > +             pktio_entry->s.type = ODP_PKTIO_TYPE_IPC_SLAVE;
> > +             ret = _ipc_pktio_init_slave(dev, pktio_entry);
> > +     }
> > +
> > +     return ret;
> > +}
> > +
> > +
> > +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> > +                            uint32_t offset,
> > +                            uint32_t *seglen,
> > +                            uint32_t limit)
> > +{
> > +     int seg_index  = offset / buf->segsize;
> > +     int seg_offset = offset % buf->segsize;
> > +     void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> > +
> > +     if (seglen != NULL) {
> > +             uint32_t buf_left = limit - offset;
> > +             *seglen = seg_offset + buf_left <= buf->segsize ?
> > +                     buf_left : buf->segsize - seg_offset;
> > +     }
> > +
> > +     return (void *)(seg_offset + (uint8_t *)addr);
> > +}
> > +
> > +
> > +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> > +                            uint32_t offset, uint32_t *seglen)
> > +{
> > +     if (offset > pkt_hdr->frame_len)
> > +             return NULL;
> > +
> > +     return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> > +                       pkt_hdr->headroom + offset, seglen,
> > +                       pkt_hdr->headroom + pkt_hdr->frame_len);
> > +}
> > +
> > +int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> > +                   odp_packet_t pkt_table[], unsigned len)
> > +{
> > +     int pkts = 0;
> > +     int ret;
> > +     int i;
> > +     odph_ring_t *r;
> > +     odph_ring_t *r_p;
> > +     odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> > +     void **ipcbufs_p = (void *)&remote_pkts;
> > +     unsigned ring_len;
> > +     int p_free;
> > +
> > +     if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> > +             r = pktio_entry->s.ipc_r_slave;
> > +             r_p = pktio_entry->s.ipc_p_slave;
> > +     } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> > +             r = pktio_entry->s.ipc_r;
> > +             r_p = pktio_entry->s.ipc_p;
> > +     } else {
> > +             ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> > +     }
> > +
> > +     ring_len = odph_ring_count(r);
> > +
> > +     pkts = len;
> > +     if (len > ring_len)
> > +             pkts = ring_len;
> > +
> > +     /* Do not dequeue more then we can put to producer ring */
> > +     p_free = odph_ring_free_count(r_p);
>
> This is going to be racy, by the time you've got around to enqueueing to
> r_p it may have been changed via another thread. It may be protected by
> a lock somewhere at the minute, but I'm assuming that'll not always be
> the case as you're using the _mc_/_mp_ calls.
>
> > +     if (pkts > p_free)
> > +             pkts = p_free;
> > +
> > +     ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts);
>
> It looks like this call will return -ENOENT if there are < pkts buffers
> available, which will cause batching issues. odp_pktio_recv() should
> return as many packets as are immediately available (up to the provided
> len).
>
> > +     if (ret != 0) {
> > +             ODP_DBG("error to dequeue no packets\n");
> > +             pkts = -1;
> > +             return pkts;
>
> A return value of -1 is an error, but having no packets to receive
> shouldn't be treated as an error, so should just return 0;
>
And drop the debug output!


>
> > +     }
> > +
> > +     if (pkts == 0)
> > +             return 0;
>
> This check doesn't make sense here since the dequeue doesn't modify it,
> it should be moved further up.
>
> > +
> > +     for (i = 0; i < pkts; i++) {
> > +             odp_pool_t pool;
> > +             odp_packet_t pkt;
> > +             odp_packet_hdr_t *phdr;
> > +             odp_buffer_bits_t handle;
> > +             int idx; /* Remote packet has coded pool and index.
> > +                       * We need only index.*/
> > +             void *pkt_data;
> > +             void *remote_pkt_data;
> > +
> > +             handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> > +             idx = handle.index;
> > +
> > +             /* Link to packed data. To this line we have Zero-Copy
> between
> > +              * processes, to simplify use packet copy in that version
> which
> > +              * can be removed later with more advance buffer management
> > +              * (ref counters).
> > +              */
> > +             /* reverse odp_buf_to_hdr() */
> > +             phdr = (odp_packet_hdr_t *)(
> > +                             (char *)pktio_entry->s.ipc_pool_mdata_base
> +
> > +                             (idx * ODP_CACHE_LINE_SIZE));
> > +
> > +             /* Allocate new packet. Select*/
> > +             pool = pktio_entry->s.ipc_pool;
> > +             if (odp_unlikely(pool == ODP_POOL_INVALID))
> > +                     ODP_ABORT("invalid pool");
> > +
> > +             pkt = odp_packet_alloc(pool, phdr->frame_len);
> > +             if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> > +                     /* Original pool might be smaller then
> > +                     *  PKTIO_IPC_ENTRIES. If packet can not be
> > +                      * allocated from pool at this time,
> > +                      * simple get in on next recv() call.
> > +                      */
> > +                     pkts = i - 1;
> > +                     break;
> > +             }
> > +
> > +             /* Copy packet data. */
> > +             pkt_data = odp_packet_data(pkt);
> > +             if (odp_unlikely(pkt_data == NULL))
> > +                     ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> > +                               (ODP_PKTIO_TYPE_IPC_SLAVE ==
> > +                                     pktio_entry->s.type));
> > +
> > +             remote_pkt_data =  _ipc_packet_map(phdr, 0, NULL);
> > +             if (odp_unlikely(remote_pkt_data == NULL))
> > +                     ODP_ABORT("unable to map remote_pkt_data,
> ipc_slave %d\n",
> > +                               (ODP_PKTIO_TYPE_IPC_SLAVE ==
> > +                                     pktio_entry->s.type));
> > +
> > +             /* @todo fix copy packet!!! */
> > +             memcpy(pkt_data, remote_pkt_data, phdr->frame_len);
> > +
> > +             /* Copy packets L2, L3 parsed offsets and size */
> > +             odp_packet_hdr(pkt)->l2_offset = phdr->l2_offset;
> > +             odp_packet_hdr(pkt)->l3_offset = phdr->l3_offset;
> > +             odp_packet_hdr(pkt)->l4_offset = phdr->l4_offset;
> > +             odp_packet_hdr(pkt)->payload_offset = phdr->payload_offset;
> > +
> > +             odp_packet_hdr(pkt)->vlan_s_tag =  phdr->vlan_s_tag;
> > +             odp_packet_hdr(pkt)->vlan_c_tag =  phdr->vlan_c_tag;
> > +             odp_packet_hdr(pkt)->l3_protocol = phdr->l3_protocol;
> > +             odp_packet_hdr(pkt)->l3_len = phdr->l3_len;
> > +
> > +             odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
> > +             odp_packet_hdr(pkt)->headroom = phdr->headroom;
> > +             odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
> > +             pkt_table[i] = pkt;
> > +     }
> > +
> > +     /* Now tell other process that we no longer need that buffers.*/
> > +     ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts);
> > +     if (ret != 0)
> > +             ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> > +
> > +     return pkts;
> > +}
> > +
> > +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> > +                   unsigned len)
> > +{
> > +     odph_ring_t *r;
> > +     odph_ring_t *r_p;
> > +     void **rbuf_p;
> > +     int ret;
> > +     unsigned i;
> > +
> > +     if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> > +             r = pktio_entry->s.ipc_r_slave;
> > +             r_p = pktio_entry->s.ipc_p_slave;
> > +     } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> > +             r = pktio_entry->s.ipc_r;
> > +             r_p = pktio_entry->s.ipc_p;
> > +     } else {
> > +             ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> > +     }
> > +
> > +     /* Free already processed packets, if any */
> > +     {
> > +             unsigned complete_packets = odph_ring_count(r_p);
> > +             odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> > +
> > +             if (complete_packets > 0) {
> > +                     rbuf_p = (void *)&r_p_pkts;
> > +                     ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p,
> > +                                     complete_packets);
> > +                     if (ret == 0) {
> > +                             for (i = 0; i < complete_packets; i++)
> > +                                     odp_packet_free(r_p_pkts[i]);
> > +                     }
> > +             }
> > +     }
>
> The above block should go into a helper function.
>
> > +
> > +     /* wait while second process takes packet from the ring.*/
> > +     i = 3;
> > +     while (odph_ring_free_count(r) < len && i) {
> > +             i--;
> > +             sleep(1);
> > +     }
>
> If there's not enough room we shouldn't attempt to deal with it here,
> just enqueue as many as we can.
>

The application decides when to sleep or block, not hidden inside arbitrary
API's.


>
> > +
> > +     /* Put packets to ring to be processed in other process. */
> > +     for (i = 0; i < len; i++) {
> > +             int j;
> > +             odp_packet_t pkt =  pkt_table[i];
> > +             rbuf_p = (void *)&pkt;
> > +             odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> > +
> > +             /* buf_hdr.addr can not be used directly in remote process,
> > +              * convert it to offset
> > +              */
> > +             for (j = 0; j < ODP_BUFFER_MAX_SEG; j++)
> > +                     pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char
> *)pkt_hdr -
> > +                             (char *)pkt_hdr->buf_hdr.addr[j];
> > +
> > +             ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
>
> Why enqueue 1 at a time?
>
> > +             if (odp_unlikely(ret != 0)) {
> > +                     ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail,
> ipc_slave %d, ret %d\n",
> > +                             getpid(),
> > +                             (ODP_PKTIO_TYPE_IPC_SLAVE ==
> > +                              pktio_entry->s.type),
> > +                             ret);
> > +                     ODP_ERR("odp_ring_full: %d, odp_ring_count %d,
> odph_ring_free_count %d\n",
> > +                             odph_ring_full(r), odph_ring_count(r),
> > +                             odph_ring_free_count(r));
> > +             }
>
> As with the recv side I think the semantics are wrong, we should send as
> many as can be sent immediately rather than blocking for an unbounded
> length of time. It looks like the ring implementation has a mode that
> does this;
>
> enum odph_ring_queue_behavior {
>         ODPH_RING_QUEUE_FIXED = 0, /**< Enq/Deq a fixed number
>                                 of items from a ring */
>         ODPH_RING_QUEUE_VARIABLE   /**< Enq/Deq as many items
>                                 a possible from ring */
> };
>
> > +     }
> > +     return len;
> > +}
>
> --
> Stuart.
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> https://lists.linaro.org/mailman/listinfo/lng-odp
>
Ola Liljedahl April 28, 2015, 1:11 p.m. UTC | #4
On 23 April 2015 at 19:47, Maxim Uvarov <maxim.uvarov@linaro.org> wrote:

> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> ---
>  platform/linux-generic/Makefile.am                 |   2 +
>  .../linux-generic/include/odp_buffer_internal.h    |   3 +
>  .../linux-generic/include/odp_packet_io_internal.h |  15 +
>  .../include/odp_packet_io_ipc_internal.h           |  47 ++
>  platform/linux-generic/odp_packet_io.c             |  30 +-
>  platform/linux-generic/odp_packet_io_ipc.c         | 590
> +++++++++++++++++++++
>  6 files changed, 686 insertions(+), 1 deletion(-)
>  create mode 100644
> platform/linux-generic/include/odp_packet_io_ipc_internal.h
>  create mode 100644 platform/linux-generic/odp_packet_io_ipc.c
>
> diff --git a/platform/linux-generic/Makefile.am
> b/platform/linux-generic/Makefile.am
> index 66f0474..055f75c 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -120,6 +120,7 @@ noinst_HEADERS = \
>
> ${top_srcdir}/platform/linux-generic/include/odp_internal.h \
>
> ${top_srcdir}/platform/linux-generic/include/odp_packet_internal.h \
>
> ${top_srcdir}/platform/linux-generic/include/odp_packet_io_internal.h \
> +
>  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_ipc_internal.h \
>
> ${top_srcdir}/platform/linux-generic/include/odp_packet_io_queue.h \
>
> ${top_srcdir}/platform/linux-generic/include/odp_packet_socket.h \
>
> ${top_srcdir}/platform/linux-generic/include/odp_pool_internal.h \
> @@ -155,6 +156,7 @@ __LIB__libodp_la_SOURCES = \
>                            odp_packet.c \
>                            odp_packet_flags.c \
>                            odp_packet_io.c \
> +                          odp_packet_io_ipc.c \
>                            odp_packet_socket.c \
>                            odp_pool.c \
>                            odp_queue.c \
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h
> b/platform/linux-generic/include/odp_buffer_internal.h
> index 3a3d2a2..4ea7c62 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -129,6 +129,9 @@ typedef struct odp_buffer_hdr_t {
>         size_t                   udata_size; /* size of user metadata */
>         uint32_t                 segcount;   /* segment count */
>         uint32_t                 segsize;    /* segment size */
> +       /* ipc mapped process can not walk over pointers,
> +        * offset has to be used */
> +       uint64_t                 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
>         void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs
> */
>  } odp_buffer_hdr_t;
>
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h
> b/platform/linux-generic/include/odp_packet_io_internal.h
> index 18b59ef..744f438 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -23,6 +23,7 @@ extern "C" {
>  #include <odp_classification_datamodel.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp/helper/ring.h>
>
>  #include <odp/config.h>
>  #include <odp/hints.h>
> @@ -36,6 +37,8 @@ typedef enum {
>         ODP_PKTIO_TYPE_SOCKET_MMSG,
>         ODP_PKTIO_TYPE_SOCKET_MMAP,
>         ODP_PKTIO_TYPE_LOOPBACK,
> +       ODP_PKTIO_TYPE_IPC,
> +       ODP_PKTIO_TYPE_IPC_SLAVE,
>
I see IPC as peer to peer, not master/slave. Does your implementation only
support some master/slave configuration?
As long as this master/slave relationship does not leak out through the
API, I am happy.


>  } odp_pktio_type_t;
>
>  struct pktio_entry {
> @@ -53,6 +56,18 @@ struct pktio_entry {
>         char name[IFNAMSIZ];            /**< name of pktio provided to
>                                            pktio_open() */
>         odp_bool_t promisc;             /**< promiscuous mode state */
> +       odph_ring_t     *ipc_r;         /**< ODP ring for IPC mgs packets
> +                                       indexes transmitted to shared
> memory */
> +       odph_ring_t     *ipc_p;         /**< ODP ring for IPC msg packets
> +                                       indexes already processed by
> remote process */
> +       void            *ipc_pool_base; /**< IPC Remote pool base addr */
> +       void            *ipc_pool_mdata_base; /**< IPC Remote pool mdata
> base addr */
> +       uint64_t        ipc_pkt_size;   /**< IPC: packet size in remote
> pool */
> +
> +       odph_ring_t     *ipc_r_slave;
> +       odph_ring_t     *ipc_p_slave;
> +
> +       odp_pool_t      ipc_pool;       /**< IPC: Pool of main process */
>  };
>
>  typedef union {
> diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> new file mode 100644
> index 0000000..a766519
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
> @@ -0,0 +1,47 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/packet_io.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp/packet.h>
> +#include <odp_packet_internal.h>
> +#include <odp_internal.h>
> +#include <odp/shared_memory.h>
> +
> +#include <string.h>
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +                                       odp ring queue */
> +
> +/* that struct is exported to shared memory, so that 2 processes can find
> + * each other.
> + */
> +struct pktio_info {
> +       char remote_pool_name[30];
> +       int shm_pool_num;
> +       size_t shm_pkt_pool_size;
> +       size_t shm_pkt_size;
> +       size_t mdata_offset; /*< offset from shared memory block start
> +                             *to pool_mdata_addr */
> +       struct {
> +               size_t mdata_offset;
> +               char   pool_name[30];
> +       } slave;
> +} __packed;
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +                  odp_pool_t pool);
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                  unsigned len);
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                  unsigned len);
> diff --git a/platform/linux-generic/odp_packet_io.c
> b/platform/linux-generic/odp_packet_io.c
> index cfe5b71..01077fb 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -18,6 +18,7 @@
>  #include <odp_schedule_internal.h>
>  #include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_packet_io_ipc_internal.h>
>
>  #include <string.h>
>  #include <sys/ioctl.h>
> @@ -25,6 +26,15 @@
>  #include <ifaddrs.h>
>  #include <errno.h>
>
> +#include <sys/types.h>
> +#include <unistd.h>
> +
> +/* IPC packet I/O over odph_ring */
> +#include <odp/helper/ring.h>
> +
> +#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
> +                                       odp ring queue */
> +
>  /* MTU to be reported for the "loop" interface */
>  #define PKTIO_LOOP_MTU 1500
>  /* MAC address for the "loop" interface */
> @@ -263,7 +273,12 @@ static odp_pktio_t setup_pktio_entry(const char *dev,
> odp_pool_t pool)
>
>         if (strcmp(dev, "loop") == 0)
>                 ret = init_loop(pktio_entry, id);
> -       else
> +       else if (!strncmp(dev, "ipc", 3)) {
> +               ret = ipc_pktio_init(pktio_entry, dev, pool);
> +               if (ret != 0)
> +                       ODP_ABORT("unable to init ipc for %s, pool %"
> PRIu64 "\n",
> +                                 dev, pool);
> +       } else
>                 ret = init_socket(pktio_entry, dev, pool);
>
>         if (ret != 0) {
> @@ -285,6 +300,10 @@ odp_pktio_t odp_pktio_open(const char *dev,
> odp_pool_t pool)
>  {
>         odp_pktio_t id;
>
> +       /* no local table lookup for ipc case */
> +       if (pool == NULL && !memcmp(dev, "ipc", 3))
> +               goto no_local_lookup;
> +
>         id = odp_pktio_lookup(dev);
>         if (id != ODP_PKTIO_INVALID) {
>                 /* interface is already open */
> @@ -292,6 +311,7 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t
> pool)
>                 return ODP_PKTIO_INVALID;
>         }
>
> +no_local_lookup:
>         odp_spinlock_lock(&pktio_tbl->lock);
>         id = setup_pktio_entry(dev, pool);
>         odp_spinlock_unlock(&pktio_tbl->lock);
> @@ -408,6 +428,10 @@ int odp_pktio_recv(odp_pktio_t id, odp_packet_t
> pkt_table[], int len)
>         case ODP_PKTIO_TYPE_LOOPBACK:
>                 pkts = deq_loopback(pktio_entry, pkt_table, len);
>                 break;
> +       case ODP_PKTIO_TYPE_IPC_SLAVE:
> +       case ODP_PKTIO_TYPE_IPC:
> +               pkts = ipc_pktio_recv(pktio_entry, pkt_table, len);
> +               break;
>         default:
>                 pkts = -1;
>                 break;
> @@ -462,6 +486,10 @@ int odp_pktio_send(odp_pktio_t id, odp_packet_t
> pkt_table[], int len)
>         case ODP_PKTIO_TYPE_LOOPBACK:
>                 pkts = enq_loopback(pktio_entry, pkt_table, len);
>                 break;
> +       case ODP_PKTIO_TYPE_IPC:
> +       case ODP_PKTIO_TYPE_IPC_SLAVE:
> +               pkts = ipc_pktio_send(pktio_entry, pkt_table, len);
> +               break;
>         default:
>                 pkts = -1;
>         }
> diff --git a/platform/linux-generic/odp_packet_io_ipc.c
> b/platform/linux-generic/odp_packet_io_ipc.c
> new file mode 100644
> index 0000000..312ad3a
> --- /dev/null
> +++ b/platform/linux-generic/odp_packet_io_ipc.c
> @@ -0,0 +1,590 @@
> +/* Copyright (c) 2015, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_packet_io_ipc_internal.h>
> +#include <odp_debug_internal.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_spin_internal.h>
> +#include <odp/system_info.h>
> +
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size);
> +
> +static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
> +{
> +       pool_entry_t *pool;
> +       uint32_t pool_id;
> +       odp_shm_t shm;
> +       odp_shm_info_t info;
> +
> +       pool_id = pool_handle_to_index(pool_hdl);
> +       pool    = get_pool_entry(pool_id);
> +       shm = pool->s.pool_shm;
> +
> +       odp_shm_info(shm, &info);
> +
> +       return info.name;
> +}
> +
> +/**
> +* Look up for shared memory object.
> +*
> +* @param name   name of shm object
> +*
> +* @return 0 on success, otherwise non-zero
> +*/
> +static int _odp_shm_lookup_ipc(const char *name)
> +{
> +       int shm;
> +
> +       shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> +       if (shm == -1) {
> +               ODP_DBG("IPC shm_open for %s not found\n", name);
> +               return -1;
> +       }
> +       close(shm);
> +       return 0;
> +}
> +
> +static struct pktio_info *_ipc_map_pool_info(const char *pool_name, int
> flag)
> +{
> +       char *name;
> +       struct pktio_info *pinfo;
> +
> +       /* Create info about remote pktio */
> +       name = (char *)malloc(strlen(pool_name) + sizeof("_info"));
> +       memcpy(name, pool_name, strlen(pool_name));
> +       memcpy(name + strlen(pool_name), "_info", sizeof("_info"));
> +       odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
> +                       ODP_CACHE_LINE_SIZE,
> +                       flag);
> +       if (ODP_SHM_INVALID == shm)
> +               ODP_ABORT("unable to reserve memory for shm info");
> +       free(name);
> +       pinfo = odp_shm_addr(shm);
> +       if (flag != ODP_SHM_PROC_NOCREAT)
> +               memset(pinfo->remote_pool_name, 0, 30);
> +       return pinfo;
> +}
> +
> +static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char
> *dev,
> +                          odp_pool_t pool)
> +{
> +       char ipc_shm_name[ODPH_RING_NAMESIZE];
> +       pool_entry_t *pool_entry;
> +       uint32_t pool_id;
> +       void *ipc_pool_base;
> +       struct pktio_info *pinfo;
> +       const char *pool_name;
> +
> +       pool_id = pool_handle_to_index(pool);
> +       pool_entry    = get_pool_entry(pool_id);
> +
> +       /* generate name in shm like ipc_pktio_r for
> +        * to be processed packets ring.
> +        */
> +       memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +       memcpy(ipc_shm_name, dev, strlen(dev));
> +       memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> +
> +       pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_r) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +               odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +       /* generate name in shm like ipc_pktio_p for
> +        * already processed packets
> +        */
> +       memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> +
> +       pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_p) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +               odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +       pktio_entry->s.ipc_r_slave = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_r_slave) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +       pktio_entry->s.ipc_p_slave = odph_ring_create(ipc_shm_name,
> +                       PKTIO_IPC_ENTRIES,
> +                       ODPH_RING_SHM_PROC);
> +       if (!pktio_entry->s.ipc_p_slave) {
> +               ODP_DBG("pid %d unable to create ipc ring %s name\n",
> +                       getpid(), ipc_shm_name);
> +               return -1;
> +       }
> +       ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +       /* Memory to store information about exported pool */
> +       pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC);
> +
> +       /* Set up pool name for remote info */
> +       pool_name = _ipc_odp_buffer_pool_shm_name(pool);
> +       memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
> +       pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
> +       pinfo->shm_pool_num = pool_entry->s.buf_num;
> +       pinfo->shm_pkt_size = pool_entry->s.seg_size;
> +       pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
> +                              pool_entry->s.pool_base_addr;
> +       pinfo->slave.mdata_offset = 0;
> +       ODP_DBG("Master waiting for slave to be connected now..\n");
> +
> +       /* Wait for remote process to export his pool. */
> +       ODP_DBG("Wait for second process set mdata_offset...\n");
> +       while (pinfo->slave.mdata_offset == 0)
> +               odp_spin();
> +
> +       ODP_DBG("Wait for second process set mdata_offset... DONE.\n");
> +
> +       while (1) {
> +               int ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
> +               if (!ret)
> +                       break;
> +               ODP_DBG("Master looking for %s\n", pinfo->slave.pool_name);
> +               sleep(1);
> +       }
> +
> +       ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
> +                                            pinfo->shm_pkt_pool_size);
> +       pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +                                            pinfo->slave.mdata_offset;
> +       pktio_entry->s.ipc_pool = pool;
> +
> +       return 0;
> +}
> +
> +static odp_pool_t _ipc_odp_alloc_and_create_pool_slave(struct pktio_info
> *pinfo)
> +{
> +       odp_pool_t pool;
> +       char *pool_name;
> +       odp_pool_param_t params;
> +       int num = pinfo->shm_pool_num;
> +       uint64_t buf_size = pinfo->shm_pkt_size;
> +       pool_entry_t *pool_entry;
> +
> +       pool_name = calloc(1, strlen(pinfo->remote_pool_name) +
> +                          sizeof("ipc_pool_slave_"));
> +       sprintf(pool_name, "ipc_pool_slave_%s", pinfo->remote_pool_name);
> +
> +       ODP_DBG("slave uses pool %s\n", pool_name);
> +
> +       memset(&params, 0, sizeof(params));
> +       params.pkt.num = num;
> +       params.pkt.len = buf_size;
> +       params.pkt.seg_len = buf_size;
> +       params.type = ODP_POOL_PACKET;
> +       params.shm_flags = ODP_SHM_PROC;
> +
> +       pool = odp_pool_create(pool_name, ODP_SHM_NULL, &params);
> +       if (pool == ODP_POOL_INVALID)
> +               ODP_ABORT("Error: packet pool create failed.\n"
> +                       "num %d, len %d, seg_len %d\n",
> +                       params.pkt.num, params.pkt.len,
> params.pkt.seg_len);
> +
> +       /* Export info so that master can connect to that pool*/
> +       snprintf(pinfo->slave.pool_name, 30, "%s", pool_name);
> +       pool_entry = odp_pool_to_entry(pool);
> +       pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
> +                                   pool_entry->s.pool_base_addr;
> +       free(pool_name);
> +
> +       return pool;
> +}
> +
> +static void *_ipc_map_remote_pool(const char *name, size_t size)
> +{
> +       odp_shm_t shm;
> +
> +       ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
> +       shm = odp_shm_reserve(name,
> +                       size,
> +                       ODP_CACHE_LINE_SIZE,
> +                       ODP_SHM_PROC_NOCREAT);
> +       if (shm == ODP_SHM_INVALID)
> +               ODP_ABORT("unable map %s\n", name);
> +       return odp_shm_addr(shm);
> +}
> +
> +static void *_ipc_shm_map(char *name, size_t size, int timeout)
> +{
> +       odp_shm_t shm;
> +       int ret;
> +
> +       while (1) {
> +               ret = _odp_shm_lookup_ipc(name);
> +               if (!ret)
> +                       break;
> +               ODP_DBG("Waiting for %s\n", name);
> +               if (timeout <= 0)
> +                       return NULL;
> +               timeout--;
> +               sleep(1);
> +       }
> +
> +       shm = odp_shm_reserve(name, size,
> +                       ODP_CACHE_LINE_SIZE,
> +                       ODP_SHM_PROC_NOCREAT);
> +       if (ODP_SHM_INVALID == shm)
> +               ODP_ABORT("unable to map: %s\n", name);
> +
> +       return odp_shm_addr(shm);
> +}
> +
> +static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t
> *pktio_entry)
> +{
> +       int ret = -1;
> +       char ipc_shm_name[ODPH_RING_NAMESIZE];
> +       size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
> +                          sizeof(odph_ring_t);
> +       struct pktio_info *pinfo;
> +       void *ipc_pool_base;
> +
> +       memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
> +       memcpy(ipc_shm_name, dev, strlen(dev));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_r", 2);
> +       pktio_entry->s.ipc_r  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_r) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
> +               odph_ring_free_count(pktio_entry->s.ipc_r));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_p", 2);
> +       pktio_entry->s.ipc_p = _ipc_shm_map(ipc_shm_name, ring_size, 10);
> +       if (!pktio_entry->s.ipc_p) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
> +               odph_ring_free_count(pktio_entry->s.ipc_p));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
> +       pktio_entry->s.ipc_r_slave = _ipc_shm_map(ipc_shm_name, ring_size,
> 10);
> +       if (!pktio_entry->s.ipc_r_slave) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_r_slave));
> +
> +       memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
> +       pktio_entry->s.ipc_p_slave = _ipc_shm_map(ipc_shm_name, ring_size,
> 10);
> +       if (!pktio_entry->s.ipc_p_slave) {
> +               ODP_DBG("pid %d unable to find ipc ring %s name\n",
> +                       getpid(), dev);
> +               goto error;
> +       }
> +       ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
> +               ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
> +               odph_ring_free_count(pktio_entry->s.ipc_p_slave));
> +
> +
> +       /* Get info about remote pool */
> +       pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC_NOCREAT);
> +
> +       ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
> +                                            pinfo->shm_pkt_pool_size);
> +       pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
> +                                            pinfo->mdata_offset;
> +       pktio_entry->s.ipc_pkt_size = pinfo->shm_pkt_size;
> +
> +       /* @todo: to simplify in linux-generic implementation we create
> pool for
> +        * packets from IPC queue. On receive implementation copies
> packets to
> +        * that pool. Later we can try to reuse original pool without
> packets
> +        * copying.
> +        */
> +       pktio_entry->s.ipc_pool =
> _ipc_odp_alloc_and_create_pool_slave(pinfo);
> +
> +       ret = 0;
> +       ODP_DBG("%s OK.\n", __func__);
> +error:
> +       /* @todo free shm on error (api not impemented yet) */
> +       return ret;
> +}
> +
> +int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
> +                          odp_pool_t pool)
> +{
> +       int ret;
> +
> +       /* if pool is zero we assume that it's slave process connects
> +        * to shared memory already created by main process.
> +        */
> +       if (pool) {
> +               pktio_entry->s.type = ODP_PKTIO_TYPE_IPC;
> +               ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
> +       } else {
> +               pktio_entry->s.type = ODP_PKTIO_TYPE_IPC_SLAVE;
> +               ret = _ipc_pktio_init_slave(dev, pktio_entry);
> +       }
> +
> +       return ret;
> +}
> +
> +
> +static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
> +                              uint32_t offset,
> +                              uint32_t *seglen,
> +                              uint32_t limit)
> +{
> +       int seg_index  = offset / buf->segsize;
> +       int seg_offset = offset % buf->segsize;
> +       void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
> +
> +       if (seglen != NULL) {
> +               uint32_t buf_left = limit - offset;
> +               *seglen = seg_offset + buf_left <= buf->segsize ?
> +                       buf_left : buf->segsize - seg_offset;
> +       }
> +
> +       return (void *)(seg_offset + (uint8_t *)addr);
> +}
> +
> +
> +static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
> +                              uint32_t offset, uint32_t *seglen)
> +{
> +       if (offset > pkt_hdr->frame_len)
> +               return NULL;
> +
> +       return _ipc_buffer_map(&pkt_hdr->buf_hdr,
> +                         pkt_hdr->headroom + offset, seglen,
> +                         pkt_hdr->headroom + pkt_hdr->frame_len);
> +}
> +
> +int ipc_pktio_recv(pktio_entry_t *pktio_entry,
> +                     odp_packet_t pkt_table[], unsigned len)
> +{
> +       int pkts = 0;
> +       int ret;
> +       int i;
> +       odph_ring_t *r;
> +       odph_ring_t *r_p;
> +       odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
> +       void **ipcbufs_p = (void *)&remote_pkts;
> +       unsigned ring_len;
> +       int p_free;
> +
> +       if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +               r = pktio_entry->s.ipc_r_slave;
> +               r_p = pktio_entry->s.ipc_p_slave;
> +       } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +               r = pktio_entry->s.ipc_r;
> +               r_p = pktio_entry->s.ipc_p;
> +       } else {
> +               ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +       }
> +
> +       ring_len = odph_ring_count(r);
> +
> +       pkts = len;
> +       if (len > ring_len)
> +               pkts = ring_len;
> +
> +       /* Do not dequeue more then we can put to producer ring */
> +       p_free = odph_ring_free_count(r_p);
> +       if (pkts > p_free)
> +               pkts = p_free;
> +
> +       ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts);
> +       if (ret != 0) {
> +               ODP_DBG("error to dequeue no packets\n");
> +               pkts = -1;
> +               return pkts;
> +       }
> +
> +       if (pkts == 0)
> +               return 0;
> +
> +       for (i = 0; i < pkts; i++) {
> +               odp_pool_t pool;
> +               odp_packet_t pkt;
> +               odp_packet_hdr_t *phdr;
> +               odp_buffer_bits_t handle;
> +               int idx; /* Remote packet has coded pool and index.
> +                         * We need only index.*/
> +               void *pkt_data;
> +               void *remote_pkt_data;
> +
> +               handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
> +               idx = handle.index;
> +
> +               /* Link to packed data. To this line we have Zero-Copy
> between
> +                * processes, to simplify use packet copy in that version
> which
> +                * can be removed later with more advance buffer management
> +                * (ref counters).
> +                */
> +               /* reverse odp_buf_to_hdr() */
> +               phdr = (odp_packet_hdr_t *)(
> +                               (char *)pktio_entry->s.ipc_pool_mdata_base
> +
> +                               (idx * ODP_CACHE_LINE_SIZE));
> +
> +               /* Allocate new packet. Select*/
> +               pool = pktio_entry->s.ipc_pool;
> +               if (odp_unlikely(pool == ODP_POOL_INVALID))
> +                       ODP_ABORT("invalid pool");
> +
> +               pkt = odp_packet_alloc(pool, phdr->frame_len);
> +               if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +                       /* Original pool might be smaller then
> +                       *  PKTIO_IPC_ENTRIES. If packet can not be
> +                        * allocated from pool at this time,
> +                        * simple get in on next recv() call.
> +                        */
> +                       pkts = i - 1;
> +                       break;
> +               }
> +
> +               /* Copy packet data. */
> +               pkt_data = odp_packet_data(pkt);
> +               if (odp_unlikely(pkt_data == NULL))
> +                       ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
> +                                 (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                       pktio_entry->s.type));
> +
> +               remote_pkt_data =  _ipc_packet_map(phdr, 0, NULL);
> +               if (odp_unlikely(remote_pkt_data == NULL))
> +                       ODP_ABORT("unable to map remote_pkt_data,
> ipc_slave %d\n",
> +                                 (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                       pktio_entry->s.type));
> +
> +               /* @todo fix copy packet!!! */
> +               memcpy(pkt_data, remote_pkt_data, phdr->frame_len);
> +
> +               /* Copy packets L2, L3 parsed offsets and size */
> +               odp_packet_hdr(pkt)->l2_offset = phdr->l2_offset;
> +               odp_packet_hdr(pkt)->l3_offset = phdr->l3_offset;
> +               odp_packet_hdr(pkt)->l4_offset = phdr->l4_offset;
> +               odp_packet_hdr(pkt)->payload_offset = phdr->payload_offset;
> +
> +               odp_packet_hdr(pkt)->vlan_s_tag =  phdr->vlan_s_tag;
> +               odp_packet_hdr(pkt)->vlan_c_tag =  phdr->vlan_c_tag;
> +               odp_packet_hdr(pkt)->l3_protocol = phdr->l3_protocol;
> +               odp_packet_hdr(pkt)->l3_len = phdr->l3_len;
> +
> +               odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
> +               odp_packet_hdr(pkt)->headroom = phdr->headroom;
> +               odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
> +               pkt_table[i] = pkt;
> +       }
> +
> +       /* Now tell other process that we no longer need that buffers.*/
> +       ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts);
> +       if (ret != 0)
> +               ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
> +
> +       return pkts;
> +}
> +
> +int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
> +                     unsigned len)
> +{
> +       odph_ring_t *r;
> +       odph_ring_t *r_p;
> +       void **rbuf_p;
> +       int ret;
> +       unsigned i;
> +
> +       if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
> +               r = pktio_entry->s.ipc_r_slave;
> +               r_p = pktio_entry->s.ipc_p_slave;
> +       } else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
> +               r = pktio_entry->s.ipc_r;
> +               r_p = pktio_entry->s.ipc_p;
> +       } else {
> +               ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
> +       }
> +
> +       /* Free already processed packets, if any */
> +       {
> +               unsigned complete_packets = odph_ring_count(r_p);
> +               odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
> +
> +               if (complete_packets > 0) {
> +                       rbuf_p = (void *)&r_p_pkts;
> +                       ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p,
> +                                       complete_packets);
> +                       if (ret == 0) {
> +                               for (i = 0; i < complete_packets; i++)
> +                                       odp_packet_free(r_p_pkts[i]);
> +                       }
> +               }
> +       }
> +
> +       /* wait while second process takes packet from the ring.*/
> +       i = 3;
> +       while (odph_ring_free_count(r) < len && i) {
> +               i--;
> +               sleep(1);
> +       }
> +
> +       /* Put packets to ring to be processed in other process. */
> +       for (i = 0; i < len; i++) {
> +               int j;
> +               odp_packet_t pkt =  pkt_table[i];
> +               rbuf_p = (void *)&pkt;
> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> +
> +               /* buf_hdr.addr can not be used directly in remote process,
> +                * convert it to offset
> +                */
> +               for (j = 0; j < ODP_BUFFER_MAX_SEG; j++)
> +                       pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char
> *)pkt_hdr -
> +                               (char *)pkt_hdr->buf_hdr.addr[j];
> +
> +               ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
> +               if (odp_unlikely(ret != 0)) {
> +                       ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail,
> ipc_slave %d, ret %d\n",
> +                               getpid(),
> +                               (ODP_PKTIO_TYPE_IPC_SLAVE ==
> +                                pktio_entry->s.type),
> +                               ret);
> +                       ODP_ERR("odp_ring_full: %d, odp_ring_count %d,
> odph_ring_free_count %d\n",
> +                               odph_ring_full(r), odph_ring_count(r),
> +                               odph_ring_free_count(r));
> +               }
> +       }
> +       return len;
> +}
> --
> 1.9.1
>
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> https://lists.linaro.org/mailman/listinfo/lng-odp
>
Maxim Uvarov May 5, 2015, 10:27 a.m. UTC | #5
On 04/27/2015 16:03, Stuart Haslam wrote:
>> @@ -53,6 +56,18 @@ struct pktio_entry {
>> >  	char name[IFNAMSIZ];		/**< name of pktio provided to
>> >  					   pktio_open() */
>> >  	odp_bool_t promisc;		/**< promiscuous mode state */
>> >+	odph_ring_t	*ipc_r;		/**< ODP ring for IPC mgs packets
>> >+					indexes transmitted to shared memory */
>> >+	odph_ring_t	*ipc_p;		/**< ODP ring for IPC msg packets
>> >+					indexes already processed by remote process */
>> >+	void		*ipc_pool_base; /**< IPC Remote pool base addr */
>> >+	void		*ipc_pool_mdata_base; /**< IPC Remote pool mdata base addr */
>> >+	uint64_t	ipc_pkt_size;	/**< IPC: packet size in remote pool */
> We generally use uint32_t for packet length.
>
>> >+
>> >+	odph_ring_t	*ipc_r_slave;
>> >+	odph_ring_t	*ipc_p_slave;
> Why are these needed?.. having looked at the rest of the patch I still
> don't understand why both ends of the pipe nee

Because it's bi directional pktio ans slave can not free packet from the 
master. It can only ask
master to free that buffer.

TX
consumer (ipc_r)  ----new packet--->
producer  (ipc_p) <---free packet----

RX
consumer (ipc_r_slave) <--new packet-------
producer  (ipc_p_slave)  ---free packet------>


Maxim.
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 66f0474..055f75c 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -120,6 +120,7 @@  noinst_HEADERS = \
 		  ${top_srcdir}/platform/linux-generic/include/odp_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_packet_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_internal.h \
+		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_ipc_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_packet_io_queue.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_packet_socket.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_pool_internal.h \
@@ -155,6 +156,7 @@  __LIB__libodp_la_SOURCES = \
 			   odp_packet.c \
 			   odp_packet_flags.c \
 			   odp_packet_io.c \
+			   odp_packet_io_ipc.c \
 			   odp_packet_socket.c \
 			   odp_pool.c \
 			   odp_queue.c \
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 3a3d2a2..4ea7c62 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -129,6 +129,9 @@  typedef struct odp_buffer_hdr_t {
 	size_t                   udata_size; /* size of user metadata */
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
+	/* ipc mapped process can not walk over pointers,
+	 * offset has to be used */
+	uint64_t		 ipc_addr_offset[ODP_BUFFER_MAX_SEG];
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 } odp_buffer_hdr_t;
 
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index 18b59ef..744f438 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -23,6 +23,7 @@  extern "C" {
 #include <odp_classification_datamodel.h>
 #include <odp_align_internal.h>
 #include <odp_debug_internal.h>
+#include <odp/helper/ring.h>
 
 #include <odp/config.h>
 #include <odp/hints.h>
@@ -36,6 +37,8 @@  typedef enum {
 	ODP_PKTIO_TYPE_SOCKET_MMSG,
 	ODP_PKTIO_TYPE_SOCKET_MMAP,
 	ODP_PKTIO_TYPE_LOOPBACK,
+	ODP_PKTIO_TYPE_IPC,
+	ODP_PKTIO_TYPE_IPC_SLAVE,
 } odp_pktio_type_t;
 
 struct pktio_entry {
@@ -53,6 +56,18 @@  struct pktio_entry {
 	char name[IFNAMSIZ];		/**< name of pktio provided to
 					   pktio_open() */
 	odp_bool_t promisc;		/**< promiscuous mode state */
+	odph_ring_t	*ipc_r;		/**< ODP ring for IPC mgs packets
+					indexes transmitted to shared memory */
+	odph_ring_t	*ipc_p;		/**< ODP ring for IPC msg packets
+					indexes already processed by remote process */
+	void		*ipc_pool_base; /**< IPC Remote pool base addr */
+	void		*ipc_pool_mdata_base; /**< IPC Remote pool mdata base addr */
+	uint64_t	ipc_pkt_size;	/**< IPC: packet size in remote pool */
+
+	odph_ring_t	*ipc_r_slave;
+	odph_ring_t	*ipc_p_slave;
+
+	odp_pool_t	ipc_pool;	/**< IPC: Pool of main process */
 };
 
 typedef union {
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
new file mode 100644
index 0000000..a766519
--- /dev/null
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -0,0 +1,47 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/packet_io.h>
+#include <odp_packet_io_internal.h>
+#include <odp/packet.h>
+#include <odp_packet_internal.h>
+#include <odp_internal.h>
+#include <odp/shared_memory.h>
+
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+/* IPC packet I/O over odph_ring */
+#include <odp/helper/ring.h>
+
+#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
+					odp ring queue */
+
+/* that struct is exported to shared memory, so that 2 processes can find
+ * each other.
+ */
+struct pktio_info {
+	char remote_pool_name[30];
+	int shm_pool_num;
+	size_t shm_pkt_pool_size;
+	size_t shm_pkt_size;
+	size_t mdata_offset; /*< offset from shared memory block start
+			      *to pool_mdata_addr */
+	struct {
+		size_t mdata_offset;
+		char   pool_name[30];
+	} slave;
+} __packed;
+
+int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
+		   odp_pool_t pool);
+
+int ipc_pktio_recv(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		   unsigned len);
+
+int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		   unsigned len);
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index cfe5b71..01077fb 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -18,6 +18,7 @@ 
 #include <odp_schedule_internal.h>
 #include <odp_classification_internal.h>
 #include <odp_debug_internal.h>
+#include <odp_packet_io_ipc_internal.h>
 
 #include <string.h>
 #include <sys/ioctl.h>
@@ -25,6 +26,15 @@ 
 #include <ifaddrs.h>
 #include <errno.h>
 
+#include <sys/types.h>
+#include <unistd.h>
+
+/* IPC packet I/O over odph_ring */
+#include <odp/helper/ring.h>
+
+#define PKTIO_IPC_ENTRIES     4096 /**< number of odp buffers in
+					odp ring queue */
+
 /* MTU to be reported for the "loop" interface */
 #define PKTIO_LOOP_MTU 1500
 /* MAC address for the "loop" interface */
@@ -263,7 +273,12 @@  static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool)
 
 	if (strcmp(dev, "loop") == 0)
 		ret = init_loop(pktio_entry, id);
-	else
+	else if (!strncmp(dev, "ipc", 3)) {
+		ret = ipc_pktio_init(pktio_entry, dev, pool);
+		if (ret != 0)
+			ODP_ABORT("unable to init ipc for %s, pool %" PRIu64 "\n",
+				  dev, pool);
+	} else
 		ret = init_socket(pktio_entry, dev, pool);
 
 	if (ret != 0) {
@@ -285,6 +300,10 @@  odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
 {
 	odp_pktio_t id;
 
+	/* no local table lookup for ipc case */
+	if (pool == NULL && !memcmp(dev, "ipc", 3))
+		goto no_local_lookup;
+
 	id = odp_pktio_lookup(dev);
 	if (id != ODP_PKTIO_INVALID) {
 		/* interface is already open */
@@ -292,6 +311,7 @@  odp_pktio_t odp_pktio_open(const char *dev, odp_pool_t pool)
 		return ODP_PKTIO_INVALID;
 	}
 
+no_local_lookup:
 	odp_spinlock_lock(&pktio_tbl->lock);
 	id = setup_pktio_entry(dev, pool);
 	odp_spinlock_unlock(&pktio_tbl->lock);
@@ -408,6 +428,10 @@  int odp_pktio_recv(odp_pktio_t id, odp_packet_t pkt_table[], int len)
 	case ODP_PKTIO_TYPE_LOOPBACK:
 		pkts = deq_loopback(pktio_entry, pkt_table, len);
 		break;
+	case ODP_PKTIO_TYPE_IPC_SLAVE:
+	case ODP_PKTIO_TYPE_IPC:
+		pkts = ipc_pktio_recv(pktio_entry, pkt_table, len);
+		break;
 	default:
 		pkts = -1;
 		break;
@@ -462,6 +486,10 @@  int odp_pktio_send(odp_pktio_t id, odp_packet_t pkt_table[], int len)
 	case ODP_PKTIO_TYPE_LOOPBACK:
 		pkts = enq_loopback(pktio_entry, pkt_table, len);
 		break;
+	case ODP_PKTIO_TYPE_IPC:
+	case ODP_PKTIO_TYPE_IPC_SLAVE:
+		pkts = ipc_pktio_send(pktio_entry, pkt_table, len);
+		break;
 	default:
 		pkts = -1;
 	}
diff --git a/platform/linux-generic/odp_packet_io_ipc.c b/platform/linux-generic/odp_packet_io_ipc.c
new file mode 100644
index 0000000..312ad3a
--- /dev/null
+++ b/platform/linux-generic/odp_packet_io_ipc.c
@@ -0,0 +1,590 @@ 
+/* Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp_packet_io_ipc_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_packet_io_internal.h>
+#include <odp_spin_internal.h>
+#include <odp/system_info.h>
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static void *_ipc_map_remote_pool(const char *name, size_t size);
+
+static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
+{
+	pool_entry_t *pool;
+	uint32_t pool_id;
+	odp_shm_t shm;
+	odp_shm_info_t info;
+
+	pool_id = pool_handle_to_index(pool_hdl);
+	pool    = get_pool_entry(pool_id);
+	shm = pool->s.pool_shm;
+
+	odp_shm_info(shm, &info);
+
+	return info.name;
+}
+
+/**
+* Look up for shared memory object.
+*
+* @param name   name of shm object
+*
+* @return 0 on success, otherwise non-zero
+*/
+static int _odp_shm_lookup_ipc(const char *name)
+{
+	int shm;
+
+	shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
+	if (shm == -1) {
+		ODP_DBG("IPC shm_open for %s not found\n", name);
+		return -1;
+	}
+	close(shm);
+	return 0;
+}
+
+static struct pktio_info *_ipc_map_pool_info(const char *pool_name, int flag)
+{
+	char *name;
+	struct pktio_info *pinfo;
+
+	/* Create info about remote pktio */
+	name = (char *)malloc(strlen(pool_name) + sizeof("_info"));
+	memcpy(name, pool_name, strlen(pool_name));
+	memcpy(name + strlen(pool_name), "_info", sizeof("_info"));
+	odp_shm_t shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+			ODP_CACHE_LINE_SIZE,
+			flag);
+	if (ODP_SHM_INVALID == shm)
+		ODP_ABORT("unable to reserve memory for shm info");
+	free(name);
+	pinfo = odp_shm_addr(shm);
+	if (flag != ODP_SHM_PROC_NOCREAT)
+		memset(pinfo->remote_pool_name, 0, 30);
+	return pinfo;
+}
+
+static int _ipc_pktio_init_master(pktio_entry_t *pktio_entry, const char *dev,
+			   odp_pool_t pool)
+{
+	char ipc_shm_name[ODPH_RING_NAMESIZE];
+	pool_entry_t *pool_entry;
+	uint32_t pool_id;
+	void *ipc_pool_base;
+	struct pktio_info *pinfo;
+	const char *pool_name;
+
+	pool_id = pool_handle_to_index(pool);
+	pool_entry    = get_pool_entry(pool_id);
+
+	/* generate name in shm like ipc_pktio_r for
+	 * to be processed packets ring.
+	 */
+	memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
+	memcpy(ipc_shm_name, dev, strlen(dev));
+	memcpy(ipc_shm_name + strlen(dev), "_r", 2);
+
+	pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC);
+	if (!pktio_entry->s.ipc_r) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
+		odph_ring_free_count(pktio_entry->s.ipc_r));
+
+	/* generate name in shm like ipc_pktio_p for
+	 * already processed packets
+	 */
+	memcpy(ipc_shm_name + strlen(dev), "_p", 2);
+
+	pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC);
+	if (!pktio_entry->s.ipc_p) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
+		odph_ring_free_count(pktio_entry->s.ipc_p));
+
+	memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
+	pktio_entry->s.ipc_r_slave = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC);
+	if (!pktio_entry->s.ipc_r_slave) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
+		odph_ring_free_count(pktio_entry->s.ipc_r_slave));
+
+	memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
+	pktio_entry->s.ipc_p_slave = odph_ring_create(ipc_shm_name,
+			PKTIO_IPC_ENTRIES,
+			ODPH_RING_SHM_PROC);
+	if (!pktio_entry->s.ipc_p_slave) {
+		ODP_DBG("pid %d unable to create ipc ring %s name\n",
+			getpid(), ipc_shm_name);
+		return -1;
+	}
+	ODP_DBG("Created IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
+		odph_ring_free_count(pktio_entry->s.ipc_p_slave));
+
+	/* Memory to store information about exported pool */
+	pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC);
+
+	/* Set up pool name for remote info */
+	pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+	memcpy(pinfo->remote_pool_name, pool_name, strlen(pool_name));
+	pinfo->shm_pkt_pool_size = pool_entry->s.pool_size;
+	pinfo->shm_pool_num = pool_entry->s.buf_num;
+	pinfo->shm_pkt_size = pool_entry->s.seg_size;
+	pinfo->mdata_offset =  pool_entry->s.pool_mdata_addr -
+			       pool_entry->s.pool_base_addr;
+	pinfo->slave.mdata_offset = 0;
+	ODP_DBG("Master waiting for slave to be connected now..\n");
+
+	/* Wait for remote process to export his pool. */
+	ODP_DBG("Wait for second process set mdata_offset...\n");
+	while (pinfo->slave.mdata_offset == 0)
+		odp_spin();
+
+	ODP_DBG("Wait for second process set mdata_offset... DONE.\n");
+
+	while (1) {
+		int ret = _odp_shm_lookup_ipc(pinfo->slave.pool_name);
+		if (!ret)
+			break;
+		ODP_DBG("Master looking for %s\n", pinfo->slave.pool_name);
+		sleep(1);
+	}
+
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
+					     pinfo->shm_pkt_pool_size);
+	pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->slave.mdata_offset;
+	pktio_entry->s.ipc_pool = pool;
+
+	return 0;
+}
+
+static odp_pool_t _ipc_odp_alloc_and_create_pool_slave(struct pktio_info *pinfo)
+{
+	odp_pool_t pool;
+	char *pool_name;
+	odp_pool_param_t params;
+	int num = pinfo->shm_pool_num;
+	uint64_t buf_size = pinfo->shm_pkt_size;
+	pool_entry_t *pool_entry;
+
+	pool_name = calloc(1, strlen(pinfo->remote_pool_name) +
+			   sizeof("ipc_pool_slave_"));
+	sprintf(pool_name, "ipc_pool_slave_%s", pinfo->remote_pool_name);
+
+	ODP_DBG("slave uses pool %s\n", pool_name);
+
+	memset(&params, 0, sizeof(params));
+	params.pkt.num = num;
+	params.pkt.len = buf_size;
+	params.pkt.seg_len = buf_size;
+	params.type = ODP_POOL_PACKET;
+	params.shm_flags = ODP_SHM_PROC;
+
+	pool = odp_pool_create(pool_name, ODP_SHM_NULL, &params);
+	if (pool == ODP_POOL_INVALID)
+		ODP_ABORT("Error: packet pool create failed.\n"
+			"num %d, len %d, seg_len %d\n",
+			params.pkt.num, params.pkt.len, params.pkt.seg_len);
+
+	/* Export info so that master can connect to that pool*/
+	snprintf(pinfo->slave.pool_name, 30, "%s", pool_name);
+	pool_entry = odp_pool_to_entry(pool);
+	pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
+				    pool_entry->s.pool_base_addr;
+	free(pool_name);
+
+	return pool;
+}
+
+static void *_ipc_map_remote_pool(const char *name, size_t size)
+{
+	odp_shm_t shm;
+
+	ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
+	shm = odp_shm_reserve(name,
+			size,
+			ODP_CACHE_LINE_SIZE,
+			ODP_SHM_PROC_NOCREAT);
+	if (shm == ODP_SHM_INVALID)
+		ODP_ABORT("unable map %s\n", name);
+	return odp_shm_addr(shm);
+}
+
+static void *_ipc_shm_map(char *name, size_t size, int timeout)
+{
+	odp_shm_t shm;
+	int ret;
+
+	while (1) {
+		ret = _odp_shm_lookup_ipc(name);
+		if (!ret)
+			break;
+		ODP_DBG("Waiting for %s\n", name);
+		if (timeout <= 0)
+			return NULL;
+		timeout--;
+		sleep(1);
+	}
+
+	shm = odp_shm_reserve(name, size,
+			ODP_CACHE_LINE_SIZE,
+			ODP_SHM_PROC_NOCREAT);
+	if (ODP_SHM_INVALID == shm)
+		ODP_ABORT("unable to map: %s\n", name);
+
+	return odp_shm_addr(shm);
+}
+
+static int _ipc_pktio_init_slave(const char *dev, pktio_entry_t *pktio_entry)
+{
+	int ret = -1;
+	char ipc_shm_name[ODPH_RING_NAMESIZE];
+	size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
+			   sizeof(odph_ring_t);
+	struct pktio_info *pinfo;
+	void *ipc_pool_base;
+
+	memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE);
+	memcpy(ipc_shm_name, dev, strlen(dev));
+
+	memcpy(ipc_shm_name + strlen(dev), "_r", 2);
+	pktio_entry->s.ipc_r  = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc_r) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto error;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r),
+		odph_ring_free_count(pktio_entry->s.ipc_r));
+
+	memcpy(ipc_shm_name + strlen(dev), "_p", 2);
+	pktio_entry->s.ipc_p = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc_p) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto error;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p),
+		odph_ring_free_count(pktio_entry->s.ipc_p));
+
+	memcpy(ipc_shm_name + strlen(dev), "_slave_r", 8);
+	pktio_entry->s.ipc_r_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc_r_slave) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto error;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_r_slave),
+		odph_ring_free_count(pktio_entry->s.ipc_r_slave));
+
+	memcpy(ipc_shm_name + strlen(dev), "_slave_p", 8);
+	pktio_entry->s.ipc_p_slave = _ipc_shm_map(ipc_shm_name, ring_size, 10);
+	if (!pktio_entry->s.ipc_p_slave) {
+		ODP_DBG("pid %d unable to find ipc ring %s name\n",
+			getpid(), dev);
+		goto error;
+	}
+	ODP_DBG("Connected IPC ring: %s, count %d, free %d\n",
+		ipc_shm_name, odph_ring_count(pktio_entry->s.ipc_p_slave),
+		odph_ring_free_count(pktio_entry->s.ipc_p_slave));
+
+
+	/* Get info about remote pool */
+	pinfo = _ipc_map_pool_info(dev, ODP_SHM_PROC_NOCREAT);
+
+	ipc_pool_base = _ipc_map_remote_pool(pinfo->remote_pool_name,
+					     pinfo->shm_pkt_pool_size);
+	pktio_entry->s.ipc_pool_mdata_base = (char *)ipc_pool_base +
+					     pinfo->mdata_offset;
+	pktio_entry->s.ipc_pkt_size = pinfo->shm_pkt_size;
+
+	/* @todo: to simplify in linux-generic implementation we create pool for
+	 * packets from IPC queue. On receive implementation copies packets to
+	 * that pool. Later we can try to reuse original pool without packets
+	 * copying.
+	 */
+	pktio_entry->s.ipc_pool = _ipc_odp_alloc_and_create_pool_slave(pinfo);
+
+	ret = 0;
+	ODP_DBG("%s OK.\n", __func__);
+error:
+	/* @todo free shm on error (api not impemented yet) */
+	return ret;
+}
+
+int ipc_pktio_init(pktio_entry_t *pktio_entry, const char *dev,
+			   odp_pool_t pool)
+{
+	int ret;
+
+	/* if pool is zero we assume that it's slave process connects
+	 * to shared memory already created by main process.
+	 */
+	if (pool) {
+		pktio_entry->s.type = ODP_PKTIO_TYPE_IPC;
+		ret = _ipc_pktio_init_master(pktio_entry, dev, pool);
+	} else {
+		pktio_entry->s.type = ODP_PKTIO_TYPE_IPC_SLAVE;
+		ret = _ipc_pktio_init_slave(dev, pktio_entry);
+	}
+
+	return ret;
+}
+
+
+static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
+			       uint32_t offset,
+			       uint32_t *seglen,
+			       uint32_t limit)
+{
+	int seg_index  = offset / buf->segsize;
+	int seg_offset = offset % buf->segsize;
+	void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
+
+	if (seglen != NULL) {
+		uint32_t buf_left = limit - offset;
+		*seglen = seg_offset + buf_left <= buf->segsize ?
+			buf_left : buf->segsize - seg_offset;
+	}
+
+	return (void *)(seg_offset + (uint8_t *)addr);
+}
+
+
+static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
+			       uint32_t offset, uint32_t *seglen)
+{
+	if (offset > pkt_hdr->frame_len)
+		return NULL;
+
+	return _ipc_buffer_map(&pkt_hdr->buf_hdr,
+			  pkt_hdr->headroom + offset, seglen,
+			  pkt_hdr->headroom + pkt_hdr->frame_len);
+}
+
+int ipc_pktio_recv(pktio_entry_t *pktio_entry,
+		      odp_packet_t pkt_table[], unsigned len)
+{
+	int pkts = 0;
+	int ret;
+	int i;
+	odph_ring_t *r;
+	odph_ring_t *r_p;
+	odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
+	void **ipcbufs_p = (void *)&remote_pkts;
+	unsigned ring_len;
+	int p_free;
+
+	if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
+		r = pktio_entry->s.ipc_r_slave;
+		r_p = pktio_entry->s.ipc_p_slave;
+	} else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
+		r = pktio_entry->s.ipc_r;
+		r_p = pktio_entry->s.ipc_p;
+	} else {
+		ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
+	}
+
+	ring_len = odph_ring_count(r);
+
+	pkts = len;
+	if (len > ring_len)
+		pkts = ring_len;
+
+	/* Do not dequeue more then we can put to producer ring */
+	p_free = odph_ring_free_count(r_p);
+	if (pkts > p_free)
+		pkts = p_free;
+
+	ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts);
+	if (ret != 0) {
+		ODP_DBG("error to dequeue no packets\n");
+		pkts = -1;
+		return pkts;
+	}
+
+	if (pkts == 0)
+		return 0;
+
+	for (i = 0; i < pkts; i++) {
+		odp_pool_t pool;
+		odp_packet_t pkt;
+		odp_packet_hdr_t *phdr;
+		odp_buffer_bits_t handle;
+		int idx; /* Remote packet has coded pool and index.
+			  * We need only index.*/
+		void *pkt_data;
+		void *remote_pkt_data;
+
+		handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
+		idx = handle.index;
+
+		/* Link to packed data. To this line we have Zero-Copy between
+		 * processes, to simplify use packet copy in that version which
+		 * can be removed later with more advance buffer management
+		 * (ref counters).
+		 */
+		/* reverse odp_buf_to_hdr() */
+		phdr = (odp_packet_hdr_t *)(
+				(char *)pktio_entry->s.ipc_pool_mdata_base +
+				(idx * ODP_CACHE_LINE_SIZE));
+
+		/* Allocate new packet. Select*/
+		pool = pktio_entry->s.ipc_pool;
+		if (odp_unlikely(pool == ODP_POOL_INVALID))
+			ODP_ABORT("invalid pool");
+
+		pkt = odp_packet_alloc(pool, phdr->frame_len);
+		if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
+			/* Original pool might be smaller then
+			*  PKTIO_IPC_ENTRIES. If packet can not be
+			 * allocated from pool at this time,
+			 * simple get in on next recv() call.
+			 */
+			pkts = i - 1;
+			break;
+		}
+
+		/* Copy packet data. */
+		pkt_data = odp_packet_data(pkt);
+		if (odp_unlikely(pkt_data == NULL))
+			ODP_ABORT("unable to map pkt_data ipc_slave %d\n",
+				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.type));
+
+		remote_pkt_data =  _ipc_packet_map(phdr, 0, NULL);
+		if (odp_unlikely(remote_pkt_data == NULL))
+			ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
+				  (ODP_PKTIO_TYPE_IPC_SLAVE ==
+					pktio_entry->s.type));
+
+		/* @todo fix copy packet!!! */
+		memcpy(pkt_data, remote_pkt_data, phdr->frame_len);
+
+		/* Copy packets L2, L3 parsed offsets and size */
+		odp_packet_hdr(pkt)->l2_offset = phdr->l2_offset;
+		odp_packet_hdr(pkt)->l3_offset = phdr->l3_offset;
+		odp_packet_hdr(pkt)->l4_offset = phdr->l4_offset;
+		odp_packet_hdr(pkt)->payload_offset = phdr->payload_offset;
+
+		odp_packet_hdr(pkt)->vlan_s_tag =  phdr->vlan_s_tag;
+		odp_packet_hdr(pkt)->vlan_c_tag =  phdr->vlan_c_tag;
+		odp_packet_hdr(pkt)->l3_protocol = phdr->l3_protocol;
+		odp_packet_hdr(pkt)->l3_len = phdr->l3_len;
+
+		odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
+		odp_packet_hdr(pkt)->headroom = phdr->headroom;
+		odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
+		pkt_table[i] = pkt;
+	}
+
+	/* Now tell other process that we no longer need that buffers.*/
+	ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts);
+	if (ret != 0)
+		ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+
+	return pkts;
+}
+
+int ipc_pktio_send(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[],
+		      unsigned len)
+{
+	odph_ring_t *r;
+	odph_ring_t *r_p;
+	void **rbuf_p;
+	int ret;
+	unsigned i;
+
+	if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC_SLAVE) {
+		r = pktio_entry->s.ipc_r_slave;
+		r_p = pktio_entry->s.ipc_p_slave;
+	} else if (pktio_entry->s.type == ODP_PKTIO_TYPE_IPC) {
+		r = pktio_entry->s.ipc_r;
+		r_p = pktio_entry->s.ipc_p;
+	} else {
+		ODP_ABORT("wrong type: %d\n", pktio_entry->s.type);
+	}
+
+	/* Free already processed packets, if any */
+	{
+		unsigned complete_packets = odph_ring_count(r_p);
+		odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+
+		if (complete_packets > 0) {
+			rbuf_p = (void *)&r_p_pkts;
+			ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p,
+					complete_packets);
+			if (ret == 0) {
+				for (i = 0; i < complete_packets; i++)
+					odp_packet_free(r_p_pkts[i]);
+			}
+		}
+	}
+
+	/* wait while second process takes packet from the ring.*/
+	i = 3;
+	while (odph_ring_free_count(r) < len && i) {
+		i--;
+		sleep(1);
+	}
+
+	/* Put packets to ring to be processed in other process. */
+	for (i = 0; i < len; i++) {
+		int j;
+		odp_packet_t pkt =  pkt_table[i];
+		rbuf_p = (void *)&pkt;
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+
+		/* buf_hdr.addr can not be used directly in remote process,
+		 * convert it to offset
+		 */
+		for (j = 0; j < ODP_BUFFER_MAX_SEG; j++)
+			pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
+				(char *)pkt_hdr->buf_hdr.addr[j];
+
+		ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
+		if (odp_unlikely(ret != 0)) {
+			ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n",
+				getpid(),
+				(ODP_PKTIO_TYPE_IPC_SLAVE ==
+				 pktio_entry->s.type),
+				ret);
+			ODP_ERR("odp_ring_full: %d, odp_ring_count %d, odph_ring_free_count %d\n",
+				odph_ring_full(r), odph_ring_count(r),
+				odph_ring_free_count(r));
+		}
+	}
+	return len;
+}