diff mbox

[4/4] linux-gen: pktio: don't allocate new packets in classifier

Message ID 1464784423-7713-4-git-send-email-matias.elo@nokia.com
State Superseded
Headers show

Commit Message

Elo, Matias (Nokia - FI/Espoo) June 1, 2016, 12:33 p.m. UTC
Don't allocate new packets inside of the internal
classifier helpers _odp_packet_cls_enq() and
_odp_packet_classifier(). Instead, save destination queue to
the parsed packet header and return correct packet pool.
This enables zero-copy packet classification.

Added new internal pktio helper pktin_recv_buf(), which
enqueues packets to classifier queues if necessary.

All pktio types use now a common cls_classify_packet()
helper function.

Signed-off-by: Matias Elo <matias.elo@nokia.com>
---
 .../include/odp_classification_internal.h          |  21 +---
 .../linux-generic/include/odp_packet_internal.h    |   5 +
 .../linux-generic/include/odp_packet_io_internal.h |   3 -
 platform/linux-generic/odp_classification.c        | 112 ++++++++++-----------
 platform/linux-generic/odp_packet.c                |   5 -
 platform/linux-generic/odp_packet_io.c             |  77 ++++++++------
 platform/linux-generic/pktio/dpdk.c                |  55 +++++-----
 platform/linux-generic/pktio/loop.c                |  86 ++++++++--------
 platform/linux-generic/pktio/netmap.c              |  45 ++++-----
 platform/linux-generic/pktio/pktio_common.c        |  67 ------------
 platform/linux-generic/pktio/socket.c              |  29 ++++--
 platform/linux-generic/pktio/socket_mmap.c         |  52 ++++++----
 12 files changed, 250 insertions(+), 307 deletions(-)

Comments

Maxim Uvarov June 1, 2016, 8:52 p.m. UTC | #1
Hello Matias,

thanks for fixing
https://bugs.linaro.org/show_bug.cgi?id=2297

I have one question bellow.

On 06/01/16 15:33, Matias Elo wrote:
> Don't allocate new packets inside of the internal
> classifier helpers _odp_packet_cls_enq() and
> _odp_packet_classifier(). Instead, save destination queue to
> the parsed packet header and return correct packet pool.
> This enables zero-copy packet classification.
>
> Added new internal pktio helper pktin_recv_buf(), which
> enqueues packets to classifier queues if necessary.
>
> All pktio types use now a common cls_classify_packet()
> helper function.
>
> Signed-off-by: Matias Elo <matias.elo@nokia.com>
> ---
>   .../include/odp_classification_internal.h          |  21 +---
>   .../linux-generic/include/odp_packet_internal.h    |   5 +
>   .../linux-generic/include/odp_packet_io_internal.h |   3 -
>   platform/linux-generic/odp_classification.c        | 112 ++++++++++-----------
>   platform/linux-generic/odp_packet.c                |   5 -
>   platform/linux-generic/odp_packet_io.c             |  77 ++++++++------
>   platform/linux-generic/pktio/dpdk.c                |  55 +++++-----
>   platform/linux-generic/pktio/loop.c                |  86 ++++++++--------
>   platform/linux-generic/pktio/netmap.c              |  45 ++++-----
>   platform/linux-generic/pktio/pktio_common.c        |  67 ------------
>   platform/linux-generic/pktio/socket.c              |  29 ++++--
>   platform/linux-generic/pktio/socket_mmap.c         |  52 ++++++----
>   12 files changed, 250 insertions(+), 307 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_classification_internal.h b/platform/linux-generic/include/odp_classification_internal.h
> index 3a88462..d6d6904 100644
> --- a/platform/linux-generic/include/odp_classification_internal.h
> +++ b/platform/linux-generic/include/odp_classification_internal.h
> @@ -30,21 +30,6 @@ extern "C" {
>   
>   /**
>   @internal
> -Select a CoS for the given Packet based on pktio
> -
> -This function will call all the PMRs associated with a pktio for
> -a given packet and will return the matched COS object.
> -This function will check PMR, L2 and L3 QoS COS object associated
> -with the PKTIO interface.
> -
> -Returns the default cos if the packet does not match any PMR
> -Returns the error_cos if the packet has an error
> -**/
> -cos_t *pktio_select_cos(pktio_entry_t *pktio, const uint8_t *pkt_addr,
> -			odp_packet_hdr_t *pkt_hdr);
> -
> -/**
> -@internal
>   match_qos_cos
>   
>   Select a CoS for the given Packet based on QoS values
> @@ -60,10 +45,10 @@ Packet Classifier
>   
>   Start function for Packet Classifier
>   This function calls Classifier module internal functions for a given packet and
> -enqueues the packet to specific Queue based on PMR and CoS selected.
> -The packet is allocated from the pool associated with the CoS
> +selects destination queue and packet pool based on selected PMR and CoS.
>   **/
> -int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt);
> +int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base, uint16_t len,
> +			odp_pool_t *pool, odp_packet_hdr_t *pkt_hdr);
>   
>   /**
>   Packet IO classifier init
> diff --git a/platform/linux-generic/include/odp_packet_internal.h b/platform/linux-generic/include/odp_packet_internal.h
> index 67ee34d..d5ace12 100644
> --- a/platform/linux-generic/include/odp_packet_internal.h
> +++ b/platform/linux-generic/include/odp_packet_internal.h
> @@ -39,6 +39,7 @@ typedef union {
>   	struct {
>   		uint64_t parsed_l2:1; /**< L2 parsed */
>   		uint64_t parsed_all:1;/**< Parsing complete */
> +		uint64_t dst_queue:1; /**< Dst queue present */
>   
>   		uint64_t flow_hash:1; /**< Flow hash present */
>   		uint64_t timestamp:1; /**< Timestamp present */
> @@ -156,6 +157,8 @@ typedef struct {
>   	uint32_t l3_len;         /**< Layer 3 length */
>   	uint32_t l4_len;         /**< Layer 4 length */
>   
> +	odp_queue_t dst_queue;   /**< Classifier destination queue */
> +
>   	uint32_t flow_hash;      /**< Flow hash value */
>   	odp_time_t timestamp;    /**< Timestamp value */
>   
> @@ -187,6 +190,8 @@ static inline void copy_packet_parser_metadata(odp_packet_hdr_t *src_hdr,
>   
>   	dst_hdr->l3_len         = src_hdr->l3_len;
>   	dst_hdr->l4_len         = src_hdr->l4_len;
> +
> +	dst_hdr->dst_queue      = src_hdr->dst_queue;
>   }
>   
>   static inline void *packet_map(odp_packet_hdr_t *pkt_hdr,
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
> index 139b1bd..53683b3 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -207,9 +207,6 @@ typedef struct pktio_if_ops {
>   				    const odp_pktout_queue_param_t *p);
>   } pktio_if_ops_t;
>   
> -int _odp_packet_cls_enq(pktio_entry_t *pktio_entry, const uint8_t *base,
> -			uint16_t buf_len, odp_time_t *ts);
> -
>   extern void *pktio_entry_ptr[];
>   
>   static inline int pktio_to_id(odp_pktio_t pktio)
> diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
> index a3fe545..7520bdc 100644
> --- a/platform/linux-generic/odp_classification.c
> +++ b/platform/linux-generic/odp_classification.c
> @@ -748,66 +748,19 @@ int pktio_classifier_init(pktio_entry_t *entry)
>   }
>   
>   /**
> - * Classify packet and enqueue to the right queue
> - *
> - * entry		pktio where it arrived
> - * pkt		packet handle
> - *
> - * Return values:
> - * 0 on success, packet is consumed
> - * -ENOENT CoS dropped the packet
> - * -EFAULT Bug, packet is released
> - * -EINVAL Config error, packet is NOT released
> - * -ENOMEM Target CoS pool exhausted, packet is NOT released
> - */
> -
> -int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt)
> -{
> -	queue_entry_t *queue;
> -	cos_t *cos;
> -	odp_packet_hdr_t *pkt_hdr;
> -	odp_packet_t new_pkt;
> -	uint8_t *pkt_addr;
> -
> -	if (entry == NULL) {
> -		odp_packet_free(pkt);
> -		return -EFAULT;
> -	}
> -
> -	pkt_hdr = odp_packet_hdr(pkt);
> -	pkt_addr = odp_packet_data(pkt);
> -
> -	/* Matching PMR and selecting the CoS for the packet*/
> -	cos = pktio_select_cos(entry, pkt_addr, pkt_hdr);
> -	if (cos == NULL)
> -		return -EINVAL;
> -
> -	if (cos->s.queue == NULL || cos->s.pool == NULL) {
> -		odp_packet_free(pkt);
> -		return -ENOENT;
> -	}
> -
> -	if (odp_packet_pool(pkt) != cos->s.pool->s.pool_hdl) {
> -		new_pkt = odp_packet_copy(pkt, cos->s.pool->s.pool_hdl);
> -		if (new_pkt == ODP_PACKET_INVALID)
> -			return -ENOMEM;
> -		odp_packet_free(pkt);
> -	} else {
> -		new_pkt = pkt;
> -	}
> -
> -	/* Enqueuing the Packet based on the CoS */
> -	queue = cos->s.queue;
> -	if (queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)new_pkt), 0)) {
> -		odp_packet_free(new_pkt);
> -		return -EFAULT;
> -	} else {
> -		return 0;
> -	}
> -}
> -
> -cos_t *pktio_select_cos(pktio_entry_t *entry, const uint8_t *pkt_addr,
> -			odp_packet_hdr_t *pkt_hdr)
> +Select a CoS for the given Packet based on pktio
> +
> +This function will call all the PMRs associated with a pktio for
> +a given packet and will return the matched COS object.
> +This function will check PMR, L2 and L3 QoS COS object associated
> +with the PKTIO interface.
> +
> +Returns the default cos if the packet does not match any PMR
> +Returns the error_cos if the packet has an error
> +**/
> +static inline cos_t *cls_select_cos(pktio_entry_t *entry,
> +				    const uint8_t *pkt_addr,
> +				    odp_packet_hdr_t *pkt_hdr)
>   {
>   	pmr_t *pmr;
>   	cos_t *cos;
> @@ -841,6 +794,45 @@ cos_t *pktio_select_cos(pktio_entry_t *entry, const uint8_t *pkt_addr,
>   	return cls->default_cos;
>   }
>   
> +/**
> + * Classify packet
> + *
> + * @param pktio_entry	Ingress pktio
> + * @param base		Packet data
> + * @param len		Packet length
> + * @param pool[out]	Packet pool
> + * @param pkt_hdr[out]	Packet header
> + *
> + * @retval 0 on success
> + * @retval -EFAULT Bug
> + * @retval -EINVAL Config error
> + *
> + * @note *base is not released
> + */
> +int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base, uint16_t len,
> +			odp_pool_t *pool, odp_packet_hdr_t *pkt_hdr)
> +{
> +	cos_t *cos;
> +
> +	packet_parse_reset(pkt_hdr);
> +	pkt_hdr->frame_len = len;
> +
> +	_odp_parse_common(pkt_hdr, base);
> +	cos = cls_select_cos(entry, base, pkt_hdr);
> +
> +	if (cos == NULL)
> +		return -EINVAL;
> +
> +	if (cos->s.queue == NULL || cos->s.pool == NULL)
> +		return -EFAULT;
> +
> +	*pool = cos->s.pool->s.pool_hdl;
> +	pkt_hdr->input_flags.dst_queue = 1;
> +	pkt_hdr->dst_queue = cos->s.queue->s.handle;
> +
> +	return 0;
> +}
> +
>   cos_t *match_qos_l3_cos(pmr_l3_cos_t *l3_cos, const uint8_t *pkt_addr,
>   			odp_packet_hdr_t *hdr)
>   {
> diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
> index 5b11af6..2868736 100644
> --- a/platform/linux-generic/odp_packet.c
> +++ b/platform/linux-generic/odp_packet.c
> @@ -1295,11 +1295,6 @@ parse_exit:
>   	return pkt_hdr->error_flags.all != 0;
>   }
>   
> -int _odp_cls_parse(odp_packet_hdr_t *pkt_hdr, const uint8_t *parseptr)
> -{
> -	return _odp_parse_common(pkt_hdr, parseptr);
> -}
> -
>   /**
>    * Simple packet parser
>    */
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index 652709e..6de39b6 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -531,6 +531,40 @@ odp_pktio_t odp_pktio_lookup(const char *name)
>   	return hdl;
>   }
>   
> +static inline int pktin_recv_buf(odp_pktin_queue_t queue,
> +				 odp_buffer_hdr_t *buffer_hdrs[], int num)
> +{
> +	odp_packet_t pkt;
> +	odp_packet_t packets[num];
> +	odp_packet_hdr_t *pkt_hdr;
> +	odp_buffer_hdr_t *buf_hdr;
> +	odp_buffer_t buf;
> +	int i;
> +	int ret;
> +	int num_rx = 0;
> +
> +	ret = odp_pktin_recv(queue, packets, num);
> +
> +	for (i = 0; i < ret; i++) {
> +		pkt = packets[i];
> +		pkt_hdr = odp_packet_hdr(pkt);
> +		buf = _odp_packet_to_buffer(pkt);
> +		buf_hdr = odp_buf_to_hdr(buf);
> +
> +		if (pkt_hdr->input_flags.dst_queue) {
> +			queue_entry_t *dst_queue;
> +
> +			dst_queue = queue_to_qentry(pkt_hdr->dst_queue);
> +			ret = queue_enq(dst_queue, buf_hdr, 0);
> +			if (ret < 0)
> +				odp_packet_free(pkt);
> +			continue;
> +		}
> +		buffer_hdrs[num_rx++] = buf_hdr;
> +	}
> +	return num_rx;
> +}
> +
>   int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
>   {
>   	odp_packet_t pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
> @@ -579,25 +613,18 @@ int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
>   odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>   {
>   	odp_buffer_hdr_t *buf_hdr;
> -	odp_buffer_t buf;
> -	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>   	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> -	int pkts, i;
> +	int pkts;
>   
>   	buf_hdr = queue_deq(qentry);
>   	if (buf_hdr != NULL)
>   		return buf_hdr;
>   
> -	pkts = odp_pktin_recv(qentry->s.pktin, pkt_tbl, QUEUE_MULTI_MAX);
> +	pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
>   
>   	if (pkts <= 0)
>   		return NULL;
>   
> -	for (i = 0; i < pkts; i++) {
> -		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -		hdr_tbl[i] = odp_buf_to_hdr(buf);
> -	}
> -
>   	if (pkts > 1)
>   		queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0);
>   	buf_hdr = hdr_tbl[0];
> @@ -615,15 +642,12 @@ int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED,
>   int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
>   {
>   	int nbr;
> -	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>   	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> -	odp_buffer_t buf;
>   	int pkts, i, j;
>   
>   	nbr = queue_deq_multi(qentry, buf_hdr, num);
>   	if (odp_unlikely(nbr > num))
> -		ODP_ABORT("queue_deq_multi req: %d, returned %d\n",
> -			num, nbr);
> +		ODP_ABORT("queue_deq_multi req: %d, returned %d\n", num, nbr);
>   
>   	/** queue already has number of requsted buffers,
>   	 *  do not do receive in that case.
> @@ -631,20 +655,16 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
>   	if (nbr == num)
>   		return nbr;
>   
> -	pkts = odp_pktin_recv(qentry->s.pktin, pkt_tbl, QUEUE_MULTI_MAX);
> +	pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
>   	if (pkts <= 0)
>   		return nbr;
>   
> -	/* Fill in buf_hdr first */
> -	for (i = 0; i < pkts && nbr < num; i++, nbr++) {
> -		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -		buf_hdr[nbr] = odp_buf_to_hdr(buf);
> -	}
> +	for (i = 0; i < pkts && nbr < num; i++, nbr++)
> +		buf_hdr[nbr] = hdr_tbl[i];
> +
>   	/* Queue the rest for later */
> -	for (j = 0; i < pkts; i++, j++) {
> -		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -		hdr_tbl[j] = odp_buf_to_hdr(buf);
> -	}
> +	for (j = 0; i < pkts; i++, j++)
> +		hdr_tbl[j] = hdr_tbl[i];
>   
>   	if (j)
>   		queue_enq_multi(qentry, hdr_tbl, j, 0);
> @@ -653,10 +673,8 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
>   
>   int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
>   {
> -	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>   	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> -	int num, i, idx;
> -	odp_buffer_t buf;
> +	int num, idx;
>   	pktio_entry_t *entry;
>   
>   	entry = pktio_entry_by_index(pktio_index);
> @@ -678,7 +696,7 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
>   		odp_queue_t queue;
>   		odp_pktin_queue_t pktin = entry->s.in_queue[index[idx]].pktin;
>   
> -		num = odp_pktin_recv(pktin, pkt_tbl, QUEUE_MULTI_MAX);
> +		num = pktin_recv_buf(pktin, hdr_tbl, QUEUE_MULTI_MAX);
>   
>   		if (num == 0)
>   			continue;
> @@ -688,11 +706,6 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
>   			return -1;
>   		}
>   
> -		for (i = 0; i < num; i++) {
> -			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -			hdr_tbl[i] = odp_buf_to_hdr(buf);
> -		}
> -
>   		queue = entry->s.in_queue[index[idx]].queue;
>   		qentry = queue_to_qentry(queue);
>   		queue_enq_multi(qentry, hdr_tbl, num, 0);
> diff --git a/platform/linux-generic/pktio/dpdk.c b/platform/linux-generic/pktio/dpdk.c
> index 25dde7b..6d4e287 100644
> --- a/platform/linux-generic/pktio/dpdk.c
> +++ b/platform/linux-generic/pktio/dpdk.c
> @@ -15,6 +15,7 @@
>   #include <odp/api/cpumask.h>
>   
>   #include <odp_packet_io_internal.h>
> +#include <odp_classification_internal.h>
>   #include <odp_packet_dpdk.h>
>   #include <odp_debug_internal.h>
>   
> @@ -706,6 +707,9 @@ static inline int mbuf_to_pkt(pktio_entry_t *pktio_entry,
>   	int nb_pkts = 0;
>   
>   	for (i = 0; i < num; i++) {
> +		odp_pool_t pool = pktio_entry->s.pkt_dpdk.pool;
> +		odp_packet_hdr_t parsed_hdr;
> +
>   		mbuf = mbuf_table[i];
>   		if (odp_unlikely(mbuf->nb_segs != 1)) {
>   			ODP_ERR("Segmented buffers not supported\n");
> @@ -718,50 +722,43 @@ static inline int mbuf_to_pkt(pktio_entry_t *pktio_entry,
>   		pkt_len = rte_pktmbuf_pkt_len(mbuf);
>   
>   		if (pktio_cls_enabled(pktio_entry)) {
> -			int ret;
> -
> -			ret = _odp_packet_cls_enq(pktio_entry,
> -						  (const uint8_t *)buf,
> -						  pkt_len, ts);
> -			if (ret && ret != -ENOENT)
> -				nb_pkts = ret;
> -		} else {
> -			pkt = packet_alloc(pktio_entry->s.pkt_dpdk.pool,
> -					   pkt_len, 1);
> -			if (pkt == ODP_PACKET_INVALID) {
> -				ODP_ERR("packet_alloc failed\n");
> +			if (cls_classify_packet(pktio_entry,
> +						(const uint8_t *)buf,
> +						pkt_len, &pool, &parsed_hdr))
>   				goto fail;
> -			}
> +		}
> +		pkt = packet_alloc(pool, pkt_len, 1);
> +		if (pkt == ODP_PACKET_INVALID)
> +			goto fail;
>   
> -			pkt_hdr = odp_packet_hdr(pkt);
> +		pkt_hdr = odp_packet_hdr(pkt);
>   
> -			/* For now copy the data in the mbuf,
> -			   worry about zero-copy later */
> -			if (odp_packet_copy_from_mem(pkt, 0, pkt_len,
> -						     buf) != 0) {
> -				ODP_ERR("odp_packet_copy_from_mem failed\n");
> -				odp_packet_free(pkt);
> -				goto fail;
> -			}
> +		/* For now copy the data in the mbuf,
> +		   worry about zero-copy later */
> +		if (odp_packet_copy_from_mem(pkt, 0, pkt_len, buf) != 0) {
> +			odp_packet_free(pkt);
> +			goto fail;
> +		}
> +		pkt_hdr->input = pktio_entry->s.handle;
>   
> +		if (pktio_cls_enabled(pktio_entry))
> +			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
> +		else
>   			packet_parse_l2(pkt_hdr);
>   
> -			pkt_hdr->input = pktio_entry->s.handle;
> +		if (mbuf->ol_flags & PKT_RX_RSS_HASH)
> +			odp_packet_flow_hash_set(pkt, mbuf->hash.rss);
>   
> -			if (mbuf->ol_flags & PKT_RX_RSS_HASH)
> -				odp_packet_flow_hash_set(pkt, mbuf->hash.rss);
> +		packet_set_ts(pkt_hdr, ts);
>   
> -			packet_set_ts(pkt_hdr, ts);
> +		pkt_table[nb_pkts++] = pkt;
>   
> -			pkt_table[nb_pkts++] = pkt;
> -		}
>   		rte_pktmbuf_free(mbuf);
>   	}
>   
>   	return nb_pkts;
>   
>   fail:
> -	ODP_ERR("Creating ODP packet failed\n");
>   	for (j = i; j < num; j++)
>   		rte_pktmbuf_free(mbuf_table[j]);
>   
> diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
> index cdf5326..75f6a0a 100644
> --- a/platform/linux-generic/pktio/loop.c
> +++ b/platform/linux-generic/pktio/loop.c
> @@ -55,9 +55,12 @@ static int loopback_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
>   	queue_entry_t *qentry;
>   	odp_packet_hdr_t *pkt_hdr;
> +	odp_packet_hdr_t parsed_hdr;
>   	odp_packet_t pkt;
>   	odp_time_t ts_val;
>   	odp_time_t *ts = NULL;
> +	int num_rx = 0;
> +	int failed = 0;
>   
>   	if (odp_unlikely(len > QUEUE_MULTI_MAX))
>   		len = QUEUE_MULTI_MAX;
> @@ -73,59 +76,58 @@ static int loopback_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   		ts = &ts_val;
>   	}
>   
> -	if (pktio_cls_enabled(pktio_entry)) {
> -		int failed = 0, discarded = 0;
> +	for (i = 0; i < nbr; i++) {
> +		pkt = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
>   
> -		for (i = 0; i < nbr; i++) {
> +		if (pktio_cls_enabled(pktio_entry)) {
> +			odp_packet_t new_pkt;
> +			odp_pool_t new_pool;
> +			uint8_t *pkt_addr;
>   			int ret;
> -			pkt = _odp_packet_from_buffer(odp_hdr_to_buf
> -						      (hdr_tbl[i]));
> -			pkt_hdr = odp_packet_hdr(pkt);
> -			packet_parse_reset(pkt_hdr);
> -			packet_parse_l2(pkt_hdr);
> -			ret = _odp_packet_classifier(pktio_entry, pkt);
> -			switch (ret) {
> -			case 0:
> -				packet_set_ts(pkt_hdr, ts);
> -				pkt_hdr->input = pktio_entry->s.handle;
> -				pktio_entry->s.stats.in_octets +=
> -					odp_packet_len(pkt);
> -				break;
> -			case -ENOENT:
> -				discarded++;
> -				break;
> -			case -EFAULT:
> +
> +			pkt_addr = odp_packet_data(pkt);
> +			ret = cls_classify_packet(pktio_entry, pkt_addr,
> +						  odp_packet_len(pkt),
> +						  &new_pool, &parsed_hdr);
> +			if (ret) {
>   				failed++;
> -				break;
> -			default:
> -				ret = queue_enq(qentry, hdr_tbl[i], 0);
> +				odp_packet_free(pkt);
> +				continue;
> +			}
> +			if (new_pool != odp_packet_pool(pkt)) {
> +				new_pkt = odp_packet_copy(pkt, new_pool);
> +
> +				odp_packet_free(pkt);
> +
> +				if (new_pkt == ODP_PACKET_INVALID) {
> +					failed++;
> +					continue;
> +				}
> +				pkt = new_pkt;
>   			}
>   		}
> -		pktio_entry->s.stats.in_errors += failed;
> -		pktio_entry->s.stats.in_discards += discarded;
> -		pktio_entry->s.stats.in_ucast_pkts += nbr - failed - discarded;
> +		pkt_hdr = odp_packet_hdr(pkt);
>   
> -		odp_ticketlock_unlock(&pktio_entry->s.rxl);
> +		pkt_hdr->input = pktio_entry->s.handle;
>   
> -		return -failed;
> -	} else {
> -		for (i = 0; i < nbr; ++i) {
> -			pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf
> -							  (hdr_tbl[i]));
> -			pkt_hdr = odp_packet_hdr(pkts[i]);
> -			packet_parse_reset(pkt_hdr);
> +		if (pktio_cls_enabled(pktio_entry))
> +			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
> +		else
>   			packet_parse_l2(pkt_hdr);
> -			packet_set_ts(pkt_hdr, ts);
> -			pkt_hdr->input = pktio_entry->s.handle;
> -			pktio_entry->s.stats.in_octets +=
> -				odp_packet_len(pkts[i]);
> -		}
> -		pktio_entry->s.stats.in_ucast_pkts += nbr;
>   
> -		odp_ticketlock_unlock(&pktio_entry->s.rxl);
> +		packet_set_ts(pkt_hdr, ts);
>   
> -		return nbr;
> +		pktio_entry->s.stats.in_octets += odp_packet_len(pkt);
> +
> +		pkts[num_rx++] = pkt;
>   	}
> +
> +	pktio_entry->s.stats.in_errors += failed;
> +	pktio_entry->s.stats.in_ucast_pkts += num_rx - failed;
> +
> +	odp_ticketlock_unlock(&pktio_entry->s.rxl);
> +
> +	return num_rx;
>   }
>   
>   static int loopback_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
> diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c
> index 59cb276..4ea0d4a 100644
> --- a/platform/linux-generic/pktio/netmap.c
> +++ b/platform/linux-generic/pktio/netmap.c
> @@ -593,7 +593,9 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>   				    uint16_t len, odp_time_t *ts)
>   {
>   	odp_packet_t pkt;
> -	int ret;
> +	odp_pool_t pool = pktio_entry->s.pkt_nm.pool;
> +	odp_packet_hdr_t *pkt_hdr;
> +	odp_packet_hdr_t parsed_hdr;
>   
>   	if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
>   		ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
> @@ -607,35 +609,32 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>   	}
>   
>   	if (pktio_cls_enabled(pktio_entry)) {
> -		ret = _odp_packet_cls_enq(pktio_entry, (const uint8_t *)buf,
> -					  len, ts);
> -		if (ret && ret != -ENOENT)
> -			return ret;
> -		return 0;
> -	} else {
> -		odp_packet_hdr_t *pkt_hdr;
> -
> -		pkt = packet_alloc(pktio_entry->s.pkt_nm.pool, len, 1);
> -		if (pkt == ODP_PACKET_INVALID)
> +		if (cls_classify_packet(pktio_entry, (const uint8_t *)buf, len,
> +					&pool, &parsed_hdr))
>   			return -1;
> +	}
> +	pkt = packet_alloc(pool, len, 1);
> +	if (pkt == ODP_PACKET_INVALID)
> +		return -1;
>   
> -		pkt_hdr = odp_packet_hdr(pkt);
> +	pkt_hdr = odp_packet_hdr(pkt);
>   
> -		/* For now copy the data in the mbuf,
> -		   worry about zero-copy later */
> -		if (odp_packet_copy_from_mem(pkt, 0, len, buf) != 0) {
> -			odp_packet_free(pkt);
> -			return -1;
> -		}
> +	/* For now copy the data in the mbuf,
> +	   worry about zero-copy later */
> +	if (odp_packet_copy_from_mem(pkt, 0, len, buf) != 0) {
> +		odp_packet_free(pkt);
> +		return -1;
> +	}
> +	pkt_hdr->input = pktio_entry->s.handle;
>   
> +	if (pktio_cls_enabled(pktio_entry))
> +		copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
> +	else
>   		packet_parse_l2(pkt_hdr);
>   
> -		pkt_hdr->input = pktio_entry->s.handle;
> +	packet_set_ts(pkt_hdr, ts);
>   
> -		packet_set_ts(pkt_hdr, ts);
> -
> -		*pkt_out = pkt;
> -	}
> +	*pkt_out = pkt;
>   
>   	return 1;
>   }
> diff --git a/platform/linux-generic/pktio/pktio_common.c b/platform/linux-generic/pktio/pktio_common.c
> index 8a01477..611bb45 100644
> --- a/platform/linux-generic/pktio/pktio_common.c
> +++ b/platform/linux-generic/pktio/pktio_common.c
> @@ -9,73 +9,6 @@
>   #include <odp_classification_internal.h>
>   #include <errno.h>
>   
> -/**
> - * Classify packet, copy it in a odp_packet_t and enqueue to the right queue
> - *
> - * pktio_entry	pktio where it arrived
> - * base		packet data
> - * buf_len	packet length
> - *
> - * Return values:
> - * 0 on success, packet is consumed
> - * -ENOENT CoS dropped the packet
> - * -EFAULT Bug
> - * -EINVAL Config error
> - * -ENOMEM Target CoS pool exhausted
> - *
> - * Note: *base is not released, only pkt if there is an error
> - *
> - */
> -int _odp_packet_cls_enq(pktio_entry_t *pktio_entry,
> -			const uint8_t *base, uint16_t buf_len, odp_time_t *ts)
> -{
> -	cos_t *cos;
> -	odp_packet_t pkt;
> -	odp_packet_hdr_t *pkt_hdr;
> -	odp_packet_hdr_t src_pkt_hdr;
> -	int ret;
> -	odp_pool_t pool;
> -
> -	packet_parse_reset(&src_pkt_hdr);
> -
> -	_odp_cls_parse(&src_pkt_hdr, base);
> -	cos = pktio_select_cos(pktio_entry, base, &src_pkt_hdr);
> -
> -	/* if No CoS found then drop the packet */
> -	if (cos == NULL)
> -		return -EINVAL;
> -
> -	if (cos->s.queue == NULL || cos->s.pool == NULL)
> -		return -EFAULT;
> -
> -	pool = cos->s.pool->s.pool_hdl;
> -
> -	pkt = odp_packet_alloc(pool, buf_len);
> -	if (odp_unlikely(pkt == ODP_PACKET_INVALID))
> -		return -ENOMEM;
> -	pkt_hdr = odp_packet_hdr(pkt);
> -
> -	copy_packet_parser_metadata(&src_pkt_hdr, pkt_hdr);
> -	pkt_hdr->input = pktio_entry->s.handle;
> -
> -	if (odp_packet_copy_from_mem(pkt, 0, buf_len, base) != 0) {
> -		odp_packet_free(pkt);
> -		return -EFAULT;
> -	}
> -
> -	packet_set_ts(pkt_hdr, ts);
> -
> -	/* Parse and set packet header data */
> -	odp_packet_pull_tail(pkt, odp_packet_len(pkt) - buf_len);
> -	ret = queue_enq(cos->s.queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
> -	if (ret < 0) {
> -		odp_packet_free(pkt);
> -		return -EFAULT;
> -	}
> -
> -	return 0;
> -}
> -
>   int sock_stats_reset_fd(pktio_entry_t *pktio_entry, int fd)
>   {
>   	int err = 0;
> diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c
> index c7df7c7..e07b6ad 100644
> --- a/platform/linux-generic/pktio/socket.c
> +++ b/platform/linux-generic/pktio/socket.c
> @@ -617,7 +617,6 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   	struct mmsghdr msgvec[ODP_PACKET_SOCKET_MAX_BURST_RX];
>   	int nb_rx = 0;
>   	int recv_msgs;
> -	int ret;
>   	uint8_t **recv_cache;
>   	int i;
>   
> @@ -642,7 +641,6 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   			iovecs[i].iov_len = PACKET_JUMBO_LEN;
>   			msgvec[i].msg_hdr.msg_iov = &iovecs[i];
>   		}
> -		/* number of successfully allocated pkt buffers */
>   		msgvec_len = i;
>   
>   		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
> @@ -652,6 +650,10 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   			ts_val = odp_time_global();
>   
>   		for (i = 0; i < recv_msgs; i++) {
> +			odp_packet_hdr_t *pkt_hdr;
> +			odp_packet_t pkt;
> +			odp_pool_t pool = pkt_sock->pool;
> +			odp_packet_hdr_t parsed_hdr;
>   			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
>   			struct ethhdr *eth_hdr = base;
>   			uint16_t pkt_len = msgvec[i].msg_len;
> @@ -661,10 +663,25 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
>   							eth_hdr->h_source)))
>   				continue;
>   
> -			ret = _odp_packet_cls_enq(pktio_entry, base, pkt_len,
> -						  ts);
> -			if (ret && ret != -ENOENT)
> -				nb_rx = ret;
> +			if (cls_classify_packet(pktio_entry, base, pkt_len,
> +						&pool, &parsed_hdr))
> +				continue;
> +			pkt = packet_alloc(pool, pkt_len, 1);
> +			if (pkt == ODP_PACKET_INVALID)
> +				continue;
> +
> +			pkt_hdr = odp_packet_hdr(pkt);
> +
> +			if (odp_packet_copy_from_mem(pkt, 0, pkt_len,
> +						     base) != 0) {
> +				odp_packet_free(pkt);
> +				continue;
> +			}
> +			pkt_hdr->input = pktio_entry->s.handle;
> +			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
> +			packet_set_ts(pkt_hdr, ts);
> +
> +			pkt_table[nb_rx++] = pkt;
>   		}
>   	} else {
>   		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX]
> diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c
> index ce05775..6d7ec59 100644
> --- a/platform/linux-generic/pktio/socket_mmap.c
> +++ b/platform/linux-generic/pktio/socket_mmap.c
> @@ -166,6 +166,10 @@ static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
>   	frame_num = ring->frame_num;
>   
>   	while (i < len) {

In current function we have several 'continue' without incrementing i. 
I.e. i incremented in the end of cycle as well as nb_rx.
Can this function hang on waiting requested number on packets (len)? It 
has return len or less accepted packets. And I think
that increment of i has to be at the beginning, something like:
for (i =0, nb_rx =0; i < len; i++) {

pkt_table[nb_rx]); <-- index from nb_rx, not from i.

  if (ok)
     nb_rx++;

}



> +		odp_packet_hdr_t *hdr;
> +		odp_packet_hdr_t parsed_hdr;
> +		odp_pool_t pool = pkt_sock->pool;
> +
>   		if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
>   			break;
>   
> @@ -194,35 +198,39 @@ static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
>   						       &pkt_len);
>   
>   		if (pktio_cls_enabled(pktio_entry)) {
> -			ret = _odp_packet_cls_enq(pktio_entry, pkt_buf,
> -						  pkt_len, ts);
> -			if (ret && ret != -ENOENT)
> -				nb_rx = ret;
> -		} else {
> -			odp_packet_hdr_t *hdr;
> -
> -			pkt_table[i] = packet_alloc(pkt_sock->pool, pkt_len, 1);
> -			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID)) {
> -				mmap_rx_user_ready(ppd.raw); /* drop */
> -				frame_num = next_frame_num;
> -				continue;
> -			}
> -			hdr = odp_packet_hdr(pkt_table[i]);
> -			ret = odp_packet_copy_from_mem(pkt_table[i], 0,
> -						       pkt_len, pkt_buf);
> -			if (ret != 0) {
> -				odp_packet_free(pkt_table[i]);
> +			if (cls_classify_packet(pktio_entry, pkt_buf, pkt_len,
> +						&pool, &parsed_hdr)) {
>   				mmap_rx_user_ready(ppd.raw); /* drop */
>   				frame_num = next_frame_num;
>   				continue;
>   			}
> +		}
> +
> +		pkt_table[i] = packet_alloc(pool, pkt_len, 1);
> +		if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID)) {
> +			mmap_rx_user_ready(ppd.raw); /* drop */
> +			frame_num = next_frame_num;
> +			continue;
> +		}
> +		hdr = odp_packet_hdr(pkt_table[i]);
> +		ret = odp_packet_copy_from_mem(pkt_table[i], 0,
> +					       pkt_len, pkt_buf);
> +		if (ret != 0) {
> +			odp_packet_free(pkt_table[i]);
> +			mmap_rx_user_ready(ppd.raw); /* drop */
> +			frame_num = next_frame_num;
> +			continue;
> +		}
> +		hdr->input = pktio_entry->s.handle;
>   
> +		if (pktio_cls_enabled(pktio_entry))
> +			copy_packet_parser_metadata(&parsed_hdr, hdr);
> +		else
>   			packet_parse_l2(hdr);
> -			packet_set_ts(hdr, ts);
> -			hdr->input = pktio_entry->s.handle;
>   
> -			nb_rx++;
> -		}
> +		packet_set_ts(hdr, ts);
> +
> +		nb_rx++;
>   
>   		mmap_rx_user_ready(ppd.raw);
>   		frame_num = next_frame_num;
Elo, Matias (Nokia - FI/Espoo) June 2, 2016, 6:55 a.m. UTC | #2
> > diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-

> generic/pktio/socket_mmap.c

> > index ce05775..6d7ec59 100644

> > --- a/platform/linux-generic/pktio/socket_mmap.c

> > +++ b/platform/linux-generic/pktio/socket_mmap.c

> > @@ -166,6 +166,10 @@ static inline unsigned pkt_mmap_v2_rx(pktio_entry_t

> *pktio_entry,

> >   	frame_num = ring->frame_num;

> >

> >   	while (i < len) {

> 

> In current function we have several 'continue' without incrementing i.

> I.e. i incremented in the end of cycle as well as nb_rx.

> Can this function hang on waiting requested number on packets (len)? It

> has return len or less accepted packets. And I think

> that increment of i has to be at the beginning, something like:

> for (i =0, nb_rx =0; i < len; i++) {

> 

> pkt_table[nb_rx]); <-- index from nb_rx, not from i.

> 

>   if (ok)

>      nb_rx++;

> 

> }

> 


Thanks Maxim, the function may certainly hang here. The problem was there already in the original function but I'll create V2 so it gets fixed at the same time.

-Matias
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_classification_internal.h b/platform/linux-generic/include/odp_classification_internal.h
index 3a88462..d6d6904 100644
--- a/platform/linux-generic/include/odp_classification_internal.h
+++ b/platform/linux-generic/include/odp_classification_internal.h
@@ -30,21 +30,6 @@  extern "C" {
 
 /**
 @internal
-Select a CoS for the given Packet based on pktio
-
-This function will call all the PMRs associated with a pktio for
-a given packet and will return the matched COS object.
-This function will check PMR, L2 and L3 QoS COS object associated
-with the PKTIO interface.
-
-Returns the default cos if the packet does not match any PMR
-Returns the error_cos if the packet has an error
-**/
-cos_t *pktio_select_cos(pktio_entry_t *pktio, const uint8_t *pkt_addr,
-			odp_packet_hdr_t *pkt_hdr);
-
-/**
-@internal
 match_qos_cos
 
 Select a CoS for the given Packet based on QoS values
@@ -60,10 +45,10 @@  Packet Classifier
 
 Start function for Packet Classifier
 This function calls Classifier module internal functions for a given packet and
-enqueues the packet to specific Queue based on PMR and CoS selected.
-The packet is allocated from the pool associated with the CoS
+selects destination queue and packet pool based on selected PMR and CoS.
 **/
-int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt);
+int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base, uint16_t len,
+			odp_pool_t *pool, odp_packet_hdr_t *pkt_hdr);
 
 /**
 Packet IO classifier init
diff --git a/platform/linux-generic/include/odp_packet_internal.h b/platform/linux-generic/include/odp_packet_internal.h
index 67ee34d..d5ace12 100644
--- a/platform/linux-generic/include/odp_packet_internal.h
+++ b/platform/linux-generic/include/odp_packet_internal.h
@@ -39,6 +39,7 @@  typedef union {
 	struct {
 		uint64_t parsed_l2:1; /**< L2 parsed */
 		uint64_t parsed_all:1;/**< Parsing complete */
+		uint64_t dst_queue:1; /**< Dst queue present */
 
 		uint64_t flow_hash:1; /**< Flow hash present */
 		uint64_t timestamp:1; /**< Timestamp present */
@@ -156,6 +157,8 @@  typedef struct {
 	uint32_t l3_len;         /**< Layer 3 length */
 	uint32_t l4_len;         /**< Layer 4 length */
 
+	odp_queue_t dst_queue;   /**< Classifier destination queue */
+
 	uint32_t flow_hash;      /**< Flow hash value */
 	odp_time_t timestamp;    /**< Timestamp value */
 
@@ -187,6 +190,8 @@  static inline void copy_packet_parser_metadata(odp_packet_hdr_t *src_hdr,
 
 	dst_hdr->l3_len         = src_hdr->l3_len;
 	dst_hdr->l4_len         = src_hdr->l4_len;
+
+	dst_hdr->dst_queue      = src_hdr->dst_queue;
 }
 
 static inline void *packet_map(odp_packet_hdr_t *pkt_hdr,
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index 139b1bd..53683b3 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -207,9 +207,6 @@  typedef struct pktio_if_ops {
 				    const odp_pktout_queue_param_t *p);
 } pktio_if_ops_t;
 
-int _odp_packet_cls_enq(pktio_entry_t *pktio_entry, const uint8_t *base,
-			uint16_t buf_len, odp_time_t *ts);
-
 extern void *pktio_entry_ptr[];
 
 static inline int pktio_to_id(odp_pktio_t pktio)
diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
index a3fe545..7520bdc 100644
--- a/platform/linux-generic/odp_classification.c
+++ b/platform/linux-generic/odp_classification.c
@@ -748,66 +748,19 @@  int pktio_classifier_init(pktio_entry_t *entry)
 }
 
 /**
- * Classify packet and enqueue to the right queue
- *
- * entry		pktio where it arrived
- * pkt		packet handle
- *
- * Return values:
- * 0 on success, packet is consumed
- * -ENOENT CoS dropped the packet
- * -EFAULT Bug, packet is released
- * -EINVAL Config error, packet is NOT released
- * -ENOMEM Target CoS pool exhausted, packet is NOT released
- */
-
-int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt)
-{
-	queue_entry_t *queue;
-	cos_t *cos;
-	odp_packet_hdr_t *pkt_hdr;
-	odp_packet_t new_pkt;
-	uint8_t *pkt_addr;
-
-	if (entry == NULL) {
-		odp_packet_free(pkt);
-		return -EFAULT;
-	}
-
-	pkt_hdr = odp_packet_hdr(pkt);
-	pkt_addr = odp_packet_data(pkt);
-
-	/* Matching PMR and selecting the CoS for the packet*/
-	cos = pktio_select_cos(entry, pkt_addr, pkt_hdr);
-	if (cos == NULL)
-		return -EINVAL;
-
-	if (cos->s.queue == NULL || cos->s.pool == NULL) {
-		odp_packet_free(pkt);
-		return -ENOENT;
-	}
-
-	if (odp_packet_pool(pkt) != cos->s.pool->s.pool_hdl) {
-		new_pkt = odp_packet_copy(pkt, cos->s.pool->s.pool_hdl);
-		if (new_pkt == ODP_PACKET_INVALID)
-			return -ENOMEM;
-		odp_packet_free(pkt);
-	} else {
-		new_pkt = pkt;
-	}
-
-	/* Enqueuing the Packet based on the CoS */
-	queue = cos->s.queue;
-	if (queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)new_pkt), 0)) {
-		odp_packet_free(new_pkt);
-		return -EFAULT;
-	} else {
-		return 0;
-	}
-}
-
-cos_t *pktio_select_cos(pktio_entry_t *entry, const uint8_t *pkt_addr,
-			odp_packet_hdr_t *pkt_hdr)
+Select a CoS for the given Packet based on pktio
+
+This function will call all the PMRs associated with a pktio for
+a given packet and will return the matched COS object.
+This function will check PMR, L2 and L3 QoS COS object associated
+with the PKTIO interface.
+
+Returns the default cos if the packet does not match any PMR
+Returns the error_cos if the packet has an error
+**/
+static inline cos_t *cls_select_cos(pktio_entry_t *entry,
+				    const uint8_t *pkt_addr,
+				    odp_packet_hdr_t *pkt_hdr)
 {
 	pmr_t *pmr;
 	cos_t *cos;
@@ -841,6 +794,45 @@  cos_t *pktio_select_cos(pktio_entry_t *entry, const uint8_t *pkt_addr,
 	return cls->default_cos;
 }
 
+/**
+ * Classify packet
+ *
+ * @param pktio_entry	Ingress pktio
+ * @param base		Packet data
+ * @param len		Packet length
+ * @param pool[out]	Packet pool
+ * @param pkt_hdr[out]	Packet header
+ *
+ * @retval 0 on success
+ * @retval -EFAULT Bug
+ * @retval -EINVAL Config error
+ *
+ * @note *base is not released
+ */
+int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base, uint16_t len,
+			odp_pool_t *pool, odp_packet_hdr_t *pkt_hdr)
+{
+	cos_t *cos;
+
+	packet_parse_reset(pkt_hdr);
+	pkt_hdr->frame_len = len;
+
+	_odp_parse_common(pkt_hdr, base);
+	cos = cls_select_cos(entry, base, pkt_hdr);
+
+	if (cos == NULL)
+		return -EINVAL;
+
+	if (cos->s.queue == NULL || cos->s.pool == NULL)
+		return -EFAULT;
+
+	*pool = cos->s.pool->s.pool_hdl;
+	pkt_hdr->input_flags.dst_queue = 1;
+	pkt_hdr->dst_queue = cos->s.queue->s.handle;
+
+	return 0;
+}
+
 cos_t *match_qos_l3_cos(pmr_l3_cos_t *l3_cos, const uint8_t *pkt_addr,
 			odp_packet_hdr_t *hdr)
 {
diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
index 5b11af6..2868736 100644
--- a/platform/linux-generic/odp_packet.c
+++ b/platform/linux-generic/odp_packet.c
@@ -1295,11 +1295,6 @@  parse_exit:
 	return pkt_hdr->error_flags.all != 0;
 }
 
-int _odp_cls_parse(odp_packet_hdr_t *pkt_hdr, const uint8_t *parseptr)
-{
-	return _odp_parse_common(pkt_hdr, parseptr);
-}
-
 /**
  * Simple packet parser
  */
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index 652709e..6de39b6 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -531,6 +531,40 @@  odp_pktio_t odp_pktio_lookup(const char *name)
 	return hdl;
 }
 
+static inline int pktin_recv_buf(odp_pktin_queue_t queue,
+				 odp_buffer_hdr_t *buffer_hdrs[], int num)
+{
+	odp_packet_t pkt;
+	odp_packet_t packets[num];
+	odp_packet_hdr_t *pkt_hdr;
+	odp_buffer_hdr_t *buf_hdr;
+	odp_buffer_t buf;
+	int i;
+	int ret;
+	int num_rx = 0;
+
+	ret = odp_pktin_recv(queue, packets, num);
+
+	for (i = 0; i < ret; i++) {
+		pkt = packets[i];
+		pkt_hdr = odp_packet_hdr(pkt);
+		buf = _odp_packet_to_buffer(pkt);
+		buf_hdr = odp_buf_to_hdr(buf);
+
+		if (pkt_hdr->input_flags.dst_queue) {
+			queue_entry_t *dst_queue;
+
+			dst_queue = queue_to_qentry(pkt_hdr->dst_queue);
+			ret = queue_enq(dst_queue, buf_hdr, 0);
+			if (ret < 0)
+				odp_packet_free(pkt);
+			continue;
+		}
+		buffer_hdrs[num_rx++] = buf_hdr;
+	}
+	return num_rx;
+}
+
 int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
 {
 	odp_packet_t pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
@@ -579,25 +613,18 @@  int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
 odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
 {
 	odp_buffer_hdr_t *buf_hdr;
-	odp_buffer_t buf;
-	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
-	int pkts, i;
+	int pkts;
 
 	buf_hdr = queue_deq(qentry);
 	if (buf_hdr != NULL)
 		return buf_hdr;
 
-	pkts = odp_pktin_recv(qentry->s.pktin, pkt_tbl, QUEUE_MULTI_MAX);
+	pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
 
 	if (pkts <= 0)
 		return NULL;
 
-	for (i = 0; i < pkts; i++) {
-		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-		hdr_tbl[i] = odp_buf_to_hdr(buf);
-	}
-
 	if (pkts > 1)
 		queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0);
 	buf_hdr = hdr_tbl[0];
@@ -615,15 +642,12 @@  int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED,
 int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int nbr;
-	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
-	odp_buffer_t buf;
 	int pkts, i, j;
 
 	nbr = queue_deq_multi(qentry, buf_hdr, num);
 	if (odp_unlikely(nbr > num))
-		ODP_ABORT("queue_deq_multi req: %d, returned %d\n",
-			num, nbr);
+		ODP_ABORT("queue_deq_multi req: %d, returned %d\n", num, nbr);
 
 	/** queue already has number of requsted buffers,
 	 *  do not do receive in that case.
@@ -631,20 +655,16 @@  int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
 	if (nbr == num)
 		return nbr;
 
-	pkts = odp_pktin_recv(qentry->s.pktin, pkt_tbl, QUEUE_MULTI_MAX);
+	pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
 	if (pkts <= 0)
 		return nbr;
 
-	/* Fill in buf_hdr first */
-	for (i = 0; i < pkts && nbr < num; i++, nbr++) {
-		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-		buf_hdr[nbr] = odp_buf_to_hdr(buf);
-	}
+	for (i = 0; i < pkts && nbr < num; i++, nbr++)
+		buf_hdr[nbr] = hdr_tbl[i];
+
 	/* Queue the rest for later */
-	for (j = 0; i < pkts; i++, j++) {
-		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-		hdr_tbl[j] = odp_buf_to_hdr(buf);
-	}
+	for (j = 0; i < pkts; i++, j++)
+		hdr_tbl[j] = hdr_tbl[i];
 
 	if (j)
 		queue_enq_multi(qentry, hdr_tbl, j, 0);
@@ -653,10 +673,8 @@  int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
 
 int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
 {
-	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
-	int num, i, idx;
-	odp_buffer_t buf;
+	int num, idx;
 	pktio_entry_t *entry;
 
 	entry = pktio_entry_by_index(pktio_index);
@@ -678,7 +696,7 @@  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
 		odp_queue_t queue;
 		odp_pktin_queue_t pktin = entry->s.in_queue[index[idx]].pktin;
 
-		num = odp_pktin_recv(pktin, pkt_tbl, QUEUE_MULTI_MAX);
+		num = pktin_recv_buf(pktin, hdr_tbl, QUEUE_MULTI_MAX);
 
 		if (num == 0)
 			continue;
@@ -688,11 +706,6 @@  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
 			return -1;
 		}
 
-		for (i = 0; i < num; i++) {
-			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-			hdr_tbl[i] = odp_buf_to_hdr(buf);
-		}
-
 		queue = entry->s.in_queue[index[idx]].queue;
 		qentry = queue_to_qentry(queue);
 		queue_enq_multi(qentry, hdr_tbl, num, 0);
diff --git a/platform/linux-generic/pktio/dpdk.c b/platform/linux-generic/pktio/dpdk.c
index 25dde7b..6d4e287 100644
--- a/platform/linux-generic/pktio/dpdk.c
+++ b/platform/linux-generic/pktio/dpdk.c
@@ -15,6 +15,7 @@ 
 #include <odp/api/cpumask.h>
 
 #include <odp_packet_io_internal.h>
+#include <odp_classification_internal.h>
 #include <odp_packet_dpdk.h>
 #include <odp_debug_internal.h>
 
@@ -706,6 +707,9 @@  static inline int mbuf_to_pkt(pktio_entry_t *pktio_entry,
 	int nb_pkts = 0;
 
 	for (i = 0; i < num; i++) {
+		odp_pool_t pool = pktio_entry->s.pkt_dpdk.pool;
+		odp_packet_hdr_t parsed_hdr;
+
 		mbuf = mbuf_table[i];
 		if (odp_unlikely(mbuf->nb_segs != 1)) {
 			ODP_ERR("Segmented buffers not supported\n");
@@ -718,50 +722,43 @@  static inline int mbuf_to_pkt(pktio_entry_t *pktio_entry,
 		pkt_len = rte_pktmbuf_pkt_len(mbuf);
 
 		if (pktio_cls_enabled(pktio_entry)) {
-			int ret;
-
-			ret = _odp_packet_cls_enq(pktio_entry,
-						  (const uint8_t *)buf,
-						  pkt_len, ts);
-			if (ret && ret != -ENOENT)
-				nb_pkts = ret;
-		} else {
-			pkt = packet_alloc(pktio_entry->s.pkt_dpdk.pool,
-					   pkt_len, 1);
-			if (pkt == ODP_PACKET_INVALID) {
-				ODP_ERR("packet_alloc failed\n");
+			if (cls_classify_packet(pktio_entry,
+						(const uint8_t *)buf,
+						pkt_len, &pool, &parsed_hdr))
 				goto fail;
-			}
+		}
+		pkt = packet_alloc(pool, pkt_len, 1);
+		if (pkt == ODP_PACKET_INVALID)
+			goto fail;
 
-			pkt_hdr = odp_packet_hdr(pkt);
+		pkt_hdr = odp_packet_hdr(pkt);
 
-			/* For now copy the data in the mbuf,
-			   worry about zero-copy later */
-			if (odp_packet_copy_from_mem(pkt, 0, pkt_len,
-						     buf) != 0) {
-				ODP_ERR("odp_packet_copy_from_mem failed\n");
-				odp_packet_free(pkt);
-				goto fail;
-			}
+		/* For now copy the data in the mbuf,
+		   worry about zero-copy later */
+		if (odp_packet_copy_from_mem(pkt, 0, pkt_len, buf) != 0) {
+			odp_packet_free(pkt);
+			goto fail;
+		}
+		pkt_hdr->input = pktio_entry->s.handle;
 
+		if (pktio_cls_enabled(pktio_entry))
+			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
+		else
 			packet_parse_l2(pkt_hdr);
 
-			pkt_hdr->input = pktio_entry->s.handle;
+		if (mbuf->ol_flags & PKT_RX_RSS_HASH)
+			odp_packet_flow_hash_set(pkt, mbuf->hash.rss);
 
-			if (mbuf->ol_flags & PKT_RX_RSS_HASH)
-				odp_packet_flow_hash_set(pkt, mbuf->hash.rss);
+		packet_set_ts(pkt_hdr, ts);
 
-			packet_set_ts(pkt_hdr, ts);
+		pkt_table[nb_pkts++] = pkt;
 
-			pkt_table[nb_pkts++] = pkt;
-		}
 		rte_pktmbuf_free(mbuf);
 	}
 
 	return nb_pkts;
 
 fail:
-	ODP_ERR("Creating ODP packet failed\n");
 	for (j = i; j < num; j++)
 		rte_pktmbuf_free(mbuf_table[j]);
 
diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
index cdf5326..75f6a0a 100644
--- a/platform/linux-generic/pktio/loop.c
+++ b/platform/linux-generic/pktio/loop.c
@@ -55,9 +55,12 @@  static int loopback_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	queue_entry_t *qentry;
 	odp_packet_hdr_t *pkt_hdr;
+	odp_packet_hdr_t parsed_hdr;
 	odp_packet_t pkt;
 	odp_time_t ts_val;
 	odp_time_t *ts = NULL;
+	int num_rx = 0;
+	int failed = 0;
 
 	if (odp_unlikely(len > QUEUE_MULTI_MAX))
 		len = QUEUE_MULTI_MAX;
@@ -73,59 +76,58 @@  static int loopback_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 		ts = &ts_val;
 	}
 
-	if (pktio_cls_enabled(pktio_entry)) {
-		int failed = 0, discarded = 0;
+	for (i = 0; i < nbr; i++) {
+		pkt = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
 
-		for (i = 0; i < nbr; i++) {
+		if (pktio_cls_enabled(pktio_entry)) {
+			odp_packet_t new_pkt;
+			odp_pool_t new_pool;
+			uint8_t *pkt_addr;
 			int ret;
-			pkt = _odp_packet_from_buffer(odp_hdr_to_buf
-						      (hdr_tbl[i]));
-			pkt_hdr = odp_packet_hdr(pkt);
-			packet_parse_reset(pkt_hdr);
-			packet_parse_l2(pkt_hdr);
-			ret = _odp_packet_classifier(pktio_entry, pkt);
-			switch (ret) {
-			case 0:
-				packet_set_ts(pkt_hdr, ts);
-				pkt_hdr->input = pktio_entry->s.handle;
-				pktio_entry->s.stats.in_octets +=
-					odp_packet_len(pkt);
-				break;
-			case -ENOENT:
-				discarded++;
-				break;
-			case -EFAULT:
+
+			pkt_addr = odp_packet_data(pkt);
+			ret = cls_classify_packet(pktio_entry, pkt_addr,
+						  odp_packet_len(pkt),
+						  &new_pool, &parsed_hdr);
+			if (ret) {
 				failed++;
-				break;
-			default:
-				ret = queue_enq(qentry, hdr_tbl[i], 0);
+				odp_packet_free(pkt);
+				continue;
+			}
+			if (new_pool != odp_packet_pool(pkt)) {
+				new_pkt = odp_packet_copy(pkt, new_pool);
+
+				odp_packet_free(pkt);
+
+				if (new_pkt == ODP_PACKET_INVALID) {
+					failed++;
+					continue;
+				}
+				pkt = new_pkt;
 			}
 		}
-		pktio_entry->s.stats.in_errors += failed;
-		pktio_entry->s.stats.in_discards += discarded;
-		pktio_entry->s.stats.in_ucast_pkts += nbr - failed - discarded;
+		pkt_hdr = odp_packet_hdr(pkt);
 
-		odp_ticketlock_unlock(&pktio_entry->s.rxl);
+		pkt_hdr->input = pktio_entry->s.handle;
 
-		return -failed;
-	} else {
-		for (i = 0; i < nbr; ++i) {
-			pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf
-							  (hdr_tbl[i]));
-			pkt_hdr = odp_packet_hdr(pkts[i]);
-			packet_parse_reset(pkt_hdr);
+		if (pktio_cls_enabled(pktio_entry))
+			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
+		else
 			packet_parse_l2(pkt_hdr);
-			packet_set_ts(pkt_hdr, ts);
-			pkt_hdr->input = pktio_entry->s.handle;
-			pktio_entry->s.stats.in_octets +=
-				odp_packet_len(pkts[i]);
-		}
-		pktio_entry->s.stats.in_ucast_pkts += nbr;
 
-		odp_ticketlock_unlock(&pktio_entry->s.rxl);
+		packet_set_ts(pkt_hdr, ts);
 
-		return nbr;
+		pktio_entry->s.stats.in_octets += odp_packet_len(pkt);
+
+		pkts[num_rx++] = pkt;
 	}
+
+	pktio_entry->s.stats.in_errors += failed;
+	pktio_entry->s.stats.in_ucast_pkts += num_rx - failed;
+
+	odp_ticketlock_unlock(&pktio_entry->s.rxl);
+
+	return num_rx;
 }
 
 static int loopback_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c
index 59cb276..4ea0d4a 100644
--- a/platform/linux-generic/pktio/netmap.c
+++ b/platform/linux-generic/pktio/netmap.c
@@ -593,7 +593,9 @@  static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
 				    uint16_t len, odp_time_t *ts)
 {
 	odp_packet_t pkt;
-	int ret;
+	odp_pool_t pool = pktio_entry->s.pkt_nm.pool;
+	odp_packet_hdr_t *pkt_hdr;
+	odp_packet_hdr_t parsed_hdr;
 
 	if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
 		ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
@@ -607,35 +609,32 @@  static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
 	}
 
 	if (pktio_cls_enabled(pktio_entry)) {
-		ret = _odp_packet_cls_enq(pktio_entry, (const uint8_t *)buf,
-					  len, ts);
-		if (ret && ret != -ENOENT)
-			return ret;
-		return 0;
-	} else {
-		odp_packet_hdr_t *pkt_hdr;
-
-		pkt = packet_alloc(pktio_entry->s.pkt_nm.pool, len, 1);
-		if (pkt == ODP_PACKET_INVALID)
+		if (cls_classify_packet(pktio_entry, (const uint8_t *)buf, len,
+					&pool, &parsed_hdr))
 			return -1;
+	}
+	pkt = packet_alloc(pool, len, 1);
+	if (pkt == ODP_PACKET_INVALID)
+		return -1;
 
-		pkt_hdr = odp_packet_hdr(pkt);
+	pkt_hdr = odp_packet_hdr(pkt);
 
-		/* For now copy the data in the mbuf,
-		   worry about zero-copy later */
-		if (odp_packet_copy_from_mem(pkt, 0, len, buf) != 0) {
-			odp_packet_free(pkt);
-			return -1;
-		}
+	/* For now copy the data in the mbuf,
+	   worry about zero-copy later */
+	if (odp_packet_copy_from_mem(pkt, 0, len, buf) != 0) {
+		odp_packet_free(pkt);
+		return -1;
+	}
+	pkt_hdr->input = pktio_entry->s.handle;
 
+	if (pktio_cls_enabled(pktio_entry))
+		copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
+	else
 		packet_parse_l2(pkt_hdr);
 
-		pkt_hdr->input = pktio_entry->s.handle;
+	packet_set_ts(pkt_hdr, ts);
 
-		packet_set_ts(pkt_hdr, ts);
-
-		*pkt_out = pkt;
-	}
+	*pkt_out = pkt;
 
 	return 1;
 }
diff --git a/platform/linux-generic/pktio/pktio_common.c b/platform/linux-generic/pktio/pktio_common.c
index 8a01477..611bb45 100644
--- a/platform/linux-generic/pktio/pktio_common.c
+++ b/platform/linux-generic/pktio/pktio_common.c
@@ -9,73 +9,6 @@ 
 #include <odp_classification_internal.h>
 #include <errno.h>
 
-/**
- * Classify packet, copy it in a odp_packet_t and enqueue to the right queue
- *
- * pktio_entry	pktio where it arrived
- * base		packet data
- * buf_len	packet length
- *
- * Return values:
- * 0 on success, packet is consumed
- * -ENOENT CoS dropped the packet
- * -EFAULT Bug
- * -EINVAL Config error
- * -ENOMEM Target CoS pool exhausted
- *
- * Note: *base is not released, only pkt if there is an error
- *
- */
-int _odp_packet_cls_enq(pktio_entry_t *pktio_entry,
-			const uint8_t *base, uint16_t buf_len, odp_time_t *ts)
-{
-	cos_t *cos;
-	odp_packet_t pkt;
-	odp_packet_hdr_t *pkt_hdr;
-	odp_packet_hdr_t src_pkt_hdr;
-	int ret;
-	odp_pool_t pool;
-
-	packet_parse_reset(&src_pkt_hdr);
-
-	_odp_cls_parse(&src_pkt_hdr, base);
-	cos = pktio_select_cos(pktio_entry, base, &src_pkt_hdr);
-
-	/* if No CoS found then drop the packet */
-	if (cos == NULL)
-		return -EINVAL;
-
-	if (cos->s.queue == NULL || cos->s.pool == NULL)
-		return -EFAULT;
-
-	pool = cos->s.pool->s.pool_hdl;
-
-	pkt = odp_packet_alloc(pool, buf_len);
-	if (odp_unlikely(pkt == ODP_PACKET_INVALID))
-		return -ENOMEM;
-	pkt_hdr = odp_packet_hdr(pkt);
-
-	copy_packet_parser_metadata(&src_pkt_hdr, pkt_hdr);
-	pkt_hdr->input = pktio_entry->s.handle;
-
-	if (odp_packet_copy_from_mem(pkt, 0, buf_len, base) != 0) {
-		odp_packet_free(pkt);
-		return -EFAULT;
-	}
-
-	packet_set_ts(pkt_hdr, ts);
-
-	/* Parse and set packet header data */
-	odp_packet_pull_tail(pkt, odp_packet_len(pkt) - buf_len);
-	ret = queue_enq(cos->s.queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
-	if (ret < 0) {
-		odp_packet_free(pkt);
-		return -EFAULT;
-	}
-
-	return 0;
-}
-
 int sock_stats_reset_fd(pktio_entry_t *pktio_entry, int fd)
 {
 	int err = 0;
diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c
index c7df7c7..e07b6ad 100644
--- a/platform/linux-generic/pktio/socket.c
+++ b/platform/linux-generic/pktio/socket.c
@@ -617,7 +617,6 @@  static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 	struct mmsghdr msgvec[ODP_PACKET_SOCKET_MAX_BURST_RX];
 	int nb_rx = 0;
 	int recv_msgs;
-	int ret;
 	uint8_t **recv_cache;
 	int i;
 
@@ -642,7 +641,6 @@  static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 			iovecs[i].iov_len = PACKET_JUMBO_LEN;
 			msgvec[i].msg_hdr.msg_iov = &iovecs[i];
 		}
-		/* number of successfully allocated pkt buffers */
 		msgvec_len = i;
 
 		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
@@ -652,6 +650,10 @@  static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 			ts_val = odp_time_global();
 
 		for (i = 0; i < recv_msgs; i++) {
+			odp_packet_hdr_t *pkt_hdr;
+			odp_packet_t pkt;
+			odp_pool_t pool = pkt_sock->pool;
+			odp_packet_hdr_t parsed_hdr;
 			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
 			struct ethhdr *eth_hdr = base;
 			uint16_t pkt_len = msgvec[i].msg_len;
@@ -661,10 +663,25 @@  static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
 							eth_hdr->h_source)))
 				continue;
 
-			ret = _odp_packet_cls_enq(pktio_entry, base, pkt_len,
-						  ts);
-			if (ret && ret != -ENOENT)
-				nb_rx = ret;
+			if (cls_classify_packet(pktio_entry, base, pkt_len,
+						&pool, &parsed_hdr))
+				continue;
+			pkt = packet_alloc(pool, pkt_len, 1);
+			if (pkt == ODP_PACKET_INVALID)
+				continue;
+
+			pkt_hdr = odp_packet_hdr(pkt);
+
+			if (odp_packet_copy_from_mem(pkt, 0, pkt_len,
+						     base) != 0) {
+				odp_packet_free(pkt);
+				continue;
+			}
+			pkt_hdr->input = pktio_entry->s.handle;
+			copy_packet_parser_metadata(&parsed_hdr, pkt_hdr);
+			packet_set_ts(pkt_hdr, ts);
+
+			pkt_table[nb_rx++] = pkt;
 		}
 	} else {
 		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX]
diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c
index ce05775..6d7ec59 100644
--- a/platform/linux-generic/pktio/socket_mmap.c
+++ b/platform/linux-generic/pktio/socket_mmap.c
@@ -166,6 +166,10 @@  static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
 	frame_num = ring->frame_num;
 
 	while (i < len) {
+		odp_packet_hdr_t *hdr;
+		odp_packet_hdr_t parsed_hdr;
+		odp_pool_t pool = pkt_sock->pool;
+
 		if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
 			break;
 
@@ -194,35 +198,39 @@  static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
 						       &pkt_len);
 
 		if (pktio_cls_enabled(pktio_entry)) {
-			ret = _odp_packet_cls_enq(pktio_entry, pkt_buf,
-						  pkt_len, ts);
-			if (ret && ret != -ENOENT)
-				nb_rx = ret;
-		} else {
-			odp_packet_hdr_t *hdr;
-
-			pkt_table[i] = packet_alloc(pkt_sock->pool, pkt_len, 1);
-			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID)) {
-				mmap_rx_user_ready(ppd.raw); /* drop */
-				frame_num = next_frame_num;
-				continue;
-			}
-			hdr = odp_packet_hdr(pkt_table[i]);
-			ret = odp_packet_copy_from_mem(pkt_table[i], 0,
-						       pkt_len, pkt_buf);
-			if (ret != 0) {
-				odp_packet_free(pkt_table[i]);
+			if (cls_classify_packet(pktio_entry, pkt_buf, pkt_len,
+						&pool, &parsed_hdr)) {
 				mmap_rx_user_ready(ppd.raw); /* drop */
 				frame_num = next_frame_num;
 				continue;
 			}
+		}
+
+		pkt_table[i] = packet_alloc(pool, pkt_len, 1);
+		if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID)) {
+			mmap_rx_user_ready(ppd.raw); /* drop */
+			frame_num = next_frame_num;
+			continue;
+		}
+		hdr = odp_packet_hdr(pkt_table[i]);
+		ret = odp_packet_copy_from_mem(pkt_table[i], 0,
+					       pkt_len, pkt_buf);
+		if (ret != 0) {
+			odp_packet_free(pkt_table[i]);
+			mmap_rx_user_ready(ppd.raw); /* drop */
+			frame_num = next_frame_num;
+			continue;
+		}
+		hdr->input = pktio_entry->s.handle;
 
+		if (pktio_cls_enabled(pktio_entry))
+			copy_packet_parser_metadata(&parsed_hdr, hdr);
+		else
 			packet_parse_l2(hdr);
-			packet_set_ts(hdr, ts);
-			hdr->input = pktio_entry->s.handle;
 
-			nb_rx++;
-		}
+		packet_set_ts(hdr, ts);
+
+		nb_rx++;
 
 		mmap_rx_user_ready(ppd.raw);
 		frame_num = next_frame_num;