diff mbox

[lng,ODP,2/2] linux-dpdk: Rework buffer management

Message ID 1410792681-12583-3-git-send-email-ciprian.barbu@linaro.org
State Superseded
Headers show

Commit Message

Ciprian Barbu Sept. 15, 2014, 2:51 p.m. UTC
Signed-off-by: Ciprian Barbu <ciprian.barbu@linaro.org>
---
 platform/linux-dpdk/Makefile.am                    |   2 +-
 platform/linux-dpdk/include/api/odp_buffer.h       |   2 +-
 platform/linux-dpdk/include/api/odp_buffer_pool.h  |   2 +-
 platform/linux-dpdk/include/api/odp_packet.h       |  17 +
 platform/linux-dpdk/include/odp_buffer_internal.h  |   8 +-
 platform/linux-dpdk/include/odp_packet_internal.h  |   7 +-
 .../linux-dpdk/include/odp_packet_io_internal.h    |   5 -
 platform/linux-dpdk/odp_buffer.c                   |  26 +-
 platform/linux-dpdk/odp_buffer_pool.c              | 150 +++++++-
 platform/linux-dpdk/odp_packet.c                   | 148 +++++---
 platform/linux-dpdk/odp_packet_dpdk.c              |  17 +-
 platform/linux-dpdk/odp_queue.c                    |  20 +-
 platform/linux-dpdk/odp_schedule.c                 | 417 +++++++++++++++++++++
 13 files changed, 716 insertions(+), 105 deletions(-)
 create mode 100644 platform/linux-dpdk/odp_schedule.c

Comments

Venkatesh Vivekanandan Sept. 24, 2014, 10:31 a.m. UTC | #1
On 15 September 2014 20:21, Ciprian Barbu <ciprian.barbu@linaro.org> wrote:

> Signed-off-by: Ciprian Barbu <ciprian.barbu@linaro.org>
> ---
>  platform/linux-dpdk/Makefile.am                    |   2 +-
>  platform/linux-dpdk/include/api/odp_buffer.h       |   2 +-
>  platform/linux-dpdk/include/api/odp_buffer_pool.h  |   2 +-
>  platform/linux-dpdk/include/api/odp_packet.h       |  17 +
>  platform/linux-dpdk/include/odp_buffer_internal.h  |   8 +-
>  platform/linux-dpdk/include/odp_packet_internal.h  |   7 +-
>  .../linux-dpdk/include/odp_packet_io_internal.h    |   5 -
>  platform/linux-dpdk/odp_buffer.c                   |  26 +-
>  platform/linux-dpdk/odp_buffer_pool.c              | 150 +++++++-
>  platform/linux-dpdk/odp_packet.c                   | 148 +++++---
>  platform/linux-dpdk/odp_packet_dpdk.c              |  17 +-
>  platform/linux-dpdk/odp_queue.c                    |  20 +-
>  platform/linux-dpdk/odp_schedule.c                 | 417
> +++++++++++++++++++++
>  13 files changed, 716 insertions(+), 105 deletions(-)
>  create mode 100644 platform/linux-dpdk/odp_schedule.c
>
> diff --git a/platform/linux-dpdk/Makefile.am
> b/platform/linux-dpdk/Makefile.am
> index 1eabd9f..e128bf8 100644
> --- a/platform/linux-dpdk/Makefile.am
> +++ b/platform/linux-dpdk/Makefile.am
> @@ -79,7 +79,7 @@ __LIB__libodp_la_SOURCES = \
>                            odp_queue.c \
>                            ../linux-generic/odp_ring.c \
>                            ../linux-generic/odp_rwlock.c \
> -                          ../linux-generic/odp_schedule.c \
> +                          odp_schedule.c \
>                            ../linux-generic/odp_shared_memory.c \
>                            ../linux-generic/odp_spinlock.c \
>                            ../linux-generic/odp_system_info.c \
> diff --git a/platform/linux-dpdk/include/api/odp_buffer.h
> b/platform/linux-dpdk/include/api/odp_buffer.h
> index 9ea1ed8..b2fbc76 100644
> --- a/platform/linux-dpdk/include/api/odp_buffer.h
> +++ b/platform/linux-dpdk/include/api/odp_buffer.h
> @@ -32,7 +32,7 @@ extern "C" {
>  typedef unsigned long odp_buffer_t;
>
>
> -#define ODP_BUFFER_INVALID (0xffffffff) /**< Invalid buffer */
> +#define ODP_BUFFER_INVALID (unsigned long)(-1L) /**< Invalid buffer */
>
>
>  /**
> diff --git a/platform/linux-dpdk/include/api/odp_buffer_pool.h
> b/platform/linux-dpdk/include/api/odp_buffer_pool.h
> index 4b75cf5..382f4f0 100644
> --- a/platform/linux-dpdk/include/api/odp_buffer_pool.h
> +++ b/platform/linux-dpdk/include/api/odp_buffer_pool.h
> @@ -27,7 +27,7 @@ extern "C" {
>  #define ODP_BUFFER_POOL_NAME_LEN  32
>
>  /** Invalid buffer pool */
> -#define ODP_BUFFER_POOL_INVALID  (0xffffffff)
> +#define ODP_BUFFER_POOL_INVALID  (unsigned long)(-1L)
>
>  /** ODP buffer pool */
>  typedef unsigned long odp_buffer_pool_t;
> diff --git a/platform/linux-dpdk/include/api/odp_packet.h
> b/platform/linux-dpdk/include/api/odp_packet.h
> index 5545bdc..79503a5 100644
> --- a/platform/linux-dpdk/include/api/odp_packet.h
> +++ b/platform/linux-dpdk/include/api/odp_packet.h
> @@ -80,6 +80,23 @@ void odp_packet_set_len(odp_packet_t pkt, size_t len);
>  size_t odp_packet_get_len(odp_packet_t pkt);
>
>  /**
> + * Set packet user context
> + *
> + * @param buf      Packet handle
> + * @param ctx      User context
> + *
> + */
> +void odp_packet_set_ctx(odp_packet_t buf, const void *ctx);
> +
> +/**
> + * Get packet user context
> + *
> + * @param buf      Packet handle
> + *
> + * @return User context
> + */
> +void *odp_packet_get_ctx(odp_packet_t buf);
> +/**
>   * Get address to the start of the packet buffer
>   *
>   * The address of the packet buffer is not necessarily the same as the
> start
> diff --git a/platform/linux-dpdk/include/odp_buffer_internal.h
> b/platform/linux-dpdk/include/odp_buffer_internal.h
> index f87ec80..5406606 100644
> --- a/platform/linux-dpdk/include/odp_buffer_internal.h
> +++ b/platform/linux-dpdk/include/odp_buffer_internal.h
> @@ -59,8 +59,12 @@ typedef union odp_buffer_bits_t {
>  struct odp_buffer_hdr_t;
>
>
> -typedef struct rte_mbuf odp_buffer_hdr_t;
> -
> +typedef struct odp_buffer_hdr_t {
> +       struct rte_mbuf mb;            /* Underlying DPDK rte_mbuf */
> +       struct odp_buffer_hdr_t *next; /* Next buf in a list */
> +       int type;                      /* ODP buffer type; not DPDK buf
> type */
> +       uint32_t index;                /* Index in the rte_mempool */
> +} odp_buffer_hdr_t;
>
>  int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf);
>
> diff --git a/platform/linux-dpdk/include/odp_packet_internal.h
> b/platform/linux-dpdk/include/odp_packet_internal.h
> index 9357f90..d7f505b 100644
> --- a/platform/linux-dpdk/include/odp_packet_internal.h
> +++ b/platform/linux-dpdk/include/odp_packet_internal.h
> @@ -113,13 +113,8 @@ typedef struct {
>         uint32_t l3_offset; /**< offset to L3 hdr, e.g. IPv4, IPv6 */
>         uint32_t l4_offset; /**< offset to L4 hdr (TCP, UDP, SCTP, also
> ICMP) */
>
> -       uint32_t frame_len;
> -
>         odp_pktio_t input;
> -
> -       uint32_t pad;
> -       uint8_t payload[];
> -
> +       uint64_t user_ctx;  /**< user context */
>  } odp_packet_hdr_t;
>
>  /**
> diff --git a/platform/linux-dpdk/include/odp_packet_io_internal.h
> b/platform/linux-dpdk/include/odp_packet_io_internal.h
> index 08abea7..9263349 100644
> --- a/platform/linux-dpdk/include/odp_packet_io_internal.h
> +++ b/platform/linux-dpdk/include/odp_packet_io_internal.h
> @@ -31,11 +31,6 @@ struct pktio_entry {
>         odp_queue_t inq_default;        /**< default input queue, if set */
>         odp_queue_t outq_default;       /**< default out queue */
>         odp_pktio_params_t params;      /**< pktio parameters */
> -       pkt_sock_t pkt_sock;            /**< using socket API for IO */
> -       pkt_sock_mmap_t pkt_sock_mmap;  /**< using socket mmap API for IO
> */
> -#ifdef ODP_HAVE_NETMAP
> -       pkt_netmap_t pkt_nm;            /**< using netmap API for IO */
> -#endif
>         pkt_dpdk_t pkt_dpdk;            /**< using DPDK API for IO */
>  };
>
> diff --git a/platform/linux-dpdk/odp_buffer.c
> b/platform/linux-dpdk/odp_buffer.c
> index e2f8942..e2657e4 100644
> --- a/platform/linux-dpdk/odp_buffer.c
> +++ b/platform/linux-dpdk/odp_buffer.c
> @@ -16,7 +16,7 @@ void *odp_buffer_addr(odp_buffer_t buf)
>  {
>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
>
> -       return hdr->buf_addr;
> +       return hdr->mb.buf_addr;
>  }
>
>
> @@ -24,7 +24,7 @@ size_t odp_buffer_size(odp_buffer_t buf)
>  {
>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
>
> -       return hdr->buf_len;
> +       return hdr->mb.buf_len;
>  }
>
>
> @@ -38,11 +38,9 @@ int odp_buffer_type(odp_buffer_t buf)
>
>  int odp_buffer_is_valid(odp_buffer_t buf)
>  {
> -       odp_buffer_bits_t handle;
> -
> -       handle.u32 = buf;
> -
> -       return (handle.index != ODP_BUFFER_INVALID_INDEX);
> +       /* We could call rte_mbuf_sanity_check, but that panics
> +        * and aborts the program */
> +       return (void*)buf != NULL;
>  }
>
>
> @@ -61,17 +59,19 @@ int odp_buffer_snprint(char *str, size_t n,
> odp_buffer_t buf)
>         len += snprintf(&str[len], n-len,
>                         "Buffer\n");
>         len += snprintf(&str[len], n-len,
> -                       "  pool         %"PRIu64"\n", (int64_t) hdr->pool);
> +                       "  pool         %"PRIu64"\n", (int64_t)
> hdr->mb.pool);
> +       len += snprintf(&str[len], n-len,
> +                       "  phy_addr     %"PRIu64"\n",
> hdr->mb.buf_physaddr);
>         len += snprintf(&str[len], n-len,
> -                       "  phy_addr     %"PRIu64"\n", hdr->buf_physaddr);
> +                       "  addr         %p\n",        hdr->mb.buf_addr);
>         len += snprintf(&str[len], n-len,
> -                       "  addr         %p\n",        hdr->buf_addr);
> +                       "  size         %u\n",        hdr->mb.buf_len);
>         len += snprintf(&str[len], n-len,
> -                       "  size         %u\n",        hdr->buf_len);
> +                       "  ref_count    %i\n",        hdr->mb.refcnt);
>         len += snprintf(&str[len], n-len,
> -                       "  ref_count    %i\n",        hdr->refcnt);
> +                       "  dpdk type    %i\n",        hdr->mb.type);
>         len += snprintf(&str[len], n-len,
> -                       "  type         %i\n",        hdr->type);
> +                       "  odp type     %i\n",        hdr->type);
>
>         return len;
>  }
> diff --git a/platform/linux-dpdk/odp_buffer_pool.c
> b/platform/linux-dpdk/odp_buffer_pool.c
> index 805ce68..f044b5d 100644
> --- a/platform/linux-dpdk/odp_buffer_pool.c
> +++ b/platform/linux-dpdk/odp_buffer_pool.c
> @@ -9,6 +9,7 @@
>  #include <odp_buffer_pool_internal.h>
>  #include <odp_buffer_internal.h>
>  #include <odp_packet_internal.h>
> +#include <odp_timer_internal.h>
>  #include <odp_shared_memory.h>
>  #include <odp_align.h>
>  #include <odp_internal.h>
> @@ -44,6 +45,13 @@
>
>  #define NULL_INDEX ((uint32_t)-1)
>
> +union buffer_type_any_u {
> +       odp_buffer_hdr_t  buf;
> +       odp_packet_hdr_t  pkt;
> +       odp_timeout_hdr_t tmo;
> +};
> +
> +typedef union buffer_type_any_u odp_any_buffer_hdr_t;
>
>  typedef union pool_entry_u {
>         struct pool_entry_s s;
> @@ -59,7 +67,7 @@ typedef struct pool_table_t {
>  } pool_table_t;
>
>
> -/* The pool table */
> +/* The pool table ptr - resides in shared memory */
>  static pool_table_t *pool_tbl;
>
>  /* Pool entry pointers (for inlining) */
> @@ -98,31 +106,151 @@ int odp_buffer_pool_init_global(void)
>         return 0;
>  }
>
> +struct mbuf_ctor_arg {
> +       uint16_t seg_buf_offset; /* To skip the ODP buf/pkt/tmo header */
> +       uint16_t seg_buf_size;   /* total sz: offset + user sz + HDROOM */
> +       int buf_type;
> +};
> +
> +struct mbuf_pool_ctor_arg {
> +       uint16_t seg_buf_size; /* size of mbuf: user specified sz + HDROOM
> */
> +};
> +
> +static void
> +odp_dpdk_mbuf_pool_ctor(struct rte_mempool *mp,
> +                       void *opaque_arg)
> +{
> +       struct mbuf_pool_ctor_arg      *mbp_ctor_arg;
> +       struct rte_pktmbuf_pool_private *mbp_priv;
> +
> +       if (mp->private_data_size < sizeof(struct
> rte_pktmbuf_pool_private)) {
> +               ODP_ERR("%s(%s) private_data_size %d < %d",
> +                       __func__, mp->name, (int) mp->private_data_size,
> +                       (int) sizeof(struct rte_pktmbuf_pool_private));
> +               return;
> +       }
> +       mbp_ctor_arg = (struct mbuf_pool_ctor_arg *) opaque_arg;
> +       mbp_priv = rte_mempool_get_priv(mp);
> +       mbp_priv->mbuf_data_room_size = mbp_ctor_arg->seg_buf_size;
> +}
> +
> +/* ODP DPDK mbuf constructor.
> + * This is a combination of rte_pktmbuf_init in rte_mbuf.c
> + * and testpmd_mbuf_ctor in testpmd.c
> + */
> +static void
> +odp_dpdk_mbuf_ctor(struct rte_mempool *mp,
> +                  void *opaque_arg,
> +                  void *raw_mbuf,
> +                  unsigned i)
> +{
> +       struct mbuf_ctor_arg *mb_ctor_arg;
> +       struct rte_mbuf *mb = raw_mbuf;
> +       struct odp_buffer_hdr_t *buf_hdr;
> +
> +       /* The rte_mbuf is at the begninning in all cases */
> +       mb_ctor_arg = (struct mbuf_ctor_arg *) opaque_arg;
> +       mb = (struct rte_mbuf *) raw_mbuf;
> +
> +       RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf));
> +
> +       memset(mb, 0, mp->elt_size);
> +
> +       /* Start of buffer is just after the ODP type specific header
> +        * which contains in the very beginning the rte_mbuf struct */
> +       mb->buf_addr     = (char *)mb + mb_ctor_arg->seg_buf_offset;
> +       mb->buf_physaddr = rte_mempool_virt2phy(mp, mb) +
> +                       mb_ctor_arg->seg_buf_offset;
> +       mb->buf_len      = mb_ctor_arg->seg_buf_size;
> +
> +       /* keep some headroom between start of buffer and data */
> +       if (mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_PACKET ||
> +           mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_ANY)
> +               mb->pkt.data = (char *) mb->buf_addr +
> RTE_PKTMBUF_HEADROOM;
> +       else
> +               mb->pkt.data = mb->buf_addr;
> +
> +       /* init some constant fields */
> +       mb->type         = RTE_MBUF_PKT;
> +       mb->pool         = mp;
> +       mb->pkt.nb_segs  = 1;
> +       mb->pkt.in_port  = 0xff;
> +       mb->ol_flags     = 0;
> +       mb->pkt.vlan_macip.data = 0;
> +       mb->pkt.hash.rss = 0;
> +
> +       /* Save index, might be useful for debugging purposes */
> +       buf_hdr = (struct odp_buffer_hdr_t*) raw_mbuf;
> +       buf_hdr->index = i;
> +}
>
>  odp_buffer_pool_t odp_buffer_pool_create(const char *name,
>                                          void *base_addr, uint64_t size,
>                                          size_t buf_size, size_t buf_align,
>                                          int buf_type)
>  {
> -       struct rte_mempool *pktmbuf_pool = NULL;
> +       struct rte_mempool *pool = NULL;
> +       struct mbuf_pool_ctor_arg mbp_ctor_arg;
> +       struct mbuf_ctor_arg mb_ctor_arg;
> +       unsigned mb_size;
> +
> +       /* Not used for rte_mempool; the new ODP buffer management
> introduces
> +        * rte_mempool_create_from_region where base_addr makes sense */
> +       (void)base_addr;
> +
> +       /* buf_align will be removed soon, no need to wory about it */
> +       (void)buf_align;
> +
>         ODP_DBG("odp_buffer_pool_create: %s, %lx, %u, %u, %u, %d\n", name,
>                 (uint64_t) base_addr, (unsigned) size,
>                 (unsigned) buf_size, (unsigned) buf_align,
>                 buf_type);
>
> -       pktmbuf_pool =
> -               rte_mempool_create(name, NB_MBUF,
> -                                  MBUF_SIZE, MAX_PKT_BURST,
> -                                  sizeof(struct rte_pktmbuf_pool_private),
> -                                  rte_pktmbuf_pool_init, NULL,
> -                                  rte_pktmbuf_init, NULL,
> -                                  rte_socket_id(), 0);
> -       if (pktmbuf_pool == NULL) {
> +       switch (buf_type) {
> +       case ODP_BUFFER_TYPE_RAW:
> +               mb_ctor_arg.seg_buf_offset =
> +                       (uint16_t)
> CACHE_LINE_ROUNDUP(sizeof(odp_buffer_hdr_t));
> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
> +               break;
> +       case ODP_BUFFER_TYPE_PACKET:
> +               mb_ctor_arg.seg_buf_offset =
> +                       (uint16_t)
> CACHE_LINE_ROUNDUP(sizeof(odp_packet_hdr_t));
> +               mbp_ctor_arg.seg_buf_size =
> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
> +               break;
> +       case ODP_BUFFER_TYPE_TIMEOUT:
> +               mb_ctor_arg.seg_buf_offset =
> +                       (uint16_t)
> CACHE_LINE_ROUNDUP(sizeof(odp_timeout_hdr_t));
> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
> +               break;
> +       case ODP_BUFFER_TYPE_ANY:
> +               mb_ctor_arg.seg_buf_offset =
> +                       (uint16_t)
> CACHE_LINE_ROUNDUP(sizeof(odp_any_buffer_hdr_t));
> +               mbp_ctor_arg.seg_buf_size =
> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
> +               break;
> +       default:
> +               ODP_ERR("odp_buffer_pool_create: Bad type %i\n", buf_type);
> +               exit(0);
> +               break;
> +       }
> +
> +       mb_ctor_arg.seg_buf_size = mbp_ctor_arg.seg_buf_size;
> +       mb_ctor_arg.buf_type = buf_type;
> +       mb_size = mb_ctor_arg.seg_buf_offset + mb_ctor_arg.seg_buf_size;
> +
> +       pool = rte_mempool_create(name, NB_MBUF,
> +                                 mb_size, MAX_PKT_BURST,
> +                                 sizeof(struct rte_pktmbuf_pool_private),
> +                                 odp_dpdk_mbuf_pool_ctor, &mbp_ctor_arg,
> +                                 odp_dpdk_mbuf_ctor, &mb_ctor_arg,
> +                                 rte_socket_id(), 0);
> +       if (pool == NULL) {
>                 ODP_ERR("Cannot init DPDK mbuf pool\n");
>                 return -1;
>         }
>
> -       return (odp_buffer_pool_t) pktmbuf_pool;
> +       return (odp_buffer_pool_t) pool;
>  }
>
>
> diff --git a/platform/linux-dpdk/odp_packet.c
> b/platform/linux-dpdk/odp_packet.c
> index edfd06d..7afaba6 100644
> --- a/platform/linux-dpdk/odp_packet.c
> +++ b/platform/linux-dpdk/odp_packet.c
> @@ -23,13 +23,13 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t
> *pkt_hdr,
>  void odp_packet_init(odp_packet_t pkt)
>  {
>         odp_packet_hdr_t *const pkt_hdr = odp_packet_hdr(pkt);
> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
> buf_hdr);
> -       uint8_t *start;
> -       size_t len;
> +       struct rte_mbuf *mb;
> +       void *start;
>
> -       start = (uint8_t *)pkt_hdr + start_offset;
> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
> -       memset(start, 0, len);
> +       mb = &pkt_hdr->buf_hdr.mb;
> +
> +       start = mb->buf_addr;
> +       memset(start, 0, mb->buf_len);
>
>         pkt_hdr->l2_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
>         pkt_hdr->l3_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
> @@ -46,18 +46,47 @@ odp_buffer_t odp_buffer_from_packet(odp_packet_t pkt)
>         return (odp_buffer_t)pkt;
>  }
>
> -void odp_packet_set_len(odp_packet_t pkt, size_t len)
> +/* Advance the pkt data pointer and set len in one call */
> +static int odp_packet_set_offset_len(odp_packet_t pkt, size_t
> frame_offset,
> +                                     size_t len)
>  {
> -       /* for rte_pktmbuf */
> -       odp_buffer_hdr_t *buf_hdr =
> odp_buf_to_hdr(odp_buffer_from_packet(pkt));
> -       buf_hdr->pkt.data_len = len;
> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> +       uint16_t offset;
> +       uint16_t data_len;
> +
> +       /* The pkt buf may have been pulled back into the headroom
> +        * so we cannot rely on finding the data right after the
> +        * ODP header and HEADROOM */
> +       offset = (uint16_t)((unsigned long)mb->pkt.data -
> +                           (unsigned long)mb->buf_addr);
> +       ODP_ASSERT(mb->buf_len >= offset, "Corrupted mbuf");
> +       data_len = mb->buf_len - offset;
> +
> +       if (data_len < frame_offset) {
> +               ODP_ERR("Frame offset too big");
> +               return -1;
> +       }
> +       mb->pkt.data = (void*)((char*)mb->pkt.data + frame_offset);
> +       data_len -= frame_offset;
>
> -       odp_packet_hdr(pkt)->frame_len = len;
> +       if (data_len < len) {
> +               ODP_ERR("Packet len too big");
> +               return -1;
> +       }
> +       mb->pkt.pkt_len = len;
> +
> +       return 0;
> +}
> +
> +void odp_packet_set_len(odp_packet_t pkt, size_t len)
> +{
> +       (void)odp_packet_set_offset_len(pkt, 0, len);
>  }
>
>  size_t odp_packet_get_len(odp_packet_t pkt)
>  {
> -       return odp_packet_hdr(pkt)->frame_len;
> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> +       return mb->pkt.pkt_len;
>  }
>
>  uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
> @@ -67,7 +96,8 @@ uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
>
>  uint8_t *odp_packet_start(odp_packet_t pkt)
>  {
> -       return odp_packet_buf_addr(pkt) +
> odp_packet_hdr(pkt)->frame_offset;
> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> +       return mb->pkt.data;
>  }
>
>
> @@ -78,7 +108,7 @@ uint8_t *odp_packet_l2(odp_packet_t pkt)
>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>                 return NULL;
>
> -       return odp_packet_buf_addr(pkt) + offset;
> +       return odp_packet_start(pkt) + offset;
>  }
>
>  size_t odp_packet_l2_offset(odp_packet_t pkt)
> @@ -98,7 +128,7 @@ uint8_t *odp_packet_l3(odp_packet_t pkt)
>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>                 return NULL;
>
> -       return odp_packet_buf_addr(pkt) + offset;
> +       return odp_packet_start(pkt) + offset;
>  }
>
>  size_t odp_packet_l3_offset(odp_packet_t pkt)
> @@ -118,7 +148,7 @@ uint8_t *odp_packet_l4(odp_packet_t pkt)
>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>                 return NULL;
>
> -       return odp_packet_buf_addr(pkt) + offset;
> +       return odp_packet_start(pkt) + offset;
>  }
>
>  size_t odp_packet_l4_offset(odp_packet_t pkt)
> @@ -152,9 +182,13 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> size_t frame_offset)
>         size_t offset = 0;
>         uint8_t ip_proto = 0;
>
> +       /* The frame_offset is not relevant for frames from DPDK */
>         pkt_hdr->input_flags.eth = 1;
> -       pkt_hdr->frame_offset = frame_offset;
> -       pkt_hdr->frame_len = len;
> +       (void) frame_offset;
> +       pkt_hdr->frame_offset = 0;
> +       if (odp_packet_set_offset_len(pkt, 0, len)) {
> +               return;
> +       }
>
>         if (odp_unlikely(len < ODPH_ETH_LEN_MIN)) {
>                 pkt_hdr->error_flags.frame_len = 1;
> @@ -165,7 +199,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> size_t frame_offset)
>
>         /* Assume valid L2 header, no CRC/FCS check in SW */
>         pkt_hdr->input_flags.l2 = 1;
> -       pkt_hdr->l2_offset = frame_offset;
> +       pkt_hdr->l2_offset = 0;
>
>         eth = (odph_ethhdr_t *)odp_packet_start(pkt);
>         ethtype = odp_be_to_cpu_16(eth->type);
> @@ -189,7 +223,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> size_t frame_offset)
>         case ODPH_ETHTYPE_IPV4:
>                 pkt_hdr->input_flags.ipv4 = 1;
>                 pkt_hdr->input_flags.l3 = 1;
> -               pkt_hdr->l3_offset = frame_offset + ODPH_ETHHDR_LEN +
> offset;
> +               pkt_hdr->l3_offset = ODPH_ETHHDR_LEN + offset;
>                 ipv4 = (odph_ipv4hdr_t *)odp_packet_l3(pkt);
>                 ip_proto = parse_ipv4(pkt_hdr, ipv4, &offset);
>                 break;
> @@ -304,6 +338,7 @@ void odp_packet_print(odp_packet_t pkt)
>  {
>         int max_len = 512;
>         char str[max_len];
> +       uint8_t *p;
>         int len = 0;
>         int n = max_len-1;
>         odp_packet_hdr_t *hdr = odp_packet_hdr(pkt);
> @@ -325,50 +360,69 @@ void odp_packet_print(odp_packet_t pkt)
>         len += snprintf(&str[len], n-len,
>                         "  l4_offset    %u\n", hdr->l4_offset);
>         len += snprintf(&str[len], n-len,
> -                       "  frame_len    %u\n", hdr->frame_len);
> +                       "  frame_len    %u\n",
> hdr->buf_hdr.mb.pkt.pkt_len);
>         len += snprintf(&str[len], n-len,
>                         "  input        %u\n", hdr->input);
>         str[len] = '\0';
>
>         printf("\n%s\n", str);
> +       rte_pktmbuf_dump(&hdr->buf_hdr.mb, 32);
> +
> +       p = odp_packet_start(pkt);
> +       printf("00000000: %02X %02X %02X %02X %02X %02X %02X %02X\
> +              %02X %02X %02X %02X %02X %02X %02X %02X\n",
> +              p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7],
> +              p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]);
> +
>  }
>
> +/* For now we can only copy between packets of the same segment size
> + * We should probably refine this API, maybe introduce a clone API */
>  int odp_packet_copy(odp_packet_t pkt_dst, odp_packet_t pkt_src)
>  {
> -       odp_packet_hdr_t *const pkt_hdr_dst = odp_packet_hdr(pkt_dst);
> -       odp_packet_hdr_t *const pkt_hdr_src = odp_packet_hdr(pkt_src);
> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
> buf_hdr);
> -       uint8_t *start_src;
> -       uint8_t *start_dst;
> -       size_t len;
> +       struct rte_mbuf *mb_dst, *mb_src;
> +       uint8_t nb_segs, i;
> +
> +       ODP_ASSERT(odp_buffer_type(pkt_dst) == ODP_BUFFER_TYPE_PACKET &&
> +                  odp_buffer_type(pkt_src) == ODP_BUFFER_TYPE_PACKET,
> +                  "dst_pkt or src_pkt not of type
> ODP_BUFFER_TYPE_PACKET");
>
>         if (pkt_dst == ODP_PACKET_INVALID || pkt_src == ODP_PACKET_INVALID)
>                 return -1;
>
> -       /* if (pkt_hdr_dst->buf_hdr.size < */
> -       /*      pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset) */
> -       if (pkt_hdr_dst->buf_hdr.buf_len <
> -               pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset)
> +       mb_dst = &(odp_packet_hdr(pkt_dst)->buf_hdr.mb);
> +       mb_src = &(odp_packet_hdr(pkt_src)->buf_hdr.mb);
> +
> +       if (mb_dst->pkt.nb_segs != mb_src->pkt.nb_segs) {
> +               ODP_ERR("Different nb_segs in pkt_dst and pkt_src");
>                 return -1;
> +       }
>
> -       /* Copy packet header */
> -       start_dst = (uint8_t *)pkt_hdr_dst + start_offset;
> -       start_src = (uint8_t *)pkt_hdr_src + start_offset;
> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
> -       memcpy(start_dst, start_src, len);
> +       nb_segs = mb_src->pkt.nb_segs;
>
> -       /* Copy frame payload */
> -       start_dst = (uint8_t *)odp_packet_start(pkt_dst);
> -       start_src = (uint8_t *)odp_packet_start(pkt_src);
> -       len = pkt_hdr_src->frame_len;
> -       memcpy(start_dst, start_src, len);
> +       if (mb_dst->buf_len < mb_src->buf_len) {
> +               ODP_ERR("dst_pkt smaller than src_pkt");
> +               return -1;
> +       }
>
> -       /* Copy useful things from the buffer header */
> -       /* pkt_hdr_dst->buf_hdr.cur_offset =
> pkt_hdr_src->buf_hdr.cur_offset; */
> +       for (i = 0; i < nb_segs; i++) {
> +               if (mb_src == NULL || mb_dst == NULL) {
> +                       ODP_ERR("Corrupted packets");
> +                       return -1;
> +               }
> +               memcpy(mb_dst->buf_addr, mb_src->buf_addr,
> mb_src->buf_len);
> +               mb_dst = mb_dst->pkt.next;
> +               mb_src = mb_src->pkt.next;
> +       }
> +       return 0;
> +}
>
> -       /* Create a copy of the scatter list */
> -       /* odp_buffer_copy_scatter(odp_buffer_from_packet(pkt_dst), */
> -       /*                      odp_buffer_from_packet(pkt_src)); */
> +void odp_packet_set_ctx(odp_packet_t pkt, const void *ctx)
> +{
> +       odp_packet_hdr(pkt)->user_ctx = (intptr_t)ctx;
> +}
>
> -       return 0;
> +void *odp_packet_get_ctx(odp_packet_t pkt)
> +{
> +       return (void *)(intptr_t)odp_packet_hdr(pkt)->user_ctx;
>  }
> diff --git a/platform/linux-dpdk/odp_packet_dpdk.c
> b/platform/linux-dpdk/odp_packet_dpdk.c
> index d5c8e80..ea83580 100644
> --- a/platform/linux-dpdk/odp_packet_dpdk.c
> +++ b/platform/linux-dpdk/odp_packet_dpdk.c
> @@ -82,7 +82,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
> char *netdev,
>         static struct ether_addr eth_addr[RTE_MAX_ETHPORTS];
>         static int portinit[RTE_MAX_ETHPORTS];
>         static int qid[RTE_MAX_ETHPORTS];
> -       uint8_t portid = 0, num_intf = 2;
> +       uint8_t portid = 0;
>         uint16_t nbrxq = 0, nbtxq = 0;
>         int ret, i;
>
> @@ -93,7 +93,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
> char *netdev,
>         pkt_dpdk->pool = pool;
>         printf("dpdk portid: %u\n", portid);
>
> -       nbrxq = odp_sys_core_count() / num_intf;
> +       nbrxq = odp_sys_core_count();
>         nbtxq = nbrxq;
>         if (portinit[portid] == 0) {
>                 fflush(stdout);
> @@ -157,17 +157,18 @@ int close_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk)
>  }
>
>  int recv_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, odp_packet_t pkt_table[],
> -               unsigned len)
> +                 unsigned len)
>  {
> -       struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
>         uint16_t nb_rx, i = 0;
>
> -       memset(pkts_burst, 0 , sizeof(pkts_burst));
>         nb_rx = rte_eth_rx_burst((uint8_t)pkt_dpdk->portid,
>                                  (uint16_t)pkt_dpdk->queueid,
> -                                (struct rte_mbuf **)pkts_burst,
> (uint16_t)len);
> -       for (i = 0; i < nb_rx; i++)
> -               pkt_table[i] = (odp_packet_t)pkts_burst[i];
> +                                (struct rte_mbuf **)pkt_table,
> (uint16_t)len);
> +       for (i = 0; i < nb_rx; i++) {
> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt_table[i]);
> +               struct rte_mbuf *mb = &pkt_hdr->buf_hdr.mb;
> +               odp_packet_parse(pkt_table[i], mb->pkt.pkt_len, 0);
> +       }
>         return nb_rx;
>  }
>
> diff --git a/platform/linux-dpdk/odp_queue.c
> b/platform/linux-dpdk/odp_queue.c
> index 554b8ea..29fae8f 100644
> --- a/platform/linux-dpdk/odp_queue.c
> +++ b/platform/linux-dpdk/odp_queue.c
> @@ -239,11 +239,11 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t
> *buf_hdr)
>                 /* Empty queue */
>                 queue->s.head = buf_hdr;
>                 queue->s.tail = buf_hdr;
> -               buf_hdr->pkt.next = NULL;
> +               buf_hdr->next = NULL;
>         } else {
> -               queue->s.tail->pkt.next = buf_hdr;
> +               queue->s.tail->next = buf_hdr;
>                 queue->s.tail = buf_hdr;
> -               buf_hdr->pkt.next = NULL;
> +               buf_hdr->next = NULL;
>         }
>
>         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> @@ -267,17 +267,17 @@ int queue_enq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
>         odp_buffer_hdr_t *tail;
>
>         for (i = 0; i < num - 1; i++)
> -               buf_hdr[i]->pkt.next = buf_hdr[i+1];
> +               buf_hdr[i]->next = buf_hdr[i+1];
>
>         tail = buf_hdr[num-1];
> -       buf_hdr[num-1]->pkt.next = NULL;
> +       buf_hdr[num-1]->next = NULL;
>
>         LOCK(&queue->s.lock);
>         /* Empty queue */
>         if (queue->s.head == NULL)
>                 queue->s.head = buf_hdr[0];
>         else
> -               queue->s.tail->pkt.next = buf_hdr[0];
> +               queue->s.tail->next = buf_hdr[0];
>
>         queue->s.tail = tail;
>
> @@ -338,8 +338,8 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
>                         queue->s.status = QUEUE_STATUS_NOTSCHED;
>         } else {
>                 buf_hdr       = queue->s.head;
> -               queue->s.head = buf_hdr->pkt.next;
> -               buf_hdr->pkt.next = NULL;
> +               queue->s.head = buf_hdr->next;
> +               buf_hdr->next = NULL;
>
>                 if (queue->s.head == NULL) {
>                         /* Queue is now empty */
> @@ -370,8 +370,8 @@ int queue_deq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
>                 for (; i < num && hdr; i++) {
>                         buf_hdr[i]       = hdr;
>                         /* odp_prefetch(hdr->addr); */
> -                       hdr              = hdr->pkt.next;
> -                       buf_hdr[i]->pkt.next = NULL;
> +                       hdr              = hdr->next;
> +                       buf_hdr[i]->next = NULL;
>                 }
>
>                 queue->s.head = hdr;
> diff --git a/platform/linux-dpdk/odp_schedule.c
> b/platform/linux-dpdk/odp_schedule.c
> new file mode 100644
> index 0000000..462b8eb
> --- /dev/null
> +++ b/platform/linux-dpdk/odp_schedule.c
> @@ -0,0 +1,417 @@
> +/* Copyright (c) 2013, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp_schedule.h>
> +#include <odp_schedule_internal.h>
> +#include <odp_align.h>
> +#include <odp_queue.h>
> +#include <odp_shared_memory.h>
> +#include <odp_buffer.h>
> +#include <odp_buffer_pool.h>
> +#include <odp_internal.h>
> +#include <odp_config.h>
> +#include <odp_debug.h>
> +#include <odp_thread.h>
> +#include <odp_time.h>
> +#include <odp_spinlock.h>
> +#include <odp_hints.h>
> +
> +#include <odp_queue_internal.h>
> +
> +
> +/* Limits to number of scheduled queues */
> +#define SCHED_POOL_SIZE (256*1024)
> +
> +/* Scheduler sub queues */
> +#define QUEUES_PER_PRIO  4
> +
> +/* TODO: random or queue based selection */
> +#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
> +
> +/* Maximum number of dequeues */
> +#define MAX_DEQ 4
> +
> +
> +/* Mask of queues per priority */
> +typedef uint8_t pri_mask_t;
> +
> +ODP_STATIC_ASSERT((8*sizeof(pri_mask_t)) >= QUEUES_PER_PRIO,
> "pri_mask_t_is_too_small");
> +
> +
> +typedef struct {
> +       odp_queue_t
>  pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
> +       pri_mask_t        pri_mask[ODP_CONFIG_SCHED_PRIOS];
> +       odp_spinlock_t    mask_lock;
> +       odp_buffer_pool_t pool;
> +} sched_t;
> +
> +typedef struct {
> +       odp_queue_t queue;
> +
> +} queue_desc_t;
> +
> +typedef struct {
> +       odp_queue_t  pri_queue;
> +       odp_buffer_t desc_buf;
> +
> +       odp_buffer_t buf[MAX_DEQ];
> +       int num;
> +       int index;
> +       odp_queue_t queue;
> +       int pause;
> +
> +} sched_local_t;
> +
> +/* Global scheduler context */
> +static sched_t *sched;
> +
> +/* Thread local scheduler context */
> +static __thread sched_local_t sched_local;
> +
> +
> +static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
> +{
> +       int id = SEL_PRI_QUEUE(queue);
> +       return sched->pri_queue[prio][id];
> +}
> +
> +
> +int odp_schedule_init_global(void)
> +{
> +       odp_buffer_pool_t pool;
> +       int i, j;
> +
> +       ODP_DBG("Schedule init ... ");
> +
> +       sched = odp_shm_reserve("odp_scheduler",
> +                               sizeof(sched_t),
> +                               ODP_CACHE_LINE_SIZE);
> +
> +       if (sched == NULL) {
> +               ODP_ERR("Schedule init: Shm reserve failed.\n");
> +               return -1;
> +       }
> +
> +       pool = odp_buffer_pool_create("odp_sched_pool", NULL,
> +                                     SCHED_POOL_SIZE,
> sizeof(queue_desc_t),
> +                                     ODP_CACHE_LINE_SIZE,
> +                                     ODP_BUFFER_TYPE_RAW);
> +
> +       if (pool == ODP_BUFFER_POOL_INVALID) {
> +               ODP_ERR("Schedule init: Pool create failed.\n");
> +               return -1;
> +       }
> +
> +       sched->pool = pool;
> +       odp_spinlock_init(&sched->mask_lock);
> +
> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
> +               odp_queue_t queue;
> +               char name[] = "odp_priXX_YY";
> +
> +               name[7] = '0' + i / 10;
> +               name[8] = '0' + i - 10*(i / 10);
> +
> +               for (j = 0; j < QUEUES_PER_PRIO; j++) {
> +                       name[10] = '0' + j / 10;
> +                       name[11] = '0' + j - 10*(j / 10);
> +
> +                       queue = odp_queue_create(name,
> +                                                ODP_QUEUE_TYPE_POLL,
> NULL);
> +
> +                       if (queue == ODP_QUEUE_INVALID) {
> +                               ODP_ERR("Sched init: Queue create
> failed.\n");
> +                               return -1;
> +                       }
> +
> +                       sched->pri_queue[i][j] = queue;
> +                       sched->pri_mask[i]     = 0;
> +               }
> +       }
> +
> +       ODP_DBG("done\n");
> +
> +       return 0;
> +}
> +
> +
> +int odp_schedule_init_local(void)
> +{
> +       int i;
> +
> +       sched_local.pri_queue = ODP_QUEUE_INVALID;
> +       sched_local.desc_buf  = ODP_BUFFER_INVALID;
> +
> +       for (i = 0; i < MAX_DEQ; i++)
> +               sched_local.buf[i] = ODP_BUFFER_INVALID;
> +
> +       sched_local.num   = 0;
> +       sched_local.index = 0;
> +       sched_local.queue = ODP_QUEUE_INVALID;
> +       sched_local.pause = 0;
> +
> +       return 0;
> +}
> +
> +
> +void odp_schedule_mask_set(odp_queue_t queue, int prio)
> +{
> +       int id = SEL_PRI_QUEUE(queue);
> +
> +       odp_spinlock_lock(&sched->mask_lock);
> +       sched->pri_mask[prio] |= 1 << id;
> +       odp_spinlock_unlock(&sched->mask_lock);
> +}
> +
> +
> +odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
> +{
> +       odp_buffer_t buf;
> +
> +       buf = odp_buffer_alloc(sched->pool);
> +
> +       if (buf != ODP_BUFFER_INVALID) {
> +               queue_desc_t *desc;
> +               desc        = odp_buffer_addr(buf);
> +               desc->queue = queue;
> +       }
> +
> +       return buf;
> +}
> +
> +
> +void odp_schedule_queue(odp_queue_t queue, int prio)
> +{
> +       odp_buffer_t desc_buf;
> +       odp_queue_t  pri_queue;
> +
> +       pri_queue = select_pri_queue(queue, prio);
> +       desc_buf  = queue_sched_buf(queue);
> +
> +       odp_queue_enq(pri_queue, desc_buf);
> +}
> +
> +
> +void odp_schedule_release_atomic(void)
> +{
> +       if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
> +           sched_local.num       == 0) {
> +               /* Release current atomic queue */
> +               odp_queue_enq(sched_local.pri_queue, sched_local.desc_buf);
> +               sched_local.pri_queue = ODP_QUEUE_INVALID;
> +       }
> +}
> +
> +
> +static inline int copy_bufs(odp_buffer_t out_buf[], unsigned int max)
> +{
> +       int i = 0;
> +
> +       while (sched_local.num && max) {
> +               out_buf[i] = sched_local.buf[sched_local.index];
> +               sched_local.index++;
> +               sched_local.num--;
> +               max--;
> +               i++;
> +       }
> +
> +       return i;
> +}
> +
> +
> +/*
> + * Schedule queues
> + *
> + * TODO: SYNC_ORDERED not implemented yet
> + */
> +static int schedule(odp_queue_t *out_queue, odp_buffer_t out_buf[],
> +                   unsigned int max_num, unsigned int max_deq)
> +{
> +       int i, j;
> +       int thr;
> +       int ret;
> +
> +       if (sched_local.num) {
> +               ret = copy_bufs(out_buf, max_num);
> +
> +               if (out_queue)
> +                       *out_queue = sched_local.queue;
> +
> +               return ret;
> +       }
> +
> +       odp_schedule_release_atomic();
> +
> +       if (odp_unlikely(sched_local.pause))
> +               return 0;
> +
> +       thr = odp_thread_id();
> +
> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
> +               int id;
> +
> +               if (sched->pri_mask[i] == 0)
> +                       continue;
> +
> +               id = thr & (QUEUES_PER_PRIO-1);
> +
> +               for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
> +                       odp_queue_t  pri_q;
> +                       odp_buffer_t desc_buf;
> +
> +                       if (id >= QUEUES_PER_PRIO)
> +                               id = 0;
> +
> +                       if (odp_unlikely((sched->pri_mask[i] & (1 << id))
> == 0))
> +                               continue;
> +
> +                       pri_q    = sched->pri_queue[i][id];
> +                       desc_buf = odp_queue_deq(pri_q);
> +
> +                       if (desc_buf != ODP_BUFFER_INVALID) {
> +                               queue_desc_t *desc;
> +                               odp_queue_t queue;
> +                               int num;
> +
> +                               desc  = odp_buffer_addr(desc_buf);
> +                               queue = desc->queue;
> +
> +                               num = odp_queue_deq_multi(queue,
> +                                                         sched_local.buf,
> +                                                         max_deq);
> +
> +                               if (num == 0) {
> +                                       /* Remove empty queue from
> scheduling,
> +                                        * except packet input queues
> +                                        */
> +                                       if (odp_queue_type(queue) ==
> +                                           ODP_QUEUE_TYPE_PKTIN)
> +                                               odp_queue_enq(pri_q,
> desc_buf);
> +
> +                                       continue;
> +                               }
> +
> +                               sched_local.num   = num;
> +                               sched_local.index = 0;
> +                               ret = copy_bufs(out_buf, max_num);
> +
> +                               sched_local.queue = queue;
> +
> +                               if (queue_sched_atomic(queue)) {
> +                                       /* Hold queue during atomic access
> */
> +                                       sched_local.pri_queue = pri_q;
> +                                       sched_local.desc_buf  = desc_buf;
> +                               } else {
> +                                       /* Continue scheduling the queue */
> +                                       odp_queue_enq(pri_q, desc_buf);
> +                               }
> +
> +                               /* Output the source queue handle */
> +                               if (out_queue)
> +                                       *out_queue = queue;
> +
> +                               return ret;
> +                       }
> +               }
> +       }
> +
> +       return 0;
> +}
> +
> +
> +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
> +                         odp_buffer_t out_buf[],
> +                         unsigned int max_num, unsigned int max_deq)
> +{
> +       uint64_t start_cycle, cycle, diff;
> +       int ret;
> +
> +       start_cycle = 0;
> +
> +       while (1) {
> +               ret = schedule(out_queue, out_buf, max_num, max_deq);
> +
> +               if (ret)
> +                       break;
> +
> +               if (wait == ODP_SCHED_WAIT)
> +                       continue;
> +
> +               if (wait == ODP_SCHED_NO_WAIT)
> +                       break;
> +
> +               if (start_cycle == 0) {
> +                       start_cycle = odp_time_get_cycles();
> +                       continue;
> +               }
> +
> +               cycle = odp_time_get_cycles();
> +               diff  = odp_time_diff_cycles(start_cycle, cycle);
> +
> +               if (wait < diff)
> +                       break;
> +       }
> +
> +       return ret;
> +}
> +
> +
> +odp_buffer_t odp_schedule(odp_queue_t *out_queue, uint64_t wait)
> +{
> +       odp_buffer_t buf;
> +
> +       buf = ODP_BUFFER_INVALID;
> +
> +       schedule_loop(out_queue, wait, &buf, 1, MAX_DEQ);
> +
> +       return buf;
> +}
> +
> +
> +odp_buffer_t odp_schedule_one(odp_queue_t *out_queue, uint64_t wait)
> +{
> +       odp_buffer_t buf;
> +
> +       buf = ODP_BUFFER_INVALID;
> +
> +       schedule_loop(out_queue, wait, &buf, 1, 1);
> +
> +       return buf;
> +}
> +
> +
> +int odp_schedule_multi(odp_queue_t *out_queue, uint64_t wait,
> +                      odp_buffer_t out_buf[], unsigned int num)
> +{
> +       return schedule_loop(out_queue, wait, out_buf, num, MAX_DEQ);
> +}
> +
> +
> +void odp_schedule_pause(void)
> +{
> +       sched_local.pause = 1;
> +}
> +
> +
> +void odp_schedule_resume(void)
> +{
> +       sched_local.pause = 0;
> +}
> +
> +
> +uint64_t odp_schedule_wait_time(uint64_t ns)
> +{
> +       if (ns <= ODP_SCHED_NO_WAIT)
> +               ns = ODP_SCHED_NO_WAIT + 1;
> +
> +       return odp_time_ns_to_cycles(ns);
> +}
> +
> +
> +int odp_schedule_num_prio(void)
> +{
> +       return ODP_CONFIG_SCHED_PRIOS;
> +}
>

Reviewed-by: Venkatesh Vivekanandan <venkatesh.vivekanandan@linaro.org>

> --
> 1.8.3.2
>
>
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> http://lists.linaro.org/mailman/listinfo/lng-odp
>
Ciprian Barbu Sept. 24, 2014, 11:16 a.m. UTC | #2
On Wed, Sep 24, 2014 at 1:31 PM, Venkatesh Vivekanandan
<venkatesh.vivekanandan@linaro.org> wrote:
>
>
> On 15 September 2014 20:21, Ciprian Barbu <ciprian.barbu@linaro.org> wrote:
>>
>> Signed-off-by: Ciprian Barbu <ciprian.barbu@linaro.org>
>> ---
>>  platform/linux-dpdk/Makefile.am                    |   2 +-
>>  platform/linux-dpdk/include/api/odp_buffer.h       |   2 +-
>>  platform/linux-dpdk/include/api/odp_buffer_pool.h  |   2 +-
>>  platform/linux-dpdk/include/api/odp_packet.h       |  17 +
>>  platform/linux-dpdk/include/odp_buffer_internal.h  |   8 +-
>>  platform/linux-dpdk/include/odp_packet_internal.h  |   7 +-
>>  .../linux-dpdk/include/odp_packet_io_internal.h    |   5 -
>>  platform/linux-dpdk/odp_buffer.c                   |  26 +-
>>  platform/linux-dpdk/odp_buffer_pool.c              | 150 +++++++-
>>  platform/linux-dpdk/odp_packet.c                   | 148 +++++---
>>  platform/linux-dpdk/odp_packet_dpdk.c              |  17 +-
>>  platform/linux-dpdk/odp_queue.c                    |  20 +-
>>  platform/linux-dpdk/odp_schedule.c                 | 417
>> +++++++++++++++++++++
>>  13 files changed, 716 insertions(+), 105 deletions(-)
>>  create mode 100644 platform/linux-dpdk/odp_schedule.c
>>
>> diff --git a/platform/linux-dpdk/Makefile.am
>> b/platform/linux-dpdk/Makefile.am
>> index 1eabd9f..e128bf8 100644
>> --- a/platform/linux-dpdk/Makefile.am
>> +++ b/platform/linux-dpdk/Makefile.am
>> @@ -79,7 +79,7 @@ __LIB__libodp_la_SOURCES = \
>>                            odp_queue.c \
>>                            ../linux-generic/odp_ring.c \
>>                            ../linux-generic/odp_rwlock.c \
>> -                          ../linux-generic/odp_schedule.c \
>> +                          odp_schedule.c \
>>                            ../linux-generic/odp_shared_memory.c \
>>                            ../linux-generic/odp_spinlock.c \
>>                            ../linux-generic/odp_system_info.c \
>> diff --git a/platform/linux-dpdk/include/api/odp_buffer.h
>> b/platform/linux-dpdk/include/api/odp_buffer.h
>> index 9ea1ed8..b2fbc76 100644
>> --- a/platform/linux-dpdk/include/api/odp_buffer.h
>> +++ b/platform/linux-dpdk/include/api/odp_buffer.h
>> @@ -32,7 +32,7 @@ extern "C" {
>>  typedef unsigned long odp_buffer_t;
>>
>>
>> -#define ODP_BUFFER_INVALID (0xffffffff) /**< Invalid buffer */
>> +#define ODP_BUFFER_INVALID (unsigned long)(-1L) /**< Invalid buffer */
>>
>>
>>  /**
>> diff --git a/platform/linux-dpdk/include/api/odp_buffer_pool.h
>> b/platform/linux-dpdk/include/api/odp_buffer_pool.h
>> index 4b75cf5..382f4f0 100644
>> --- a/platform/linux-dpdk/include/api/odp_buffer_pool.h
>> +++ b/platform/linux-dpdk/include/api/odp_buffer_pool.h
>> @@ -27,7 +27,7 @@ extern "C" {
>>  #define ODP_BUFFER_POOL_NAME_LEN  32
>>
>>  /** Invalid buffer pool */
>> -#define ODP_BUFFER_POOL_INVALID  (0xffffffff)
>> +#define ODP_BUFFER_POOL_INVALID  (unsigned long)(-1L)
>>
>>  /** ODP buffer pool */
>>  typedef unsigned long odp_buffer_pool_t;
>> diff --git a/platform/linux-dpdk/include/api/odp_packet.h
>> b/platform/linux-dpdk/include/api/odp_packet.h
>> index 5545bdc..79503a5 100644
>> --- a/platform/linux-dpdk/include/api/odp_packet.h
>> +++ b/platform/linux-dpdk/include/api/odp_packet.h
>> @@ -80,6 +80,23 @@ void odp_packet_set_len(odp_packet_t pkt, size_t len);
>>  size_t odp_packet_get_len(odp_packet_t pkt);
>>
>>  /**
>> + * Set packet user context
>> + *
>> + * @param buf      Packet handle
>> + * @param ctx      User context
>> + *
>> + */
>> +void odp_packet_set_ctx(odp_packet_t buf, const void *ctx);
>> +
>> +/**
>> + * Get packet user context
>> + *
>> + * @param buf      Packet handle
>> + *
>> + * @return User context
>> + */
>> +void *odp_packet_get_ctx(odp_packet_t buf);
>> +/**
>>   * Get address to the start of the packet buffer
>>   *
>>   * The address of the packet buffer is not necessarily the same as the
>> start
>> diff --git a/platform/linux-dpdk/include/odp_buffer_internal.h
>> b/platform/linux-dpdk/include/odp_buffer_internal.h
>> index f87ec80..5406606 100644
>> --- a/platform/linux-dpdk/include/odp_buffer_internal.h
>> +++ b/platform/linux-dpdk/include/odp_buffer_internal.h
>> @@ -59,8 +59,12 @@ typedef union odp_buffer_bits_t {
>>  struct odp_buffer_hdr_t;
>>
>>
>> -typedef struct rte_mbuf odp_buffer_hdr_t;
>> -
>> +typedef struct odp_buffer_hdr_t {
>> +       struct rte_mbuf mb;            /* Underlying DPDK rte_mbuf */
>> +       struct odp_buffer_hdr_t *next; /* Next buf in a list */
>> +       int type;                      /* ODP buffer type; not DPDK buf
>> type */
>> +       uint32_t index;                /* Index in the rte_mempool */
>> +} odp_buffer_hdr_t;
>>
>>  int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf);
>>
>> diff --git a/platform/linux-dpdk/include/odp_packet_internal.h
>> b/platform/linux-dpdk/include/odp_packet_internal.h
>> index 9357f90..d7f505b 100644
>> --- a/platform/linux-dpdk/include/odp_packet_internal.h
>> +++ b/platform/linux-dpdk/include/odp_packet_internal.h
>> @@ -113,13 +113,8 @@ typedef struct {
>>         uint32_t l3_offset; /**< offset to L3 hdr, e.g. IPv4, IPv6 */
>>         uint32_t l4_offset; /**< offset to L4 hdr (TCP, UDP, SCTP, also
>> ICMP) */
>>
>> -       uint32_t frame_len;
>> -
>>         odp_pktio_t input;
>> -
>> -       uint32_t pad;
>> -       uint8_t payload[];
>> -
>> +       uint64_t user_ctx;  /**< user context */
>>  } odp_packet_hdr_t;
>>
>>  /**
>> diff --git a/platform/linux-dpdk/include/odp_packet_io_internal.h
>> b/platform/linux-dpdk/include/odp_packet_io_internal.h
>> index 08abea7..9263349 100644
>> --- a/platform/linux-dpdk/include/odp_packet_io_internal.h
>> +++ b/platform/linux-dpdk/include/odp_packet_io_internal.h
>> @@ -31,11 +31,6 @@ struct pktio_entry {
>>         odp_queue_t inq_default;        /**< default input queue, if set
>> */
>>         odp_queue_t outq_default;       /**< default out queue */
>>         odp_pktio_params_t params;      /**< pktio parameters */
>> -       pkt_sock_t pkt_sock;            /**< using socket API for IO */
>> -       pkt_sock_mmap_t pkt_sock_mmap;  /**< using socket mmap API for IO
>> */
>> -#ifdef ODP_HAVE_NETMAP
>> -       pkt_netmap_t pkt_nm;            /**< using netmap API for IO */
>> -#endif
>>         pkt_dpdk_t pkt_dpdk;            /**< using DPDK API for IO */
>>  };
>>
>> diff --git a/platform/linux-dpdk/odp_buffer.c
>> b/platform/linux-dpdk/odp_buffer.c
>> index e2f8942..e2657e4 100644
>> --- a/platform/linux-dpdk/odp_buffer.c
>> +++ b/platform/linux-dpdk/odp_buffer.c
>> @@ -16,7 +16,7 @@ void *odp_buffer_addr(odp_buffer_t buf)
>>  {
>>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
>>
>> -       return hdr->buf_addr;
>> +       return hdr->mb.buf_addr;
>>  }
>>
>>
>> @@ -24,7 +24,7 @@ size_t odp_buffer_size(odp_buffer_t buf)
>>  {
>>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
>>
>> -       return hdr->buf_len;
>> +       return hdr->mb.buf_len;
>>  }
>>
>>
>> @@ -38,11 +38,9 @@ int odp_buffer_type(odp_buffer_t buf)
>>
>>  int odp_buffer_is_valid(odp_buffer_t buf)
>>  {
>> -       odp_buffer_bits_t handle;
>> -
>> -       handle.u32 = buf;
>> -
>> -       return (handle.index != ODP_BUFFER_INVALID_INDEX);
>> +       /* We could call rte_mbuf_sanity_check, but that panics
>> +        * and aborts the program */
>> +       return (void*)buf != NULL;
>>  }
>>
>>
>> @@ -61,17 +59,19 @@ int odp_buffer_snprint(char *str, size_t n,
>> odp_buffer_t buf)
>>         len += snprintf(&str[len], n-len,
>>                         "Buffer\n");
>>         len += snprintf(&str[len], n-len,
>> -                       "  pool         %"PRIu64"\n", (int64_t)
>> hdr->pool);
>> +                       "  pool         %"PRIu64"\n", (int64_t)
>> hdr->mb.pool);
>> +       len += snprintf(&str[len], n-len,
>> +                       "  phy_addr     %"PRIu64"\n",
>> hdr->mb.buf_physaddr);
>>         len += snprintf(&str[len], n-len,
>> -                       "  phy_addr     %"PRIu64"\n", hdr->buf_physaddr);
>> +                       "  addr         %p\n",        hdr->mb.buf_addr);
>>         len += snprintf(&str[len], n-len,
>> -                       "  addr         %p\n",        hdr->buf_addr);
>> +                       "  size         %u\n",        hdr->mb.buf_len);
>>         len += snprintf(&str[len], n-len,
>> -                       "  size         %u\n",        hdr->buf_len);
>> +                       "  ref_count    %i\n",        hdr->mb.refcnt);
>>         len += snprintf(&str[len], n-len,
>> -                       "  ref_count    %i\n",        hdr->refcnt);
>> +                       "  dpdk type    %i\n",        hdr->mb.type);
>>         len += snprintf(&str[len], n-len,
>> -                       "  type         %i\n",        hdr->type);
>> +                       "  odp type     %i\n",        hdr->type);
>>
>>         return len;
>>  }
>> diff --git a/platform/linux-dpdk/odp_buffer_pool.c
>> b/platform/linux-dpdk/odp_buffer_pool.c
>> index 805ce68..f044b5d 100644
>> --- a/platform/linux-dpdk/odp_buffer_pool.c
>> +++ b/platform/linux-dpdk/odp_buffer_pool.c
>> @@ -9,6 +9,7 @@
>>  #include <odp_buffer_pool_internal.h>
>>  #include <odp_buffer_internal.h>
>>  #include <odp_packet_internal.h>
>> +#include <odp_timer_internal.h>
>>  #include <odp_shared_memory.h>
>>  #include <odp_align.h>
>>  #include <odp_internal.h>
>> @@ -44,6 +45,13 @@
>>
>>  #define NULL_INDEX ((uint32_t)-1)
>>
>> +union buffer_type_any_u {
>> +       odp_buffer_hdr_t  buf;
>> +       odp_packet_hdr_t  pkt;
>> +       odp_timeout_hdr_t tmo;
>> +};
>> +
>> +typedef union buffer_type_any_u odp_any_buffer_hdr_t;
>>
>>  typedef union pool_entry_u {
>>         struct pool_entry_s s;
>> @@ -59,7 +67,7 @@ typedef struct pool_table_t {
>>  } pool_table_t;
>>
>>
>> -/* The pool table */
>> +/* The pool table ptr - resides in shared memory */
>>  static pool_table_t *pool_tbl;
>>
>>  /* Pool entry pointers (for inlining) */
>> @@ -98,31 +106,151 @@ int odp_buffer_pool_init_global(void)
>>         return 0;
>>  }
>>
>> +struct mbuf_ctor_arg {
>> +       uint16_t seg_buf_offset; /* To skip the ODP buf/pkt/tmo header */
>> +       uint16_t seg_buf_size;   /* total sz: offset + user sz + HDROOM */
>> +       int buf_type;
>> +};
>> +
>> +struct mbuf_pool_ctor_arg {
>> +       uint16_t seg_buf_size; /* size of mbuf: user specified sz + HDROOM
>> */
>> +};
>> +
>> +static void
>> +odp_dpdk_mbuf_pool_ctor(struct rte_mempool *mp,
>> +                       void *opaque_arg)
>> +{
>> +       struct mbuf_pool_ctor_arg      *mbp_ctor_arg;
>> +       struct rte_pktmbuf_pool_private *mbp_priv;
>> +
>> +       if (mp->private_data_size < sizeof(struct
>> rte_pktmbuf_pool_private)) {
>> +               ODP_ERR("%s(%s) private_data_size %d < %d",
>> +                       __func__, mp->name, (int) mp->private_data_size,
>> +                       (int) sizeof(struct rte_pktmbuf_pool_private));
>> +               return;
>> +       }
>> +       mbp_ctor_arg = (struct mbuf_pool_ctor_arg *) opaque_arg;
>> +       mbp_priv = rte_mempool_get_priv(mp);
>> +       mbp_priv->mbuf_data_room_size = mbp_ctor_arg->seg_buf_size;
>> +}
>> +
>> +/* ODP DPDK mbuf constructor.
>> + * This is a combination of rte_pktmbuf_init in rte_mbuf.c
>> + * and testpmd_mbuf_ctor in testpmd.c
>> + */
>> +static void
>> +odp_dpdk_mbuf_ctor(struct rte_mempool *mp,
>> +                  void *opaque_arg,
>> +                  void *raw_mbuf,
>> +                  unsigned i)
>> +{
>> +       struct mbuf_ctor_arg *mb_ctor_arg;
>> +       struct rte_mbuf *mb = raw_mbuf;
>> +       struct odp_buffer_hdr_t *buf_hdr;
>> +
>> +       /* The rte_mbuf is at the begninning in all cases */
>> +       mb_ctor_arg = (struct mbuf_ctor_arg *) opaque_arg;
>> +       mb = (struct rte_mbuf *) raw_mbuf;
>> +
>> +       RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf));
>> +
>> +       memset(mb, 0, mp->elt_size);
>> +
>> +       /* Start of buffer is just after the ODP type specific header
>> +        * which contains in the very beginning the rte_mbuf struct */
>> +       mb->buf_addr     = (char *)mb + mb_ctor_arg->seg_buf_offset;
>> +       mb->buf_physaddr = rte_mempool_virt2phy(mp, mb) +
>> +                       mb_ctor_arg->seg_buf_offset;
>> +       mb->buf_len      = mb_ctor_arg->seg_buf_size;
>> +
>> +       /* keep some headroom between start of buffer and data */
>> +       if (mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_PACKET ||
>> +           mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_ANY)
>> +               mb->pkt.data = (char *) mb->buf_addr +
>> RTE_PKTMBUF_HEADROOM;
>> +       else
>> +               mb->pkt.data = mb->buf_addr;
>> +
>> +       /* init some constant fields */
>> +       mb->type         = RTE_MBUF_PKT;
>> +       mb->pool         = mp;
>> +       mb->pkt.nb_segs  = 1;
>> +       mb->pkt.in_port  = 0xff;
>> +       mb->ol_flags     = 0;
>> +       mb->pkt.vlan_macip.data = 0;
>> +       mb->pkt.hash.rss = 0;
>> +
>> +       /* Save index, might be useful for debugging purposes */
>> +       buf_hdr = (struct odp_buffer_hdr_t*) raw_mbuf;
>> +       buf_hdr->index = i;
>> +}
>>
>>  odp_buffer_pool_t odp_buffer_pool_create(const char *name,
>>                                          void *base_addr, uint64_t size,
>>                                          size_t buf_size, size_t
>> buf_align,
>>                                          int buf_type)
>>  {
>> -       struct rte_mempool *pktmbuf_pool = NULL;
>> +       struct rte_mempool *pool = NULL;
>> +       struct mbuf_pool_ctor_arg mbp_ctor_arg;
>> +       struct mbuf_ctor_arg mb_ctor_arg;
>> +       unsigned mb_size;
>> +
>> +       /* Not used for rte_mempool; the new ODP buffer management
>> introduces
>> +        * rte_mempool_create_from_region where base_addr makes sense */
>> +       (void)base_addr;
>> +
>> +       /* buf_align will be removed soon, no need to wory about it */
>> +       (void)buf_align;
>> +
>>         ODP_DBG("odp_buffer_pool_create: %s, %lx, %u, %u, %u, %d\n", name,
>>                 (uint64_t) base_addr, (unsigned) size,
>>                 (unsigned) buf_size, (unsigned) buf_align,
>>                 buf_type);
>>
>> -       pktmbuf_pool =
>> -               rte_mempool_create(name, NB_MBUF,
>> -                                  MBUF_SIZE, MAX_PKT_BURST,
>> -                                  sizeof(struct
>> rte_pktmbuf_pool_private),
>> -                                  rte_pktmbuf_pool_init, NULL,
>> -                                  rte_pktmbuf_init, NULL,
>> -                                  rte_socket_id(), 0);
>> -       if (pktmbuf_pool == NULL) {
>> +       switch (buf_type) {
>> +       case ODP_BUFFER_TYPE_RAW:
>> +               mb_ctor_arg.seg_buf_offset =
>> +                       (uint16_t)
>> CACHE_LINE_ROUNDUP(sizeof(odp_buffer_hdr_t));
>> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
>> +               break;
>> +       case ODP_BUFFER_TYPE_PACKET:
>> +               mb_ctor_arg.seg_buf_offset =
>> +                       (uint16_t)
>> CACHE_LINE_ROUNDUP(sizeof(odp_packet_hdr_t));
>> +               mbp_ctor_arg.seg_buf_size =
>> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
>> +               break;
>> +       case ODP_BUFFER_TYPE_TIMEOUT:
>> +               mb_ctor_arg.seg_buf_offset =
>> +                       (uint16_t)
>> CACHE_LINE_ROUNDUP(sizeof(odp_timeout_hdr_t));
>> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
>> +               break;
>> +       case ODP_BUFFER_TYPE_ANY:
>> +               mb_ctor_arg.seg_buf_offset =
>> +                       (uint16_t)
>> CACHE_LINE_ROUNDUP(sizeof(odp_any_buffer_hdr_t));
>> +               mbp_ctor_arg.seg_buf_size =
>> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
>> +               break;
>> +       default:
>> +               ODP_ERR("odp_buffer_pool_create: Bad type %i\n",
>> buf_type);
>> +               exit(0);
>> +               break;
>> +       }
>> +
>> +       mb_ctor_arg.seg_buf_size = mbp_ctor_arg.seg_buf_size;
>> +       mb_ctor_arg.buf_type = buf_type;
>> +       mb_size = mb_ctor_arg.seg_buf_offset + mb_ctor_arg.seg_buf_size;
>> +
>> +       pool = rte_mempool_create(name, NB_MBUF,
>> +                                 mb_size, MAX_PKT_BURST,
>> +                                 sizeof(struct rte_pktmbuf_pool_private),
>> +                                 odp_dpdk_mbuf_pool_ctor, &mbp_ctor_arg,
>> +                                 odp_dpdk_mbuf_ctor, &mb_ctor_arg,
>> +                                 rte_socket_id(), 0);
>> +       if (pool == NULL) {
>>                 ODP_ERR("Cannot init DPDK mbuf pool\n");
>>                 return -1;
>>         }
>>
>> -       return (odp_buffer_pool_t) pktmbuf_pool;
>> +       return (odp_buffer_pool_t) pool;
>>  }
>>
>>
>> diff --git a/platform/linux-dpdk/odp_packet.c
>> b/platform/linux-dpdk/odp_packet.c
>> index edfd06d..7afaba6 100644
>> --- a/platform/linux-dpdk/odp_packet.c
>> +++ b/platform/linux-dpdk/odp_packet.c
>> @@ -23,13 +23,13 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t
>> *pkt_hdr,
>>  void odp_packet_init(odp_packet_t pkt)
>>  {
>>         odp_packet_hdr_t *const pkt_hdr = odp_packet_hdr(pkt);
>> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
>> buf_hdr);
>> -       uint8_t *start;
>> -       size_t len;
>> +       struct rte_mbuf *mb;
>> +       void *start;
>>
>> -       start = (uint8_t *)pkt_hdr + start_offset;
>> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
>> -       memset(start, 0, len);
>> +       mb = &pkt_hdr->buf_hdr.mb;
>> +
>> +       start = mb->buf_addr;
>> +       memset(start, 0, mb->buf_len);
>>
>>         pkt_hdr->l2_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
>>         pkt_hdr->l3_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
>> @@ -46,18 +46,47 @@ odp_buffer_t odp_buffer_from_packet(odp_packet_t pkt)
>>         return (odp_buffer_t)pkt;
>>  }
>>
>> -void odp_packet_set_len(odp_packet_t pkt, size_t len)
>> +/* Advance the pkt data pointer and set len in one call */
>> +static int odp_packet_set_offset_len(odp_packet_t pkt, size_t
>> frame_offset,
>> +                                     size_t len)
>>  {
>> -       /* for rte_pktmbuf */
>> -       odp_buffer_hdr_t *buf_hdr =
>> odp_buf_to_hdr(odp_buffer_from_packet(pkt));
>> -       buf_hdr->pkt.data_len = len;
>> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
>> +       uint16_t offset;
>> +       uint16_t data_len;
>> +
>> +       /* The pkt buf may have been pulled back into the headroom
>> +        * so we cannot rely on finding the data right after the
>> +        * ODP header and HEADROOM */
>> +       offset = (uint16_t)((unsigned long)mb->pkt.data -
>> +                           (unsigned long)mb->buf_addr);
>> +       ODP_ASSERT(mb->buf_len >= offset, "Corrupted mbuf");
>> +       data_len = mb->buf_len - offset;
>> +
>> +       if (data_len < frame_offset) {
>> +               ODP_ERR("Frame offset too big");
>> +               return -1;
>> +       }
>> +       mb->pkt.data = (void*)((char*)mb->pkt.data + frame_offset);
>> +       data_len -= frame_offset;
>>
>> -       odp_packet_hdr(pkt)->frame_len = len;
>> +       if (data_len < len) {
>> +               ODP_ERR("Packet len too big");
>> +               return -1;
>> +       }
>> +       mb->pkt.pkt_len = len;
>> +
>> +       return 0;
>> +}
>> +
>> +void odp_packet_set_len(odp_packet_t pkt, size_t len)
>> +{
>> +       (void)odp_packet_set_offset_len(pkt, 0, len);
>>  }
>>
>>  size_t odp_packet_get_len(odp_packet_t pkt)
>>  {
>> -       return odp_packet_hdr(pkt)->frame_len;
>> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
>> +       return mb->pkt.pkt_len;
>>  }
>>
>>  uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
>> @@ -67,7 +96,8 @@ uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
>>
>>  uint8_t *odp_packet_start(odp_packet_t pkt)
>>  {
>> -       return odp_packet_buf_addr(pkt) +
>> odp_packet_hdr(pkt)->frame_offset;
>> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
>> +       return mb->pkt.data;
>>  }
>>
>>
>> @@ -78,7 +108,7 @@ uint8_t *odp_packet_l2(odp_packet_t pkt)
>>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>>                 return NULL;
>>
>> -       return odp_packet_buf_addr(pkt) + offset;
>> +       return odp_packet_start(pkt) + offset;
>>  }
>>
>>  size_t odp_packet_l2_offset(odp_packet_t pkt)
>> @@ -98,7 +128,7 @@ uint8_t *odp_packet_l3(odp_packet_t pkt)
>>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>>                 return NULL;
>>
>> -       return odp_packet_buf_addr(pkt) + offset;
>> +       return odp_packet_start(pkt) + offset;
>>  }
>>
>>  size_t odp_packet_l3_offset(odp_packet_t pkt)
>> @@ -118,7 +148,7 @@ uint8_t *odp_packet_l4(odp_packet_t pkt)
>>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
>>                 return NULL;
>>
>> -       return odp_packet_buf_addr(pkt) + offset;
>> +       return odp_packet_start(pkt) + offset;
>>  }
>>
>>  size_t odp_packet_l4_offset(odp_packet_t pkt)
>> @@ -152,9 +182,13 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
>> size_t frame_offset)
>>         size_t offset = 0;
>>         uint8_t ip_proto = 0;
>>
>> +       /* The frame_offset is not relevant for frames from DPDK */
>>         pkt_hdr->input_flags.eth = 1;
>> -       pkt_hdr->frame_offset = frame_offset;
>> -       pkt_hdr->frame_len = len;
>> +       (void) frame_offset;
>> +       pkt_hdr->frame_offset = 0;
>> +       if (odp_packet_set_offset_len(pkt, 0, len)) {
>> +               return;
>> +       }
>>
>>         if (odp_unlikely(len < ODPH_ETH_LEN_MIN)) {
>>                 pkt_hdr->error_flags.frame_len = 1;
>> @@ -165,7 +199,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
>> size_t frame_offset)
>>
>>         /* Assume valid L2 header, no CRC/FCS check in SW */
>>         pkt_hdr->input_flags.l2 = 1;
>> -       pkt_hdr->l2_offset = frame_offset;
>> +       pkt_hdr->l2_offset = 0;
>>
>>         eth = (odph_ethhdr_t *)odp_packet_start(pkt);
>>         ethtype = odp_be_to_cpu_16(eth->type);
>> @@ -189,7 +223,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
>> size_t frame_offset)
>>         case ODPH_ETHTYPE_IPV4:
>>                 pkt_hdr->input_flags.ipv4 = 1;
>>                 pkt_hdr->input_flags.l3 = 1;
>> -               pkt_hdr->l3_offset = frame_offset + ODPH_ETHHDR_LEN +
>> offset;
>> +               pkt_hdr->l3_offset = ODPH_ETHHDR_LEN + offset;
>>                 ipv4 = (odph_ipv4hdr_t *)odp_packet_l3(pkt);
>>                 ip_proto = parse_ipv4(pkt_hdr, ipv4, &offset);
>>                 break;
>> @@ -304,6 +338,7 @@ void odp_packet_print(odp_packet_t pkt)
>>  {
>>         int max_len = 512;
>>         char str[max_len];
>> +       uint8_t *p;
>>         int len = 0;
>>         int n = max_len-1;
>>         odp_packet_hdr_t *hdr = odp_packet_hdr(pkt);
>> @@ -325,50 +360,69 @@ void odp_packet_print(odp_packet_t pkt)
>>         len += snprintf(&str[len], n-len,
>>                         "  l4_offset    %u\n", hdr->l4_offset);
>>         len += snprintf(&str[len], n-len,
>> -                       "  frame_len    %u\n", hdr->frame_len);
>> +                       "  frame_len    %u\n",
>> hdr->buf_hdr.mb.pkt.pkt_len);
>>         len += snprintf(&str[len], n-len,
>>                         "  input        %u\n", hdr->input);
>>         str[len] = '\0';
>>
>>         printf("\n%s\n", str);
>> +       rte_pktmbuf_dump(&hdr->buf_hdr.mb, 32);
>> +
>> +       p = odp_packet_start(pkt);
>> +       printf("00000000: %02X %02X %02X %02X %02X %02X %02X %02X\
>> +              %02X %02X %02X %02X %02X %02X %02X %02X\n",
>> +              p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7],
>> +              p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]);
>> +
>>  }
>>
>> +/* For now we can only copy between packets of the same segment size
>> + * We should probably refine this API, maybe introduce a clone API */
>>  int odp_packet_copy(odp_packet_t pkt_dst, odp_packet_t pkt_src)
>>  {
>> -       odp_packet_hdr_t *const pkt_hdr_dst = odp_packet_hdr(pkt_dst);
>> -       odp_packet_hdr_t *const pkt_hdr_src = odp_packet_hdr(pkt_src);
>> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
>> buf_hdr);
>> -       uint8_t *start_src;
>> -       uint8_t *start_dst;
>> -       size_t len;
>> +       struct rte_mbuf *mb_dst, *mb_src;
>> +       uint8_t nb_segs, i;
>> +
>> +       ODP_ASSERT(odp_buffer_type(pkt_dst) == ODP_BUFFER_TYPE_PACKET &&
>> +                  odp_buffer_type(pkt_src) == ODP_BUFFER_TYPE_PACKET,
>> +                  "dst_pkt or src_pkt not of type
>> ODP_BUFFER_TYPE_PACKET");
>>
>>         if (pkt_dst == ODP_PACKET_INVALID || pkt_src ==
>> ODP_PACKET_INVALID)
>>                 return -1;
>>
>> -       /* if (pkt_hdr_dst->buf_hdr.size < */
>> -       /*      pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset) */
>> -       if (pkt_hdr_dst->buf_hdr.buf_len <
>> -               pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset)
>> +       mb_dst = &(odp_packet_hdr(pkt_dst)->buf_hdr.mb);
>> +       mb_src = &(odp_packet_hdr(pkt_src)->buf_hdr.mb);
>> +
>> +       if (mb_dst->pkt.nb_segs != mb_src->pkt.nb_segs) {
>> +               ODP_ERR("Different nb_segs in pkt_dst and pkt_src");
>>                 return -1;
>> +       }
>>
>> -       /* Copy packet header */
>> -       start_dst = (uint8_t *)pkt_hdr_dst + start_offset;
>> -       start_src = (uint8_t *)pkt_hdr_src + start_offset;
>> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
>> -       memcpy(start_dst, start_src, len);
>> +       nb_segs = mb_src->pkt.nb_segs;
>>
>> -       /* Copy frame payload */
>> -       start_dst = (uint8_t *)odp_packet_start(pkt_dst);
>> -       start_src = (uint8_t *)odp_packet_start(pkt_src);
>> -       len = pkt_hdr_src->frame_len;
>> -       memcpy(start_dst, start_src, len);
>> +       if (mb_dst->buf_len < mb_src->buf_len) {
>> +               ODP_ERR("dst_pkt smaller than src_pkt");
>> +               return -1;
>> +       }
>>
>> -       /* Copy useful things from the buffer header */
>> -       /* pkt_hdr_dst->buf_hdr.cur_offset =
>> pkt_hdr_src->buf_hdr.cur_offset; */
>> +       for (i = 0; i < nb_segs; i++) {
>> +               if (mb_src == NULL || mb_dst == NULL) {
>> +                       ODP_ERR("Corrupted packets");
>> +                       return -1;
>> +               }
>> +               memcpy(mb_dst->buf_addr, mb_src->buf_addr,
>> mb_src->buf_len);
>> +               mb_dst = mb_dst->pkt.next;
>> +               mb_src = mb_src->pkt.next;
>> +       }
>> +       return 0;
>> +}
>>
>> -       /* Create a copy of the scatter list */
>> -       /* odp_buffer_copy_scatter(odp_buffer_from_packet(pkt_dst), */
>> -       /*                      odp_buffer_from_packet(pkt_src)); */
>> +void odp_packet_set_ctx(odp_packet_t pkt, const void *ctx)
>> +{
>> +       odp_packet_hdr(pkt)->user_ctx = (intptr_t)ctx;
>> +}
>>
>> -       return 0;
>> +void *odp_packet_get_ctx(odp_packet_t pkt)
>> +{
>> +       return (void *)(intptr_t)odp_packet_hdr(pkt)->user_ctx;
>>  }
>> diff --git a/platform/linux-dpdk/odp_packet_dpdk.c
>> b/platform/linux-dpdk/odp_packet_dpdk.c
>> index d5c8e80..ea83580 100644
>> --- a/platform/linux-dpdk/odp_packet_dpdk.c
>> +++ b/platform/linux-dpdk/odp_packet_dpdk.c
>> @@ -82,7 +82,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
>> char *netdev,
>>         static struct ether_addr eth_addr[RTE_MAX_ETHPORTS];
>>         static int portinit[RTE_MAX_ETHPORTS];
>>         static int qid[RTE_MAX_ETHPORTS];
>> -       uint8_t portid = 0, num_intf = 2;
>> +       uint8_t portid = 0;
>>         uint16_t nbrxq = 0, nbtxq = 0;
>>         int ret, i;
>>
>> @@ -93,7 +93,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
>> char *netdev,
>>         pkt_dpdk->pool = pool;
>>         printf("dpdk portid: %u\n", portid);
>>
>> -       nbrxq = odp_sys_core_count() / num_intf;
>> +       nbrxq = odp_sys_core_count();

The change here might cause l2fwd to not work anymore, I didn't get a
chance to test it last week. Have you had a chance to verify it?

>>         nbtxq = nbrxq;
>>         if (portinit[portid] == 0) {
>>                 fflush(stdout);
>> @@ -157,17 +157,18 @@ int close_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk)
>>  }
>>
>>  int recv_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, odp_packet_t pkt_table[],
>> -               unsigned len)
>> +                 unsigned len)
>>  {
>> -       struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
>>         uint16_t nb_rx, i = 0;
>>
>> -       memset(pkts_burst, 0 , sizeof(pkts_burst));
>>         nb_rx = rte_eth_rx_burst((uint8_t)pkt_dpdk->portid,
>>                                  (uint16_t)pkt_dpdk->queueid,
>> -                                (struct rte_mbuf **)pkts_burst,
>> (uint16_t)len);
>> -       for (i = 0; i < nb_rx; i++)
>> -               pkt_table[i] = (odp_packet_t)pkts_burst[i];
>> +                                (struct rte_mbuf **)pkt_table,
>> (uint16_t)len);
>> +       for (i = 0; i < nb_rx; i++) {
>> +               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt_table[i]);
>> +               struct rte_mbuf *mb = &pkt_hdr->buf_hdr.mb;
>> +               odp_packet_parse(pkt_table[i], mb->pkt.pkt_len, 0);
>> +       }
>>         return nb_rx;
>>  }
>>
>> diff --git a/platform/linux-dpdk/odp_queue.c
>> b/platform/linux-dpdk/odp_queue.c
>> index 554b8ea..29fae8f 100644
>> --- a/platform/linux-dpdk/odp_queue.c
>> +++ b/platform/linux-dpdk/odp_queue.c
>> @@ -239,11 +239,11 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t
>> *buf_hdr)
>>                 /* Empty queue */
>>                 queue->s.head = buf_hdr;
>>                 queue->s.tail = buf_hdr;
>> -               buf_hdr->pkt.next = NULL;
>> +               buf_hdr->next = NULL;
>>         } else {
>> -               queue->s.tail->pkt.next = buf_hdr;
>> +               queue->s.tail->next = buf_hdr;
>>                 queue->s.tail = buf_hdr;
>> -               buf_hdr->pkt.next = NULL;
>> +               buf_hdr->next = NULL;
>>         }
>>
>>         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>> @@ -267,17 +267,17 @@ int queue_enq_multi(queue_entry_t *queue,
>> odp_buffer_hdr_t *buf_hdr[], int num)
>>         odp_buffer_hdr_t *tail;
>>
>>         for (i = 0; i < num - 1; i++)
>> -               buf_hdr[i]->pkt.next = buf_hdr[i+1];
>> +               buf_hdr[i]->next = buf_hdr[i+1];
>>
>>         tail = buf_hdr[num-1];
>> -       buf_hdr[num-1]->pkt.next = NULL;
>> +       buf_hdr[num-1]->next = NULL;
>>
>>         LOCK(&queue->s.lock);
>>         /* Empty queue */
>>         if (queue->s.head == NULL)
>>                 queue->s.head = buf_hdr[0];
>>         else
>> -               queue->s.tail->pkt.next = buf_hdr[0];
>> +               queue->s.tail->next = buf_hdr[0];
>>
>>         queue->s.tail = tail;
>>
>> @@ -338,8 +338,8 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
>>                         queue->s.status = QUEUE_STATUS_NOTSCHED;
>>         } else {
>>                 buf_hdr       = queue->s.head;
>> -               queue->s.head = buf_hdr->pkt.next;
>> -               buf_hdr->pkt.next = NULL;
>> +               queue->s.head = buf_hdr->next;
>> +               buf_hdr->next = NULL;
>>
>>                 if (queue->s.head == NULL) {
>>                         /* Queue is now empty */
>> @@ -370,8 +370,8 @@ int queue_deq_multi(queue_entry_t *queue,
>> odp_buffer_hdr_t *buf_hdr[], int num)
>>                 for (; i < num && hdr; i++) {
>>                         buf_hdr[i]       = hdr;
>>                         /* odp_prefetch(hdr->addr); */
>> -                       hdr              = hdr->pkt.next;
>> -                       buf_hdr[i]->pkt.next = NULL;
>> +                       hdr              = hdr->next;
>> +                       buf_hdr[i]->next = NULL;
>>                 }
>>
>>                 queue->s.head = hdr;
>> diff --git a/platform/linux-dpdk/odp_schedule.c
>> b/platform/linux-dpdk/odp_schedule.c
>> new file mode 100644
>> index 0000000..462b8eb
>> --- /dev/null
>> +++ b/platform/linux-dpdk/odp_schedule.c
>> @@ -0,0 +1,417 @@
>> +/* Copyright (c) 2013, Linaro Limited
>> + * All rights reserved.
>> + *
>> + * SPDX-License-Identifier:     BSD-3-Clause
>> + */
>> +
>> +#include <odp_schedule.h>
>> +#include <odp_schedule_internal.h>
>> +#include <odp_align.h>
>> +#include <odp_queue.h>
>> +#include <odp_shared_memory.h>
>> +#include <odp_buffer.h>
>> +#include <odp_buffer_pool.h>
>> +#include <odp_internal.h>
>> +#include <odp_config.h>
>> +#include <odp_debug.h>
>> +#include <odp_thread.h>
>> +#include <odp_time.h>
>> +#include <odp_spinlock.h>
>> +#include <odp_hints.h>
>> +
>> +#include <odp_queue_internal.h>
>> +
>> +
>> +/* Limits to number of scheduled queues */
>> +#define SCHED_POOL_SIZE (256*1024)
>> +
>> +/* Scheduler sub queues */
>> +#define QUEUES_PER_PRIO  4
>> +
>> +/* TODO: random or queue based selection */
>> +#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
>> +
>> +/* Maximum number of dequeues */
>> +#define MAX_DEQ 4
>> +
>> +
>> +/* Mask of queues per priority */
>> +typedef uint8_t pri_mask_t;
>> +
>> +ODP_STATIC_ASSERT((8*sizeof(pri_mask_t)) >= QUEUES_PER_PRIO,
>> "pri_mask_t_is_too_small");
>> +
>> +
>> +typedef struct {
>> +       odp_queue_t
>> pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
>> +       pri_mask_t        pri_mask[ODP_CONFIG_SCHED_PRIOS];
>> +       odp_spinlock_t    mask_lock;
>> +       odp_buffer_pool_t pool;
>> +} sched_t;
>> +
>> +typedef struct {
>> +       odp_queue_t queue;
>> +
>> +} queue_desc_t;
>> +
>> +typedef struct {
>> +       odp_queue_t  pri_queue;
>> +       odp_buffer_t desc_buf;
>> +
>> +       odp_buffer_t buf[MAX_DEQ];
>> +       int num;
>> +       int index;
>> +       odp_queue_t queue;
>> +       int pause;
>> +
>> +} sched_local_t;
>> +
>> +/* Global scheduler context */
>> +static sched_t *sched;
>> +
>> +/* Thread local scheduler context */
>> +static __thread sched_local_t sched_local;
>> +
>> +
>> +static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
>> +{
>> +       int id = SEL_PRI_QUEUE(queue);
>> +       return sched->pri_queue[prio][id];
>> +}
>> +
>> +
>> +int odp_schedule_init_global(void)
>> +{
>> +       odp_buffer_pool_t pool;
>> +       int i, j;
>> +
>> +       ODP_DBG("Schedule init ... ");
>> +
>> +       sched = odp_shm_reserve("odp_scheduler",
>> +                               sizeof(sched_t),
>> +                               ODP_CACHE_LINE_SIZE);
>> +
>> +       if (sched == NULL) {
>> +               ODP_ERR("Schedule init: Shm reserve failed.\n");
>> +               return -1;
>> +       }
>> +
>> +       pool = odp_buffer_pool_create("odp_sched_pool", NULL,
>> +                                     SCHED_POOL_SIZE,
>> sizeof(queue_desc_t),
>> +                                     ODP_CACHE_LINE_SIZE,
>> +                                     ODP_BUFFER_TYPE_RAW);
>> +
>> +       if (pool == ODP_BUFFER_POOL_INVALID) {
>> +               ODP_ERR("Schedule init: Pool create failed.\n");
>> +               return -1;
>> +       }
>> +
>> +       sched->pool = pool;
>> +       odp_spinlock_init(&sched->mask_lock);
>> +
>> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
>> +               odp_queue_t queue;
>> +               char name[] = "odp_priXX_YY";
>> +
>> +               name[7] = '0' + i / 10;
>> +               name[8] = '0' + i - 10*(i / 10);
>> +
>> +               for (j = 0; j < QUEUES_PER_PRIO; j++) {
>> +                       name[10] = '0' + j / 10;
>> +                       name[11] = '0' + j - 10*(j / 10);
>> +
>> +                       queue = odp_queue_create(name,
>> +                                                ODP_QUEUE_TYPE_POLL,
>> NULL);
>> +
>> +                       if (queue == ODP_QUEUE_INVALID) {
>> +                               ODP_ERR("Sched init: Queue create
>> failed.\n");
>> +                               return -1;
>> +                       }
>> +
>> +                       sched->pri_queue[i][j] = queue;
>> +                       sched->pri_mask[i]     = 0;
>> +               }
>> +       }
>> +
>> +       ODP_DBG("done\n");
>> +
>> +       return 0;
>> +}
>> +
>> +
>> +int odp_schedule_init_local(void)
>> +{
>> +       int i;
>> +
>> +       sched_local.pri_queue = ODP_QUEUE_INVALID;
>> +       sched_local.desc_buf  = ODP_BUFFER_INVALID;
>> +
>> +       for (i = 0; i < MAX_DEQ; i++)
>> +               sched_local.buf[i] = ODP_BUFFER_INVALID;
>> +
>> +       sched_local.num   = 0;
>> +       sched_local.index = 0;
>> +       sched_local.queue = ODP_QUEUE_INVALID;
>> +       sched_local.pause = 0;
>> +
>> +       return 0;
>> +}
>> +
>> +
>> +void odp_schedule_mask_set(odp_queue_t queue, int prio)
>> +{
>> +       int id = SEL_PRI_QUEUE(queue);
>> +
>> +       odp_spinlock_lock(&sched->mask_lock);
>> +       sched->pri_mask[prio] |= 1 << id;
>> +       odp_spinlock_unlock(&sched->mask_lock);
>> +}
>> +
>> +
>> +odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
>> +{
>> +       odp_buffer_t buf;
>> +
>> +       buf = odp_buffer_alloc(sched->pool);
>> +
>> +       if (buf != ODP_BUFFER_INVALID) {
>> +               queue_desc_t *desc;
>> +               desc        = odp_buffer_addr(buf);
>> +               desc->queue = queue;
>> +       }
>> +
>> +       return buf;
>> +}
>> +
>> +
>> +void odp_schedule_queue(odp_queue_t queue, int prio)
>> +{
>> +       odp_buffer_t desc_buf;
>> +       odp_queue_t  pri_queue;
>> +
>> +       pri_queue = select_pri_queue(queue, prio);
>> +       desc_buf  = queue_sched_buf(queue);
>> +
>> +       odp_queue_enq(pri_queue, desc_buf);
>> +}
>> +
>> +
>> +void odp_schedule_release_atomic(void)
>> +{
>> +       if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
>> +           sched_local.num       == 0) {
>> +               /* Release current atomic queue */
>> +               odp_queue_enq(sched_local.pri_queue,
>> sched_local.desc_buf);
>> +               sched_local.pri_queue = ODP_QUEUE_INVALID;
>> +       }
>> +}
>> +
>> +
>> +static inline int copy_bufs(odp_buffer_t out_buf[], unsigned int max)
>> +{
>> +       int i = 0;
>> +
>> +       while (sched_local.num && max) {
>> +               out_buf[i] = sched_local.buf[sched_local.index];
>> +               sched_local.index++;
>> +               sched_local.num--;
>> +               max--;
>> +               i++;
>> +       }
>> +
>> +       return i;
>> +}
>> +
>> +
>> +/*
>> + * Schedule queues
>> + *
>> + * TODO: SYNC_ORDERED not implemented yet
>> + */
>> +static int schedule(odp_queue_t *out_queue, odp_buffer_t out_buf[],
>> +                   unsigned int max_num, unsigned int max_deq)
>> +{
>> +       int i, j;
>> +       int thr;
>> +       int ret;
>> +
>> +       if (sched_local.num) {
>> +               ret = copy_bufs(out_buf, max_num);
>> +
>> +               if (out_queue)
>> +                       *out_queue = sched_local.queue;
>> +
>> +               return ret;
>> +       }
>> +
>> +       odp_schedule_release_atomic();
>> +
>> +       if (odp_unlikely(sched_local.pause))
>> +               return 0;
>> +
>> +       thr = odp_thread_id();
>> +
>> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
>> +               int id;
>> +
>> +               if (sched->pri_mask[i] == 0)
>> +                       continue;
>> +
>> +               id = thr & (QUEUES_PER_PRIO-1);
>> +
>> +               for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
>> +                       odp_queue_t  pri_q;
>> +                       odp_buffer_t desc_buf;
>> +
>> +                       if (id >= QUEUES_PER_PRIO)
>> +                               id = 0;
>> +
>> +                       if (odp_unlikely((sched->pri_mask[i] & (1 << id))
>> == 0))
>> +                               continue;
>> +
>> +                       pri_q    = sched->pri_queue[i][id];
>> +                       desc_buf = odp_queue_deq(pri_q);
>> +
>> +                       if (desc_buf != ODP_BUFFER_INVALID) {
>> +                               queue_desc_t *desc;
>> +                               odp_queue_t queue;
>> +                               int num;
>> +
>> +                               desc  = odp_buffer_addr(desc_buf);
>> +                               queue = desc->queue;
>> +
>> +                               num = odp_queue_deq_multi(queue,
>> +                                                         sched_local.buf,
>> +                                                         max_deq);
>> +
>> +                               if (num == 0) {
>> +                                       /* Remove empty queue from
>> scheduling,
>> +                                        * except packet input queues
>> +                                        */
>> +                                       if (odp_queue_type(queue) ==
>> +                                           ODP_QUEUE_TYPE_PKTIN)
>> +                                               odp_queue_enq(pri_q,
>> desc_buf);
>> +
>> +                                       continue;
>> +                               }
>> +
>> +                               sched_local.num   = num;
>> +                               sched_local.index = 0;
>> +                               ret = copy_bufs(out_buf, max_num);
>> +
>> +                               sched_local.queue = queue;
>> +
>> +                               if (queue_sched_atomic(queue)) {
>> +                                       /* Hold queue during atomic access
>> */
>> +                                       sched_local.pri_queue = pri_q;
>> +                                       sched_local.desc_buf  = desc_buf;
>> +                               } else {
>> +                                       /* Continue scheduling the queue
>> */
>> +                                       odp_queue_enq(pri_q, desc_buf);
>> +                               }
>> +
>> +                               /* Output the source queue handle */
>> +                               if (out_queue)
>> +                                       *out_queue = queue;
>> +
>> +                               return ret;
>> +                       }
>> +               }
>> +       }
>> +
>> +       return 0;
>> +}
>> +
>> +
>> +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
>> +                         odp_buffer_t out_buf[],
>> +                         unsigned int max_num, unsigned int max_deq)
>> +{
>> +       uint64_t start_cycle, cycle, diff;
>> +       int ret;
>> +
>> +       start_cycle = 0;
>> +
>> +       while (1) {
>> +               ret = schedule(out_queue, out_buf, max_num, max_deq);
>> +
>> +               if (ret)
>> +                       break;
>> +
>> +               if (wait == ODP_SCHED_WAIT)
>> +                       continue;
>> +
>> +               if (wait == ODP_SCHED_NO_WAIT)
>> +                       break;
>> +
>> +               if (start_cycle == 0) {
>> +                       start_cycle = odp_time_get_cycles();
>> +                       continue;
>> +               }
>> +
>> +               cycle = odp_time_get_cycles();
>> +               diff  = odp_time_diff_cycles(start_cycle, cycle);
>> +
>> +               if (wait < diff)
>> +                       break;
>> +       }
>> +
>> +       return ret;
>> +}
>> +
>> +
>> +odp_buffer_t odp_schedule(odp_queue_t *out_queue, uint64_t wait)
>> +{
>> +       odp_buffer_t buf;
>> +
>> +       buf = ODP_BUFFER_INVALID;
>> +
>> +       schedule_loop(out_queue, wait, &buf, 1, MAX_DEQ);
>> +
>> +       return buf;
>> +}
>> +
>> +
>> +odp_buffer_t odp_schedule_one(odp_queue_t *out_queue, uint64_t wait)
>> +{
>> +       odp_buffer_t buf;
>> +
>> +       buf = ODP_BUFFER_INVALID;
>> +
>> +       schedule_loop(out_queue, wait, &buf, 1, 1);
>> +
>> +       return buf;
>> +}
>> +
>> +
>> +int odp_schedule_multi(odp_queue_t *out_queue, uint64_t wait,
>> +                      odp_buffer_t out_buf[], unsigned int num)
>> +{
>> +       return schedule_loop(out_queue, wait, out_buf, num, MAX_DEQ);
>> +}
>> +
>> +
>> +void odp_schedule_pause(void)
>> +{
>> +       sched_local.pause = 1;
>> +}
>> +
>> +
>> +void odp_schedule_resume(void)
>> +{
>> +       sched_local.pause = 0;
>> +}
>> +
>> +
>> +uint64_t odp_schedule_wait_time(uint64_t ns)
>> +{
>> +       if (ns <= ODP_SCHED_NO_WAIT)
>> +               ns = ODP_SCHED_NO_WAIT + 1;
>> +
>> +       return odp_time_ns_to_cycles(ns);
>> +}
>> +
>> +
>> +int odp_schedule_num_prio(void)
>> +{
>> +       return ODP_CONFIG_SCHED_PRIOS;
>> +}
>
>
> Reviewed-by: Venkatesh Vivekanandan <venkatesh.vivekanandan@linaro.org>
>>
>> --
>> 1.8.3.2
>>
>>
>> _______________________________________________
>> lng-odp mailing list
>> lng-odp@lists.linaro.org
>> http://lists.linaro.org/mailman/listinfo/lng-odp
>
>
Venkatesh Vivekanandan Sept. 24, 2014, 2:32 p.m. UTC | #3
I didn't test before adding "Reviewed-by".

On 24 September 2014 16:46, Ciprian Barbu <ciprian.barbu@linaro.org> wrote:

> On Wed, Sep 24, 2014 at 1:31 PM, Venkatesh Vivekanandan
> <venkatesh.vivekanandan@linaro.org> wrote:
> >
> >
> > On 15 September 2014 20:21, Ciprian Barbu <ciprian.barbu@linaro.org>
> wrote:
> >>
> >> Signed-off-by: Ciprian Barbu <ciprian.barbu@linaro.org>
> >> ---
> >>  platform/linux-dpdk/Makefile.am                    |   2 +-
> >>  platform/linux-dpdk/include/api/odp_buffer.h       |   2 +-
> >>  platform/linux-dpdk/include/api/odp_buffer_pool.h  |   2 +-
> >>  platform/linux-dpdk/include/api/odp_packet.h       |  17 +
> >>  platform/linux-dpdk/include/odp_buffer_internal.h  |   8 +-
> >>  platform/linux-dpdk/include/odp_packet_internal.h  |   7 +-
> >>  .../linux-dpdk/include/odp_packet_io_internal.h    |   5 -
> >>  platform/linux-dpdk/odp_buffer.c                   |  26 +-
> >>  platform/linux-dpdk/odp_buffer_pool.c              | 150 +++++++-
> >>  platform/linux-dpdk/odp_packet.c                   | 148 +++++---
> >>  platform/linux-dpdk/odp_packet_dpdk.c              |  17 +-
> >>  platform/linux-dpdk/odp_queue.c                    |  20 +-
> >>  platform/linux-dpdk/odp_schedule.c                 | 417
> >> +++++++++++++++++++++
> >>  13 files changed, 716 insertions(+), 105 deletions(-)
> >>  create mode 100644 platform/linux-dpdk/odp_schedule.c
> >>
> >> diff --git a/platform/linux-dpdk/Makefile.am
> >> b/platform/linux-dpdk/Makefile.am
> >> index 1eabd9f..e128bf8 100644
> >> --- a/platform/linux-dpdk/Makefile.am
> >> +++ b/platform/linux-dpdk/Makefile.am
> >> @@ -79,7 +79,7 @@ __LIB__libodp_la_SOURCES = \
> >>                            odp_queue.c \
> >>                            ../linux-generic/odp_ring.c \
> >>                            ../linux-generic/odp_rwlock.c \
> >> -                          ../linux-generic/odp_schedule.c \
> >> +                          odp_schedule.c \
> >>                            ../linux-generic/odp_shared_memory.c \
> >>                            ../linux-generic/odp_spinlock.c \
> >>                            ../linux-generic/odp_system_info.c \
> >> diff --git a/platform/linux-dpdk/include/api/odp_buffer.h
> >> b/platform/linux-dpdk/include/api/odp_buffer.h
> >> index 9ea1ed8..b2fbc76 100644
> >> --- a/platform/linux-dpdk/include/api/odp_buffer.h
> >> +++ b/platform/linux-dpdk/include/api/odp_buffer.h
> >> @@ -32,7 +32,7 @@ extern "C" {
> >>  typedef unsigned long odp_buffer_t;
> >>
> >>
> >> -#define ODP_BUFFER_INVALID (0xffffffff) /**< Invalid buffer */
> >> +#define ODP_BUFFER_INVALID (unsigned long)(-1L) /**< Invalid buffer */
> >>
> >>
> >>  /**
> >> diff --git a/platform/linux-dpdk/include/api/odp_buffer_pool.h
> >> b/platform/linux-dpdk/include/api/odp_buffer_pool.h
> >> index 4b75cf5..382f4f0 100644
> >> --- a/platform/linux-dpdk/include/api/odp_buffer_pool.h
> >> +++ b/platform/linux-dpdk/include/api/odp_buffer_pool.h
> >> @@ -27,7 +27,7 @@ extern "C" {
> >>  #define ODP_BUFFER_POOL_NAME_LEN  32
> >>
> >>  /** Invalid buffer pool */
> >> -#define ODP_BUFFER_POOL_INVALID  (0xffffffff)
> >> +#define ODP_BUFFER_POOL_INVALID  (unsigned long)(-1L)
> >>
> >>  /** ODP buffer pool */
> >>  typedef unsigned long odp_buffer_pool_t;
> >> diff --git a/platform/linux-dpdk/include/api/odp_packet.h
> >> b/platform/linux-dpdk/include/api/odp_packet.h
> >> index 5545bdc..79503a5 100644
> >> --- a/platform/linux-dpdk/include/api/odp_packet.h
> >> +++ b/platform/linux-dpdk/include/api/odp_packet.h
> >> @@ -80,6 +80,23 @@ void odp_packet_set_len(odp_packet_t pkt, size_t
> len);
> >>  size_t odp_packet_get_len(odp_packet_t pkt);
> >>
> >>  /**
> >> + * Set packet user context
> >> + *
> >> + * @param buf      Packet handle
> >> + * @param ctx      User context
> >> + *
> >> + */
> >> +void odp_packet_set_ctx(odp_packet_t buf, const void *ctx);
> >> +
> >> +/**
> >> + * Get packet user context
> >> + *
> >> + * @param buf      Packet handle
> >> + *
> >> + * @return User context
> >> + */
> >> +void *odp_packet_get_ctx(odp_packet_t buf);
> >> +/**
> >>   * Get address to the start of the packet buffer
> >>   *
> >>   * The address of the packet buffer is not necessarily the same as the
> >> start
> >> diff --git a/platform/linux-dpdk/include/odp_buffer_internal.h
> >> b/platform/linux-dpdk/include/odp_buffer_internal.h
> >> index f87ec80..5406606 100644
> >> --- a/platform/linux-dpdk/include/odp_buffer_internal.h
> >> +++ b/platform/linux-dpdk/include/odp_buffer_internal.h
> >> @@ -59,8 +59,12 @@ typedef union odp_buffer_bits_t {
> >>  struct odp_buffer_hdr_t;
> >>
> >>
> >> -typedef struct rte_mbuf odp_buffer_hdr_t;
> >> -
> >> +typedef struct odp_buffer_hdr_t {
> >> +       struct rte_mbuf mb;            /* Underlying DPDK rte_mbuf */
> >> +       struct odp_buffer_hdr_t *next; /* Next buf in a list */
> >> +       int type;                      /* ODP buffer type; not DPDK buf
> >> type */
> >> +       uint32_t index;                /* Index in the rte_mempool */
> >> +} odp_buffer_hdr_t;
> >>
> >>  int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf);
> >>
> >> diff --git a/platform/linux-dpdk/include/odp_packet_internal.h
> >> b/platform/linux-dpdk/include/odp_packet_internal.h
> >> index 9357f90..d7f505b 100644
> >> --- a/platform/linux-dpdk/include/odp_packet_internal.h
> >> +++ b/platform/linux-dpdk/include/odp_packet_internal.h
> >> @@ -113,13 +113,8 @@ typedef struct {
> >>         uint32_t l3_offset; /**< offset to L3 hdr, e.g. IPv4, IPv6 */
> >>         uint32_t l4_offset; /**< offset to L4 hdr (TCP, UDP, SCTP, also
> >> ICMP) */
> >>
> >> -       uint32_t frame_len;
> >> -
> >>         odp_pktio_t input;
> >> -
> >> -       uint32_t pad;
> >> -       uint8_t payload[];
> >> -
> >> +       uint64_t user_ctx;  /**< user context */
> >>  } odp_packet_hdr_t;
> >>
> >>  /**
> >> diff --git a/platform/linux-dpdk/include/odp_packet_io_internal.h
> >> b/platform/linux-dpdk/include/odp_packet_io_internal.h
> >> index 08abea7..9263349 100644
> >> --- a/platform/linux-dpdk/include/odp_packet_io_internal.h
> >> +++ b/platform/linux-dpdk/include/odp_packet_io_internal.h
> >> @@ -31,11 +31,6 @@ struct pktio_entry {
> >>         odp_queue_t inq_default;        /**< default input queue, if set
> >> */
> >>         odp_queue_t outq_default;       /**< default out queue */
> >>         odp_pktio_params_t params;      /**< pktio parameters */
> >> -       pkt_sock_t pkt_sock;            /**< using socket API for IO */
> >> -       pkt_sock_mmap_t pkt_sock_mmap;  /**< using socket mmap API for
> IO
> >> */
> >> -#ifdef ODP_HAVE_NETMAP
> >> -       pkt_netmap_t pkt_nm;            /**< using netmap API for IO */
> >> -#endif
> >>         pkt_dpdk_t pkt_dpdk;            /**< using DPDK API for IO */
> >>  };
> >>
> >> diff --git a/platform/linux-dpdk/odp_buffer.c
> >> b/platform/linux-dpdk/odp_buffer.c
> >> index e2f8942..e2657e4 100644
> >> --- a/platform/linux-dpdk/odp_buffer.c
> >> +++ b/platform/linux-dpdk/odp_buffer.c
> >> @@ -16,7 +16,7 @@ void *odp_buffer_addr(odp_buffer_t buf)
> >>  {
> >>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
> >>
> >> -       return hdr->buf_addr;
> >> +       return hdr->mb.buf_addr;
> >>  }
> >>
> >>
> >> @@ -24,7 +24,7 @@ size_t odp_buffer_size(odp_buffer_t buf)
> >>  {
> >>         odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
> >>
> >> -       return hdr->buf_len;
> >> +       return hdr->mb.buf_len;
> >>  }
> >>
> >>
> >> @@ -38,11 +38,9 @@ int odp_buffer_type(odp_buffer_t buf)
> >>
> >>  int odp_buffer_is_valid(odp_buffer_t buf)
> >>  {
> >> -       odp_buffer_bits_t handle;
> >> -
> >> -       handle.u32 = buf;
> >> -
> >> -       return (handle.index != ODP_BUFFER_INVALID_INDEX);
> >> +       /* We could call rte_mbuf_sanity_check, but that panics
> >> +        * and aborts the program */
> >> +       return (void*)buf != NULL;
> >>  }
> >>
> >>
> >> @@ -61,17 +59,19 @@ int odp_buffer_snprint(char *str, size_t n,
> >> odp_buffer_t buf)
> >>         len += snprintf(&str[len], n-len,
> >>                         "Buffer\n");
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  pool         %"PRIu64"\n", (int64_t)
> >> hdr->pool);
> >> +                       "  pool         %"PRIu64"\n", (int64_t)
> >> hdr->mb.pool);
> >> +       len += snprintf(&str[len], n-len,
> >> +                       "  phy_addr     %"PRIu64"\n",
> >> hdr->mb.buf_physaddr);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  phy_addr     %"PRIu64"\n",
> hdr->buf_physaddr);
> >> +                       "  addr         %p\n",        hdr->mb.buf_addr);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  addr         %p\n",        hdr->buf_addr);
> >> +                       "  size         %u\n",        hdr->mb.buf_len);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  size         %u\n",        hdr->buf_len);
> >> +                       "  ref_count    %i\n",        hdr->mb.refcnt);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  ref_count    %i\n",        hdr->refcnt);
> >> +                       "  dpdk type    %i\n",        hdr->mb.type);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  type         %i\n",        hdr->type);
> >> +                       "  odp type     %i\n",        hdr->type);
> >>
> >>         return len;
> >>  }
> >> diff --git a/platform/linux-dpdk/odp_buffer_pool.c
> >> b/platform/linux-dpdk/odp_buffer_pool.c
> >> index 805ce68..f044b5d 100644
> >> --- a/platform/linux-dpdk/odp_buffer_pool.c
> >> +++ b/platform/linux-dpdk/odp_buffer_pool.c
> >> @@ -9,6 +9,7 @@
> >>  #include <odp_buffer_pool_internal.h>
> >>  #include <odp_buffer_internal.h>
> >>  #include <odp_packet_internal.h>
> >> +#include <odp_timer_internal.h>
> >>  #include <odp_shared_memory.h>
> >>  #include <odp_align.h>
> >>  #include <odp_internal.h>
> >> @@ -44,6 +45,13 @@
> >>
> >>  #define NULL_INDEX ((uint32_t)-1)
> >>
> >> +union buffer_type_any_u {
> >> +       odp_buffer_hdr_t  buf;
> >> +       odp_packet_hdr_t  pkt;
> >> +       odp_timeout_hdr_t tmo;
> >> +};
> >> +
> >> +typedef union buffer_type_any_u odp_any_buffer_hdr_t;
> >>
> >>  typedef union pool_entry_u {
> >>         struct pool_entry_s s;
> >> @@ -59,7 +67,7 @@ typedef struct pool_table_t {
> >>  } pool_table_t;
> >>
> >>
> >> -/* The pool table */
> >> +/* The pool table ptr - resides in shared memory */
> >>  static pool_table_t *pool_tbl;
> >>
> >>  /* Pool entry pointers (for inlining) */
> >> @@ -98,31 +106,151 @@ int odp_buffer_pool_init_global(void)
> >>         return 0;
> >>  }
> >>
> >> +struct mbuf_ctor_arg {
> >> +       uint16_t seg_buf_offset; /* To skip the ODP buf/pkt/tmo header
> */
> >> +       uint16_t seg_buf_size;   /* total sz: offset + user sz + HDROOM
> */
> >> +       int buf_type;
> >> +};
> >> +
> >> +struct mbuf_pool_ctor_arg {
> >> +       uint16_t seg_buf_size; /* size of mbuf: user specified sz +
> HDROOM
> >> */
> >> +};
> >> +
> >> +static void
> >> +odp_dpdk_mbuf_pool_ctor(struct rte_mempool *mp,
> >> +                       void *opaque_arg)
> >> +{
> >> +       struct mbuf_pool_ctor_arg      *mbp_ctor_arg;
> >> +       struct rte_pktmbuf_pool_private *mbp_priv;
> >> +
> >> +       if (mp->private_data_size < sizeof(struct
> >> rte_pktmbuf_pool_private)) {
> >> +               ODP_ERR("%s(%s) private_data_size %d < %d",
> >> +                       __func__, mp->name, (int) mp->private_data_size,
> >> +                       (int) sizeof(struct rte_pktmbuf_pool_private));
> >> +               return;
> >> +       }
> >> +       mbp_ctor_arg = (struct mbuf_pool_ctor_arg *) opaque_arg;
> >> +       mbp_priv = rte_mempool_get_priv(mp);
> >> +       mbp_priv->mbuf_data_room_size = mbp_ctor_arg->seg_buf_size;
> >> +}
> >> +
> >> +/* ODP DPDK mbuf constructor.
> >> + * This is a combination of rte_pktmbuf_init in rte_mbuf.c
> >> + * and testpmd_mbuf_ctor in testpmd.c
> >> + */
> >> +static void
> >> +odp_dpdk_mbuf_ctor(struct rte_mempool *mp,
> >> +                  void *opaque_arg,
> >> +                  void *raw_mbuf,
> >> +                  unsigned i)
> >> +{
> >> +       struct mbuf_ctor_arg *mb_ctor_arg;
> >> +       struct rte_mbuf *mb = raw_mbuf;
> >> +       struct odp_buffer_hdr_t *buf_hdr;
> >> +
> >> +       /* The rte_mbuf is at the begninning in all cases */
> >> +       mb_ctor_arg = (struct mbuf_ctor_arg *) opaque_arg;
> >> +       mb = (struct rte_mbuf *) raw_mbuf;
> >> +
> >> +       RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf));
> >> +
> >> +       memset(mb, 0, mp->elt_size);
> >> +
> >> +       /* Start of buffer is just after the ODP type specific header
> >> +        * which contains in the very beginning the rte_mbuf struct */
> >> +       mb->buf_addr     = (char *)mb + mb_ctor_arg->seg_buf_offset;
> >> +       mb->buf_physaddr = rte_mempool_virt2phy(mp, mb) +
> >> +                       mb_ctor_arg->seg_buf_offset;
> >> +       mb->buf_len      = mb_ctor_arg->seg_buf_size;
> >> +
> >> +       /* keep some headroom between start of buffer and data */
> >> +       if (mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_PACKET ||
> >> +           mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_ANY)
> >> +               mb->pkt.data = (char *) mb->buf_addr +
> >> RTE_PKTMBUF_HEADROOM;
> >> +       else
> >> +               mb->pkt.data = mb->buf_addr;
> >> +
> >> +       /* init some constant fields */
> >> +       mb->type         = RTE_MBUF_PKT;
> >> +       mb->pool         = mp;
> >> +       mb->pkt.nb_segs  = 1;
> >> +       mb->pkt.in_port  = 0xff;
> >> +       mb->ol_flags     = 0;
> >> +       mb->pkt.vlan_macip.data = 0;
> >> +       mb->pkt.hash.rss = 0;
> >> +
> >> +       /* Save index, might be useful for debugging purposes */
> >> +       buf_hdr = (struct odp_buffer_hdr_t*) raw_mbuf;
> >> +       buf_hdr->index = i;
> >> +}
> >>
> >>  odp_buffer_pool_t odp_buffer_pool_create(const char *name,
> >>                                          void *base_addr, uint64_t size,
> >>                                          size_t buf_size, size_t
> >> buf_align,
> >>                                          int buf_type)
> >>  {
> >> -       struct rte_mempool *pktmbuf_pool = NULL;
> >> +       struct rte_mempool *pool = NULL;
> >> +       struct mbuf_pool_ctor_arg mbp_ctor_arg;
> >> +       struct mbuf_ctor_arg mb_ctor_arg;
> >> +       unsigned mb_size;
> >> +
> >> +       /* Not used for rte_mempool; the new ODP buffer management
> >> introduces
> >> +        * rte_mempool_create_from_region where base_addr makes sense */
> >> +       (void)base_addr;
> >> +
> >> +       /* buf_align will be removed soon, no need to wory about it */
> >> +       (void)buf_align;
> >> +
> >>         ODP_DBG("odp_buffer_pool_create: %s, %lx, %u, %u, %u, %d\n",
> name,
> >>                 (uint64_t) base_addr, (unsigned) size,
> >>                 (unsigned) buf_size, (unsigned) buf_align,
> >>                 buf_type);
> >>
> >> -       pktmbuf_pool =
> >> -               rte_mempool_create(name, NB_MBUF,
> >> -                                  MBUF_SIZE, MAX_PKT_BURST,
> >> -                                  sizeof(struct
> >> rte_pktmbuf_pool_private),
> >> -                                  rte_pktmbuf_pool_init, NULL,
> >> -                                  rte_pktmbuf_init, NULL,
> >> -                                  rte_socket_id(), 0);
> >> -       if (pktmbuf_pool == NULL) {
> >> +       switch (buf_type) {
> >> +       case ODP_BUFFER_TYPE_RAW:
> >> +               mb_ctor_arg.seg_buf_offset =
> >> +                       (uint16_t)
> >> CACHE_LINE_ROUNDUP(sizeof(odp_buffer_hdr_t));
> >> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
> >> +               break;
> >> +       case ODP_BUFFER_TYPE_PACKET:
> >> +               mb_ctor_arg.seg_buf_offset =
> >> +                       (uint16_t)
> >> CACHE_LINE_ROUNDUP(sizeof(odp_packet_hdr_t));
> >> +               mbp_ctor_arg.seg_buf_size =
> >> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
> >> +               break;
> >> +       case ODP_BUFFER_TYPE_TIMEOUT:
> >> +               mb_ctor_arg.seg_buf_offset =
> >> +                       (uint16_t)
> >> CACHE_LINE_ROUNDUP(sizeof(odp_timeout_hdr_t));
> >> +               mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
> >> +               break;
> >> +       case ODP_BUFFER_TYPE_ANY:
> >> +               mb_ctor_arg.seg_buf_offset =
> >> +                       (uint16_t)
> >> CACHE_LINE_ROUNDUP(sizeof(odp_any_buffer_hdr_t));
> >> +               mbp_ctor_arg.seg_buf_size =
> >> +                       (uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
> >> +               break;
> >> +       default:
> >> +               ODP_ERR("odp_buffer_pool_create: Bad type %i\n",
> >> buf_type);
> >> +               exit(0);
> >> +               break;
> >> +       }
> >> +
> >> +       mb_ctor_arg.seg_buf_size = mbp_ctor_arg.seg_buf_size;
> >> +       mb_ctor_arg.buf_type = buf_type;
> >> +       mb_size = mb_ctor_arg.seg_buf_offset + mb_ctor_arg.seg_buf_size;
> >> +
> >> +       pool = rte_mempool_create(name, NB_MBUF,
> >> +                                 mb_size, MAX_PKT_BURST,
> >> +                                 sizeof(struct
> rte_pktmbuf_pool_private),
> >> +                                 odp_dpdk_mbuf_pool_ctor,
> &mbp_ctor_arg,
> >> +                                 odp_dpdk_mbuf_ctor, &mb_ctor_arg,
> >> +                                 rte_socket_id(), 0);
> >> +       if (pool == NULL) {
> >>                 ODP_ERR("Cannot init DPDK mbuf pool\n");
> >>                 return -1;
> >>         }
> >>
> >> -       return (odp_buffer_pool_t) pktmbuf_pool;
> >> +       return (odp_buffer_pool_t) pool;
> >>  }
> >>
> >>
> >> diff --git a/platform/linux-dpdk/odp_packet.c
> >> b/platform/linux-dpdk/odp_packet.c
> >> index edfd06d..7afaba6 100644
> >> --- a/platform/linux-dpdk/odp_packet.c
> >> +++ b/platform/linux-dpdk/odp_packet.c
> >> @@ -23,13 +23,13 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t
> >> *pkt_hdr,
> >>  void odp_packet_init(odp_packet_t pkt)
> >>  {
> >>         odp_packet_hdr_t *const pkt_hdr = odp_packet_hdr(pkt);
> >> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
> >> buf_hdr);
> >> -       uint8_t *start;
> >> -       size_t len;
> >> +       struct rte_mbuf *mb;
> >> +       void *start;
> >>
> >> -       start = (uint8_t *)pkt_hdr + start_offset;
> >> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
> >> -       memset(start, 0, len);
> >> +       mb = &pkt_hdr->buf_hdr.mb;
> >> +
> >> +       start = mb->buf_addr;
> >> +       memset(start, 0, mb->buf_len);
> >>
> >>         pkt_hdr->l2_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
> >>         pkt_hdr->l3_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
> >> @@ -46,18 +46,47 @@ odp_buffer_t odp_buffer_from_packet(odp_packet_t
> pkt)
> >>         return (odp_buffer_t)pkt;
> >>  }
> >>
> >> -void odp_packet_set_len(odp_packet_t pkt, size_t len)
> >> +/* Advance the pkt data pointer and set len in one call */
> >> +static int odp_packet_set_offset_len(odp_packet_t pkt, size_t
> >> frame_offset,
> >> +                                     size_t len)
> >>  {
> >> -       /* for rte_pktmbuf */
> >> -       odp_buffer_hdr_t *buf_hdr =
> >> odp_buf_to_hdr(odp_buffer_from_packet(pkt));
> >> -       buf_hdr->pkt.data_len = len;
> >> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> >> +       uint16_t offset;
> >> +       uint16_t data_len;
> >> +
> >> +       /* The pkt buf may have been pulled back into the headroom
> >> +        * so we cannot rely on finding the data right after the
> >> +        * ODP header and HEADROOM */
> >> +       offset = (uint16_t)((unsigned long)mb->pkt.data -
> >> +                           (unsigned long)mb->buf_addr);
> >> +       ODP_ASSERT(mb->buf_len >= offset, "Corrupted mbuf");
> >> +       data_len = mb->buf_len - offset;
> >> +
> >> +       if (data_len < frame_offset) {
> >> +               ODP_ERR("Frame offset too big");
> >> +               return -1;
> >> +       }
> >> +       mb->pkt.data = (void*)((char*)mb->pkt.data + frame_offset);
> >> +       data_len -= frame_offset;
> >>
> >> -       odp_packet_hdr(pkt)->frame_len = len;
> >> +       if (data_len < len) {
> >> +               ODP_ERR("Packet len too big");
> >> +               return -1;
> >> +       }
> >> +       mb->pkt.pkt_len = len;
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +void odp_packet_set_len(odp_packet_t pkt, size_t len)
> >> +{
> >> +       (void)odp_packet_set_offset_len(pkt, 0, len);
> >>  }
> >>
> >>  size_t odp_packet_get_len(odp_packet_t pkt)
> >>  {
> >> -       return odp_packet_hdr(pkt)->frame_len;
> >> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> >> +       return mb->pkt.pkt_len;
> >>  }
> >>
> >>  uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
> >> @@ -67,7 +96,8 @@ uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
> >>
> >>  uint8_t *odp_packet_start(odp_packet_t pkt)
> >>  {
> >> -       return odp_packet_buf_addr(pkt) +
> >> odp_packet_hdr(pkt)->frame_offset;
> >> +       struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
> >> +       return mb->pkt.data;
> >>  }
> >>
> >>
> >> @@ -78,7 +108,7 @@ uint8_t *odp_packet_l2(odp_packet_t pkt)
> >>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
> >>                 return NULL;
> >>
> >> -       return odp_packet_buf_addr(pkt) + offset;
> >> +       return odp_packet_start(pkt) + offset;
> >>  }
> >>
> >>  size_t odp_packet_l2_offset(odp_packet_t pkt)
> >> @@ -98,7 +128,7 @@ uint8_t *odp_packet_l3(odp_packet_t pkt)
> >>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
> >>                 return NULL;
> >>
> >> -       return odp_packet_buf_addr(pkt) + offset;
> >> +       return odp_packet_start(pkt) + offset;
> >>  }
> >>
> >>  size_t odp_packet_l3_offset(odp_packet_t pkt)
> >> @@ -118,7 +148,7 @@ uint8_t *odp_packet_l4(odp_packet_t pkt)
> >>         if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
> >>                 return NULL;
> >>
> >> -       return odp_packet_buf_addr(pkt) + offset;
> >> +       return odp_packet_start(pkt) + offset;
> >>  }
> >>
> >>  size_t odp_packet_l4_offset(odp_packet_t pkt)
> >> @@ -152,9 +182,13 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> >> size_t frame_offset)
> >>         size_t offset = 0;
> >>         uint8_t ip_proto = 0;
> >>
> >> +       /* The frame_offset is not relevant for frames from DPDK */
> >>         pkt_hdr->input_flags.eth = 1;
> >> -       pkt_hdr->frame_offset = frame_offset;
> >> -       pkt_hdr->frame_len = len;
> >> +       (void) frame_offset;
> >> +       pkt_hdr->frame_offset = 0;
> >> +       if (odp_packet_set_offset_len(pkt, 0, len)) {
> >> +               return;
> >> +       }
> >>
> >>         if (odp_unlikely(len < ODPH_ETH_LEN_MIN)) {
> >>                 pkt_hdr->error_flags.frame_len = 1;
> >> @@ -165,7 +199,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> >> size_t frame_offset)
> >>
> >>         /* Assume valid L2 header, no CRC/FCS check in SW */
> >>         pkt_hdr->input_flags.l2 = 1;
> >> -       pkt_hdr->l2_offset = frame_offset;
> >> +       pkt_hdr->l2_offset = 0;
> >>
> >>         eth = (odph_ethhdr_t *)odp_packet_start(pkt);
> >>         ethtype = odp_be_to_cpu_16(eth->type);
> >> @@ -189,7 +223,7 @@ void odp_packet_parse(odp_packet_t pkt, size_t len,
> >> size_t frame_offset)
> >>         case ODPH_ETHTYPE_IPV4:
> >>                 pkt_hdr->input_flags.ipv4 = 1;
> >>                 pkt_hdr->input_flags.l3 = 1;
> >> -               pkt_hdr->l3_offset = frame_offset + ODPH_ETHHDR_LEN +
> >> offset;
> >> +               pkt_hdr->l3_offset = ODPH_ETHHDR_LEN + offset;
> >>                 ipv4 = (odph_ipv4hdr_t *)odp_packet_l3(pkt);
> >>                 ip_proto = parse_ipv4(pkt_hdr, ipv4, &offset);
> >>                 break;
> >> @@ -304,6 +338,7 @@ void odp_packet_print(odp_packet_t pkt)
> >>  {
> >>         int max_len = 512;
> >>         char str[max_len];
> >> +       uint8_t *p;
> >>         int len = 0;
> >>         int n = max_len-1;
> >>         odp_packet_hdr_t *hdr = odp_packet_hdr(pkt);
> >> @@ -325,50 +360,69 @@ void odp_packet_print(odp_packet_t pkt)
> >>         len += snprintf(&str[len], n-len,
> >>                         "  l4_offset    %u\n", hdr->l4_offset);
> >>         len += snprintf(&str[len], n-len,
> >> -                       "  frame_len    %u\n", hdr->frame_len);
> >> +                       "  frame_len    %u\n",
> >> hdr->buf_hdr.mb.pkt.pkt_len);
> >>         len += snprintf(&str[len], n-len,
> >>                         "  input        %u\n", hdr->input);
> >>         str[len] = '\0';
> >>
> >>         printf("\n%s\n", str);
> >> +       rte_pktmbuf_dump(&hdr->buf_hdr.mb, 32);
> >> +
> >> +       p = odp_packet_start(pkt);
> >> +       printf("00000000: %02X %02X %02X %02X %02X %02X %02X %02X\
> >> +              %02X %02X %02X %02X %02X %02X %02X %02X\n",
> >> +              p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7],
> >> +              p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]);
> >> +
> >>  }
> >>
> >> +/* For now we can only copy between packets of the same segment size
> >> + * We should probably refine this API, maybe introduce a clone API */
> >>  int odp_packet_copy(odp_packet_t pkt_dst, odp_packet_t pkt_src)
> >>  {
> >> -       odp_packet_hdr_t *const pkt_hdr_dst = odp_packet_hdr(pkt_dst);
> >> -       odp_packet_hdr_t *const pkt_hdr_src = odp_packet_hdr(pkt_src);
> >> -       const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t,
> >> buf_hdr);
> >> -       uint8_t *start_src;
> >> -       uint8_t *start_dst;
> >> -       size_t len;
> >> +       struct rte_mbuf *mb_dst, *mb_src;
> >> +       uint8_t nb_segs, i;
> >> +
> >> +       ODP_ASSERT(odp_buffer_type(pkt_dst) == ODP_BUFFER_TYPE_PACKET &&
> >> +                  odp_buffer_type(pkt_src) == ODP_BUFFER_TYPE_PACKET,
> >> +                  "dst_pkt or src_pkt not of type
> >> ODP_BUFFER_TYPE_PACKET");
> >>
> >>         if (pkt_dst == ODP_PACKET_INVALID || pkt_src ==
> >> ODP_PACKET_INVALID)
> >>                 return -1;
> >>
> >> -       /* if (pkt_hdr_dst->buf_hdr.size < */
> >> -       /*      pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset) */
> >> -       if (pkt_hdr_dst->buf_hdr.buf_len <
> >> -               pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset)
> >> +       mb_dst = &(odp_packet_hdr(pkt_dst)->buf_hdr.mb);
> >> +       mb_src = &(odp_packet_hdr(pkt_src)->buf_hdr.mb);
> >> +
> >> +       if (mb_dst->pkt.nb_segs != mb_src->pkt.nb_segs) {
> >> +               ODP_ERR("Different nb_segs in pkt_dst and pkt_src");
> >>                 return -1;
> >> +       }
> >>
> >> -       /* Copy packet header */
> >> -       start_dst = (uint8_t *)pkt_hdr_dst + start_offset;
> >> -       start_src = (uint8_t *)pkt_hdr_src + start_offset;
> >> -       len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
> >> -       memcpy(start_dst, start_src, len);
> >> +       nb_segs = mb_src->pkt.nb_segs;
> >>
> >> -       /* Copy frame payload */
> >> -       start_dst = (uint8_t *)odp_packet_start(pkt_dst);
> >> -       start_src = (uint8_t *)odp_packet_start(pkt_src);
> >> -       len = pkt_hdr_src->frame_len;
> >> -       memcpy(start_dst, start_src, len);
> >> +       if (mb_dst->buf_len < mb_src->buf_len) {
> >> +               ODP_ERR("dst_pkt smaller than src_pkt");
> >> +               return -1;
> >> +       }
> >>
> >> -       /* Copy useful things from the buffer header */
> >> -       /* pkt_hdr_dst->buf_hdr.cur_offset =
> >> pkt_hdr_src->buf_hdr.cur_offset; */
> >> +       for (i = 0; i < nb_segs; i++) {
> >> +               if (mb_src == NULL || mb_dst == NULL) {
> >> +                       ODP_ERR("Corrupted packets");
> >> +                       return -1;
> >> +               }
> >> +               memcpy(mb_dst->buf_addr, mb_src->buf_addr,
> >> mb_src->buf_len);
> >> +               mb_dst = mb_dst->pkt.next;
> >> +               mb_src = mb_src->pkt.next;
> >> +       }
> >> +       return 0;
> >> +}
> >>
> >> -       /* Create a copy of the scatter list */
> >> -       /* odp_buffer_copy_scatter(odp_buffer_from_packet(pkt_dst), */
> >> -       /*                      odp_buffer_from_packet(pkt_src)); */
> >> +void odp_packet_set_ctx(odp_packet_t pkt, const void *ctx)
> >> +{
> >> +       odp_packet_hdr(pkt)->user_ctx = (intptr_t)ctx;
> >> +}
> >>
> >> -       return 0;
> >> +void *odp_packet_get_ctx(odp_packet_t pkt)
> >> +{
> >> +       return (void *)(intptr_t)odp_packet_hdr(pkt)->user_ctx;
> >>  }
>

set/get_ctx causes a compilation error as the same changes went into the
mainstream on a different patch.

>> diff --git a/platform/linux-dpdk/odp_packet_dpdk.c
> >> b/platform/linux-dpdk/odp_packet_dpdk.c
> >> index d5c8e80..ea83580 100644
> >> --- a/platform/linux-dpdk/odp_packet_dpdk.c
> >> +++ b/platform/linux-dpdk/odp_packet_dpdk.c
> >> @@ -82,7 +82,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
> >> char *netdev,
> >>         static struct ether_addr eth_addr[RTE_MAX_ETHPORTS];
> >>         static int portinit[RTE_MAX_ETHPORTS];
> >>         static int qid[RTE_MAX_ETHPORTS];
> >> -       uint8_t portid = 0, num_intf = 2;
> >> +       uint8_t portid = 0;
> >>         uint16_t nbrxq = 0, nbtxq = 0;
> >>         int ret, i;
> >>
> >> @@ -93,7 +93,7 @@ int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const
> >> char *netdev,
> >>         pkt_dpdk->pool = pool;
> >>         printf("dpdk portid: %u\n", portid);
> >>
> >> -       nbrxq = odp_sys_core_count() / num_intf;
> >> +       nbrxq = odp_sys_core_count();
>
> above changes on removing num_intf causes traffic to hang as the queue per
interface per core relationship is lost. We can keep this hard-coded as of
now and it will be replaced with the parameter passed from command line.


> The change here might cause l2fwd to not work anymore, I didn't get a
> chance to test it last week. Have you had a chance to verify it?


> >>         nbtxq = nbrxq;
> >>         if (portinit[portid] == 0) {
> >>                 fflush(stdout);
> >> @@ -157,17 +157,18 @@ int close_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk)
> >>  }
> >>
> >>  int recv_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, odp_packet_t
> pkt_table[],
> >> -               unsigned len)
> >> +                 unsigned len)
> >>  {
> >> -       struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
> >>         uint16_t nb_rx, i = 0;
> >>
> >> -       memset(pkts_burst, 0 , sizeof(pkts_burst));
> >>         nb_rx = rte_eth_rx_burst((uint8_t)pkt_dpdk->portid,
> >>                                  (uint16_t)pkt_dpdk->queueid,
> >> -                                (struct rte_mbuf **)pkts_burst,
> >> (uint16_t)len);
> >> -       for (i = 0; i < nb_rx; i++)
> >> -               pkt_table[i] = (odp_packet_t)pkts_burst[i];
> >> +                                (struct rte_mbuf **)pkt_table,
> >> (uint16_t)len);
> >> +       for (i = 0; i < nb_rx; i++) {
> >> +               odp_packet_hdr_t *pkt_hdr =
> odp_packet_hdr(pkt_table[i]);
> >> +               struct rte_mbuf *mb = &pkt_hdr->buf_hdr.mb;
> >> +               odp_packet_parse(pkt_table[i], mb->pkt.pkt_len, 0);
> >> +       }
> >>         return nb_rx;
> >>  }
> >>
> >> diff --git a/platform/linux-dpdk/odp_queue.c
> >> b/platform/linux-dpdk/odp_queue.c
> >> index 554b8ea..29fae8f 100644
> >> --- a/platform/linux-dpdk/odp_queue.c
> >> +++ b/platform/linux-dpdk/odp_queue.c
> >> @@ -239,11 +239,11 @@ int queue_enq(queue_entry_t *queue,
> odp_buffer_hdr_t
> >> *buf_hdr)
> >>                 /* Empty queue */
> >>                 queue->s.head = buf_hdr;
> >>                 queue->s.tail = buf_hdr;
> >> -               buf_hdr->pkt.next = NULL;
> >> +               buf_hdr->next = NULL;
> >>         } else {
> >> -               queue->s.tail->pkt.next = buf_hdr;
> >> +               queue->s.tail->next = buf_hdr;
> >>                 queue->s.tail = buf_hdr;
> >> -               buf_hdr->pkt.next = NULL;
> >> +               buf_hdr->next = NULL;
> >>         }
> >>
> >>         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> >> @@ -267,17 +267,17 @@ int queue_enq_multi(queue_entry_t *queue,
> >> odp_buffer_hdr_t *buf_hdr[], int num)
> >>         odp_buffer_hdr_t *tail;
> >>
> >>         for (i = 0; i < num - 1; i++)
> >> -               buf_hdr[i]->pkt.next = buf_hdr[i+1];
> >> +               buf_hdr[i]->next = buf_hdr[i+1];
> >>
> >>         tail = buf_hdr[num-1];
> >> -       buf_hdr[num-1]->pkt.next = NULL;
> >> +       buf_hdr[num-1]->next = NULL;
> >>
> >>         LOCK(&queue->s.lock);
> >>         /* Empty queue */
> >>         if (queue->s.head == NULL)
> >>                 queue->s.head = buf_hdr[0];
> >>         else
> >> -               queue->s.tail->pkt.next = buf_hdr[0];
> >> +               queue->s.tail->next = buf_hdr[0];
> >>
> >>         queue->s.tail = tail;
> >>
> >> @@ -338,8 +338,8 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
> >>                         queue->s.status = QUEUE_STATUS_NOTSCHED;
> >>         } else {
> >>                 buf_hdr       = queue->s.head;
> >> -               queue->s.head = buf_hdr->pkt.next;
> >> -               buf_hdr->pkt.next = NULL;
> >> +               queue->s.head = buf_hdr->next;
> >> +               buf_hdr->next = NULL;
> >>
> >>                 if (queue->s.head == NULL) {
> >>                         /* Queue is now empty */
> >> @@ -370,8 +370,8 @@ int queue_deq_multi(queue_entry_t *queue,
> >> odp_buffer_hdr_t *buf_hdr[], int num)
> >>                 for (; i < num && hdr; i++) {
> >>                         buf_hdr[i]       = hdr;
> >>                         /* odp_prefetch(hdr->addr); */
> >> -                       hdr              = hdr->pkt.next;
> >> -                       buf_hdr[i]->pkt.next = NULL;
> >> +                       hdr              = hdr->next;
> >> +                       buf_hdr[i]->next = NULL;
> >>                 }
> >>
> >>                 queue->s.head = hdr;
>

odp_schedule.c is updated after this patch is sent which again causes
compilation issue. Please pull in the latest odp_schedule.c linux-generic.

>> diff --git a/platform/linux-dpdk/odp_schedule.c
> >> b/platform/linux-dpdk/odp_schedule.c
> >> new file mode 100644
> >> index 0000000..462b8eb
> >> --- /dev/null
> >> +++ b/platform/linux-dpdk/odp_schedule.c
> >> @@ -0,0 +1,417 @@
> >> +/* Copyright (c) 2013, Linaro Limited
> >> + * All rights reserved.
> >> + *
> >> + * SPDX-License-Identifier:     BSD-3-Clause
> >> + */
> >> +
> >> +#include <odp_schedule.h>
> >> +#include <odp_schedule_internal.h>
> >> +#include <odp_align.h>
> >> +#include <odp_queue.h>
> >> +#include <odp_shared_memory.h>
> >> +#include <odp_buffer.h>
> >> +#include <odp_buffer_pool.h>
> >> +#include <odp_internal.h>
> >> +#include <odp_config.h>
> >> +#include <odp_debug.h>
> >> +#include <odp_thread.h>
> >> +#include <odp_time.h>
> >> +#include <odp_spinlock.h>
> >> +#include <odp_hints.h>
> >> +
> >> +#include <odp_queue_internal.h>
> >> +
> >> +
> >> +/* Limits to number of scheduled queues */
> >> +#define SCHED_POOL_SIZE (256*1024)
> >> +
> >> +/* Scheduler sub queues */
> >> +#define QUEUES_PER_PRIO  4
> >> +
> >> +/* TODO: random or queue based selection */
> >> +#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
> >> +
> >> +/* Maximum number of dequeues */
> >> +#define MAX_DEQ 4
> >> +
> >> +
> >> +/* Mask of queues per priority */
> >> +typedef uint8_t pri_mask_t;
> >> +
> >> +ODP_STATIC_ASSERT((8*sizeof(pri_mask_t)) >= QUEUES_PER_PRIO,
> >> "pri_mask_t_is_too_small");
> >> +
> >> +
> >> +typedef struct {
> >> +       odp_queue_t
> >> pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
> >> +       pri_mask_t        pri_mask[ODP_CONFIG_SCHED_PRIOS];
> >> +       odp_spinlock_t    mask_lock;
> >> +       odp_buffer_pool_t pool;
> >> +} sched_t;
> >> +
> >> +typedef struct {
> >> +       odp_queue_t queue;
> >> +
> >> +} queue_desc_t;
> >> +
> >> +typedef struct {
> >> +       odp_queue_t  pri_queue;
> >> +       odp_buffer_t desc_buf;
> >> +
> >> +       odp_buffer_t buf[MAX_DEQ];
> >> +       int num;
> >> +       int index;
> >> +       odp_queue_t queue;
> >> +       int pause;
> >> +
> >> +} sched_local_t;
> >> +
> >> +/* Global scheduler context */
> >> +static sched_t *sched;
> >> +
> >> +/* Thread local scheduler context */
> >> +static __thread sched_local_t sched_local;
> >> +
> >> +
> >> +static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
> >> +{
> >> +       int id = SEL_PRI_QUEUE(queue);
> >> +       return sched->pri_queue[prio][id];
> >> +}
> >> +
> >> +
> >> +int odp_schedule_init_global(void)
> >> +{
> >> +       odp_buffer_pool_t pool;
> >> +       int i, j;
> >> +
> >> +       ODP_DBG("Schedule init ... ");
> >> +
> >> +       sched = odp_shm_reserve("odp_scheduler",
> >> +                               sizeof(sched_t),
> >> +                               ODP_CACHE_LINE_SIZE);
> >> +
> >> +       if (sched == NULL) {
> >> +               ODP_ERR("Schedule init: Shm reserve failed.\n");
> >> +               return -1;
> >> +       }
> >> +
> >> +       pool = odp_buffer_pool_create("odp_sched_pool", NULL,
> >> +                                     SCHED_POOL_SIZE,
> >> sizeof(queue_desc_t),
> >> +                                     ODP_CACHE_LINE_SIZE,
> >> +                                     ODP_BUFFER_TYPE_RAW);
> >> +
> >> +       if (pool == ODP_BUFFER_POOL_INVALID) {
> >> +               ODP_ERR("Schedule init: Pool create failed.\n");
> >> +               return -1;
> >> +       }
> >> +
> >> +       sched->pool = pool;
> >> +       odp_spinlock_init(&sched->mask_lock);
> >> +
> >> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
> >> +               odp_queue_t queue;
> >> +               char name[] = "odp_priXX_YY";
> >> +
> >> +               name[7] = '0' + i / 10;
> >> +               name[8] = '0' + i - 10*(i / 10);
> >> +
> >> +               for (j = 0; j < QUEUES_PER_PRIO; j++) {
> >> +                       name[10] = '0' + j / 10;
> >> +                       name[11] = '0' + j - 10*(j / 10);
> >> +
> >> +                       queue = odp_queue_create(name,
> >> +                                                ODP_QUEUE_TYPE_POLL,
> >> NULL);
> >> +
> >> +                       if (queue == ODP_QUEUE_INVALID) {
> >> +                               ODP_ERR("Sched init: Queue create
> >> failed.\n");
> >> +                               return -1;
> >> +                       }
> >> +
> >> +                       sched->pri_queue[i][j] = queue;
> >> +                       sched->pri_mask[i]     = 0;
> >> +               }
> >> +       }
> >> +
> >> +       ODP_DBG("done\n");
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +
> >> +int odp_schedule_init_local(void)
> >> +{
> >> +       int i;
> >> +
> >> +       sched_local.pri_queue = ODP_QUEUE_INVALID;
> >> +       sched_local.desc_buf  = ODP_BUFFER_INVALID;
> >> +
> >> +       for (i = 0; i < MAX_DEQ; i++)
> >> +               sched_local.buf[i] = ODP_BUFFER_INVALID;
> >> +
> >> +       sched_local.num   = 0;
> >> +       sched_local.index = 0;
> >> +       sched_local.queue = ODP_QUEUE_INVALID;
> >> +       sched_local.pause = 0;
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +
> >> +void odp_schedule_mask_set(odp_queue_t queue, int prio)
> >> +{
> >> +       int id = SEL_PRI_QUEUE(queue);
> >> +
> >> +       odp_spinlock_lock(&sched->mask_lock);
> >> +       sched->pri_mask[prio] |= 1 << id;
> >> +       odp_spinlock_unlock(&sched->mask_lock);
> >> +}
> >> +
> >> +
> >> +odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
> >> +{
> >> +       odp_buffer_t buf;
> >> +
> >> +       buf = odp_buffer_alloc(sched->pool);
> >> +
> >> +       if (buf != ODP_BUFFER_INVALID) {
> >> +               queue_desc_t *desc;
> >> +               desc        = odp_buffer_addr(buf);
> >> +               desc->queue = queue;
> >> +       }
> >> +
> >> +       return buf;
> >> +}
> >> +
> >> +
> >> +void odp_schedule_queue(odp_queue_t queue, int prio)
> >> +{
> >> +       odp_buffer_t desc_buf;
> >> +       odp_queue_t  pri_queue;
> >> +
> >> +       pri_queue = select_pri_queue(queue, prio);
> >> +       desc_buf  = queue_sched_buf(queue);
> >> +
> >> +       odp_queue_enq(pri_queue, desc_buf);
> >> +}
> >> +
> >> +
> >> +void odp_schedule_release_atomic(void)
> >> +{
> >> +       if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
> >> +           sched_local.num       == 0) {
> >> +               /* Release current atomic queue */
> >> +               odp_queue_enq(sched_local.pri_queue,
> >> sched_local.desc_buf);
> >> +               sched_local.pri_queue = ODP_QUEUE_INVALID;
> >> +       }
> >> +}
> >> +
> >> +
> >> +static inline int copy_bufs(odp_buffer_t out_buf[], unsigned int max)
> >> +{
> >> +       int i = 0;
> >> +
> >> +       while (sched_local.num && max) {
> >> +               out_buf[i] = sched_local.buf[sched_local.index];
> >> +               sched_local.index++;
> >> +               sched_local.num--;
> >> +               max--;
> >> +               i++;
> >> +       }
> >> +
> >> +       return i;
> >> +}
> >> +
> >> +
> >> +/*
> >> + * Schedule queues
> >> + *
> >> + * TODO: SYNC_ORDERED not implemented yet
> >> + */
> >> +static int schedule(odp_queue_t *out_queue, odp_buffer_t out_buf[],
> >> +                   unsigned int max_num, unsigned int max_deq)
> >> +{
> >> +       int i, j;
> >> +       int thr;
> >> +       int ret;
> >> +
> >> +       if (sched_local.num) {
> >> +               ret = copy_bufs(out_buf, max_num);
> >> +
> >> +               if (out_queue)
> >> +                       *out_queue = sched_local.queue;
> >> +
> >> +               return ret;
> >> +       }
> >> +
> >> +       odp_schedule_release_atomic();
> >> +
> >> +       if (odp_unlikely(sched_local.pause))
> >> +               return 0;
> >> +
> >> +       thr = odp_thread_id();
> >> +
> >> +       for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
> >> +               int id;
> >> +
> >> +               if (sched->pri_mask[i] == 0)
> >> +                       continue;
> >> +
> >> +               id = thr & (QUEUES_PER_PRIO-1);
> >> +
> >> +               for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
> >> +                       odp_queue_t  pri_q;
> >> +                       odp_buffer_t desc_buf;
> >> +
> >> +                       if (id >= QUEUES_PER_PRIO)
> >> +                               id = 0;
> >> +
> >> +                       if (odp_unlikely((sched->pri_mask[i] & (1 <<
> id))
> >> == 0))
> >> +                               continue;
> >> +
> >> +                       pri_q    = sched->pri_queue[i][id];
> >> +                       desc_buf = odp_queue_deq(pri_q);
> >> +
> >> +                       if (desc_buf != ODP_BUFFER_INVALID) {
> >> +                               queue_desc_t *desc;
> >> +                               odp_queue_t queue;
> >> +                               int num;
> >> +
> >> +                               desc  = odp_buffer_addr(desc_buf);
> >> +                               queue = desc->queue;
> >> +
> >> +                               num = odp_queue_deq_multi(queue,
> >> +
>  sched_local.buf,
> >> +                                                         max_deq);
> >> +
> >> +                               if (num == 0) {
> >> +                                       /* Remove empty queue from
> >> scheduling,
> >> +                                        * except packet input queues
> >> +                                        */
> >> +                                       if (odp_queue_type(queue) ==
> >> +                                           ODP_QUEUE_TYPE_PKTIN)
> >> +                                               odp_queue_enq(pri_q,
> >> desc_buf);
> >> +
> >> +                                       continue;
> >> +                               }
> >> +
> >> +                               sched_local.num   = num;
> >> +                               sched_local.index = 0;
> >> +                               ret = copy_bufs(out_buf, max_num);
> >> +
> >> +                               sched_local.queue = queue;
> >> +
> >> +                               if (queue_sched_atomic(queue)) {
> >> +                                       /* Hold queue during atomic
> access
> >> */
> >> +                                       sched_local.pri_queue = pri_q;
> >> +                                       sched_local.desc_buf  =
> desc_buf;
> >> +                               } else {
> >> +                                       /* Continue scheduling the queue
> >> */
> >> +                                       odp_queue_enq(pri_q, desc_buf);
> >> +                               }
> >> +
> >> +                               /* Output the source queue handle */
> >> +                               if (out_queue)
> >> +                                       *out_queue = queue;
> >> +
> >> +                               return ret;
> >> +                       }
> >> +               }
> >> +       }
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +
> >> +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
> >> +                         odp_buffer_t out_buf[],
> >> +                         unsigned int max_num, unsigned int max_deq)
> >> +{
> >> +       uint64_t start_cycle, cycle, diff;
> >> +       int ret;
> >> +
> >> +       start_cycle = 0;
> >> +
> >> +       while (1) {
> >> +               ret = schedule(out_queue, out_buf, max_num, max_deq);
> >> +
> >> +               if (ret)
> >> +                       break;
> >> +
> >> +               if (wait == ODP_SCHED_WAIT)
> >> +                       continue;
> >> +
> >> +               if (wait == ODP_SCHED_NO_WAIT)
> >> +                       break;
> >> +
> >> +               if (start_cycle == 0) {
> >> +                       start_cycle = odp_time_get_cycles();
> >> +                       continue;
> >> +               }
> >> +
> >> +               cycle = odp_time_get_cycles();
> >> +               diff  = odp_time_diff_cycles(start_cycle, cycle);
> >> +
> >> +               if (wait < diff)
> >> +                       break;
> >> +       }
> >> +
> >> +       return ret;
> >> +}
> >> +
> >> +
> >> +odp_buffer_t odp_schedule(odp_queue_t *out_queue, uint64_t wait)
> >> +{
> >> +       odp_buffer_t buf;
> >> +
> >> +       buf = ODP_BUFFER_INVALID;
> >> +
> >> +       schedule_loop(out_queue, wait, &buf, 1, MAX_DEQ);
> >> +
> >> +       return buf;
> >> +}
> >> +
> >> +
> >> +odp_buffer_t odp_schedule_one(odp_queue_t *out_queue, uint64_t wait)
> >> +{
> >> +       odp_buffer_t buf;
> >> +
> >> +       buf = ODP_BUFFER_INVALID;
> >> +
> >> +       schedule_loop(out_queue, wait, &buf, 1, 1);
> >> +
> >> +       return buf;
> >> +}
> >> +
> >> +
> >> +int odp_schedule_multi(odp_queue_t *out_queue, uint64_t wait,
> >> +                      odp_buffer_t out_buf[], unsigned int num)
> >> +{
> >> +       return schedule_loop(out_queue, wait, out_buf, num, MAX_DEQ);
> >> +}
> >> +
> >> +
> >> +void odp_schedule_pause(void)
> >> +{
> >> +       sched_local.pause = 1;
> >> +}
> >> +
> >> +
> >> +void odp_schedule_resume(void)
> >> +{
> >> +       sched_local.pause = 0;
> >> +}
> >> +
> >> +
> >> +uint64_t odp_schedule_wait_time(uint64_t ns)
> >> +{
> >> +       if (ns <= ODP_SCHED_NO_WAIT)
> >> +               ns = ODP_SCHED_NO_WAIT + 1;
> >> +
> >> +       return odp_time_ns_to_cycles(ns);
> >> +}
> >> +
> >> +
> >> +int odp_schedule_num_prio(void)
> >> +{
> >> +       return ODP_CONFIG_SCHED_PRIOS;
> >> +}
> >
> >
> > Reviewed-by: Venkatesh Vivekanandan <venkatesh.vivekanandan@linaro.org>
> >>
> >> --
> >> 1.8.3.2
> >>
> >>
> >> _______________________________________________
> >> lng-odp mailing list
> >> lng-odp@lists.linaro.org
> >> http://lists.linaro.org/mailman/listinfo/lng-odp
> >
> >
>
diff mbox

Patch

diff --git a/platform/linux-dpdk/Makefile.am b/platform/linux-dpdk/Makefile.am
index 1eabd9f..e128bf8 100644
--- a/platform/linux-dpdk/Makefile.am
+++ b/platform/linux-dpdk/Makefile.am
@@ -79,7 +79,7 @@  __LIB__libodp_la_SOURCES = \
 			   odp_queue.c \
 			   ../linux-generic/odp_ring.c \
 			   ../linux-generic/odp_rwlock.c \
-			   ../linux-generic/odp_schedule.c \
+			   odp_schedule.c \
 			   ../linux-generic/odp_shared_memory.c \
 			   ../linux-generic/odp_spinlock.c \
 			   ../linux-generic/odp_system_info.c \
diff --git a/platform/linux-dpdk/include/api/odp_buffer.h b/platform/linux-dpdk/include/api/odp_buffer.h
index 9ea1ed8..b2fbc76 100644
--- a/platform/linux-dpdk/include/api/odp_buffer.h
+++ b/platform/linux-dpdk/include/api/odp_buffer.h
@@ -32,7 +32,7 @@  extern "C" {
 typedef unsigned long odp_buffer_t;
 
 
-#define ODP_BUFFER_INVALID (0xffffffff) /**< Invalid buffer */
+#define ODP_BUFFER_INVALID (unsigned long)(-1L) /**< Invalid buffer */
 
 
 /**
diff --git a/platform/linux-dpdk/include/api/odp_buffer_pool.h b/platform/linux-dpdk/include/api/odp_buffer_pool.h
index 4b75cf5..382f4f0 100644
--- a/platform/linux-dpdk/include/api/odp_buffer_pool.h
+++ b/platform/linux-dpdk/include/api/odp_buffer_pool.h
@@ -27,7 +27,7 @@  extern "C" {
 #define ODP_BUFFER_POOL_NAME_LEN  32
 
 /** Invalid buffer pool */
-#define ODP_BUFFER_POOL_INVALID  (0xffffffff)
+#define ODP_BUFFER_POOL_INVALID  (unsigned long)(-1L)
 
 /** ODP buffer pool */
 typedef unsigned long odp_buffer_pool_t;
diff --git a/platform/linux-dpdk/include/api/odp_packet.h b/platform/linux-dpdk/include/api/odp_packet.h
index 5545bdc..79503a5 100644
--- a/platform/linux-dpdk/include/api/odp_packet.h
+++ b/platform/linux-dpdk/include/api/odp_packet.h
@@ -80,6 +80,23 @@  void odp_packet_set_len(odp_packet_t pkt, size_t len);
 size_t odp_packet_get_len(odp_packet_t pkt);
 
 /**
+ * Set packet user context
+ *
+ * @param buf      Packet handle
+ * @param ctx      User context
+ *
+ */
+void odp_packet_set_ctx(odp_packet_t buf, const void *ctx);
+
+/**
+ * Get packet user context
+ *
+ * @param buf      Packet handle
+ *
+ * @return User context
+ */
+void *odp_packet_get_ctx(odp_packet_t buf);
+/**
  * Get address to the start of the packet buffer
  *
  * The address of the packet buffer is not necessarily the same as the start
diff --git a/platform/linux-dpdk/include/odp_buffer_internal.h b/platform/linux-dpdk/include/odp_buffer_internal.h
index f87ec80..5406606 100644
--- a/platform/linux-dpdk/include/odp_buffer_internal.h
+++ b/platform/linux-dpdk/include/odp_buffer_internal.h
@@ -59,8 +59,12 @@  typedef union odp_buffer_bits_t {
 struct odp_buffer_hdr_t;
 
 
-typedef struct rte_mbuf odp_buffer_hdr_t;
-
+typedef struct odp_buffer_hdr_t {
+	struct rte_mbuf mb;            /* Underlying DPDK rte_mbuf */
+	struct odp_buffer_hdr_t *next; /* Next buf in a list */
+	int type;                      /* ODP buffer type; not DPDK buf type */
+	uint32_t index;                /* Index in the rte_mempool */
+} odp_buffer_hdr_t;
 
 int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf);
 
diff --git a/platform/linux-dpdk/include/odp_packet_internal.h b/platform/linux-dpdk/include/odp_packet_internal.h
index 9357f90..d7f505b 100644
--- a/platform/linux-dpdk/include/odp_packet_internal.h
+++ b/platform/linux-dpdk/include/odp_packet_internal.h
@@ -113,13 +113,8 @@  typedef struct {
 	uint32_t l3_offset; /**< offset to L3 hdr, e.g. IPv4, IPv6 */
 	uint32_t l4_offset; /**< offset to L4 hdr (TCP, UDP, SCTP, also ICMP) */
 
-	uint32_t frame_len;
-
 	odp_pktio_t input;
-
-	uint32_t pad;
-	uint8_t payload[];
-
+	uint64_t user_ctx;  /**< user context */
 } odp_packet_hdr_t;
 
 /**
diff --git a/platform/linux-dpdk/include/odp_packet_io_internal.h b/platform/linux-dpdk/include/odp_packet_io_internal.h
index 08abea7..9263349 100644
--- a/platform/linux-dpdk/include/odp_packet_io_internal.h
+++ b/platform/linux-dpdk/include/odp_packet_io_internal.h
@@ -31,11 +31,6 @@  struct pktio_entry {
 	odp_queue_t inq_default;	/**< default input queue, if set */
 	odp_queue_t outq_default;	/**< default out queue */
 	odp_pktio_params_t params;	/**< pktio parameters */
-	pkt_sock_t pkt_sock;		/**< using socket API for IO */
-	pkt_sock_mmap_t pkt_sock_mmap;	/**< using socket mmap API for IO */
-#ifdef ODP_HAVE_NETMAP
-	pkt_netmap_t pkt_nm;		/**< using netmap API for IO */
-#endif
 	pkt_dpdk_t pkt_dpdk;		/**< using DPDK API for IO */
 };
 
diff --git a/platform/linux-dpdk/odp_buffer.c b/platform/linux-dpdk/odp_buffer.c
index e2f8942..e2657e4 100644
--- a/platform/linux-dpdk/odp_buffer.c
+++ b/platform/linux-dpdk/odp_buffer.c
@@ -16,7 +16,7 @@  void *odp_buffer_addr(odp_buffer_t buf)
 {
 	odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
 
-	return hdr->buf_addr;
+	return hdr->mb.buf_addr;
 }
 
 
@@ -24,7 +24,7 @@  size_t odp_buffer_size(odp_buffer_t buf)
 {
 	odp_buffer_hdr_t *hdr = odp_buf_to_hdr(buf);
 
-	return hdr->buf_len;
+	return hdr->mb.buf_len;
 }
 
 
@@ -38,11 +38,9 @@  int odp_buffer_type(odp_buffer_t buf)
 
 int odp_buffer_is_valid(odp_buffer_t buf)
 {
-	odp_buffer_bits_t handle;
-
-	handle.u32 = buf;
-
-	return (handle.index != ODP_BUFFER_INVALID_INDEX);
+	/* We could call rte_mbuf_sanity_check, but that panics
+	 * and aborts the program */
+	return (void*)buf != NULL;
 }
 
 
@@ -61,17 +59,19 @@  int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf)
 	len += snprintf(&str[len], n-len,
 			"Buffer\n");
 	len += snprintf(&str[len], n-len,
-			"  pool         %"PRIu64"\n", (int64_t) hdr->pool);
+			"  pool         %"PRIu64"\n", (int64_t) hdr->mb.pool);
+	len += snprintf(&str[len], n-len,
+			"  phy_addr     %"PRIu64"\n", hdr->mb.buf_physaddr);
 	len += snprintf(&str[len], n-len,
-			"  phy_addr     %"PRIu64"\n", hdr->buf_physaddr);
+			"  addr         %p\n",        hdr->mb.buf_addr);
 	len += snprintf(&str[len], n-len,
-			"  addr         %p\n",        hdr->buf_addr);
+			"  size         %u\n",        hdr->mb.buf_len);
 	len += snprintf(&str[len], n-len,
-			"  size         %u\n",        hdr->buf_len);
+			"  ref_count    %i\n",        hdr->mb.refcnt);
 	len += snprintf(&str[len], n-len,
-			"  ref_count    %i\n",        hdr->refcnt);
+			"  dpdk type    %i\n",        hdr->mb.type);
 	len += snprintf(&str[len], n-len,
-			"  type         %i\n",        hdr->type);
+			"  odp type     %i\n",        hdr->type);
 
 	return len;
 }
diff --git a/platform/linux-dpdk/odp_buffer_pool.c b/platform/linux-dpdk/odp_buffer_pool.c
index 805ce68..f044b5d 100644
--- a/platform/linux-dpdk/odp_buffer_pool.c
+++ b/platform/linux-dpdk/odp_buffer_pool.c
@@ -9,6 +9,7 @@ 
 #include <odp_buffer_pool_internal.h>
 #include <odp_buffer_internal.h>
 #include <odp_packet_internal.h>
+#include <odp_timer_internal.h>
 #include <odp_shared_memory.h>
 #include <odp_align.h>
 #include <odp_internal.h>
@@ -44,6 +45,13 @@ 
 
 #define NULL_INDEX ((uint32_t)-1)
 
+union buffer_type_any_u {
+	odp_buffer_hdr_t  buf;
+	odp_packet_hdr_t  pkt;
+	odp_timeout_hdr_t tmo;
+};
+
+typedef union buffer_type_any_u odp_any_buffer_hdr_t;
 
 typedef union pool_entry_u {
 	struct pool_entry_s s;
@@ -59,7 +67,7 @@  typedef struct pool_table_t {
 } pool_table_t;
 
 
-/* The pool table */
+/* The pool table ptr - resides in shared memory */
 static pool_table_t *pool_tbl;
 
 /* Pool entry pointers (for inlining) */
@@ -98,31 +106,151 @@  int odp_buffer_pool_init_global(void)
 	return 0;
 }
 
+struct mbuf_ctor_arg {
+	uint16_t seg_buf_offset; /* To skip the ODP buf/pkt/tmo header */
+	uint16_t seg_buf_size;   /* total sz: offset + user sz + HDROOM */
+	int buf_type;
+};
+
+struct mbuf_pool_ctor_arg {
+	uint16_t seg_buf_size; /* size of mbuf: user specified sz + HDROOM */
+};
+
+static void
+odp_dpdk_mbuf_pool_ctor(struct rte_mempool *mp,
+			void *opaque_arg)
+{
+	struct mbuf_pool_ctor_arg      *mbp_ctor_arg;
+	struct rte_pktmbuf_pool_private *mbp_priv;
+
+	if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) {
+		ODP_ERR("%s(%s) private_data_size %d < %d",
+			__func__, mp->name, (int) mp->private_data_size,
+			(int) sizeof(struct rte_pktmbuf_pool_private));
+		return;
+	}
+	mbp_ctor_arg = (struct mbuf_pool_ctor_arg *) opaque_arg;
+	mbp_priv = rte_mempool_get_priv(mp);
+	mbp_priv->mbuf_data_room_size = mbp_ctor_arg->seg_buf_size;
+}
+
+/* ODP DPDK mbuf constructor.
+ * This is a combination of rte_pktmbuf_init in rte_mbuf.c
+ * and testpmd_mbuf_ctor in testpmd.c
+ */
+static void
+odp_dpdk_mbuf_ctor(struct rte_mempool *mp,
+		   void *opaque_arg,
+		   void *raw_mbuf,
+		   unsigned i)
+{
+	struct mbuf_ctor_arg *mb_ctor_arg;
+	struct rte_mbuf *mb = raw_mbuf;
+	struct odp_buffer_hdr_t *buf_hdr;
+
+	/* The rte_mbuf is at the begninning in all cases */
+	mb_ctor_arg = (struct mbuf_ctor_arg *) opaque_arg;
+	mb = (struct rte_mbuf *) raw_mbuf;
+
+	RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf));
+
+	memset(mb, 0, mp->elt_size);
+
+	/* Start of buffer is just after the ODP type specific header
+	 * which contains in the very beginning the rte_mbuf struct */
+	mb->buf_addr     = (char *)mb + mb_ctor_arg->seg_buf_offset;
+	mb->buf_physaddr = rte_mempool_virt2phy(mp, mb) +
+			mb_ctor_arg->seg_buf_offset;
+	mb->buf_len      = mb_ctor_arg->seg_buf_size;
+	
+	/* keep some headroom between start of buffer and data */
+	if (mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_PACKET ||
+	    mb_ctor_arg->buf_type == ODP_BUFFER_TYPE_ANY)
+		mb->pkt.data = (char *) mb->buf_addr + RTE_PKTMBUF_HEADROOM;
+	else
+		mb->pkt.data = mb->buf_addr;
+
+	/* init some constant fields */
+	mb->type         = RTE_MBUF_PKT;
+	mb->pool         = mp;
+	mb->pkt.nb_segs  = 1;
+	mb->pkt.in_port  = 0xff;
+	mb->ol_flags     = 0;
+	mb->pkt.vlan_macip.data = 0;
+	mb->pkt.hash.rss = 0;
+
+	/* Save index, might be useful for debugging purposes */
+	buf_hdr = (struct odp_buffer_hdr_t*) raw_mbuf;
+	buf_hdr->index = i;
+}
 
 odp_buffer_pool_t odp_buffer_pool_create(const char *name,
 					 void *base_addr, uint64_t size,
 					 size_t buf_size, size_t buf_align,
 					 int buf_type)
 {
-	struct rte_mempool *pktmbuf_pool = NULL;
+	struct rte_mempool *pool = NULL;
+	struct mbuf_pool_ctor_arg mbp_ctor_arg;
+	struct mbuf_ctor_arg mb_ctor_arg;
+	unsigned mb_size;
+
+	/* Not used for rte_mempool; the new ODP buffer management introduces
+	 * rte_mempool_create_from_region where base_addr makes sense */
+	(void)base_addr;
+
+	/* buf_align will be removed soon, no need to wory about it */
+	(void)buf_align;
+
 	ODP_DBG("odp_buffer_pool_create: %s, %lx, %u, %u, %u, %d\n", name,
 		(uint64_t) base_addr, (unsigned) size,
 		(unsigned) buf_size, (unsigned) buf_align,
 		buf_type);
 
-	pktmbuf_pool =
-		rte_mempool_create(name, NB_MBUF,
-				   MBUF_SIZE, MAX_PKT_BURST,
-				   sizeof(struct rte_pktmbuf_pool_private),
-				   rte_pktmbuf_pool_init, NULL,
-				   rte_pktmbuf_init, NULL,
-				   rte_socket_id(), 0);
-	if (pktmbuf_pool == NULL) {
+	switch (buf_type) {
+	case ODP_BUFFER_TYPE_RAW:
+		mb_ctor_arg.seg_buf_offset = 
+			(uint16_t) CACHE_LINE_ROUNDUP(sizeof(odp_buffer_hdr_t));
+		mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
+		break;
+	case ODP_BUFFER_TYPE_PACKET:
+		mb_ctor_arg.seg_buf_offset = 
+			(uint16_t) CACHE_LINE_ROUNDUP(sizeof(odp_packet_hdr_t));
+		mbp_ctor_arg.seg_buf_size =
+			(uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
+		break;
+	case ODP_BUFFER_TYPE_TIMEOUT:
+		mb_ctor_arg.seg_buf_offset =
+			(uint16_t) CACHE_LINE_ROUNDUP(sizeof(odp_timeout_hdr_t));
+		mbp_ctor_arg.seg_buf_size = (uint16_t) buf_size;
+		break;
+	case ODP_BUFFER_TYPE_ANY:
+		mb_ctor_arg.seg_buf_offset =
+			(uint16_t) CACHE_LINE_ROUNDUP(sizeof(odp_any_buffer_hdr_t));
+		mbp_ctor_arg.seg_buf_size =
+			(uint16_t) (RTE_PKTMBUF_HEADROOM + buf_size);
+		break;
+	default:
+		ODP_ERR("odp_buffer_pool_create: Bad type %i\n", buf_type);
+		exit(0);
+		break;
+	}
+
+	mb_ctor_arg.seg_buf_size = mbp_ctor_arg.seg_buf_size;
+	mb_ctor_arg.buf_type = buf_type;
+	mb_size = mb_ctor_arg.seg_buf_offset + mb_ctor_arg.seg_buf_size;
+
+	pool = rte_mempool_create(name, NB_MBUF,
+				  mb_size, MAX_PKT_BURST,
+				  sizeof(struct rte_pktmbuf_pool_private),
+				  odp_dpdk_mbuf_pool_ctor, &mbp_ctor_arg,
+				  odp_dpdk_mbuf_ctor, &mb_ctor_arg,
+				  rte_socket_id(), 0);
+	if (pool == NULL) {
 		ODP_ERR("Cannot init DPDK mbuf pool\n");
 		return -1;
 	}
 
-	return (odp_buffer_pool_t) pktmbuf_pool;
+	return (odp_buffer_pool_t) pool;
 }
 
 
diff --git a/platform/linux-dpdk/odp_packet.c b/platform/linux-dpdk/odp_packet.c
index edfd06d..7afaba6 100644
--- a/platform/linux-dpdk/odp_packet.c
+++ b/platform/linux-dpdk/odp_packet.c
@@ -23,13 +23,13 @@  static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
 void odp_packet_init(odp_packet_t pkt)
 {
 	odp_packet_hdr_t *const pkt_hdr = odp_packet_hdr(pkt);
-	const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t, buf_hdr);
-	uint8_t *start;
-	size_t len;
+	struct rte_mbuf *mb;
+	void *start;
 
-	start = (uint8_t *)pkt_hdr + start_offset;
-	len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
-	memset(start, 0, len);
+	mb = &pkt_hdr->buf_hdr.mb;
+
+	start = mb->buf_addr;
+	memset(start, 0, mb->buf_len);
 
 	pkt_hdr->l2_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
 	pkt_hdr->l3_offset = (uint32_t) ODP_PACKET_OFFSET_INVALID;
@@ -46,18 +46,47 @@  odp_buffer_t odp_buffer_from_packet(odp_packet_t pkt)
 	return (odp_buffer_t)pkt;
 }
 
-void odp_packet_set_len(odp_packet_t pkt, size_t len)
+/* Advance the pkt data pointer and set len in one call */
+static int odp_packet_set_offset_len(odp_packet_t pkt, size_t frame_offset,
+                                     size_t len)
 {
-	/* for rte_pktmbuf */
-	odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_packet(pkt));
-	buf_hdr->pkt.data_len = len;
+	struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
+	uint16_t offset;
+	uint16_t data_len;
+
+	/* The pkt buf may have been pulled back into the headroom
+	 * so we cannot rely on finding the data right after the 
+	 * ODP header and HEADROOM */
+	offset = (uint16_t)((unsigned long)mb->pkt.data -
+			    (unsigned long)mb->buf_addr);
+	ODP_ASSERT(mb->buf_len >= offset, "Corrupted mbuf");
+	data_len = mb->buf_len - offset;
+
+	if (data_len < frame_offset) {
+		ODP_ERR("Frame offset too big");
+		return -1;
+	}
+	mb->pkt.data = (void*)((char*)mb->pkt.data + frame_offset);
+	data_len -= frame_offset;
 
-	odp_packet_hdr(pkt)->frame_len = len;
+	if (data_len < len) {
+		ODP_ERR("Packet len too big");
+		return -1;
+	}
+	mb->pkt.pkt_len = len;
+
+	return 0;
+}
+
+void odp_packet_set_len(odp_packet_t pkt, size_t len)
+{
+	(void)odp_packet_set_offset_len(pkt, 0, len);
 }
 
 size_t odp_packet_get_len(odp_packet_t pkt)
 {
-	return odp_packet_hdr(pkt)->frame_len;
+	struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
+	return mb->pkt.pkt_len;
 }
 
 uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
@@ -67,7 +96,8 @@  uint8_t *odp_packet_buf_addr(odp_packet_t pkt)
 
 uint8_t *odp_packet_start(odp_packet_t pkt)
 {
-	return odp_packet_buf_addr(pkt) + odp_packet_hdr(pkt)->frame_offset;
+	struct rte_mbuf *mb = &(odp_packet_hdr(pkt)->buf_hdr.mb);
+	return mb->pkt.data;
 }
 
 
@@ -78,7 +108,7 @@  uint8_t *odp_packet_l2(odp_packet_t pkt)
 	if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
 		return NULL;
 
-	return odp_packet_buf_addr(pkt) + offset;
+	return odp_packet_start(pkt) + offset;
 }
 
 size_t odp_packet_l2_offset(odp_packet_t pkt)
@@ -98,7 +128,7 @@  uint8_t *odp_packet_l3(odp_packet_t pkt)
 	if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
 		return NULL;
 
-	return odp_packet_buf_addr(pkt) + offset;
+	return odp_packet_start(pkt) + offset;
 }
 
 size_t odp_packet_l3_offset(odp_packet_t pkt)
@@ -118,7 +148,7 @@  uint8_t *odp_packet_l4(odp_packet_t pkt)
 	if (odp_unlikely(offset == ODP_PACKET_OFFSET_INVALID))
 		return NULL;
 
-	return odp_packet_buf_addr(pkt) + offset;
+	return odp_packet_start(pkt) + offset;
 }
 
 size_t odp_packet_l4_offset(odp_packet_t pkt)
@@ -152,9 +182,13 @@  void odp_packet_parse(odp_packet_t pkt, size_t len, size_t frame_offset)
 	size_t offset = 0;
 	uint8_t ip_proto = 0;
 
+	/* The frame_offset is not relevant for frames from DPDK */
 	pkt_hdr->input_flags.eth = 1;
-	pkt_hdr->frame_offset = frame_offset;
-	pkt_hdr->frame_len = len;
+	(void) frame_offset;
+	pkt_hdr->frame_offset = 0;
+	if (odp_packet_set_offset_len(pkt, 0, len)) {
+		return;
+	}
 
 	if (odp_unlikely(len < ODPH_ETH_LEN_MIN)) {
 		pkt_hdr->error_flags.frame_len = 1;
@@ -165,7 +199,7 @@  void odp_packet_parse(odp_packet_t pkt, size_t len, size_t frame_offset)
 
 	/* Assume valid L2 header, no CRC/FCS check in SW */
 	pkt_hdr->input_flags.l2 = 1;
-	pkt_hdr->l2_offset = frame_offset;
+	pkt_hdr->l2_offset = 0;
 
 	eth = (odph_ethhdr_t *)odp_packet_start(pkt);
 	ethtype = odp_be_to_cpu_16(eth->type);
@@ -189,7 +223,7 @@  void odp_packet_parse(odp_packet_t pkt, size_t len, size_t frame_offset)
 	case ODPH_ETHTYPE_IPV4:
 		pkt_hdr->input_flags.ipv4 = 1;
 		pkt_hdr->input_flags.l3 = 1;
-		pkt_hdr->l3_offset = frame_offset + ODPH_ETHHDR_LEN + offset;
+		pkt_hdr->l3_offset = ODPH_ETHHDR_LEN + offset;
 		ipv4 = (odph_ipv4hdr_t *)odp_packet_l3(pkt);
 		ip_proto = parse_ipv4(pkt_hdr, ipv4, &offset);
 		break;
@@ -304,6 +338,7 @@  void odp_packet_print(odp_packet_t pkt)
 {
 	int max_len = 512;
 	char str[max_len];
+	uint8_t *p;
 	int len = 0;
 	int n = max_len-1;
 	odp_packet_hdr_t *hdr = odp_packet_hdr(pkt);
@@ -325,50 +360,69 @@  void odp_packet_print(odp_packet_t pkt)
 	len += snprintf(&str[len], n-len,
 			"  l4_offset    %u\n", hdr->l4_offset);
 	len += snprintf(&str[len], n-len,
-			"  frame_len    %u\n", hdr->frame_len);
+			"  frame_len    %u\n", hdr->buf_hdr.mb.pkt.pkt_len);
 	len += snprintf(&str[len], n-len,
 			"  input        %u\n", hdr->input);
 	str[len] = '\0';
 
 	printf("\n%s\n", str);
+	rte_pktmbuf_dump(&hdr->buf_hdr.mb, 32);
+
+	p = odp_packet_start(pkt);
+	printf("00000000: %02X %02X %02X %02X %02X %02X %02X %02X\
+	       %02X %02X %02X %02X %02X %02X %02X %02X\n",
+	       p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7],
+	       p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]);
+
 }
 
+/* For now we can only copy between packets of the same segment size
+ * We should probably refine this API, maybe introduce a clone API */
 int odp_packet_copy(odp_packet_t pkt_dst, odp_packet_t pkt_src)
 {
-	odp_packet_hdr_t *const pkt_hdr_dst = odp_packet_hdr(pkt_dst);
-	odp_packet_hdr_t *const pkt_hdr_src = odp_packet_hdr(pkt_src);
-	const size_t start_offset = ODP_FIELD_SIZEOF(odp_packet_hdr_t, buf_hdr);
-	uint8_t *start_src;
-	uint8_t *start_dst;
-	size_t len;
+	struct rte_mbuf *mb_dst, *mb_src;
+	uint8_t nb_segs, i;
+
+	ODP_ASSERT(odp_buffer_type(pkt_dst) == ODP_BUFFER_TYPE_PACKET &&
+		   odp_buffer_type(pkt_src) == ODP_BUFFER_TYPE_PACKET,
+		   "dst_pkt or src_pkt not of type ODP_BUFFER_TYPE_PACKET");
 
 	if (pkt_dst == ODP_PACKET_INVALID || pkt_src == ODP_PACKET_INVALID)
 		return -1;
 
-	/* if (pkt_hdr_dst->buf_hdr.size < */
-	/*	pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset) */
-	if (pkt_hdr_dst->buf_hdr.buf_len <
-		pkt_hdr_src->frame_len + pkt_hdr_src->frame_offset)
+	mb_dst = &(odp_packet_hdr(pkt_dst)->buf_hdr.mb);
+	mb_src = &(odp_packet_hdr(pkt_src)->buf_hdr.mb);
+
+	if (mb_dst->pkt.nb_segs != mb_src->pkt.nb_segs) {
+		ODP_ERR("Different nb_segs in pkt_dst and pkt_src");
 		return -1;
+	}
 
-	/* Copy packet header */
-	start_dst = (uint8_t *)pkt_hdr_dst + start_offset;
-	start_src = (uint8_t *)pkt_hdr_src + start_offset;
-	len = ODP_OFFSETOF(odp_packet_hdr_t, payload) - start_offset;
-	memcpy(start_dst, start_src, len);
+	nb_segs = mb_src->pkt.nb_segs;
 
-	/* Copy frame payload */
-	start_dst = (uint8_t *)odp_packet_start(pkt_dst);
-	start_src = (uint8_t *)odp_packet_start(pkt_src);
-	len = pkt_hdr_src->frame_len;
-	memcpy(start_dst, start_src, len);
+	if (mb_dst->buf_len < mb_src->buf_len) {
+		ODP_ERR("dst_pkt smaller than src_pkt");
+		return -1;
+	}
 
-	/* Copy useful things from the buffer header */
-	/* pkt_hdr_dst->buf_hdr.cur_offset = pkt_hdr_src->buf_hdr.cur_offset; */
+	for (i = 0; i < nb_segs; i++) {
+		if (mb_src == NULL || mb_dst == NULL) {
+			ODP_ERR("Corrupted packets");
+			return -1;
+		}
+		memcpy(mb_dst->buf_addr, mb_src->buf_addr, mb_src->buf_len);
+		mb_dst = mb_dst->pkt.next;
+		mb_src = mb_src->pkt.next;
+	}
+	return 0;
+}
 
-	/* Create a copy of the scatter list */
-	/* odp_buffer_copy_scatter(odp_buffer_from_packet(pkt_dst), */
-	/*			odp_buffer_from_packet(pkt_src)); */
+void odp_packet_set_ctx(odp_packet_t pkt, const void *ctx)
+{
+	odp_packet_hdr(pkt)->user_ctx = (intptr_t)ctx;
+}
 
-	return 0;
+void *odp_packet_get_ctx(odp_packet_t pkt)
+{
+	return (void *)(intptr_t)odp_packet_hdr(pkt)->user_ctx;
 }
diff --git a/platform/linux-dpdk/odp_packet_dpdk.c b/platform/linux-dpdk/odp_packet_dpdk.c
index d5c8e80..ea83580 100644
--- a/platform/linux-dpdk/odp_packet_dpdk.c
+++ b/platform/linux-dpdk/odp_packet_dpdk.c
@@ -82,7 +82,7 @@  int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const char *netdev,
 	static struct ether_addr eth_addr[RTE_MAX_ETHPORTS];
 	static int portinit[RTE_MAX_ETHPORTS];
 	static int qid[RTE_MAX_ETHPORTS];
-	uint8_t portid = 0, num_intf = 2;
+	uint8_t portid = 0;
 	uint16_t nbrxq = 0, nbtxq = 0;
 	int ret, i;
 
@@ -93,7 +93,7 @@  int setup_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, const char *netdev,
 	pkt_dpdk->pool = pool;
 	printf("dpdk portid: %u\n", portid);
 
-	nbrxq = odp_sys_core_count() / num_intf;
+	nbrxq = odp_sys_core_count();
 	nbtxq = nbrxq;
 	if (portinit[portid] == 0) {
 		fflush(stdout);
@@ -157,17 +157,18 @@  int close_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk)
 }
 
 int recv_pkt_dpdk(pkt_dpdk_t * const pkt_dpdk, odp_packet_t pkt_table[],
-		unsigned len)
+		  unsigned len)
 {
-	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
 	uint16_t nb_rx, i = 0;
 
-	memset(pkts_burst, 0 , sizeof(pkts_burst));
 	nb_rx = rte_eth_rx_burst((uint8_t)pkt_dpdk->portid,
 				 (uint16_t)pkt_dpdk->queueid,
-				 (struct rte_mbuf **)pkts_burst, (uint16_t)len);
-	for (i = 0; i < nb_rx; i++)
-		pkt_table[i] = (odp_packet_t)pkts_burst[i];
+				 (struct rte_mbuf **)pkt_table, (uint16_t)len);
+	for (i = 0; i < nb_rx; i++) {
+		odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt_table[i]);
+		struct rte_mbuf *mb = &pkt_hdr->buf_hdr.mb;
+		odp_packet_parse(pkt_table[i], mb->pkt.pkt_len, 0);
+	}
 	return nb_rx;
 }
 
diff --git a/platform/linux-dpdk/odp_queue.c b/platform/linux-dpdk/odp_queue.c
index 554b8ea..29fae8f 100644
--- a/platform/linux-dpdk/odp_queue.c
+++ b/platform/linux-dpdk/odp_queue.c
@@ -239,11 +239,11 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		/* Empty queue */
 		queue->s.head = buf_hdr;
 		queue->s.tail = buf_hdr;
-		buf_hdr->pkt.next = NULL;
+		buf_hdr->next = NULL;
 	} else {
-		queue->s.tail->pkt.next = buf_hdr;
+		queue->s.tail->next = buf_hdr;
 		queue->s.tail = buf_hdr;
-		buf_hdr->pkt.next = NULL;
+		buf_hdr->next = NULL;
 	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
@@ -267,17 +267,17 @@  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 	odp_buffer_hdr_t *tail;
 
 	for (i = 0; i < num - 1; i++)
-		buf_hdr[i]->pkt.next = buf_hdr[i+1];
+		buf_hdr[i]->next = buf_hdr[i+1];
 
 	tail = buf_hdr[num-1];
-	buf_hdr[num-1]->pkt.next = NULL;
+	buf_hdr[num-1]->next = NULL;
 
 	LOCK(&queue->s.lock);
 	/* Empty queue */
 	if (queue->s.head == NULL)
 		queue->s.head = buf_hdr[0];
 	else
-		queue->s.tail->pkt.next = buf_hdr[0];
+		queue->s.tail->next = buf_hdr[0];
 
 	queue->s.tail = tail;
 
@@ -338,8 +338,8 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
 	} else {
 		buf_hdr       = queue->s.head;
-		queue->s.head = buf_hdr->pkt.next;
-		buf_hdr->pkt.next = NULL;
+		queue->s.head = buf_hdr->next;
+		buf_hdr->next = NULL;
 
 		if (queue->s.head == NULL) {
 			/* Queue is now empty */
@@ -370,8 +370,8 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		for (; i < num && hdr; i++) {
 			buf_hdr[i]       = hdr;
 			/* odp_prefetch(hdr->addr); */
-			hdr              = hdr->pkt.next;
-			buf_hdr[i]->pkt.next = NULL;
+			hdr              = hdr->next;
+			buf_hdr[i]->next = NULL;
 		}
 
 		queue->s.head = hdr;
diff --git a/platform/linux-dpdk/odp_schedule.c b/platform/linux-dpdk/odp_schedule.c
new file mode 100644
index 0000000..462b8eb
--- /dev/null
+++ b/platform/linux-dpdk/odp_schedule.c
@@ -0,0 +1,417 @@ 
+/* Copyright (c) 2013, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp_schedule.h>
+#include <odp_schedule_internal.h>
+#include <odp_align.h>
+#include <odp_queue.h>
+#include <odp_shared_memory.h>
+#include <odp_buffer.h>
+#include <odp_buffer_pool.h>
+#include <odp_internal.h>
+#include <odp_config.h>
+#include <odp_debug.h>
+#include <odp_thread.h>
+#include <odp_time.h>
+#include <odp_spinlock.h>
+#include <odp_hints.h>
+
+#include <odp_queue_internal.h>
+
+
+/* Limits to number of scheduled queues */
+#define SCHED_POOL_SIZE (256*1024)
+
+/* Scheduler sub queues */
+#define QUEUES_PER_PRIO  4
+
+/* TODO: random or queue based selection */
+#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
+
+/* Maximum number of dequeues */
+#define MAX_DEQ 4
+
+
+/* Mask of queues per priority */
+typedef uint8_t pri_mask_t;
+
+ODP_STATIC_ASSERT((8*sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, "pri_mask_t_is_too_small");
+
+
+typedef struct {
+	odp_queue_t       pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
+	pri_mask_t        pri_mask[ODP_CONFIG_SCHED_PRIOS];
+	odp_spinlock_t    mask_lock;
+	odp_buffer_pool_t pool;
+} sched_t;
+
+typedef struct {
+	odp_queue_t queue;
+
+} queue_desc_t;
+
+typedef struct {
+	odp_queue_t  pri_queue;
+	odp_buffer_t desc_buf;
+
+	odp_buffer_t buf[MAX_DEQ];
+	int num;
+	int index;
+	odp_queue_t queue;
+	int pause;
+
+} sched_local_t;
+
+/* Global scheduler context */
+static sched_t *sched;
+
+/* Thread local scheduler context */
+static __thread sched_local_t sched_local;
+
+
+static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
+{
+	int id = SEL_PRI_QUEUE(queue);
+	return sched->pri_queue[prio][id];
+}
+
+
+int odp_schedule_init_global(void)
+{
+	odp_buffer_pool_t pool;
+	int i, j;
+
+	ODP_DBG("Schedule init ... ");
+
+	sched = odp_shm_reserve("odp_scheduler",
+				sizeof(sched_t),
+				ODP_CACHE_LINE_SIZE);
+
+	if (sched == NULL) {
+		ODP_ERR("Schedule init: Shm reserve failed.\n");
+		return -1;
+	}
+
+	pool = odp_buffer_pool_create("odp_sched_pool", NULL,
+				      SCHED_POOL_SIZE, sizeof(queue_desc_t),
+				      ODP_CACHE_LINE_SIZE,
+				      ODP_BUFFER_TYPE_RAW);
+
+	if (pool == ODP_BUFFER_POOL_INVALID) {
+		ODP_ERR("Schedule init: Pool create failed.\n");
+		return -1;
+	}
+
+	sched->pool = pool;
+	odp_spinlock_init(&sched->mask_lock);
+
+	for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
+		odp_queue_t queue;
+		char name[] = "odp_priXX_YY";
+
+		name[7] = '0' + i / 10;
+		name[8] = '0' + i - 10*(i / 10);
+
+		for (j = 0; j < QUEUES_PER_PRIO; j++) {
+			name[10] = '0' + j / 10;
+			name[11] = '0' + j - 10*(j / 10);
+
+			queue = odp_queue_create(name,
+						 ODP_QUEUE_TYPE_POLL, NULL);
+
+			if (queue == ODP_QUEUE_INVALID) {
+				ODP_ERR("Sched init: Queue create failed.\n");
+				return -1;
+			}
+
+			sched->pri_queue[i][j] = queue;
+			sched->pri_mask[i]     = 0;
+		}
+	}
+
+	ODP_DBG("done\n");
+
+	return 0;
+}
+
+
+int odp_schedule_init_local(void)
+{
+	int i;
+
+	sched_local.pri_queue = ODP_QUEUE_INVALID;
+	sched_local.desc_buf  = ODP_BUFFER_INVALID;
+
+	for (i = 0; i < MAX_DEQ; i++)
+		sched_local.buf[i] = ODP_BUFFER_INVALID;
+
+	sched_local.num   = 0;
+	sched_local.index = 0;
+	sched_local.queue = ODP_QUEUE_INVALID;
+	sched_local.pause = 0;
+
+	return 0;
+}
+
+
+void odp_schedule_mask_set(odp_queue_t queue, int prio)
+{
+	int id = SEL_PRI_QUEUE(queue);
+
+	odp_spinlock_lock(&sched->mask_lock);
+	sched->pri_mask[prio] |= 1 << id;
+	odp_spinlock_unlock(&sched->mask_lock);
+}
+
+
+odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
+{
+	odp_buffer_t buf;
+
+	buf = odp_buffer_alloc(sched->pool);
+
+	if (buf != ODP_BUFFER_INVALID) {
+		queue_desc_t *desc;
+		desc        = odp_buffer_addr(buf);
+		desc->queue = queue;
+	}
+
+	return buf;
+}
+
+
+void odp_schedule_queue(odp_queue_t queue, int prio)
+{
+	odp_buffer_t desc_buf;
+	odp_queue_t  pri_queue;
+
+	pri_queue = select_pri_queue(queue, prio);
+	desc_buf  = queue_sched_buf(queue);
+
+	odp_queue_enq(pri_queue, desc_buf);
+}
+
+
+void odp_schedule_release_atomic(void)
+{
+	if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
+	    sched_local.num       == 0) {
+		/* Release current atomic queue */
+		odp_queue_enq(sched_local.pri_queue, sched_local.desc_buf);
+		sched_local.pri_queue = ODP_QUEUE_INVALID;
+	}
+}
+
+
+static inline int copy_bufs(odp_buffer_t out_buf[], unsigned int max)
+{
+	int i = 0;
+
+	while (sched_local.num && max) {
+		out_buf[i] = sched_local.buf[sched_local.index];
+		sched_local.index++;
+		sched_local.num--;
+		max--;
+		i++;
+	}
+
+	return i;
+}
+
+
+/*
+ * Schedule queues
+ *
+ * TODO: SYNC_ORDERED not implemented yet
+ */
+static int schedule(odp_queue_t *out_queue, odp_buffer_t out_buf[],
+		    unsigned int max_num, unsigned int max_deq)
+{
+	int i, j;
+	int thr;
+	int ret;
+
+	if (sched_local.num) {
+		ret = copy_bufs(out_buf, max_num);
+
+		if (out_queue)
+			*out_queue = sched_local.queue;
+
+		return ret;
+	}
+
+	odp_schedule_release_atomic();
+
+	if (odp_unlikely(sched_local.pause))
+		return 0;
+
+	thr = odp_thread_id();
+
+	for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
+		int id;
+
+		if (sched->pri_mask[i] == 0)
+			continue;
+
+		id = thr & (QUEUES_PER_PRIO-1);
+
+		for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
+			odp_queue_t  pri_q;
+			odp_buffer_t desc_buf;
+
+			if (id >= QUEUES_PER_PRIO)
+				id = 0;
+
+			if (odp_unlikely((sched->pri_mask[i] & (1 << id)) == 0))
+				continue;
+
+			pri_q    = sched->pri_queue[i][id];
+			desc_buf = odp_queue_deq(pri_q);
+
+			if (desc_buf != ODP_BUFFER_INVALID) {
+				queue_desc_t *desc;
+				odp_queue_t queue;
+				int num;
+
+				desc  = odp_buffer_addr(desc_buf);
+				queue = desc->queue;
+
+				num = odp_queue_deq_multi(queue,
+							  sched_local.buf,
+							  max_deq);
+
+				if (num == 0) {
+					/* Remove empty queue from scheduling,
+					 * except packet input queues
+					 */
+					if (odp_queue_type(queue) ==
+					    ODP_QUEUE_TYPE_PKTIN)
+						odp_queue_enq(pri_q, desc_buf);
+
+					continue;
+				}
+
+				sched_local.num   = num;
+				sched_local.index = 0;
+				ret = copy_bufs(out_buf, max_num);
+
+				sched_local.queue = queue;
+
+				if (queue_sched_atomic(queue)) {
+					/* Hold queue during atomic access */
+					sched_local.pri_queue = pri_q;
+					sched_local.desc_buf  = desc_buf;
+				} else {
+					/* Continue scheduling the queue */
+					odp_queue_enq(pri_q, desc_buf);
+				}
+
+				/* Output the source queue handle */
+				if (out_queue)
+					*out_queue = queue;
+
+				return ret;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
+			  odp_buffer_t out_buf[],
+			  unsigned int max_num, unsigned int max_deq)
+{
+	uint64_t start_cycle, cycle, diff;
+	int ret;
+
+	start_cycle = 0;
+
+	while (1) {
+		ret = schedule(out_queue, out_buf, max_num, max_deq);
+
+		if (ret)
+			break;
+
+		if (wait == ODP_SCHED_WAIT)
+			continue;
+
+		if (wait == ODP_SCHED_NO_WAIT)
+			break;
+
+		if (start_cycle == 0) {
+			start_cycle = odp_time_get_cycles();
+			continue;
+		}
+
+		cycle = odp_time_get_cycles();
+		diff  = odp_time_diff_cycles(start_cycle, cycle);
+
+		if (wait < diff)
+			break;
+	}
+
+	return ret;
+}
+
+
+odp_buffer_t odp_schedule(odp_queue_t *out_queue, uint64_t wait)
+{
+	odp_buffer_t buf;
+
+	buf = ODP_BUFFER_INVALID;
+
+	schedule_loop(out_queue, wait, &buf, 1, MAX_DEQ);
+
+	return buf;
+}
+
+
+odp_buffer_t odp_schedule_one(odp_queue_t *out_queue, uint64_t wait)
+{
+	odp_buffer_t buf;
+
+	buf = ODP_BUFFER_INVALID;
+
+	schedule_loop(out_queue, wait, &buf, 1, 1);
+
+	return buf;
+}
+
+
+int odp_schedule_multi(odp_queue_t *out_queue, uint64_t wait,
+		       odp_buffer_t out_buf[], unsigned int num)
+{
+	return schedule_loop(out_queue, wait, out_buf, num, MAX_DEQ);
+}
+
+
+void odp_schedule_pause(void)
+{
+	sched_local.pause = 1;
+}
+
+
+void odp_schedule_resume(void)
+{
+	sched_local.pause = 0;
+}
+
+
+uint64_t odp_schedule_wait_time(uint64_t ns)
+{
+	if (ns <= ODP_SCHED_NO_WAIT)
+		ns = ODP_SCHED_NO_WAIT + 1;
+
+	return odp_time_ns_to_cycles(ns);
+}
+
+
+int odp_schedule_num_prio(void)
+{
+	return ODP_CONFIG_SCHED_PRIOS;
+}