diff mbox

[3/5] linux-generic: classification: implements odp_cls_cos_pool_set() api

Message ID 1447741013-7333-3-git-send-email-bala.manoharan@linaro.org
State New
Headers show

Commit Message

Balasubramanian Manoharan Nov. 17, 2015, 6:16 a.m. UTC
Adds support for configuring packet pool to a class-of-service.
linux-generic packet parser is enhanced to parse a packet directly from
a memory location rather than from odp_packet_t.

packet receive code is modified to run packet classifier directly from
the stream so that the packet can be allocated from the pool specified
by the CoS.

Signed-off-by: Balasubramanian Manoharan <bala.manoharan@linaro.org>
---
 platform/linux-generic/odp_classification.c |  50 ++++++++-
 platform/linux-generic/odp_packet.c         |  74 ++++++++-----
 platform/linux-generic/odp_packet_io.c      |  80 ++++----------
 platform/linux-generic/pktio/loop.c         |  29 ++++-
 platform/linux-generic/pktio/netmap.c       |  66 +++++++++--
 platform/linux-generic/pktio/socket.c       | 166 ++++++++++++++++++++++------
 platform/linux-generic/pktio/socket_mmap.c  |  95 ++++++++++++----
 7 files changed, 395 insertions(+), 165 deletions(-)

Comments

Stuart Haslam Nov. 19, 2015, 10:54 a.m. UTC | #1
On Tue, Nov 17, 2015 at 11:46:51AM +0530, Balasubramanian Manoharan wrote:
> Adds support for configuring packet pool to a class-of-service.
> linux-generic packet parser is enhanced to parse a packet directly from
> a memory location rather than from odp_packet_t.
> 
> packet receive code is modified to run packet classifier directly from
> the stream so that the packet can be allocated from the pool specified
> by the CoS.
> 
> Signed-off-by: Balasubramanian Manoharan <bala.manoharan@linaro.org>
> ---
>  platform/linux-generic/odp_classification.c |  50 ++++++++-
>  platform/linux-generic/odp_packet.c         |  74 ++++++++-----
>  platform/linux-generic/odp_packet_io.c      |  80 ++++----------
>  platform/linux-generic/pktio/loop.c         |  29 ++++-
>  platform/linux-generic/pktio/netmap.c       |  66 +++++++++--
>  platform/linux-generic/pktio/socket.c       | 166 ++++++++++++++++++++++------
>  platform/linux-generic/pktio/socket_mmap.c  |  95 ++++++++++++----
>  7 files changed, 395 insertions(+), 165 deletions(-)
> 
> diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
> index 1b6d593..a702867 100644
> --- a/platform/linux-generic/odp_classification.c
> +++ b/platform/linux-generic/odp_classification.c
> @@ -661,6 +661,42 @@ int odp_pktio_pmr_match_set_cos(odp_pmr_set_t pmr_set_id, odp_pktio_t src_pktio,
>  	return 0;
>  }
>  
> +int odp_cls_cos_pool_set(odp_cos_t cos_id, odp_pool_t pool_id)
> +{
> +	cos_t *cos;
> +	pool_entry_t *pool;
> +
> +	cos = get_cos_entry(cos_id);
> +	if (cos == NULL) {
> +		ODP_ERR("Invalid odp_cos_t handle");
> +		return -1;
> +	}
> +	pool = odp_pool_to_entry(pool_id);
> +	if (pool == NULL) {
> +		ODP_ERR("Invalid odp_pool_t handle");
> +		return -1;
> +	}
> +
> +	LOCK(&cos->s.lock);
> +	cos->s.pool = pool;
> +	cos->s.pool_id = pool_id;
> +	UNLOCK(&cos->s.lock);
> +	return 0;
> +}
> +
> +odp_pool_t odp_cls_cos_pool(odp_cos_t cos_id)
> +{
> +	cos_t *cos;
> +
> +	cos = get_cos_entry(cos_id);
> +	if (cos == NULL) {
> +		ODP_ERR("Invalid odp_cos_t handle");
> +		return ODP_POOL_INVALID;
> +	}
> +
> +	return cos->s.pool_id;
> +}
> +
>  int verify_pmr(pmr_t *pmr, uint8_t *pkt_addr, odp_packet_hdr_t *pkt_hdr)
>  {
>  	int pmr_failure = 0;
> @@ -824,15 +860,13 @@ int pktio_classifier_init(pktio_entry_t *entry)
>  	return 0;
>  }
>  
> -int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
> +int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt)
>  {
> -	pktio_entry_t *entry;
>  	queue_entry_t *queue;
>  	cos_t *cos;
>  	odp_packet_hdr_t *pkt_hdr;
>  	uint8_t *pkt_addr;
>  
> -	entry = get_pktio_entry(pktio);
>  	if (entry == NULL)
>  		return -1;
>  
> @@ -849,6 +883,16 @@ int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
>  	return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
>  }
>  
> +int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
> +{
> +	pktio_entry_t *entry;
> +
> +	entry = get_pktio_entry(pktio);
> +	if (entry == NULL)
> +		return -1;
> +	return _odp_packet_classifier(entry, pkt);
> +}
> +
>  cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
>  		       odp_packet_hdr_t *pkt_hdr)
>  {
> diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
> index 07c740c..80901c9 100644
> --- a/platform/linux-generic/odp_packet.c
> +++ b/platform/linux-generic/odp_packet.c
> @@ -32,10 +32,8 @@ static inline void packet_parse_disable(odp_packet_hdr_t *pkt_hdr)
>  	pkt_hdr->input_flags.parsed_all = 1;
>  }
>  
> -void packet_parse_reset(odp_packet_t pkt)
> +void packet_parse_reset(odp_packet_hdr_t *pkt_hdr)
>  {
> -	odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
> -
>  	/* Reset parser metadata before new parse */
>  	pkt_hdr->error_flags.all  = 0;
>  	pkt_hdr->input_flags.all  = 0;
> @@ -780,9 +778,9 @@ int _odp_packet_copy_to_packet(odp_packet_t srcpkt, uint32_t srcoffset,
>   * Parser helper function for IPv4
>   */
>  static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
> -				 uint8_t **parseptr, uint32_t *offset)
> +				 const uint8_t **parseptr, uint32_t *offset)
>  {
> -	odph_ipv4hdr_t *ipv4 = (odph_ipv4hdr_t *)*parseptr;
> +	const odph_ipv4hdr_t *ipv4 = (const odph_ipv4hdr_t *)*parseptr;
>  	uint8_t ver = ODPH_IPV4HDR_VER(ipv4->ver_ihl);
>  	uint8_t ihl = ODPH_IPV4HDR_IHL(ipv4->ver_ihl);
>  	uint16_t frag_offset;
> @@ -818,10 +816,10 @@ static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
>   * Parser helper function for IPv6
>   */
>  static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
> -				 uint8_t **parseptr, uint32_t *offset)
> +				 const uint8_t **parseptr, uint32_t *offset)
>  {
> -	odph_ipv6hdr_t *ipv6 = (odph_ipv6hdr_t *)*parseptr;
> -	odph_ipv6hdr_ext_t *ipv6ext;
> +	const odph_ipv6hdr_t *ipv6 = (const odph_ipv6hdr_t *)*parseptr;
> +	const odph_ipv6hdr_ext_t *ipv6ext;
>  
>  	pkt_hdr->l3_len = odp_be_to_cpu_16(ipv6->payload_len);
>  
> @@ -843,7 +841,7 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
>  		pkt_hdr->input_flags.ipopt = 1;
>  
>  		do  {
> -			ipv6ext    = (odph_ipv6hdr_ext_t *)*parseptr;
> +			ipv6ext    = (const odph_ipv6hdr_ext_t *)*parseptr;
>  			uint16_t extlen = 8 + ipv6ext->ext_len * 8;
>  
>  			*offset   += extlen;
> @@ -875,9 +873,9 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
>   * Parser helper function for TCP
>   */
>  static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
> -			     uint8_t **parseptr, uint32_t *offset)
> +			     const uint8_t **parseptr, uint32_t *offset)
>  {
> -	odph_tcphdr_t *tcp = (odph_tcphdr_t *)*parseptr;
> +	const odph_tcphdr_t *tcp = (const odph_tcphdr_t *)*parseptr;
>  
>  	if (tcp->hl < sizeof(odph_tcphdr_t)/sizeof(uint32_t))
>  		pkt_hdr->error_flags.tcp_err = 1;
> @@ -895,9 +893,9 @@ static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
>   * Parser helper function for UDP
>   */
>  static inline void parse_udp(odp_packet_hdr_t *pkt_hdr,
> -			     uint8_t **parseptr, uint32_t *offset)
> +			     const uint8_t **parseptr, uint32_t *offset)
>  {
> -	odph_udphdr_t *udp = (odph_udphdr_t *)*parseptr;
> +	const odph_udphdr_t *udp = (const odph_udphdr_t *)*parseptr;
>  	uint32_t udplen = odp_be_to_cpu_16(udp->length);
>  
>  	if (udplen < sizeof(odph_udphdr_t) ||
> @@ -932,46 +930,51 @@ void packet_parse_l2(odp_packet_hdr_t *pkt_hdr)
>  	pkt_hdr->input_flags.parsed_l2 = 1;
>  }
>  
> -/**
> - * Simple packet parser
> - */
> -int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
> +int _odp_parse_common(odp_packet_hdr_t *pkt_hdr, const uint8_t *ptr)
>  {
> -	odph_ethhdr_t *eth;
> -	odph_vlanhdr_t *vlan;
> +	const odph_ethhdr_t *eth;
> +	const odph_vlanhdr_t *vlan;
>  	uint16_t ethtype;
> -	uint8_t *parseptr;
>  	uint32_t offset, seglen;
>  	uint8_t ip_proto = 0;
> +	const uint8_t *parseptr;
>  
> +	offset = sizeof(odph_ethhdr_t);
>  	if (packet_parse_l2_not_done(pkt_hdr))
>  		packet_parse_l2(pkt_hdr);
>  
> -	eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
> -	offset = sizeof(odph_ethhdr_t);
> -	parseptr = (uint8_t *)&eth->type;
> -	ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
> +	if (ptr == NULL) {
> +		eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
> +		parseptr = (const uint8_t *)&eth->type;
> +		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
> +	} else {
> +		eth = (const odph_ethhdr_t *)ptr;
> +		parseptr = (const uint8_t *)&eth->type;
> +		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
> +	}
> +
>  
>  	/* Parse the VLAN header(s), if present */
>  	if (ethtype == ODPH_ETHTYPE_VLAN_OUTER) {
>  		pkt_hdr->input_flags.vlan_qinq = 1;
>  		pkt_hdr->input_flags.vlan = 1;
> -		vlan = (odph_vlanhdr_t *)(void *)parseptr;
> +
> +		vlan = (const odph_vlanhdr_t *)parseptr;
>  		pkt_hdr->vlan_s_tag = ((ethtype << 16) |
>  				       odp_be_to_cpu_16(vlan->tci));
>  		offset += sizeof(odph_vlanhdr_t);
>  		parseptr += sizeof(odph_vlanhdr_t);
> -		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
> +		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>  	}
>  
>  	if (ethtype == ODPH_ETHTYPE_VLAN) {
>  		pkt_hdr->input_flags.vlan = 1;
> -		vlan = (odph_vlanhdr_t *)(void *)parseptr;
> +		vlan = (const odph_vlanhdr_t *)parseptr;
>  		pkt_hdr->vlan_c_tag = ((ethtype << 16) |
>  				       odp_be_to_cpu_16(vlan->tci));
>  		offset += sizeof(odph_vlanhdr_t);
>  		parseptr += sizeof(odph_vlanhdr_t);
> -		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
> +		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>  	}
>  
>  	/* Check for SNAP vs. DIX */
> @@ -983,7 +986,7 @@ int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
>  		}
>  		offset   += 8;
>  		parseptr += 8;
> -		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
> +		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>  	}
>  
>  	/* Consume Ethertype for Layer 3 parse */
> @@ -1061,3 +1064,16 @@ parse_exit:
>  	pkt_hdr->input_flags.parsed_all = 1;
>  	return pkt_hdr->error_flags.all != 0;
>  }
> +
> +int _odp_cls_parse(odp_packet_hdr_t *pkt_hdr, const uint8_t *parseptr)
> +{
> +	return _odp_parse_common(pkt_hdr, parseptr);
> +}
> +
> +/**
> + * Simple packet parser
> + */
> +int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
> +{
> +	return _odp_parse_common(pkt_hdr, NULL);
> +}
> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
> index b21fd0b..e1e330b 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -196,6 +196,7 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool,
>  		return ODP_PKTIO_INVALID;
>  
>  	memcpy(&pktio_entry->s.param, param, sizeof(odp_pktio_param_t));
> +	pktio_entry->s.id = id;
>  
>  	for (pktio_if = 0; pktio_if_ops[pktio_if]; ++pktio_if) {
>  		ret = pktio_if_ops[pktio_if]->open(id, pktio_entry, dev, pool);
> @@ -551,7 +552,7 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>  	odp_buffer_t buf;
>  	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>  	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> -	int pkts, i, j;
> +	int pkts, i;
>  	odp_pktio_t pktio;
>  
>  	buf_hdr = queue_deq(qentry);
> @@ -564,27 +565,13 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>  	if (pkts <= 0)
>  		return NULL;
>  
> -	if (pktio_cls_enabled(get_pktio_entry(pktio))) {
> -		for (i = 0, j = 0; i < pkts; i++) {
> -			if (0 > packet_classifier(pktio, pkt_tbl[i])) {
> -				buf = _odp_packet_to_buffer(pkt_tbl[i]);
> -				hdr_tbl[j++] = odp_buf_to_hdr(buf);
> -			}
> -		}
> -	} else {
> -		for (i = 0; i < pkts; i++) {
> -			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -			hdr_tbl[i] = odp_buf_to_hdr(buf);
> -		}
> -
> -		j = pkts;
> +	for (i = 0; i < pkts; i++) {
> +		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> +		hdr_tbl[i] = odp_buf_to_hdr(buf);
>  	}
>  
> -	if (0 == j)
> -		return NULL;
> -
> -	if (j > 1)
> -		queue_enq_multi(qentry, &hdr_tbl[1], j - 1, 0);
> +	if (pkts > 1)
> +		queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0);
>  	buf_hdr = hdr_tbl[0];
>  	return buf_hdr;
>  }
> @@ -623,27 +610,15 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
>  	if (pkts <= 0)
>  		return nbr;
>  
> -	if (pktio_cls_enabled(get_pktio_entry(pktio))) {
> -		for (i = 0, j = 0; i < pkts; i++) {
> -			if (0 > packet_classifier(pktio, pkt_tbl[i])) {
> -				buf = _odp_packet_to_buffer(pkt_tbl[i]);
> -				if (nbr < num)
> -					buf_hdr[nbr++] = odp_buf_to_hdr(buf);
> -				else
> -					hdr_tbl[j++] = odp_buf_to_hdr(buf);
> -			}
> -		}
> -	} else {
> -		/* Fill in buf_hdr first */
> -		for (i = 0; i < pkts && nbr < num; i++, nbr++) {
> -			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -			buf_hdr[nbr] = odp_buf_to_hdr(buf);
> -		}
> -		/* Queue the rest for later */
> -		for (j = 0; i < pkts; i++, j++) {
> -			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -			hdr_tbl[j++] = odp_buf_to_hdr(buf);
> -		}
> +	/* Fill in buf_hdr first */
> +	for (i = 0; i < pkts && nbr < num; i++, nbr++) {
> +		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> +		buf_hdr[nbr] = odp_buf_to_hdr(buf);
> +	}
> +	/* Queue the rest for later */
> +	for (j = 0; i < pkts; i++, j++) {
> +		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> +		hdr_tbl[j++] = odp_buf_to_hdr(buf);
>  	}
>  
>  	if (j)
> @@ -655,7 +630,7 @@ int pktin_poll(pktio_entry_t *entry)
>  {
>  	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>  	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> -	int num, num_enq, i;
> +	int num, i;
>  	odp_buffer_t buf;
>  	odp_pktio_t pktio;
>  
> @@ -680,26 +655,15 @@ int pktin_poll(pktio_entry_t *entry)
>  		return -1;
>  	}
>  
> -	if (pktio_cls_enabled(entry)) {
> -		for (i = 0, num_enq = 0; i < num; i++) {
> -			if (packet_classifier(pktio, pkt_tbl[i]) < 0) {
> -				buf = _odp_packet_to_buffer(pkt_tbl[i]);
> -				hdr_tbl[num_enq++] = odp_buf_to_hdr(buf);
> -			}
> -		}
> -	} else {
> -		for (i = 0; i < num; i++) {
> -			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> -			hdr_tbl[i] = odp_buf_to_hdr(buf);
> -		}
> -
> -		num_enq = num;
> +	for (i = 0; i < num; i++) {
> +		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
> +		hdr_tbl[i] = odp_buf_to_hdr(buf);
>  	}
>  
> -	if (num_enq) {
> +	if (num) {
>  		queue_entry_t *qentry;
>  		qentry = queue_to_qentry(entry->s.inq_default);
> -		queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
> +		queue_enq_multi(qentry, hdr_tbl, num, 0);
>  	}
>  
>  	return 0;
> diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
> index ce19add..47745ad 100644
> --- a/platform/linux-generic/pktio/loop.c
> +++ b/platform/linux-generic/pktio/loop.c
> @@ -12,6 +12,7 @@
>  #include <odp.h>
>  #include <odp_packet_internal.h>
>  #include <odp_packet_io_internal.h>
> +#include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
>  #include <odp/hints.h>
>  
> @@ -50,19 +51,35 @@ static int loopback_close(pktio_entry_t *pktio_entry)
>  static int loopback_recv(pktio_entry_t *pktio_entry, odp_packet_t pkts[],
>  			 unsigned len)
>  {
> -	int nbr, i;
> +	int nbr, i, j;
>  	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
>  	queue_entry_t *qentry;
>  	odp_packet_hdr_t *pkt_hdr;
> +	odp_packet_t pkt;
>  
> +	nbr = 0;
>  	qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
>  	nbr = queue_deq_multi(qentry, hdr_tbl, len);
>  
> -	for (i = 0; i < nbr; ++i) {
> -		pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
> -		pkt_hdr = odp_packet_hdr(pkts[i]);
> -		packet_parse_reset(pkts[i]);
> -		packet_parse_l2(pkt_hdr);
> +	if (pktio_cls_enabled(pktio_entry)) {
> +		for (i = 0, j = 0; i < nbr; i++) {
> +			pkt = _odp_packet_from_buffer(odp_hdr_to_buf
> +						      (hdr_tbl[i]));
> +			pkt_hdr = odp_packet_hdr(pkt);
> +			packet_parse_reset(pkt_hdr);
> +			packet_parse_l2(pkt_hdr);
> +			if (0 > _odp_packet_classifier(pktio_entry, pkt))
> +				pkts[j++] = pkt;
> +		}
> +	nbr = j;
> +	} else {
> +		for (i = 0; i < nbr; ++i) {
> +			pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf
> +							  (hdr_tbl[i]));
> +			pkt_hdr = odp_packet_hdr(pkts[i]);
> +			packet_parse_reset(pkt_hdr);
> +			packet_parse_l2(pkt_hdr);
> +		}
>  	}
>  
>  	return nbr;
> diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c
> index d064323..74835ad 100644
> --- a/platform/linux-generic/pktio/netmap.c
> +++ b/platform/linux-generic/pktio/netmap.c
> @@ -20,6 +20,9 @@
>  #include <poll.h>
>  #include <linux/ethtool.h>
>  #include <linux/sockios.h>
> +#include <odp_classification_datamodel.h>
> +#include <odp_classification_inlines.h>
> +#include <odp_classification_internal.h>
>  
>  #define NETMAP_WITH_LIBS
>  #include <net/netmap_user.h>
> @@ -197,7 +200,9 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>  				    uint16_t len)
>  {
>  	odp_packet_t pkt;
> -	odp_packet_hdr_t *pkt_hdr;
> +	pktio_entry_t *pktio_entry;
> +
> +	pktio_entry = args->pktio_entry;
>  
>  	if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
>  		ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
> @@ -210,19 +215,58 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>  		return -1;
>  	}
>  
> -	pkt = packet_alloc(pktio_entry->s.pkt_nm.pool, len, 1);
> -	if (pkt == ODP_PACKET_INVALID)
> -		return -1;
> +	if (pktio_cls_enabled(pktio_entry)) {
> +		odp_packet_hdr_t pkt_hdr;
> +		cos_t *cos;
>  
> -	pkt_hdr = odp_packet_hdr(pkt);
> +		packet_parse_reset(&pkt_hdr);
> +		_odp_cls_parse(&pkt_hdr, buf);
> +		cos = pktio_select_cos(args->pktio_entry,

args?.. looks like a merge error

> +				       (uint8_t *)buf,
> +				       &pkt_hdr);
>  
> -	/* For now copy the data in the mbuf,
> -	   worry about zero-copy later */
> -	if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
> -		odp_packet_free(pkt);
> -		return -1;
> +		if (cos == NULL || cos->s.queue == NULL)
> +			return -1;
> +
> +		pkt = odp_packet_alloc(cos->s.pool_id, len);
> +		if (pkt == ODP_PACKET_INVALID)
> +			return -1;
> +
> +		copy_packet_parser_metadata(&pkt_hdr, odp_packet_hdr(pkt));
> +
> +		/* For now copy the data in the mbuf,
> +		worry about zero-copy later */
> +		if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
> +			odp_packet_free(pkt);
> +			return -1;
> +		}
> +
> +		odp_packet_hdr(pkt)->input = args->pktio_entry->s.id;
> +		ret = queue_enq(cos->s.queue,
> +				odp_buf_to_hdr((odp_buffer_t)pkt), 0);
> +		/*if the queue enq was not successful return
> +		the packet to packet recv function to be enqueued
> +		to the default queue*/
> +		if (ret == 0)
> +			return -1;
> +	} else {
> +		odp_packet_hdr_t *pkt_hdr;
> +
> +		pkt = packet_alloc(pktio_entry->s.pkt_nm->pool, len, 1);
> +		if (pkt == ODP_PACKET_INVALID)
> +			return -1;
> +
> +		pkt_hdr = odp_packet_hdr(pkt);
> +
> +		/* For now copy the data in the mbuf,
> +		   worry about zero-copy later */
> +		if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
> +			odp_packet_free(pkt);
> +			return;

no return value

> +		}
> +
> +		packet_parse_l2(pkt_hdr);
>  	}
> -	packet_parse_l2(pkt_hdr);
>  
>  	*pkt_out = pkt;
>  	return 0;
> diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c
> index 5f5e0ae..95b7ea3 100644
> --- a/platform/linux-generic/pktio/socket.c
> +++ b/platform/linux-generic/pktio/socket.c
> @@ -38,6 +38,9 @@
>  #include <odp_packet_io_internal.h>
>  #include <odp_align_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_classification_datamodel.h>
> +#include <odp_classification_inlines.h>
> +#include <odp_classification_internal.h>
>  #include <odp/hints.h>
>  
>  #include <odp/helper/eth.h>
> @@ -194,6 +197,8 @@ static int sock_close(pktio_entry_t *pktio_entry)
>  		return -1;
>  	}
>  
> +	odp_shm_free(pkt_sock->shm);
> +
>  	return 0;
>  }
>  
> @@ -205,10 +210,13 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>  {
>  	int sockfd;
>  	int err;
> +	int i;
>  	unsigned int if_idx;
>  	struct ifreq ethreq;
>  	struct sockaddr_ll sa_ll;
> +	char shm_name[ODP_SHM_NAME_LEN];
>  	pkt_sock_t *pkt_sock = &pktio_entry->s.pkt_sock;
> +	uint8_t *addr;
>  
>  	/* Init pktio entry */
>  	memset(pkt_sock, 0, sizeof(*pkt_sock));
> @@ -218,6 +226,20 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>  	if (pool == ODP_POOL_INVALID)
>  		return -1;
>  	pkt_sock->pool = pool;
> +	snprintf(shm_name, ODP_SHM_NAME_LEN, "%s-%s", "pktio", netdev);
> +	shm_name[ODP_SHM_NAME_LEN - 1] = '\0';
> +
> +	pkt_sock->shm = odp_shm_reserve(shm_name, PACKET_JUMBO_LEN,
> +					  PACKET_JUMBO_LEN *
> +					  ODP_PACKET_SOCKET_MAX_BURST_RX, 0);
> +	if (pkt_sock->shm == ODP_SHM_INVALID)
> +		return -1;
> +
> +	addr = odp_shm_addr(pkt_sock->shm);
> +	for (i = 0; i < ODP_PACKET_SOCKET_MAX_BURST_RX; i++) {
> +		pkt_sock->cache_ptr[i] = addr;
> +		addr += PACKET_JUMBO_LEN;
> +	}
>  
>  	sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
>  	if (sockfd == -1) {
> @@ -253,7 +275,6 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>  		ODP_ERR("bind(to IF): %s\n", strerror(errno));
>  		goto error;
>  	}
> -
>  	return 0;
>  
>  error:
> @@ -311,58 +332,133 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry,
>  	const int sockfd = pkt_sock->sockfd;
>  	int msgvec_len;
>  	struct mmsghdr msgvec[ODP_PACKET_SOCKET_MAX_BURST_RX];
> -	struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX][ODP_BUFFER_MAX_SEG];
>  	int nb_rx = 0;
>  	int recv_msgs;
> +	uint8_t **recv_cache;
>  	int i;
> +	int ret;
>  
>  	if (odp_unlikely(len > ODP_PACKET_SOCKET_MAX_BURST_RX))
>  		return -1;
>  
>  	memset(msgvec, 0, sizeof(msgvec));
> +	recv_cache = pkt_sock->cache_ptr;
>  
> -	for (i = 0; i < (int)len; i++) {
> -		pkt_table[i] = packet_alloc(pkt_sock->pool, 0 /*default*/, 1);
> -		if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
> -			break;
> +	if (pktio_cls_enabled(pktio_entry)) {
> +		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX];
> +		odp_packet_hdr_t pkt_hdr;
>  
> -		msgvec[i].msg_hdr.msg_iovlen =
> -			_rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
> +		for (i = 0; i < (int)len; i++) {
> +			msgvec[i].msg_hdr.msg_iovlen = 1;
> +			iovecs[i].iov_base = recv_cache[i];
> +			iovecs[i].iov_len = 1024;
>  
> -		msgvec[i].msg_hdr.msg_iov = iovecs[i];
> -	}
> -	msgvec_len = i; /* number of successfully allocated pkt buffers */
> +			msgvec[i].msg_hdr.msg_iov = &iovecs[i];
> +		}
> +		/* number of successfully allocated pkt buffers */
> +		msgvec_len = i;
> +
> +		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
> +				     MSG_DONTWAIT, NULL);
> +		for (i = 0; i < recv_msgs; i++) {
> +			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
> +			struct ethhdr *eth_hdr = base;
> +
> +			/* Don't receive packets sent by ourselves */
> +			if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
> +							eth_hdr->h_source))) {
> +				continue;
> +			}
>  
> -	recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len, MSG_DONTWAIT, NULL);
> +			packet_parse_reset(&pkt_hdr);
>  
> -	for (i = 0; i < recv_msgs; i++) {
> -		odp_packet_hdr_t *pkt_hdr;
> -		void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
> -		struct ethhdr *eth_hdr = base;
> +			_odp_cls_parse(&pkt_hdr, base);
> +			cos_t *cos = pktio_select_cos(pktio_entry,
> +						      (uint8_t *)base,
> +						      &pkt_hdr);
>  
> -		/* Don't receive packets sent by ourselves */
> -		if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
> -						eth_hdr->h_source))) {
> -			odp_packet_free(pkt_table[i]);
> -			continue;
> +			/* if No CoS found then drop the packet */
> +			if (cos == NULL || cos->s.queue == NULL)
> +				continue;
> +
> +			pkt_table[i] = odp_packet_alloc(cos->s.pool_id,
> +							 msgvec[i].msg_len);
> +			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
> +				continue;
> +
> +			copy_packet_parser_metadata(&pkt_hdr,
> +						    odp_packet_hdr
> +						    (pkt_table[i]));
> +
> +			odp_packet_hdr(pkt_table[i])->input = pktio_entry->
> +							      s.id;
> +
> +			if (odp_packet_copydata_in(pkt_table[i], 0,
> +						   msgvec[i].msg_len, base)
> +						   != 0) {
> +				odp_packet_free(pkt_table[i]);
> +				continue;
> +			}
> +
> +			/* Parse and set packet header data */
> +			odp_packet_pull_tail(pkt_table[i],
> +					     odp_packet_len(pkt_table[i]) -
> +					     msgvec[i].msg_len);
> +			ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
> +					(odp_buffer_t)pkt_table[i]), 0);
> +			if (ret < 0) {
> +				pkt_table[nb_rx] = pkt_table[i];
> +				nb_rx++;
> +			}
>  		}
> +	} else {
> +		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX]
> +				   [ODP_BUFFER_MAX_SEG];
> +
> +		for (i = 0; i < (int)len; i++) {
> +			pkt_table[i] = packet_alloc(pkt_sock->pool,
> +						    0 /*default*/, 1);
> +			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
> +				break;
>  
> -		pkt_hdr = odp_packet_hdr(pkt_table[i]);
> +			msgvec[i].msg_hdr.msg_iovlen =
> +				_rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
>  
> -		odp_packet_pull_tail(pkt_table[i],
> -				     odp_packet_len(pkt_table[i]) -
> -				     msgvec[i].msg_len);
> +			msgvec[i].msg_hdr.msg_iov = iovecs[i];
> +		}
>  
> -		packet_parse_l2(pkt_hdr);
> +		/* number of successfully allocated pkt buffers */
> +		msgvec_len = i;
>  
> -		pkt_table[nb_rx] = pkt_table[i];
> -		nb_rx++;
> -	}
> +		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
> +				     MSG_DONTWAIT, NULL);
>  
> -	/* Free unused pkt buffers */
> -	for (; i < msgvec_len; i++)
> -		odp_packet_free(pkt_table[i]);
> +		for (i = 0; i < recv_msgs; i++) {
> +			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
> +			struct ethhdr *eth_hdr = base;
> +			odp_packet_hdr_t *pkt_hdr;
>  
> +			/* Don't receive packets sent by ourselves */
> +			if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
> +							eth_hdr->h_source))) {
> +				odp_packet_free(pkt_table[i]);
> +				continue;
> +			}
> +			pkt_hdr = odp_packet_hdr(pkt_table[i]);
> +			/* Parse and set packet header data */
> +			odp_packet_pull_tail(pkt_table[i],
> +					     odp_packet_len(pkt_table[i]) -
> +					     msgvec[i].msg_len);
> +
> +			packet_parse_l2(pkt_hdr);
> +			pkt_table[nb_rx] = pkt_table[i];
> +			nb_rx++;
> +		}
> +
> +		/* Free unused pkt buffers */
> +		for (; i < msgvec_len; i++)
> +			odp_packet_free(pkt_table[i]);
> +	}
>  	return nb_rx;
>  }
>  
> @@ -377,7 +473,7 @@ static uint32_t _tx_pkt_to_iovec(odp_packet_t pkt,
>  		uint32_t seglen;
>  
>  		iovecs[iov_count].iov_base = odp_packet_offset(pkt, offset,
> -							       &seglen, NULL);
> +				&seglen, NULL);
>  		iovecs[iov_count].iov_len = seglen;
>  		iov_count++;
>  		offset += seglen;
> @@ -407,7 +503,7 @@ static int sock_mmsg_send(pktio_entry_t *pktio_entry,
>  	for (i = 0; i < len; i++) {
>  		msgvec[i].msg_hdr.msg_iov = iovecs[i];
>  		msgvec[i].msg_hdr.msg_iovlen = _tx_pkt_to_iovec(pkt_table[i],
> -								iovecs[i]);
> +				iovecs[i]);
>  	}
>  
>  	for (i = 0; i < len; ) {
> @@ -415,7 +511,7 @@ static int sock_mmsg_send(pktio_entry_t *pktio_entry,
>  		if (odp_unlikely(ret <= -1)) {
>  			if (i == 0 && SOCK_ERR_REPORT(errno)) {
>  				__odp_errno = errno;
> -				ODP_ERR("sendmmsg(): %s\n", strerror(errno));
> +			ODP_ERR("sendmmsg(): %s\n", strerror(errno));
>  				return -1;
>  			}
>  			break;
> diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c
> index 2bdb72b..6ec1bfa 100644
> --- a/platform/linux-generic/pktio/socket_mmap.c
> +++ b/platform/linux-generic/pktio/socket_mmap.c
> @@ -28,6 +28,9 @@
>  #include <odp_packet_internal.h>
>  #include <odp_packet_io_internal.h>
>  #include <odp_debug_internal.h>
> +#include <odp_classification_datamodel.h>
> +#include <odp_classification_inlines.h>
> +#include <odp_classification_internal.h>
>  #include <odp/hints.h>
>  
>  #include <odp/helper/eth.h>
> @@ -108,9 +111,9 @@ static inline void mmap_tx_user_ready(struct tpacket2_hdr *hdr)
>  	__sync_synchronize();
>  }
>  
> -static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
> +static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
> +				      pkt_sock_mmap_t *pkt_sock,
>  				      odp_packet_t pkt_table[], unsigned len,
> -				      odp_pool_t pool,
>  				      unsigned char if_mac[])
>  {
>  	union frame_map ppd;
> @@ -118,11 +121,14 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>  	uint8_t *pkt_buf;
>  	int pkt_len;
>  	struct ethhdr *eth_hdr;
> -	odp_packet_hdr_t *pkt_hdr;
>  	unsigned i = 0;
> +	uint8_t nb_rx = 0;
> +	odp_packet_hdr_t pkt_hdr;
> +	odp_packet_t pkt;
> +	struct ring *ring;
> +	int ret;
>  
> -	(void)sock;
> -
> +	ring  = &pkt_sock->rx_ring;
>  	frame_num = ring->frame_num;
>  
>  	while (i < len) {

The block within this loop is now pretty long and indented, changing the
next line (not in shown in this diff) can tidy things up a bit:

while (i < len) {
	if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
		break;

	..
}

> @@ -143,22 +149,67 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>  				continue;
>  			}
>  
> -			pkt_table[i] = packet_alloc(pool, pkt_len, 1);
> -			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
> -				break;
> -
> -			pkt_hdr = odp_packet_hdr(pkt_table[i]);
> -
> -			if (odp_packet_copydata_in(pkt_table[i], 0,
> -						   pkt_len, pkt_buf) != 0) {
> -				odp_packet_free(pkt_table[i]);
> -				break;
> +			if (pktio_cls_enabled(pktio_entry)) {
> +				/* Parse and set packet header data */
> +				packet_parse_reset(&pkt_hdr);
> +				_odp_cls_parse(&pkt_hdr, pkt_buf);
> +
> +				cos_t *cos = pktio_select_cos(pktio_entry,
> +							      pkt_buf,
> +							      &pkt_hdr);
> +				/* if No CoS found for then drop the packet */
> +				if (cos == NULL || cos->s.queue == NULL) {
> +					mmap_rx_user_ready(ppd.raw); /* drop */
> +					frame_num = next_frame_num;
> +					continue;
> +				}
> +
> +				pkt = odp_packet_alloc(cos->s.pool_id,
> +						       pkt_len);

This looks to be forcing a CoS to have a pool associated with it. I
thought that a CoS must have a queue, but that the pool was optional and
if not configured packets would be allocated from the pktio's pool.

> +				if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
> +					mmap_rx_user_ready(ppd.raw); /* drop */
> +					frame_num = next_frame_num;
> +					continue;
> +				}
> +
> +				copy_packet_parser_metadata(&pkt_hdr,
> +							    odp_packet_hdr
> +							    (pkt));
> +				odp_packet_hdr(pkt)->input = pktio_entry->s.id;
> +
> +				ret = odp_packet_copydata_in(pkt, 0,
> +							     pkt_len, pkt_buf);
> +				if (ret != 0) {
> +					odp_packet_free(pkt);
> +					break;
> +				}
> +				ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
> +						(odp_buffer_t)pkt), 0);
> +				if (ret < 0) {
> +					pkt_table[nb_rx] = pkt;
> +					nb_rx++;
> +				}

I wonder if it's possible to put the above block into a common function
that can be reused by other pktios. Something that takes the pktio_entry,
buffer pointer and buffer length and returns a status indication of
whether the packet should be dropped, skipped (already classified and
delivered) or delivered to the default queue.

> +			} else {
> +				odp_packet_hdr_t *hdr;
> +
> +				pkt_table[i] = packet_alloc(pkt_sock->pool,
> +							    pkt_len, 1);
> +				if (odp_unlikely(pkt_table[i] ==
> +				    ODP_PACKET_INVALID))
> +					break;

When the classifier is enabled we now drop packets if an allocation
fails, I think we should do the same here.

> +				hdr = odp_packet_hdr(pkt_table[i]);
> +				ret = odp_packet_copydata_in(pkt_table[i], 0,
> +							     pkt_len, pkt_buf);
> +				if (ret != 0) {
> +					odp_packet_free(pkt_table[i]);
> +					break;
> +				}
> +
> +				packet_parse_l2(hdr);
> +				nb_rx++;
>  			}
>  
> -			packet_parse_l2(pkt_hdr);
> -
>  			mmap_rx_user_ready(ppd.raw);
> -
>  			frame_num = next_frame_num;
>  			i++;
>  		} else {
> @@ -167,8 +218,7 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>  	}
>  
>  	ring->frame_num = frame_num;
> -
> -	return i;
> +	return nb_rx;
>  }
Balasubramanian Manoharan Nov. 19, 2015, 11:28 a.m. UTC | #2
Hi,

Pls find comments inline..

On 19 November 2015 at 16:24, Stuart Haslam <stuart.haslam@linaro.org> wrote:
> On Tue, Nov 17, 2015 at 11:46:51AM +0530, Balasubramanian Manoharan wrote:
>> Adds support for configuring packet pool to a class-of-service.
>> linux-generic packet parser is enhanced to parse a packet directly from
>> a memory location rather than from odp_packet_t.
>>
>> packet receive code is modified to run packet classifier directly from
>> the stream so that the packet can be allocated from the pool specified
>> by the CoS.
>>
>> Signed-off-by: Balasubramanian Manoharan <bala.manoharan@linaro.org>
>> ---
>>  platform/linux-generic/odp_classification.c |  50 ++++++++-
>>  platform/linux-generic/odp_packet.c         |  74 ++++++++-----
>>  platform/linux-generic/odp_packet_io.c      |  80 ++++----------
>>  platform/linux-generic/pktio/loop.c         |  29 ++++-
>>  platform/linux-generic/pktio/netmap.c       |  66 +++++++++--
>>  platform/linux-generic/pktio/socket.c       | 166 ++++++++++++++++++++++------
>>  platform/linux-generic/pktio/socket_mmap.c  |  95 ++++++++++++----
>>  7 files changed, 395 insertions(+), 165 deletions(-)
>>
>> diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
>> index 1b6d593..a702867 100644
>> --- a/platform/linux-generic/odp_classification.c
>> +++ b/platform/linux-generic/odp_classification.c
>> @@ -661,6 +661,42 @@ int odp_pktio_pmr_match_set_cos(odp_pmr_set_t pmr_set_id, odp_pktio_t src_pktio,
>>       return 0;
>>  }
>>
>> +int odp_cls_cos_pool_set(odp_cos_t cos_id, odp_pool_t pool_id)
>> +{
>> +     cos_t *cos;
>> +     pool_entry_t *pool;
>> +
>> +     cos = get_cos_entry(cos_id);
>> +     if (cos == NULL) {
>> +             ODP_ERR("Invalid odp_cos_t handle");
>> +             return -1;
>> +     }
>> +     pool = odp_pool_to_entry(pool_id);
>> +     if (pool == NULL) {
>> +             ODP_ERR("Invalid odp_pool_t handle");
>> +             return -1;
>> +     }
>> +
>> +     LOCK(&cos->s.lock);
>> +     cos->s.pool = pool;
>> +     cos->s.pool_id = pool_id;
>> +     UNLOCK(&cos->s.lock);
>> +     return 0;
>> +}
>> +
>> +odp_pool_t odp_cls_cos_pool(odp_cos_t cos_id)
>> +{
>> +     cos_t *cos;
>> +
>> +     cos = get_cos_entry(cos_id);
>> +     if (cos == NULL) {
>> +             ODP_ERR("Invalid odp_cos_t handle");
>> +             return ODP_POOL_INVALID;
>> +     }
>> +
>> +     return cos->s.pool_id;
>> +}
>> +
>>  int verify_pmr(pmr_t *pmr, uint8_t *pkt_addr, odp_packet_hdr_t *pkt_hdr)
>>  {
>>       int pmr_failure = 0;
>> @@ -824,15 +860,13 @@ int pktio_classifier_init(pktio_entry_t *entry)
>>       return 0;
>>  }
>>
>> -int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
>> +int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt)
>>  {
>> -     pktio_entry_t *entry;
>>       queue_entry_t *queue;
>>       cos_t *cos;
>>       odp_packet_hdr_t *pkt_hdr;
>>       uint8_t *pkt_addr;
>>
>> -     entry = get_pktio_entry(pktio);
>>       if (entry == NULL)
>>               return -1;
>>
>> @@ -849,6 +883,16 @@ int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
>>       return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
>>  }
>>
>> +int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
>> +{
>> +     pktio_entry_t *entry;
>> +
>> +     entry = get_pktio_entry(pktio);
>> +     if (entry == NULL)
>> +             return -1;
>> +     return _odp_packet_classifier(entry, pkt);
>> +}
>> +
>>  cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
>>                      odp_packet_hdr_t *pkt_hdr)
>>  {
>> diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
>> index 07c740c..80901c9 100644
>> --- a/platform/linux-generic/odp_packet.c
>> +++ b/platform/linux-generic/odp_packet.c
>> @@ -32,10 +32,8 @@ static inline void packet_parse_disable(odp_packet_hdr_t *pkt_hdr)
>>       pkt_hdr->input_flags.parsed_all = 1;
>>  }
>>
>> -void packet_parse_reset(odp_packet_t pkt)
>> +void packet_parse_reset(odp_packet_hdr_t *pkt_hdr)
>>  {
>> -     odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
>> -
>>       /* Reset parser metadata before new parse */
>>       pkt_hdr->error_flags.all  = 0;
>>       pkt_hdr->input_flags.all  = 0;
>> @@ -780,9 +778,9 @@ int _odp_packet_copy_to_packet(odp_packet_t srcpkt, uint32_t srcoffset,
>>   * Parser helper function for IPv4
>>   */
>>  static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
>> -                              uint8_t **parseptr, uint32_t *offset)
>> +                              const uint8_t **parseptr, uint32_t *offset)
>>  {
>> -     odph_ipv4hdr_t *ipv4 = (odph_ipv4hdr_t *)*parseptr;
>> +     const odph_ipv4hdr_t *ipv4 = (const odph_ipv4hdr_t *)*parseptr;
>>       uint8_t ver = ODPH_IPV4HDR_VER(ipv4->ver_ihl);
>>       uint8_t ihl = ODPH_IPV4HDR_IHL(ipv4->ver_ihl);
>>       uint16_t frag_offset;
>> @@ -818,10 +816,10 @@ static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
>>   * Parser helper function for IPv6
>>   */
>>  static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
>> -                              uint8_t **parseptr, uint32_t *offset)
>> +                              const uint8_t **parseptr, uint32_t *offset)
>>  {
>> -     odph_ipv6hdr_t *ipv6 = (odph_ipv6hdr_t *)*parseptr;
>> -     odph_ipv6hdr_ext_t *ipv6ext;
>> +     const odph_ipv6hdr_t *ipv6 = (const odph_ipv6hdr_t *)*parseptr;
>> +     const odph_ipv6hdr_ext_t *ipv6ext;
>>
>>       pkt_hdr->l3_len = odp_be_to_cpu_16(ipv6->payload_len);
>>
>> @@ -843,7 +841,7 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
>>               pkt_hdr->input_flags.ipopt = 1;
>>
>>               do  {
>> -                     ipv6ext    = (odph_ipv6hdr_ext_t *)*parseptr;
>> +                     ipv6ext    = (const odph_ipv6hdr_ext_t *)*parseptr;
>>                       uint16_t extlen = 8 + ipv6ext->ext_len * 8;
>>
>>                       *offset   += extlen;
>> @@ -875,9 +873,9 @@ static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
>>   * Parser helper function for TCP
>>   */
>>  static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
>> -                          uint8_t **parseptr, uint32_t *offset)
>> +                          const uint8_t **parseptr, uint32_t *offset)
>>  {
>> -     odph_tcphdr_t *tcp = (odph_tcphdr_t *)*parseptr;
>> +     const odph_tcphdr_t *tcp = (const odph_tcphdr_t *)*parseptr;
>>
>>       if (tcp->hl < sizeof(odph_tcphdr_t)/sizeof(uint32_t))
>>               pkt_hdr->error_flags.tcp_err = 1;
>> @@ -895,9 +893,9 @@ static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
>>   * Parser helper function for UDP
>>   */
>>  static inline void parse_udp(odp_packet_hdr_t *pkt_hdr,
>> -                          uint8_t **parseptr, uint32_t *offset)
>> +                          const uint8_t **parseptr, uint32_t *offset)
>>  {
>> -     odph_udphdr_t *udp = (odph_udphdr_t *)*parseptr;
>> +     const odph_udphdr_t *udp = (const odph_udphdr_t *)*parseptr;
>>       uint32_t udplen = odp_be_to_cpu_16(udp->length);
>>
>>       if (udplen < sizeof(odph_udphdr_t) ||
>> @@ -932,46 +930,51 @@ void packet_parse_l2(odp_packet_hdr_t *pkt_hdr)
>>       pkt_hdr->input_flags.parsed_l2 = 1;
>>  }
>>
>> -/**
>> - * Simple packet parser
>> - */
>> -int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
>> +int _odp_parse_common(odp_packet_hdr_t *pkt_hdr, const uint8_t *ptr)
>>  {
>> -     odph_ethhdr_t *eth;
>> -     odph_vlanhdr_t *vlan;
>> +     const odph_ethhdr_t *eth;
>> +     const odph_vlanhdr_t *vlan;
>>       uint16_t ethtype;
>> -     uint8_t *parseptr;
>>       uint32_t offset, seglen;
>>       uint8_t ip_proto = 0;
>> +     const uint8_t *parseptr;
>>
>> +     offset = sizeof(odph_ethhdr_t);
>>       if (packet_parse_l2_not_done(pkt_hdr))
>>               packet_parse_l2(pkt_hdr);
>>
>> -     eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
>> -     offset = sizeof(odph_ethhdr_t);
>> -     parseptr = (uint8_t *)&eth->type;
>> -     ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
>> +     if (ptr == NULL) {
>> +             eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
>> +             parseptr = (const uint8_t *)&eth->type;
>> +             ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>> +     } else {
>> +             eth = (const odph_ethhdr_t *)ptr;
>> +             parseptr = (const uint8_t *)&eth->type;
>> +             ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>> +     }
>> +
>>
>>       /* Parse the VLAN header(s), if present */
>>       if (ethtype == ODPH_ETHTYPE_VLAN_OUTER) {
>>               pkt_hdr->input_flags.vlan_qinq = 1;
>>               pkt_hdr->input_flags.vlan = 1;
>> -             vlan = (odph_vlanhdr_t *)(void *)parseptr;
>> +
>> +             vlan = (const odph_vlanhdr_t *)parseptr;
>>               pkt_hdr->vlan_s_tag = ((ethtype << 16) |
>>                                      odp_be_to_cpu_16(vlan->tci));
>>               offset += sizeof(odph_vlanhdr_t);
>>               parseptr += sizeof(odph_vlanhdr_t);
>> -             ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
>> +             ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>>       }
>>
>>       if (ethtype == ODPH_ETHTYPE_VLAN) {
>>               pkt_hdr->input_flags.vlan = 1;
>> -             vlan = (odph_vlanhdr_t *)(void *)parseptr;
>> +             vlan = (const odph_vlanhdr_t *)parseptr;
>>               pkt_hdr->vlan_c_tag = ((ethtype << 16) |
>>                                      odp_be_to_cpu_16(vlan->tci));
>>               offset += sizeof(odph_vlanhdr_t);
>>               parseptr += sizeof(odph_vlanhdr_t);
>> -             ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
>> +             ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>>       }
>>
>>       /* Check for SNAP vs. DIX */
>> @@ -983,7 +986,7 @@ int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
>>               }
>>               offset   += 8;
>>               parseptr += 8;
>> -             ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
>> +             ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
>>       }
>>
>>       /* Consume Ethertype for Layer 3 parse */
>> @@ -1061,3 +1064,16 @@ parse_exit:
>>       pkt_hdr->input_flags.parsed_all = 1;
>>       return pkt_hdr->error_flags.all != 0;
>>  }
>> +
>> +int _odp_cls_parse(odp_packet_hdr_t *pkt_hdr, const uint8_t *parseptr)
>> +{
>> +     return _odp_parse_common(pkt_hdr, parseptr);
>> +}
>> +
>> +/**
>> + * Simple packet parser
>> + */
>> +int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
>> +{
>> +     return _odp_parse_common(pkt_hdr, NULL);
>> +}
>> diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
>> index b21fd0b..e1e330b 100644
>> --- a/platform/linux-generic/odp_packet_io.c
>> +++ b/platform/linux-generic/odp_packet_io.c
>> @@ -196,6 +196,7 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool,
>>               return ODP_PKTIO_INVALID;
>>
>>       memcpy(&pktio_entry->s.param, param, sizeof(odp_pktio_param_t));
>> +     pktio_entry->s.id = id;
>>
>>       for (pktio_if = 0; pktio_if_ops[pktio_if]; ++pktio_if) {
>>               ret = pktio_if_ops[pktio_if]->open(id, pktio_entry, dev, pool);
>> @@ -551,7 +552,7 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>>       odp_buffer_t buf;
>>       odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>>       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
>> -     int pkts, i, j;
>> +     int pkts, i;
>>       odp_pktio_t pktio;
>>
>>       buf_hdr = queue_deq(qentry);
>> @@ -564,27 +565,13 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>>       if (pkts <= 0)
>>               return NULL;
>>
>> -     if (pktio_cls_enabled(get_pktio_entry(pktio))) {
>> -             for (i = 0, j = 0; i < pkts; i++) {
>> -                     if (0 > packet_classifier(pktio, pkt_tbl[i])) {
>> -                             buf = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                             hdr_tbl[j++] = odp_buf_to_hdr(buf);
>> -                     }
>> -             }
>> -     } else {
>> -             for (i = 0; i < pkts; i++) {
>> -                     buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                     hdr_tbl[i] = odp_buf_to_hdr(buf);
>> -             }
>> -
>> -             j = pkts;
>> +     for (i = 0; i < pkts; i++) {
>> +             buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> +             hdr_tbl[i] = odp_buf_to_hdr(buf);
>>       }
>>
>> -     if (0 == j)
>> -             return NULL;
>> -
>> -     if (j > 1)
>> -             queue_enq_multi(qentry, &hdr_tbl[1], j - 1, 0);
>> +     if (pkts > 1)
>> +             queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0);
>>       buf_hdr = hdr_tbl[0];
>>       return buf_hdr;
>>  }
>> @@ -623,27 +610,15 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
>>       if (pkts <= 0)
>>               return nbr;
>>
>> -     if (pktio_cls_enabled(get_pktio_entry(pktio))) {
>> -             for (i = 0, j = 0; i < pkts; i++) {
>> -                     if (0 > packet_classifier(pktio, pkt_tbl[i])) {
>> -                             buf = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                             if (nbr < num)
>> -                                     buf_hdr[nbr++] = odp_buf_to_hdr(buf);
>> -                             else
>> -                                     hdr_tbl[j++] = odp_buf_to_hdr(buf);
>> -                     }
>> -             }
>> -     } else {
>> -             /* Fill in buf_hdr first */
>> -             for (i = 0; i < pkts && nbr < num; i++, nbr++) {
>> -                     buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                     buf_hdr[nbr] = odp_buf_to_hdr(buf);
>> -             }
>> -             /* Queue the rest for later */
>> -             for (j = 0; i < pkts; i++, j++) {
>> -                     buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                     hdr_tbl[j++] = odp_buf_to_hdr(buf);
>> -             }
>> +     /* Fill in buf_hdr first */
>> +     for (i = 0; i < pkts && nbr < num; i++, nbr++) {
>> +             buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> +             buf_hdr[nbr] = odp_buf_to_hdr(buf);
>> +     }
>> +     /* Queue the rest for later */
>> +     for (j = 0; i < pkts; i++, j++) {
>> +             buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> +             hdr_tbl[j++] = odp_buf_to_hdr(buf);
>>       }
>>
>>       if (j)
>> @@ -655,7 +630,7 @@ int pktin_poll(pktio_entry_t *entry)
>>  {
>>       odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
>>       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
>> -     int num, num_enq, i;
>> +     int num, i;
>>       odp_buffer_t buf;
>>       odp_pktio_t pktio;
>>
>> @@ -680,26 +655,15 @@ int pktin_poll(pktio_entry_t *entry)
>>               return -1;
>>       }
>>
>> -     if (pktio_cls_enabled(entry)) {
>> -             for (i = 0, num_enq = 0; i < num; i++) {
>> -                     if (packet_classifier(pktio, pkt_tbl[i]) < 0) {
>> -                             buf = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                             hdr_tbl[num_enq++] = odp_buf_to_hdr(buf);
>> -                     }
>> -             }
>> -     } else {
>> -             for (i = 0; i < num; i++) {
>> -                     buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> -                     hdr_tbl[i] = odp_buf_to_hdr(buf);
>> -             }
>> -
>> -             num_enq = num;
>> +     for (i = 0; i < num; i++) {
>> +             buf        = _odp_packet_to_buffer(pkt_tbl[i]);
>> +             hdr_tbl[i] = odp_buf_to_hdr(buf);
>>       }
>>
>> -     if (num_enq) {
>> +     if (num) {
>>               queue_entry_t *qentry;
>>               qentry = queue_to_qentry(entry->s.inq_default);
>> -             queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
>> +             queue_enq_multi(qentry, hdr_tbl, num, 0);
>>       }
>>
>>       return 0;
>> diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
>> index ce19add..47745ad 100644
>> --- a/platform/linux-generic/pktio/loop.c
>> +++ b/platform/linux-generic/pktio/loop.c
>> @@ -12,6 +12,7 @@
>>  #include <odp.h>
>>  #include <odp_packet_internal.h>
>>  #include <odp_packet_io_internal.h>
>> +#include <odp_classification_internal.h>
>>  #include <odp_debug_internal.h>
>>  #include <odp/hints.h>
>>
>> @@ -50,19 +51,35 @@ static int loopback_close(pktio_entry_t *pktio_entry)
>>  static int loopback_recv(pktio_entry_t *pktio_entry, odp_packet_t pkts[],
>>                        unsigned len)
>>  {
>> -     int nbr, i;
>> +     int nbr, i, j;
>>       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
>>       queue_entry_t *qentry;
>>       odp_packet_hdr_t *pkt_hdr;
>> +     odp_packet_t pkt;
>>
>> +     nbr = 0;
>>       qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
>>       nbr = queue_deq_multi(qentry, hdr_tbl, len);
>>
>> -     for (i = 0; i < nbr; ++i) {
>> -             pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
>> -             pkt_hdr = odp_packet_hdr(pkts[i]);
>> -             packet_parse_reset(pkts[i]);
>> -             packet_parse_l2(pkt_hdr);
>> +     if (pktio_cls_enabled(pktio_entry)) {
>> +             for (i = 0, j = 0; i < nbr; i++) {
>> +                     pkt = _odp_packet_from_buffer(odp_hdr_to_buf
>> +                                                   (hdr_tbl[i]));
>> +                     pkt_hdr = odp_packet_hdr(pkt);
>> +                     packet_parse_reset(pkt_hdr);
>> +                     packet_parse_l2(pkt_hdr);
>> +                     if (0 > _odp_packet_classifier(pktio_entry, pkt))
>> +                             pkts[j++] = pkt;
>> +             }
>> +     nbr = j;
>> +     } else {
>> +             for (i = 0; i < nbr; ++i) {
>> +                     pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf
>> +                                                       (hdr_tbl[i]));
>> +                     pkt_hdr = odp_packet_hdr(pkts[i]);
>> +                     packet_parse_reset(pkt_hdr);
>> +                     packet_parse_l2(pkt_hdr);
>> +             }
>>       }
>>
>>       return nbr;
>> diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c
>> index d064323..74835ad 100644
>> --- a/platform/linux-generic/pktio/netmap.c
>> +++ b/platform/linux-generic/pktio/netmap.c
>> @@ -20,6 +20,9 @@
>>  #include <poll.h>
>>  #include <linux/ethtool.h>
>>  #include <linux/sockios.h>
>> +#include <odp_classification_datamodel.h>
>> +#include <odp_classification_inlines.h>
>> +#include <odp_classification_internal.h>
>>
>>  #define NETMAP_WITH_LIBS
>>  #include <net/netmap_user.h>
>> @@ -197,7 +200,9 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>>                                   uint16_t len)
>>  {
>>       odp_packet_t pkt;
>> -     odp_packet_hdr_t *pkt_hdr;
>> +     pktio_entry_t *pktio_entry;
>> +
>> +     pktio_entry = args->pktio_entry;
>>
>>       if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
>>               ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
>> @@ -210,19 +215,58 @@ static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
>>               return -1;
>>       }
>>
>> -     pkt = packet_alloc(pktio_entry->s.pkt_nm.pool, len, 1);
>> -     if (pkt == ODP_PACKET_INVALID)
>> -             return -1;
>> +     if (pktio_cls_enabled(pktio_entry)) {
>> +             odp_packet_hdr_t pkt_hdr;
>> +             cos_t *cos;
>>
>> -     pkt_hdr = odp_packet_hdr(pkt);
>> +             packet_parse_reset(&pkt_hdr);
>> +             _odp_cls_parse(&pkt_hdr, buf);
>> +             cos = pktio_select_cos(args->pktio_entry,
>
> args?.. looks like a merge error

Yes. This is a merge error. Will fix in next version.
>
>> +                                    (uint8_t *)buf,
>> +                                    &pkt_hdr);
>>
>> -     /* For now copy the data in the mbuf,
>> -        worry about zero-copy later */
>> -     if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
>> -             odp_packet_free(pkt);
>> -             return -1;
>> +             if (cos == NULL || cos->s.queue == NULL)
>> +                     return -1;
>> +
>> +             pkt = odp_packet_alloc(cos->s.pool_id, len);
>> +             if (pkt == ODP_PACKET_INVALID)
>> +                     return -1;
>> +
>> +             copy_packet_parser_metadata(&pkt_hdr, odp_packet_hdr(pkt));
>> +
>> +             /* For now copy the data in the mbuf,
>> +             worry about zero-copy later */
>> +             if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
>> +                     odp_packet_free(pkt);
>> +                     return -1;
>> +             }
>> +
>> +             odp_packet_hdr(pkt)->input = args->pktio_entry->s.id;
>> +             ret = queue_enq(cos->s.queue,
>> +                             odp_buf_to_hdr((odp_buffer_t)pkt), 0);
>> +             /*if the queue enq was not successful return
>> +             the packet to packet recv function to be enqueued
>> +             to the default queue*/
>> +             if (ret == 0)
>> +                     return -1;
>> +     } else {
>> +             odp_packet_hdr_t *pkt_hdr;
>> +
>> +             pkt = packet_alloc(pktio_entry->s.pkt_nm->pool, len, 1);
>> +             if (pkt == ODP_PACKET_INVALID)
>> +                     return -1;
>> +
>> +             pkt_hdr = odp_packet_hdr(pkt);
>> +
>> +             /* For now copy the data in the mbuf,
>> +                worry about zero-copy later */
>> +             if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
>> +                     odp_packet_free(pkt);
>> +                     return;
>
> no return value

Yes. It should return error since packet copy failed.
>
>> +             }
>> +
>> +             packet_parse_l2(pkt_hdr);
>>       }
>> -     packet_parse_l2(pkt_hdr);
>>
>>       *pkt_out = pkt;
>>       return 0;
>> diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c
>> index 5f5e0ae..95b7ea3 100644
>> --- a/platform/linux-generic/pktio/socket.c
>> +++ b/platform/linux-generic/pktio/socket.c
>> @@ -38,6 +38,9 @@
>>  #include <odp_packet_io_internal.h>
>>  #include <odp_align_internal.h>
>>  #include <odp_debug_internal.h>
>> +#include <odp_classification_datamodel.h>
>> +#include <odp_classification_inlines.h>
>> +#include <odp_classification_internal.h>
>>  #include <odp/hints.h>
>>
>>  #include <odp/helper/eth.h>
>> @@ -194,6 +197,8 @@ static int sock_close(pktio_entry_t *pktio_entry)
>>               return -1;
>>       }
>>
>> +     odp_shm_free(pkt_sock->shm);
>> +
>>       return 0;
>>  }
>>
>> @@ -205,10 +210,13 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>>  {
>>       int sockfd;
>>       int err;
>> +     int i;
>>       unsigned int if_idx;
>>       struct ifreq ethreq;
>>       struct sockaddr_ll sa_ll;
>> +     char shm_name[ODP_SHM_NAME_LEN];
>>       pkt_sock_t *pkt_sock = &pktio_entry->s.pkt_sock;
>> +     uint8_t *addr;
>>
>>       /* Init pktio entry */
>>       memset(pkt_sock, 0, sizeof(*pkt_sock));
>> @@ -218,6 +226,20 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>>       if (pool == ODP_POOL_INVALID)
>>               return -1;
>>       pkt_sock->pool = pool;
>> +     snprintf(shm_name, ODP_SHM_NAME_LEN, "%s-%s", "pktio", netdev);
>> +     shm_name[ODP_SHM_NAME_LEN - 1] = '\0';
>> +
>> +     pkt_sock->shm = odp_shm_reserve(shm_name, PACKET_JUMBO_LEN,
>> +                                       PACKET_JUMBO_LEN *
>> +                                       ODP_PACKET_SOCKET_MAX_BURST_RX, 0);
>> +     if (pkt_sock->shm == ODP_SHM_INVALID)
>> +             return -1;
>> +
>> +     addr = odp_shm_addr(pkt_sock->shm);
>> +     for (i = 0; i < ODP_PACKET_SOCKET_MAX_BURST_RX; i++) {
>> +             pkt_sock->cache_ptr[i] = addr;
>> +             addr += PACKET_JUMBO_LEN;
>> +     }
>>
>>       sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
>>       if (sockfd == -1) {
>> @@ -253,7 +275,6 @@ static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
>>               ODP_ERR("bind(to IF): %s\n", strerror(errno));
>>               goto error;
>>       }
>> -
>>       return 0;
>>
>>  error:
>> @@ -311,58 +332,133 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry,
>>       const int sockfd = pkt_sock->sockfd;
>>       int msgvec_len;
>>       struct mmsghdr msgvec[ODP_PACKET_SOCKET_MAX_BURST_RX];
>> -     struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX][ODP_BUFFER_MAX_SEG];
>>       int nb_rx = 0;
>>       int recv_msgs;
>> +     uint8_t **recv_cache;
>>       int i;
>> +     int ret;
>>
>>       if (odp_unlikely(len > ODP_PACKET_SOCKET_MAX_BURST_RX))
>>               return -1;
>>
>>       memset(msgvec, 0, sizeof(msgvec));
>> +     recv_cache = pkt_sock->cache_ptr;
>>
>> -     for (i = 0; i < (int)len; i++) {
>> -             pkt_table[i] = packet_alloc(pkt_sock->pool, 0 /*default*/, 1);
>> -             if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
>> -                     break;
>> +     if (pktio_cls_enabled(pktio_entry)) {
>> +             struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX];
>> +             odp_packet_hdr_t pkt_hdr;
>>
>> -             msgvec[i].msg_hdr.msg_iovlen =
>> -                     _rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
>> +             for (i = 0; i < (int)len; i++) {
>> +                     msgvec[i].msg_hdr.msg_iovlen = 1;
>> +                     iovecs[i].iov_base = recv_cache[i];
>> +                     iovecs[i].iov_len = 1024;
>>
>> -             msgvec[i].msg_hdr.msg_iov = iovecs[i];
>> -     }
>> -     msgvec_len = i; /* number of successfully allocated pkt buffers */
>> +                     msgvec[i].msg_hdr.msg_iov = &iovecs[i];
>> +             }
>> +             /* number of successfully allocated pkt buffers */
>> +             msgvec_len = i;
>> +
>> +             recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
>> +                                  MSG_DONTWAIT, NULL);
>> +             for (i = 0; i < recv_msgs; i++) {
>> +                     void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
>> +                     struct ethhdr *eth_hdr = base;
>> +
>> +                     /* Don't receive packets sent by ourselves */
>> +                     if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
>> +                                                     eth_hdr->h_source))) {
>> +                             continue;
>> +                     }
>>
>> -     recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len, MSG_DONTWAIT, NULL);
>> +                     packet_parse_reset(&pkt_hdr);
>>
>> -     for (i = 0; i < recv_msgs; i++) {
>> -             odp_packet_hdr_t *pkt_hdr;
>> -             void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
>> -             struct ethhdr *eth_hdr = base;
>> +                     _odp_cls_parse(&pkt_hdr, base);
>> +                     cos_t *cos = pktio_select_cos(pktio_entry,
>> +                                                   (uint8_t *)base,
>> +                                                   &pkt_hdr);
>>
>> -             /* Don't receive packets sent by ourselves */
>> -             if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
>> -                                             eth_hdr->h_source))) {
>> -                     odp_packet_free(pkt_table[i]);
>> -                     continue;
>> +                     /* if No CoS found then drop the packet */
>> +                     if (cos == NULL || cos->s.queue == NULL)
>> +                             continue;
>> +
>> +                     pkt_table[i] = odp_packet_alloc(cos->s.pool_id,
>> +                                                      msgvec[i].msg_len);
>> +                     if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
>> +                             continue;
>> +
>> +                     copy_packet_parser_metadata(&pkt_hdr,
>> +                                                 odp_packet_hdr
>> +                                                 (pkt_table[i]));
>> +
>> +                     odp_packet_hdr(pkt_table[i])->input = pktio_entry->
>> +                                                           s.id;
>> +
>> +                     if (odp_packet_copydata_in(pkt_table[i], 0,
>> +                                                msgvec[i].msg_len, base)
>> +                                                != 0) {
>> +                             odp_packet_free(pkt_table[i]);
>> +                             continue;
>> +                     }
>> +
>> +                     /* Parse and set packet header data */
>> +                     odp_packet_pull_tail(pkt_table[i],
>> +                                          odp_packet_len(pkt_table[i]) -
>> +                                          msgvec[i].msg_len);
>> +                     ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
>> +                                     (odp_buffer_t)pkt_table[i]), 0);
>> +                     if (ret < 0) {
>> +                             pkt_table[nb_rx] = pkt_table[i];
>> +                             nb_rx++;
>> +                     }
>>               }
>> +     } else {
>> +             struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX]
>> +                                [ODP_BUFFER_MAX_SEG];
>> +
>> +             for (i = 0; i < (int)len; i++) {
>> +                     pkt_table[i] = packet_alloc(pkt_sock->pool,
>> +                                                 0 /*default*/, 1);
>> +                     if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
>> +                             break;
>>
>> -             pkt_hdr = odp_packet_hdr(pkt_table[i]);
>> +                     msgvec[i].msg_hdr.msg_iovlen =
>> +                             _rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
>>
>> -             odp_packet_pull_tail(pkt_table[i],
>> -                                  odp_packet_len(pkt_table[i]) -
>> -                                  msgvec[i].msg_len);
>> +                     msgvec[i].msg_hdr.msg_iov = iovecs[i];
>> +             }
>>
>> -             packet_parse_l2(pkt_hdr);
>> +             /* number of successfully allocated pkt buffers */
>> +             msgvec_len = i;
>>
>> -             pkt_table[nb_rx] = pkt_table[i];
>> -             nb_rx++;
>> -     }
>> +             recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
>> +                                  MSG_DONTWAIT, NULL);
>>
>> -     /* Free unused pkt buffers */
>> -     for (; i < msgvec_len; i++)
>> -             odp_packet_free(pkt_table[i]);
>> +             for (i = 0; i < recv_msgs; i++) {
>> +                     void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
>> +                     struct ethhdr *eth_hdr = base;
>> +                     odp_packet_hdr_t *pkt_hdr;
>>
>> +                     /* Don't receive packets sent by ourselves */
>> +                     if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
>> +                                                     eth_hdr->h_source))) {
>> +                             odp_packet_free(pkt_table[i]);
>> +                             continue;
>> +                     }
>> +                     pkt_hdr = odp_packet_hdr(pkt_table[i]);
>> +                     /* Parse and set packet header data */
>> +                     odp_packet_pull_tail(pkt_table[i],
>> +                                          odp_packet_len(pkt_table[i]) -
>> +                                          msgvec[i].msg_len);
>> +
>> +                     packet_parse_l2(pkt_hdr);
>> +                     pkt_table[nb_rx] = pkt_table[i];
>> +                     nb_rx++;
>> +             }
>> +
>> +             /* Free unused pkt buffers */
>> +             for (; i < msgvec_len; i++)
>> +                     odp_packet_free(pkt_table[i]);
>> +     }
>>       return nb_rx;
>>  }
>>
>> @@ -377,7 +473,7 @@ static uint32_t _tx_pkt_to_iovec(odp_packet_t pkt,
>>               uint32_t seglen;
>>
>>               iovecs[iov_count].iov_base = odp_packet_offset(pkt, offset,
>> -                                                            &seglen, NULL);
>> +                             &seglen, NULL);
>>               iovecs[iov_count].iov_len = seglen;
>>               iov_count++;
>>               offset += seglen;
>> @@ -407,7 +503,7 @@ static int sock_mmsg_send(pktio_entry_t *pktio_entry,
>>       for (i = 0; i < len; i++) {
>>               msgvec[i].msg_hdr.msg_iov = iovecs[i];
>>               msgvec[i].msg_hdr.msg_iovlen = _tx_pkt_to_iovec(pkt_table[i],
>> -                                                             iovecs[i]);
>> +                             iovecs[i]);
>>       }
>>
>>       for (i = 0; i < len; ) {
>> @@ -415,7 +511,7 @@ static int sock_mmsg_send(pktio_entry_t *pktio_entry,
>>               if (odp_unlikely(ret <= -1)) {
>>                       if (i == 0 && SOCK_ERR_REPORT(errno)) {
>>                               __odp_errno = errno;
>> -                             ODP_ERR("sendmmsg(): %s\n", strerror(errno));
>> +                     ODP_ERR("sendmmsg(): %s\n", strerror(errno));
>>                               return -1;
>>                       }
>>                       break;
>> diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c
>> index 2bdb72b..6ec1bfa 100644
>> --- a/platform/linux-generic/pktio/socket_mmap.c
>> +++ b/platform/linux-generic/pktio/socket_mmap.c
>> @@ -28,6 +28,9 @@
>>  #include <odp_packet_internal.h>
>>  #include <odp_packet_io_internal.h>
>>  #include <odp_debug_internal.h>
>> +#include <odp_classification_datamodel.h>
>> +#include <odp_classification_inlines.h>
>> +#include <odp_classification_internal.h>
>>  #include <odp/hints.h>
>>
>>  #include <odp/helper/eth.h>
>> @@ -108,9 +111,9 @@ static inline void mmap_tx_user_ready(struct tpacket2_hdr *hdr)
>>       __sync_synchronize();
>>  }
>>
>> -static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>> +static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
>> +                                   pkt_sock_mmap_t *pkt_sock,
>>                                     odp_packet_t pkt_table[], unsigned len,
>> -                                   odp_pool_t pool,
>>                                     unsigned char if_mac[])
>>  {
>>       union frame_map ppd;
>> @@ -118,11 +121,14 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>>       uint8_t *pkt_buf;
>>       int pkt_len;
>>       struct ethhdr *eth_hdr;
>> -     odp_packet_hdr_t *pkt_hdr;
>>       unsigned i = 0;
>> +     uint8_t nb_rx = 0;
>> +     odp_packet_hdr_t pkt_hdr;
>> +     odp_packet_t pkt;
>> +     struct ring *ring;
>> +     int ret;
>>
>> -     (void)sock;
>> -
>> +     ring  = &pkt_sock->rx_ring;
>>       frame_num = ring->frame_num;
>>
>>       while (i < len) {
>
> The block within this loop is now pretty long and indented, changing the
> next line (not in shown in this diff) can tidy things up a bit:
>
> while (i < len) {
>         if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
>                 break;
>
>         ..
> }
>

Agreed.
>> @@ -143,22 +149,67 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>>                               continue;
>>                       }
>>
>> -                     pkt_table[i] = packet_alloc(pool, pkt_len, 1);
>> -                     if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
>> -                             break;
>> -
>> -                     pkt_hdr = odp_packet_hdr(pkt_table[i]);
>> -
>> -                     if (odp_packet_copydata_in(pkt_table[i], 0,
>> -                                                pkt_len, pkt_buf) != 0) {
>> -                             odp_packet_free(pkt_table[i]);
>> -                             break;
>> +                     if (pktio_cls_enabled(pktio_entry)) {
>> +                             /* Parse and set packet header data */
>> +                             packet_parse_reset(&pkt_hdr);
>> +                             _odp_cls_parse(&pkt_hdr, pkt_buf);
>> +
>> +                             cos_t *cos = pktio_select_cos(pktio_entry,
>> +                                                           pkt_buf,
>> +                                                           &pkt_hdr);
>> +                             /* if No CoS found for then drop the packet */
>> +                             if (cos == NULL || cos->s.queue == NULL) {
>> +                                     mmap_rx_user_ready(ppd.raw); /* drop */
>> +                                     frame_num = next_frame_num;
>> +                                     continue;
>> +                             }
>> +
>> +                             pkt = odp_packet_alloc(cos->s.pool_id,
>> +                                                    pkt_len);
>
> This looks to be forcing a CoS to have a pool associated with it. I
> thought that a CoS must have a queue, but that the pool was optional and
> if not configured packets would be allocated from the pktio's pool.

IMO, the CoS should have a dedicated pool to allocate packets arriving
at this CoS.
This is required so that the applications can basically allow packets
belonging to a particular CoS to be dropped when that particular flow
is in excess without affecting packets from other flows.

At the same time the application can reuse the same pool for pktio
default and to the CoS to handle the above defined behaviour.
The packets associated with a CoS will be dropped if either the pool
or the queue is NULL.
>
>> +                             if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
>> +                                     mmap_rx_user_ready(ppd.raw); /* drop */
>> +                                     frame_num = next_frame_num;
>> +                                     continue;
>> +                             }
>> +
>> +                             copy_packet_parser_metadata(&pkt_hdr,
>> +                                                         odp_packet_hdr
>> +                                                         (pkt));
>> +                             odp_packet_hdr(pkt)->input = pktio_entry->s.id;
>> +
>> +                             ret = odp_packet_copydata_in(pkt, 0,
>> +                                                          pkt_len, pkt_buf);
>> +                             if (ret != 0) {
>> +                                     odp_packet_free(pkt);
>> +                                     break;
>> +                             }
>> +                             ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
>> +                                             (odp_buffer_t)pkt), 0);
>> +                             if (ret < 0) {
>> +                                     pkt_table[nb_rx] = pkt;
>> +                                     nb_rx++;
>> +                             }
>
> I wonder if it's possible to put the above block into a common function
> that can be reused by other pktios. Something that takes the pktio_entry,
> buffer pointer and buffer length and returns a status indication of
> whether the packet should be dropped, skipped (already classified and
> delivered) or delivered to the default queue.

Yes. This can be done. something like a common file which will be used
by different pktio types.
>
>> +                     } else {
>> +                             odp_packet_hdr_t *hdr;
>> +
>> +                             pkt_table[i] = packet_alloc(pkt_sock->pool,
>> +                                                         pkt_len, 1);
>> +                             if (odp_unlikely(pkt_table[i] ==
>> +                                 ODP_PACKET_INVALID))
>> +                                     break;
>
> When the classifier is enabled we now drop packets if an allocation
> fails, I think we should do the same here.

Yes. I agree. This was the existing behaviour in pktio but I will
change the same.
>
>> +                             hdr = odp_packet_hdr(pkt_table[i]);
>> +                             ret = odp_packet_copydata_in(pkt_table[i], 0,
>> +                                                          pkt_len, pkt_buf);
>> +                             if (ret != 0) {
>> +                                     odp_packet_free(pkt_table[i]);
>> +                                     break;
>> +                             }
>> +
>> +                             packet_parse_l2(hdr);
>> +                             nb_rx++;
>>                       }
>>
>> -                     packet_parse_l2(pkt_hdr);
>> -
>>                       mmap_rx_user_ready(ppd.raw);
>> -
>>                       frame_num = next_frame_num;
>>                       i++;
>>               } else {
>> @@ -167,8 +218,7 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
>>       }
>>
>>       ring->frame_num = frame_num;
>> -
>> -     return i;
>> +     return nb_rx;
>>  }

Regards,
Bala
diff mbox

Patch

diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
index 1b6d593..a702867 100644
--- a/platform/linux-generic/odp_classification.c
+++ b/platform/linux-generic/odp_classification.c
@@ -661,6 +661,42 @@  int odp_pktio_pmr_match_set_cos(odp_pmr_set_t pmr_set_id, odp_pktio_t src_pktio,
 	return 0;
 }
 
+int odp_cls_cos_pool_set(odp_cos_t cos_id, odp_pool_t pool_id)
+{
+	cos_t *cos;
+	pool_entry_t *pool;
+
+	cos = get_cos_entry(cos_id);
+	if (cos == NULL) {
+		ODP_ERR("Invalid odp_cos_t handle");
+		return -1;
+	}
+	pool = odp_pool_to_entry(pool_id);
+	if (pool == NULL) {
+		ODP_ERR("Invalid odp_pool_t handle");
+		return -1;
+	}
+
+	LOCK(&cos->s.lock);
+	cos->s.pool = pool;
+	cos->s.pool_id = pool_id;
+	UNLOCK(&cos->s.lock);
+	return 0;
+}
+
+odp_pool_t odp_cls_cos_pool(odp_cos_t cos_id)
+{
+	cos_t *cos;
+
+	cos = get_cos_entry(cos_id);
+	if (cos == NULL) {
+		ODP_ERR("Invalid odp_cos_t handle");
+		return ODP_POOL_INVALID;
+	}
+
+	return cos->s.pool_id;
+}
+
 int verify_pmr(pmr_t *pmr, uint8_t *pkt_addr, odp_packet_hdr_t *pkt_hdr)
 {
 	int pmr_failure = 0;
@@ -824,15 +860,13 @@  int pktio_classifier_init(pktio_entry_t *entry)
 	return 0;
 }
 
-int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
+int _odp_packet_classifier(pktio_entry_t *entry, odp_packet_t pkt)
 {
-	pktio_entry_t *entry;
 	queue_entry_t *queue;
 	cos_t *cos;
 	odp_packet_hdr_t *pkt_hdr;
 	uint8_t *pkt_addr;
 
-	entry = get_pktio_entry(pktio);
 	if (entry == NULL)
 		return -1;
 
@@ -849,6 +883,16 @@  int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
 	return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
 }
 
+int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
+{
+	pktio_entry_t *entry;
+
+	entry = get_pktio_entry(pktio);
+	if (entry == NULL)
+		return -1;
+	return _odp_packet_classifier(entry, pkt);
+}
+
 cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
 		       odp_packet_hdr_t *pkt_hdr)
 {
diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
index 07c740c..80901c9 100644
--- a/platform/linux-generic/odp_packet.c
+++ b/platform/linux-generic/odp_packet.c
@@ -32,10 +32,8 @@  static inline void packet_parse_disable(odp_packet_hdr_t *pkt_hdr)
 	pkt_hdr->input_flags.parsed_all = 1;
 }
 
-void packet_parse_reset(odp_packet_t pkt)
+void packet_parse_reset(odp_packet_hdr_t *pkt_hdr)
 {
-	odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
-
 	/* Reset parser metadata before new parse */
 	pkt_hdr->error_flags.all  = 0;
 	pkt_hdr->input_flags.all  = 0;
@@ -780,9 +778,9 @@  int _odp_packet_copy_to_packet(odp_packet_t srcpkt, uint32_t srcoffset,
  * Parser helper function for IPv4
  */
 static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
-				 uint8_t **parseptr, uint32_t *offset)
+				 const uint8_t **parseptr, uint32_t *offset)
 {
-	odph_ipv4hdr_t *ipv4 = (odph_ipv4hdr_t *)*parseptr;
+	const odph_ipv4hdr_t *ipv4 = (const odph_ipv4hdr_t *)*parseptr;
 	uint8_t ver = ODPH_IPV4HDR_VER(ipv4->ver_ihl);
 	uint8_t ihl = ODPH_IPV4HDR_IHL(ipv4->ver_ihl);
 	uint16_t frag_offset;
@@ -818,10 +816,10 @@  static inline uint8_t parse_ipv4(odp_packet_hdr_t *pkt_hdr,
  * Parser helper function for IPv6
  */
 static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
-				 uint8_t **parseptr, uint32_t *offset)
+				 const uint8_t **parseptr, uint32_t *offset)
 {
-	odph_ipv6hdr_t *ipv6 = (odph_ipv6hdr_t *)*parseptr;
-	odph_ipv6hdr_ext_t *ipv6ext;
+	const odph_ipv6hdr_t *ipv6 = (const odph_ipv6hdr_t *)*parseptr;
+	const odph_ipv6hdr_ext_t *ipv6ext;
 
 	pkt_hdr->l3_len = odp_be_to_cpu_16(ipv6->payload_len);
 
@@ -843,7 +841,7 @@  static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
 		pkt_hdr->input_flags.ipopt = 1;
 
 		do  {
-			ipv6ext    = (odph_ipv6hdr_ext_t *)*parseptr;
+			ipv6ext    = (const odph_ipv6hdr_ext_t *)*parseptr;
 			uint16_t extlen = 8 + ipv6ext->ext_len * 8;
 
 			*offset   += extlen;
@@ -875,9 +873,9 @@  static inline uint8_t parse_ipv6(odp_packet_hdr_t *pkt_hdr,
  * Parser helper function for TCP
  */
 static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
-			     uint8_t **parseptr, uint32_t *offset)
+			     const uint8_t **parseptr, uint32_t *offset)
 {
-	odph_tcphdr_t *tcp = (odph_tcphdr_t *)*parseptr;
+	const odph_tcphdr_t *tcp = (const odph_tcphdr_t *)*parseptr;
 
 	if (tcp->hl < sizeof(odph_tcphdr_t)/sizeof(uint32_t))
 		pkt_hdr->error_flags.tcp_err = 1;
@@ -895,9 +893,9 @@  static inline void parse_tcp(odp_packet_hdr_t *pkt_hdr,
  * Parser helper function for UDP
  */
 static inline void parse_udp(odp_packet_hdr_t *pkt_hdr,
-			     uint8_t **parseptr, uint32_t *offset)
+			     const uint8_t **parseptr, uint32_t *offset)
 {
-	odph_udphdr_t *udp = (odph_udphdr_t *)*parseptr;
+	const odph_udphdr_t *udp = (const odph_udphdr_t *)*parseptr;
 	uint32_t udplen = odp_be_to_cpu_16(udp->length);
 
 	if (udplen < sizeof(odph_udphdr_t) ||
@@ -932,46 +930,51 @@  void packet_parse_l2(odp_packet_hdr_t *pkt_hdr)
 	pkt_hdr->input_flags.parsed_l2 = 1;
 }
 
-/**
- * Simple packet parser
- */
-int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
+int _odp_parse_common(odp_packet_hdr_t *pkt_hdr, const uint8_t *ptr)
 {
-	odph_ethhdr_t *eth;
-	odph_vlanhdr_t *vlan;
+	const odph_ethhdr_t *eth;
+	const odph_vlanhdr_t *vlan;
 	uint16_t ethtype;
-	uint8_t *parseptr;
 	uint32_t offset, seglen;
 	uint8_t ip_proto = 0;
+	const uint8_t *parseptr;
 
+	offset = sizeof(odph_ethhdr_t);
 	if (packet_parse_l2_not_done(pkt_hdr))
 		packet_parse_l2(pkt_hdr);
 
-	eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
-	offset = sizeof(odph_ethhdr_t);
-	parseptr = (uint8_t *)&eth->type;
-	ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
+	if (ptr == NULL) {
+		eth = (odph_ethhdr_t *)packet_map(pkt_hdr, 0, &seglen);
+		parseptr = (const uint8_t *)&eth->type;
+		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
+	} else {
+		eth = (const odph_ethhdr_t *)ptr;
+		parseptr = (const uint8_t *)&eth->type;
+		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
+	}
+
 
 	/* Parse the VLAN header(s), if present */
 	if (ethtype == ODPH_ETHTYPE_VLAN_OUTER) {
 		pkt_hdr->input_flags.vlan_qinq = 1;
 		pkt_hdr->input_flags.vlan = 1;
-		vlan = (odph_vlanhdr_t *)(void *)parseptr;
+
+		vlan = (const odph_vlanhdr_t *)parseptr;
 		pkt_hdr->vlan_s_tag = ((ethtype << 16) |
 				       odp_be_to_cpu_16(vlan->tci));
 		offset += sizeof(odph_vlanhdr_t);
 		parseptr += sizeof(odph_vlanhdr_t);
-		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
+		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
 	}
 
 	if (ethtype == ODPH_ETHTYPE_VLAN) {
 		pkt_hdr->input_flags.vlan = 1;
-		vlan = (odph_vlanhdr_t *)(void *)parseptr;
+		vlan = (const odph_vlanhdr_t *)parseptr;
 		pkt_hdr->vlan_c_tag = ((ethtype << 16) |
 				       odp_be_to_cpu_16(vlan->tci));
 		offset += sizeof(odph_vlanhdr_t);
 		parseptr += sizeof(odph_vlanhdr_t);
-		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
+		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
 	}
 
 	/* Check for SNAP vs. DIX */
@@ -983,7 +986,7 @@  int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
 		}
 		offset   += 8;
 		parseptr += 8;
-		ethtype = odp_be_to_cpu_16(*((uint16_t *)(void *)parseptr));
+		ethtype = odp_be_to_cpu_16(*((const uint16_t *)parseptr));
 	}
 
 	/* Consume Ethertype for Layer 3 parse */
@@ -1061,3 +1064,16 @@  parse_exit:
 	pkt_hdr->input_flags.parsed_all = 1;
 	return pkt_hdr->error_flags.all != 0;
 }
+
+int _odp_cls_parse(odp_packet_hdr_t *pkt_hdr, const uint8_t *parseptr)
+{
+	return _odp_parse_common(pkt_hdr, parseptr);
+}
+
+/**
+ * Simple packet parser
+ */
+int packet_parse_full(odp_packet_hdr_t *pkt_hdr)
+{
+	return _odp_parse_common(pkt_hdr, NULL);
+}
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index b21fd0b..e1e330b 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -196,6 +196,7 @@  static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool,
 		return ODP_PKTIO_INVALID;
 
 	memcpy(&pktio_entry->s.param, param, sizeof(odp_pktio_param_t));
+	pktio_entry->s.id = id;
 
 	for (pktio_if = 0; pktio_if_ops[pktio_if]; ++pktio_if) {
 		ret = pktio_if_ops[pktio_if]->open(id, pktio_entry, dev, pool);
@@ -551,7 +552,7 @@  odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
 	odp_buffer_t buf;
 	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
-	int pkts, i, j;
+	int pkts, i;
 	odp_pktio_t pktio;
 
 	buf_hdr = queue_deq(qentry);
@@ -564,27 +565,13 @@  odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
 	if (pkts <= 0)
 		return NULL;
 
-	if (pktio_cls_enabled(get_pktio_entry(pktio))) {
-		for (i = 0, j = 0; i < pkts; i++) {
-			if (0 > packet_classifier(pktio, pkt_tbl[i])) {
-				buf = _odp_packet_to_buffer(pkt_tbl[i]);
-				hdr_tbl[j++] = odp_buf_to_hdr(buf);
-			}
-		}
-	} else {
-		for (i = 0; i < pkts; i++) {
-			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-			hdr_tbl[i] = odp_buf_to_hdr(buf);
-		}
-
-		j = pkts;
+	for (i = 0; i < pkts; i++) {
+		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
+		hdr_tbl[i] = odp_buf_to_hdr(buf);
 	}
 
-	if (0 == j)
-		return NULL;
-
-	if (j > 1)
-		queue_enq_multi(qentry, &hdr_tbl[1], j - 1, 0);
+	if (pkts > 1)
+		queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0);
 	buf_hdr = hdr_tbl[0];
 	return buf_hdr;
 }
@@ -623,27 +610,15 @@  int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
 	if (pkts <= 0)
 		return nbr;
 
-	if (pktio_cls_enabled(get_pktio_entry(pktio))) {
-		for (i = 0, j = 0; i < pkts; i++) {
-			if (0 > packet_classifier(pktio, pkt_tbl[i])) {
-				buf = _odp_packet_to_buffer(pkt_tbl[i]);
-				if (nbr < num)
-					buf_hdr[nbr++] = odp_buf_to_hdr(buf);
-				else
-					hdr_tbl[j++] = odp_buf_to_hdr(buf);
-			}
-		}
-	} else {
-		/* Fill in buf_hdr first */
-		for (i = 0; i < pkts && nbr < num; i++, nbr++) {
-			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-			buf_hdr[nbr] = odp_buf_to_hdr(buf);
-		}
-		/* Queue the rest for later */
-		for (j = 0; i < pkts; i++, j++) {
-			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-			hdr_tbl[j++] = odp_buf_to_hdr(buf);
-		}
+	/* Fill in buf_hdr first */
+	for (i = 0; i < pkts && nbr < num; i++, nbr++) {
+		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
+		buf_hdr[nbr] = odp_buf_to_hdr(buf);
+	}
+	/* Queue the rest for later */
+	for (j = 0; i < pkts; i++, j++) {
+		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
+		hdr_tbl[j++] = odp_buf_to_hdr(buf);
 	}
 
 	if (j)
@@ -655,7 +630,7 @@  int pktin_poll(pktio_entry_t *entry)
 {
 	odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
-	int num, num_enq, i;
+	int num, i;
 	odp_buffer_t buf;
 	odp_pktio_t pktio;
 
@@ -680,26 +655,15 @@  int pktin_poll(pktio_entry_t *entry)
 		return -1;
 	}
 
-	if (pktio_cls_enabled(entry)) {
-		for (i = 0, num_enq = 0; i < num; i++) {
-			if (packet_classifier(pktio, pkt_tbl[i]) < 0) {
-				buf = _odp_packet_to_buffer(pkt_tbl[i]);
-				hdr_tbl[num_enq++] = odp_buf_to_hdr(buf);
-			}
-		}
-	} else {
-		for (i = 0; i < num; i++) {
-			buf        = _odp_packet_to_buffer(pkt_tbl[i]);
-			hdr_tbl[i] = odp_buf_to_hdr(buf);
-		}
-
-		num_enq = num;
+	for (i = 0; i < num; i++) {
+		buf        = _odp_packet_to_buffer(pkt_tbl[i]);
+		hdr_tbl[i] = odp_buf_to_hdr(buf);
 	}
 
-	if (num_enq) {
+	if (num) {
 		queue_entry_t *qentry;
 		qentry = queue_to_qentry(entry->s.inq_default);
-		queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
+		queue_enq_multi(qentry, hdr_tbl, num, 0);
 	}
 
 	return 0;
diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c
index ce19add..47745ad 100644
--- a/platform/linux-generic/pktio/loop.c
+++ b/platform/linux-generic/pktio/loop.c
@@ -12,6 +12,7 @@ 
 #include <odp.h>
 #include <odp_packet_internal.h>
 #include <odp_packet_io_internal.h>
+#include <odp_classification_internal.h>
 #include <odp_debug_internal.h>
 #include <odp/hints.h>
 
@@ -50,19 +51,35 @@  static int loopback_close(pktio_entry_t *pktio_entry)
 static int loopback_recv(pktio_entry_t *pktio_entry, odp_packet_t pkts[],
 			 unsigned len)
 {
-	int nbr, i;
+	int nbr, i, j;
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	queue_entry_t *qentry;
 	odp_packet_hdr_t *pkt_hdr;
+	odp_packet_t pkt;
 
+	nbr = 0;
 	qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
 	nbr = queue_deq_multi(qentry, hdr_tbl, len);
 
-	for (i = 0; i < nbr; ++i) {
-		pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
-		pkt_hdr = odp_packet_hdr(pkts[i]);
-		packet_parse_reset(pkts[i]);
-		packet_parse_l2(pkt_hdr);
+	if (pktio_cls_enabled(pktio_entry)) {
+		for (i = 0, j = 0; i < nbr; i++) {
+			pkt = _odp_packet_from_buffer(odp_hdr_to_buf
+						      (hdr_tbl[i]));
+			pkt_hdr = odp_packet_hdr(pkt);
+			packet_parse_reset(pkt_hdr);
+			packet_parse_l2(pkt_hdr);
+			if (0 > _odp_packet_classifier(pktio_entry, pkt))
+				pkts[j++] = pkt;
+		}
+	nbr = j;
+	} else {
+		for (i = 0; i < nbr; ++i) {
+			pkts[i] = _odp_packet_from_buffer(odp_hdr_to_buf
+							  (hdr_tbl[i]));
+			pkt_hdr = odp_packet_hdr(pkts[i]);
+			packet_parse_reset(pkt_hdr);
+			packet_parse_l2(pkt_hdr);
+		}
 	}
 
 	return nbr;
diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c
index d064323..74835ad 100644
--- a/platform/linux-generic/pktio/netmap.c
+++ b/platform/linux-generic/pktio/netmap.c
@@ -20,6 +20,9 @@ 
 #include <poll.h>
 #include <linux/ethtool.h>
 #include <linux/sockios.h>
+#include <odp_classification_datamodel.h>
+#include <odp_classification_inlines.h>
+#include <odp_classification_internal.h>
 
 #define NETMAP_WITH_LIBS
 #include <net/netmap_user.h>
@@ -197,7 +200,9 @@  static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
 				    uint16_t len)
 {
 	odp_packet_t pkt;
-	odp_packet_hdr_t *pkt_hdr;
+	pktio_entry_t *pktio_entry;
+
+	pktio_entry = args->pktio_entry;
 
 	if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
 		ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
@@ -210,19 +215,58 @@  static inline int netmap_pkt_to_odp(pktio_entry_t *pktio_entry,
 		return -1;
 	}
 
-	pkt = packet_alloc(pktio_entry->s.pkt_nm.pool, len, 1);
-	if (pkt == ODP_PACKET_INVALID)
-		return -1;
+	if (pktio_cls_enabled(pktio_entry)) {
+		odp_packet_hdr_t pkt_hdr;
+		cos_t *cos;
 
-	pkt_hdr = odp_packet_hdr(pkt);
+		packet_parse_reset(&pkt_hdr);
+		_odp_cls_parse(&pkt_hdr, buf);
+		cos = pktio_select_cos(args->pktio_entry,
+				       (uint8_t *)buf,
+				       &pkt_hdr);
 
-	/* For now copy the data in the mbuf,
-	   worry about zero-copy later */
-	if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
-		odp_packet_free(pkt);
-		return -1;
+		if (cos == NULL || cos->s.queue == NULL)
+			return -1;
+
+		pkt = odp_packet_alloc(cos->s.pool_id, len);
+		if (pkt == ODP_PACKET_INVALID)
+			return -1;
+
+		copy_packet_parser_metadata(&pkt_hdr, odp_packet_hdr(pkt));
+
+		/* For now copy the data in the mbuf,
+		worry about zero-copy later */
+		if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
+			odp_packet_free(pkt);
+			return -1;
+		}
+
+		odp_packet_hdr(pkt)->input = args->pktio_entry->s.id;
+		ret = queue_enq(cos->s.queue,
+				odp_buf_to_hdr((odp_buffer_t)pkt), 0);
+		/*if the queue enq was not successful return
+		the packet to packet recv function to be enqueued
+		to the default queue*/
+		if (ret == 0)
+			return -1;
+	} else {
+		odp_packet_hdr_t *pkt_hdr;
+
+		pkt = packet_alloc(pktio_entry->s.pkt_nm->pool, len, 1);
+		if (pkt == ODP_PACKET_INVALID)
+			return -1;
+
+		pkt_hdr = odp_packet_hdr(pkt);
+
+		/* For now copy the data in the mbuf,
+		   worry about zero-copy later */
+		if (odp_packet_copydata_in(pkt, 0, len, buf) != 0) {
+			odp_packet_free(pkt);
+			return;
+		}
+
+		packet_parse_l2(pkt_hdr);
 	}
-	packet_parse_l2(pkt_hdr);
 
 	*pkt_out = pkt;
 	return 0;
diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c
index 5f5e0ae..95b7ea3 100644
--- a/platform/linux-generic/pktio/socket.c
+++ b/platform/linux-generic/pktio/socket.c
@@ -38,6 +38,9 @@ 
 #include <odp_packet_io_internal.h>
 #include <odp_align_internal.h>
 #include <odp_debug_internal.h>
+#include <odp_classification_datamodel.h>
+#include <odp_classification_inlines.h>
+#include <odp_classification_internal.h>
 #include <odp/hints.h>
 
 #include <odp/helper/eth.h>
@@ -194,6 +197,8 @@  static int sock_close(pktio_entry_t *pktio_entry)
 		return -1;
 	}
 
+	odp_shm_free(pkt_sock->shm);
+
 	return 0;
 }
 
@@ -205,10 +210,13 @@  static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
 {
 	int sockfd;
 	int err;
+	int i;
 	unsigned int if_idx;
 	struct ifreq ethreq;
 	struct sockaddr_ll sa_ll;
+	char shm_name[ODP_SHM_NAME_LEN];
 	pkt_sock_t *pkt_sock = &pktio_entry->s.pkt_sock;
+	uint8_t *addr;
 
 	/* Init pktio entry */
 	memset(pkt_sock, 0, sizeof(*pkt_sock));
@@ -218,6 +226,20 @@  static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
 	if (pool == ODP_POOL_INVALID)
 		return -1;
 	pkt_sock->pool = pool;
+	snprintf(shm_name, ODP_SHM_NAME_LEN, "%s-%s", "pktio", netdev);
+	shm_name[ODP_SHM_NAME_LEN - 1] = '\0';
+
+	pkt_sock->shm = odp_shm_reserve(shm_name, PACKET_JUMBO_LEN,
+					  PACKET_JUMBO_LEN *
+					  ODP_PACKET_SOCKET_MAX_BURST_RX, 0);
+	if (pkt_sock->shm == ODP_SHM_INVALID)
+		return -1;
+
+	addr = odp_shm_addr(pkt_sock->shm);
+	for (i = 0; i < ODP_PACKET_SOCKET_MAX_BURST_RX; i++) {
+		pkt_sock->cache_ptr[i] = addr;
+		addr += PACKET_JUMBO_LEN;
+	}
 
 	sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
 	if (sockfd == -1) {
@@ -253,7 +275,6 @@  static int sock_setup_pkt(pktio_entry_t *pktio_entry, const char *netdev,
 		ODP_ERR("bind(to IF): %s\n", strerror(errno));
 		goto error;
 	}
-
 	return 0;
 
 error:
@@ -311,58 +332,133 @@  static int sock_mmsg_recv(pktio_entry_t *pktio_entry,
 	const int sockfd = pkt_sock->sockfd;
 	int msgvec_len;
 	struct mmsghdr msgvec[ODP_PACKET_SOCKET_MAX_BURST_RX];
-	struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX][ODP_BUFFER_MAX_SEG];
 	int nb_rx = 0;
 	int recv_msgs;
+	uint8_t **recv_cache;
 	int i;
+	int ret;
 
 	if (odp_unlikely(len > ODP_PACKET_SOCKET_MAX_BURST_RX))
 		return -1;
 
 	memset(msgvec, 0, sizeof(msgvec));
+	recv_cache = pkt_sock->cache_ptr;
 
-	for (i = 0; i < (int)len; i++) {
-		pkt_table[i] = packet_alloc(pkt_sock->pool, 0 /*default*/, 1);
-		if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
-			break;
+	if (pktio_cls_enabled(pktio_entry)) {
+		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX];
+		odp_packet_hdr_t pkt_hdr;
 
-		msgvec[i].msg_hdr.msg_iovlen =
-			_rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
+		for (i = 0; i < (int)len; i++) {
+			msgvec[i].msg_hdr.msg_iovlen = 1;
+			iovecs[i].iov_base = recv_cache[i];
+			iovecs[i].iov_len = 1024;
 
-		msgvec[i].msg_hdr.msg_iov = iovecs[i];
-	}
-	msgvec_len = i; /* number of successfully allocated pkt buffers */
+			msgvec[i].msg_hdr.msg_iov = &iovecs[i];
+		}
+		/* number of successfully allocated pkt buffers */
+		msgvec_len = i;
+
+		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
+				     MSG_DONTWAIT, NULL);
+		for (i = 0; i < recv_msgs; i++) {
+			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
+			struct ethhdr *eth_hdr = base;
+
+			/* Don't receive packets sent by ourselves */
+			if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
+							eth_hdr->h_source))) {
+				continue;
+			}
 
-	recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len, MSG_DONTWAIT, NULL);
+			packet_parse_reset(&pkt_hdr);
 
-	for (i = 0; i < recv_msgs; i++) {
-		odp_packet_hdr_t *pkt_hdr;
-		void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
-		struct ethhdr *eth_hdr = base;
+			_odp_cls_parse(&pkt_hdr, base);
+			cos_t *cos = pktio_select_cos(pktio_entry,
+						      (uint8_t *)base,
+						      &pkt_hdr);
 
-		/* Don't receive packets sent by ourselves */
-		if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
-						eth_hdr->h_source))) {
-			odp_packet_free(pkt_table[i]);
-			continue;
+			/* if No CoS found then drop the packet */
+			if (cos == NULL || cos->s.queue == NULL)
+				continue;
+
+			pkt_table[i] = odp_packet_alloc(cos->s.pool_id,
+							 msgvec[i].msg_len);
+			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
+				continue;
+
+			copy_packet_parser_metadata(&pkt_hdr,
+						    odp_packet_hdr
+						    (pkt_table[i]));
+
+			odp_packet_hdr(pkt_table[i])->input = pktio_entry->
+							      s.id;
+
+			if (odp_packet_copydata_in(pkt_table[i], 0,
+						   msgvec[i].msg_len, base)
+						   != 0) {
+				odp_packet_free(pkt_table[i]);
+				continue;
+			}
+
+			/* Parse and set packet header data */
+			odp_packet_pull_tail(pkt_table[i],
+					     odp_packet_len(pkt_table[i]) -
+					     msgvec[i].msg_len);
+			ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
+					(odp_buffer_t)pkt_table[i]), 0);
+			if (ret < 0) {
+				pkt_table[nb_rx] = pkt_table[i];
+				nb_rx++;
+			}
 		}
+	} else {
+		struct iovec iovecs[ODP_PACKET_SOCKET_MAX_BURST_RX]
+				   [ODP_BUFFER_MAX_SEG];
+
+		for (i = 0; i < (int)len; i++) {
+			pkt_table[i] = packet_alloc(pkt_sock->pool,
+						    0 /*default*/, 1);
+			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
+				break;
 
-		pkt_hdr = odp_packet_hdr(pkt_table[i]);
+			msgvec[i].msg_hdr.msg_iovlen =
+				_rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
 
-		odp_packet_pull_tail(pkt_table[i],
-				     odp_packet_len(pkt_table[i]) -
-				     msgvec[i].msg_len);
+			msgvec[i].msg_hdr.msg_iov = iovecs[i];
+		}
 
-		packet_parse_l2(pkt_hdr);
+		/* number of successfully allocated pkt buffers */
+		msgvec_len = i;
 
-		pkt_table[nb_rx] = pkt_table[i];
-		nb_rx++;
-	}
+		recv_msgs = recvmmsg(sockfd, msgvec, msgvec_len,
+				     MSG_DONTWAIT, NULL);
 
-	/* Free unused pkt buffers */
-	for (; i < msgvec_len; i++)
-		odp_packet_free(pkt_table[i]);
+		for (i = 0; i < recv_msgs; i++) {
+			void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
+			struct ethhdr *eth_hdr = base;
+			odp_packet_hdr_t *pkt_hdr;
 
+			/* Don't receive packets sent by ourselves */
+			if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
+							eth_hdr->h_source))) {
+				odp_packet_free(pkt_table[i]);
+				continue;
+			}
+			pkt_hdr = odp_packet_hdr(pkt_table[i]);
+			/* Parse and set packet header data */
+			odp_packet_pull_tail(pkt_table[i],
+					     odp_packet_len(pkt_table[i]) -
+					     msgvec[i].msg_len);
+
+			packet_parse_l2(pkt_hdr);
+			pkt_table[nb_rx] = pkt_table[i];
+			nb_rx++;
+		}
+
+		/* Free unused pkt buffers */
+		for (; i < msgvec_len; i++)
+			odp_packet_free(pkt_table[i]);
+	}
 	return nb_rx;
 }
 
@@ -377,7 +473,7 @@  static uint32_t _tx_pkt_to_iovec(odp_packet_t pkt,
 		uint32_t seglen;
 
 		iovecs[iov_count].iov_base = odp_packet_offset(pkt, offset,
-							       &seglen, NULL);
+				&seglen, NULL);
 		iovecs[iov_count].iov_len = seglen;
 		iov_count++;
 		offset += seglen;
@@ -407,7 +503,7 @@  static int sock_mmsg_send(pktio_entry_t *pktio_entry,
 	for (i = 0; i < len; i++) {
 		msgvec[i].msg_hdr.msg_iov = iovecs[i];
 		msgvec[i].msg_hdr.msg_iovlen = _tx_pkt_to_iovec(pkt_table[i],
-								iovecs[i]);
+				iovecs[i]);
 	}
 
 	for (i = 0; i < len; ) {
@@ -415,7 +511,7 @@  static int sock_mmsg_send(pktio_entry_t *pktio_entry,
 		if (odp_unlikely(ret <= -1)) {
 			if (i == 0 && SOCK_ERR_REPORT(errno)) {
 				__odp_errno = errno;
-				ODP_ERR("sendmmsg(): %s\n", strerror(errno));
+			ODP_ERR("sendmmsg(): %s\n", strerror(errno));
 				return -1;
 			}
 			break;
diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c
index 2bdb72b..6ec1bfa 100644
--- a/platform/linux-generic/pktio/socket_mmap.c
+++ b/platform/linux-generic/pktio/socket_mmap.c
@@ -28,6 +28,9 @@ 
 #include <odp_packet_internal.h>
 #include <odp_packet_io_internal.h>
 #include <odp_debug_internal.h>
+#include <odp_classification_datamodel.h>
+#include <odp_classification_inlines.h>
+#include <odp_classification_internal.h>
 #include <odp/hints.h>
 
 #include <odp/helper/eth.h>
@@ -108,9 +111,9 @@  static inline void mmap_tx_user_ready(struct tpacket2_hdr *hdr)
 	__sync_synchronize();
 }
 
-static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
+static inline unsigned pkt_mmap_v2_rx(pktio_entry_t *pktio_entry,
+				      pkt_sock_mmap_t *pkt_sock,
 				      odp_packet_t pkt_table[], unsigned len,
-				      odp_pool_t pool,
 				      unsigned char if_mac[])
 {
 	union frame_map ppd;
@@ -118,11 +121,14 @@  static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
 	uint8_t *pkt_buf;
 	int pkt_len;
 	struct ethhdr *eth_hdr;
-	odp_packet_hdr_t *pkt_hdr;
 	unsigned i = 0;
+	uint8_t nb_rx = 0;
+	odp_packet_hdr_t pkt_hdr;
+	odp_packet_t pkt;
+	struct ring *ring;
+	int ret;
 
-	(void)sock;
-
+	ring  = &pkt_sock->rx_ring;
 	frame_num = ring->frame_num;
 
 	while (i < len) {
@@ -143,22 +149,67 @@  static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
 				continue;
 			}
 
-			pkt_table[i] = packet_alloc(pool, pkt_len, 1);
-			if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
-				break;
-
-			pkt_hdr = odp_packet_hdr(pkt_table[i]);
-
-			if (odp_packet_copydata_in(pkt_table[i], 0,
-						   pkt_len, pkt_buf) != 0) {
-				odp_packet_free(pkt_table[i]);
-				break;
+			if (pktio_cls_enabled(pktio_entry)) {
+				/* Parse and set packet header data */
+				packet_parse_reset(&pkt_hdr);
+				_odp_cls_parse(&pkt_hdr, pkt_buf);
+
+				cos_t *cos = pktio_select_cos(pktio_entry,
+							      pkt_buf,
+							      &pkt_hdr);
+				/* if No CoS found for then drop the packet */
+				if (cos == NULL || cos->s.queue == NULL) {
+					mmap_rx_user_ready(ppd.raw); /* drop */
+					frame_num = next_frame_num;
+					continue;
+				}
+
+				pkt = odp_packet_alloc(cos->s.pool_id,
+						       pkt_len);
+				if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
+					mmap_rx_user_ready(ppd.raw); /* drop */
+					frame_num = next_frame_num;
+					continue;
+				}
+
+				copy_packet_parser_metadata(&pkt_hdr,
+							    odp_packet_hdr
+							    (pkt));
+				odp_packet_hdr(pkt)->input = pktio_entry->s.id;
+
+				ret = odp_packet_copydata_in(pkt, 0,
+							     pkt_len, pkt_buf);
+				if (ret != 0) {
+					odp_packet_free(pkt);
+					break;
+				}
+				ret = queue_enq(cos->s.queue, odp_buf_to_hdr(
+						(odp_buffer_t)pkt), 0);
+				if (ret < 0) {
+					pkt_table[nb_rx] = pkt;
+					nb_rx++;
+				}
+			} else {
+				odp_packet_hdr_t *hdr;
+
+				pkt_table[i] = packet_alloc(pkt_sock->pool,
+							    pkt_len, 1);
+				if (odp_unlikely(pkt_table[i] ==
+				    ODP_PACKET_INVALID))
+					break;
+				hdr = odp_packet_hdr(pkt_table[i]);
+				ret = odp_packet_copydata_in(pkt_table[i], 0,
+							     pkt_len, pkt_buf);
+				if (ret != 0) {
+					odp_packet_free(pkt_table[i]);
+					break;
+				}
+
+				packet_parse_l2(hdr);
+				nb_rx++;
 			}
 
-			packet_parse_l2(pkt_hdr);
-
 			mmap_rx_user_ready(ppd.raw);
-
 			frame_num = next_frame_num;
 			i++;
 		} else {
@@ -167,8 +218,7 @@  static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
 	}
 
 	ring->frame_num = frame_num;
-
-	return i;
+	return nb_rx;
 }
 
 static inline unsigned pkt_mmap_v2_tx(int sock, struct ring *ring,
@@ -502,9 +552,8 @@  static int sock_mmap_recv(pktio_entry_t *pktio_entry,
 {
 	pkt_sock_mmap_t *const pkt_sock = &pktio_entry->s.pkt_sock_mmap;
 
-	return pkt_mmap_v2_rx(pkt_sock->rx_ring.sock, &pkt_sock->rx_ring,
-			      pkt_table, len, pkt_sock->pool,
-			      pkt_sock->if_mac);
+	return pkt_mmap_v2_rx(pktio_entry, pkt_sock,
+			      pkt_table, len, pkt_sock->if_mac);
 }
 
 static int sock_mmap_send(pktio_entry_t *pktio_entry,