diff mbox series

[3/3] linux-generic: classification: implement packet hashing in classifier

Message ID 1493803502-4500-3-git-send-email-bala.manoharan@linaro.org
State New
Headers show
Series [1/3] api: classification: add support for packet hashing in classification | expand

Commit Message

Balasubramanian Manoharan May 3, 2017, 9:25 a.m. UTC
Implement packet hashing feature to distribute incoming packet to multiple
queues linked with CoS.

Signed-off-by: Balasubramanian Manoharan <bala.manoharan@linaro.org>

---
 .../include/odp_classification_datamodel.h         |  18 +++
 .../include/odp_classification_internal.h          |   4 +
 platform/linux-generic/odp_classification.c        | 161 +++++++++++++++++++--
 3 files changed, 168 insertions(+), 15 deletions(-)

-- 
1.9.1

Comments

Peltonen, Janne (Nokia - FI/Espoo) May 3, 2017, 12:45 p.m. UTC | #1
Hi,

> @@ -846,11 +897,91 @@ int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base,

> 

>  	*pool = cos->s.pool;

>  	pkt_hdr->p.input_flags.dst_queue = 1;

> -	pkt_hdr->dst_queue = cos->s.queue->s.handle;

> 

> +	if (!cos->s.queue_group) {

> +		pkt_hdr->dst_queue = cos->s.queue->s.handle;

> +		return 0;

> +	}

> +

> +	hash = packet_rss_hash(pkt_hdr, cos->s.hash_proto, base);

> +	hash = hash & 0x1F;

> +	hash = hash & cos->s.num_queue;


This works only if cos->s.num_queue is one less than a power of two,
which it does not seem to be in general.

> +	tbl_index = (cos->s.index * ODP_COS_QUEUE_MAX) + hash;

> +

> +	if (!queue_grp_tbl->s.queue[tbl_index]) {

> +		LOCK(&cos->s.lock);

> +		if (!queue_grp_tbl->s.queue[tbl_index])

> +			UNLOCK(&cos->s.lock);


You probably did not mean that but to skip the rest and unlock
if the queue now exists.

But I think it may be bad to have queue creation as a side effect of
packet processing since the creation (including possible registration
to the scheduler) can be somewhat slow and cause uncontrolled latency
to packet handling.

Another issue is that there is no practical way of getting the queue
handle in case one wants to use the queue directly instead of through
the scheduler. And even if there were, one would need to get to know
about new queues right when they get created.

Where does the packet get delivered if queue creation here fails?
How would one know that such a problem occurred?

	Janne

> +

> +		sprintf(queue_name, "%s%d", "queue_group", tbl_index);

> +		queue = odp_queue_create(queue_name, &cos->s.queue_param);

> +		queue_grp_tbl->s.queue[tbl_index] = queue;

> +		UNLOCK(&cos->s.lock);

> +	}

> +

> +	pkt_hdr->dst_queue = queue_grp_tbl->s.queue[tbl_index];

>  	return 0;

>  }

>
Balasubramanian Manoharan May 4, 2017, 1:22 p.m. UTC | #2
Regards,
Bala


On 3 May 2017 at 18:15, Peltonen, Janne (Nokia - FI/Espoo)
<janne.peltonen@nokia.com> wrote:
> Hi,

>

>> @@ -846,11 +897,91 @@ int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base,

>>

>>       *pool = cos->s.pool;

>>       pkt_hdr->p.input_flags.dst_queue = 1;

>> -     pkt_hdr->dst_queue = cos->s.queue->s.handle;

>>

>> +     if (!cos->s.queue_group) {

>> +             pkt_hdr->dst_queue = cos->s.queue->s.handle;

>> +             return 0;

>> +     }

>> +

>> +     hash = packet_rss_hash(pkt_hdr, cos->s.hash_proto, base);

>> +     hash = hash & 0x1F;

>> +     hash = hash & cos->s.num_queue;

>

> This works only if cos->s.num_queue is one less than a power of two,

> which it does not seem to be in general.


Yes.

>

>> +     tbl_index = (cos->s.index * ODP_COS_QUEUE_MAX) + hash;

>> +

>> +     if (!queue_grp_tbl->s.queue[tbl_index]) {

>> +             LOCK(&cos->s.lock);

>> +             if (!queue_grp_tbl->s.queue[tbl_index])

>> +                     UNLOCK(&cos->s.lock);

>

> You probably did not mean that but to skip the rest and unlock

> if the queue now exists.

>

> But I think it may be bad to have queue creation as a side effect of

> packet processing since the creation (including possible registration

> to the scheduler) can be somewhat slow and cause uncontrolled latency

> to packet handling.


This is an implementation dependent and we could implement in a way to
create all the queues upfront rather than when packets are arrived for
the flow.

>

> Another issue is that there is no practical way of getting the queue

> handle in case one wants to use the queue directly instead of through

> the scheduler. And even if there were, one would need to get to know

> about new queues right when they get created.


I have proposed the API odp_queue_hash_result() to get the queue for
the possible flow. But based on the use-case you have defined here we
could add an additional API to receive the queue handle of all
possible queues from the hash configuration.

>

> Where does the packet get delivered if queue creation here fails?

> How would one know that such a problem occurred?

>

>         Janne

>

>> +

>> +             sprintf(queue_name, "%s%d", "queue_group", tbl_index);

>> +             queue = odp_queue_create(queue_name, &cos->s.queue_param);

>> +             queue_grp_tbl->s.queue[tbl_index] = queue;

>> +             UNLOCK(&cos->s.lock);

>> +     }

>> +

>> +     pkt_hdr->dst_queue = queue_grp_tbl->s.queue[tbl_index];

>>       return 0;

>>  }

>>

>
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_classification_datamodel.h b/platform/linux-generic/include/odp_classification_datamodel.h
index 9df756b..b44a547 100644
--- a/platform/linux-generic/include/odp_classification_datamodel.h
+++ b/platform/linux-generic/include/odp_classification_datamodel.h
@@ -46,6 +46,10 @@  extern "C" {
 /* Max PMR Term bits */
 #define ODP_PMR_TERM_BYTES_MAX		16
 
+#define ODP_COS_QUEUE_MAX		32
+
+#define ODP_CLS_QUEUE_GROUP_MAX		(ODP_COS_MAX_ENTRY * ODP_COS_QUEUE_MAX)
+
 /**
 Packet Matching Rule Term Value
 
@@ -94,7 +98,12 @@  struct cos_s {
 	size_t headroom;		/* Headroom for this CoS */
 	odp_spinlock_t lock;		/* cos lock */
 	odp_atomic_u32_t num_rule;	/* num of PMRs attached with this CoS */
+	bool queue_group;
+	odp_hash_proto_t hash_proto;
+	uint32_t num_queue;
+	odp_queue_param_t queue_param;
 	char name[ODP_COS_NAME_LEN];	/* name */
+	uint8_t index;
 };
 
 typedef union cos_u {
@@ -122,6 +131,15 @@  typedef union pmr_u {
 	uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct pmr_s))];
 } pmr_t;
 
+typedef struct _cls_queue_grp_tbl_s {
+	odp_queue_t queue[ODP_CLS_QUEUE_GROUP_MAX];
+} _cls_queue_grp_tbl_s;
+
+typedef union _cls_queue_grp_tbl_t {
+	_cls_queue_grp_tbl_s s;
+	uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(_cls_queue_grp_tbl_s))];
+} _cls_queue_grp_tbl_t;
+
 /**
 L2 QoS and CoS Map
 
diff --git a/platform/linux-generic/include/odp_classification_internal.h b/platform/linux-generic/include/odp_classification_internal.h
index 78eaac9..f2ac608 100644
--- a/platform/linux-generic/include/odp_classification_internal.h
+++ b/platform/linux-generic/include/odp_classification_internal.h
@@ -138,6 +138,10 @@  Otherwise.
 **/
 int verify_pmr(pmr_t *pmr, const uint8_t *pkt_addr, odp_packet_hdr_t *pkt_hdr);
 
+uint32_t packet_rss_hash(odp_packet_hdr_t *pkt_hdr,
+			 odp_hash_proto_t hash_proto,
+			 const uint8_t *base);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/odp_classification.c b/platform/linux-generic/odp_classification.c
index 7ebc47d..9028482 100644
--- a/platform/linux-generic/odp_classification.c
+++ b/platform/linux-generic/odp_classification.c
@@ -17,6 +17,7 @@ 
 #include <odp_classification_inlines.h>
 #include <odp_classification_internal.h>
 #include <odp/api/shared_memory.h>
+#include <protocols/thash.h>
 #include <protocols/eth.h>
 #include <protocols/ip.h>
 #include <string.h>
@@ -30,6 +31,16 @@ 
 
 static cos_tbl_t *cos_tbl;
 static pmr_tbl_t	*pmr_tbl;
+static _cls_queue_grp_tbl_t *queue_grp_tbl;
+
+const uint8_t default_rss[] = {
+	0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
+	0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
+	0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
+	0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
+	0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
+};
+
 
 cos_t *get_cos_entry_internal(odp_cos_t cos_id)
 {
@@ -45,6 +56,7 @@  int odp_classification_init_global(void)
 {
 	odp_shm_t cos_shm;
 	odp_shm_t pmr_shm;
+	odp_shm_t queue_grp_shm;
 	int i;
 
 	cos_shm = odp_shm_reserve("shm_odp_cos_tbl",
@@ -89,8 +101,22 @@  int odp_classification_init_global(void)
 		LOCK_INIT(&pmr->s.lock);
 	}
 
+	queue_grp_shm = odp_shm_reserve("shm_odp_cls_queue_grp_tbl",
+					sizeof(_cls_queue_grp_tbl_t),
+					sizeof(queue_entry_t *), 0);
+
+	if (queue_grp_shm == ODP_SHM_INVALID) {
+		ODP_ERR("shm allocation failed for queue_grp_tbl");
+		goto error_queue_grp;
+	}
+
+	queue_grp_tbl = odp_shm_addr(queue_grp_shm);
+	memset(queue_grp_tbl, 0, sizeof(_cls_queue_grp_tbl_t));
+
 	return 0;
 
+error_queue_grp:
+	odp_shm_free(queue_grp_shm);
 error_pmr:
 	odp_shm_free(pmr_shm);
 error_cos:
@@ -124,6 +150,11 @@  void odp_cls_cos_param_init(odp_cls_cos_param_t *param)
 	param->queue = ODP_QUEUE_INVALID;
 	param->pool = ODP_POOL_INVALID;
 	param->drop_policy = ODP_COS_DROP_NEVER;
+	param->num_queue = 1;
+	param->deq_mode = ODP_QUEUE_OP_MT;
+	param->sched_param.prio = ODP_SCHED_PRIO_DEFAULT;
+	param->sched_param.sync = ODP_SCHED_SYNC_PARALLEL;
+	param->sched_param.group = ODP_SCHED_GROUP_ALL;
 }
 
 void odp_cls_pmr_param_init(odp_pmr_param_t *param)
@@ -159,19 +190,23 @@  odp_cos_t odp_cls_cos_create(const char *name, odp_cls_cos_param_t *param)
 	int i, j;
 	queue_entry_t *queue;
 	odp_cls_drop_t drop_policy;
+	cos_t *cos;
 
 	/* Packets are dropped if Queue or Pool is invalid*/
 	if (param->queue == ODP_QUEUE_INVALID)
 		queue = NULL;
 	else
 		queue = queue_to_qentry(param->queue);
+	if (param->num_queue > ODP_COS_QUEUE_MAX)
+		return ODP_COS_INVALID;
 
 	drop_policy = param->drop_policy;
 
 	for (i = 0; i < ODP_COS_MAX_ENTRY; i++) {
-		LOCK(&cos_tbl->cos_entry[i].s.lock);
-		if (0 == cos_tbl->cos_entry[i].s.valid) {
-			char *cos_name = cos_tbl->cos_entry[i].s.name;
+		cos = &cos_tbl->cos_entry[i];
+		LOCK(&cos->s.lock);
+		if (0 == cos->s.valid) {
+			char *cos_name = cos->s.name;
 
 			if (name == NULL) {
 				cos_name[0] = 0;
@@ -180,20 +215,32 @@  odp_cos_t odp_cls_cos_create(const char *name, odp_cls_cos_param_t *param)
 				cos_name[ODP_COS_NAME_LEN - 1] = 0;
 			}
 			for (j = 0; j < ODP_PMR_PER_COS_MAX; j++) {
-				cos_tbl->cos_entry[i].s.pmr[j] = NULL;
-				cos_tbl->cos_entry[i].s.linked_cos[j] = NULL;
+				cos->s.pmr[j] = NULL;
+				cos->s.linked_cos[j] = NULL;
 			}
-			cos_tbl->cos_entry[i].s.queue = queue;
-			cos_tbl->cos_entry[i].s.pool = param->pool;
-			cos_tbl->cos_entry[i].s.headroom = 0;
-			cos_tbl->cos_entry[i].s.valid = 1;
-			cos_tbl->cos_entry[i].s.drop_policy = drop_policy;
-			odp_atomic_init_u32(&cos_tbl->cos_entry[i]
-					    .s.num_rule, 0);
-			UNLOCK(&cos_tbl->cos_entry[i].s.lock);
+
+			if (param->num_queue > 1) {
+				odp_queue_param_init(&cos->s.queue_param);
+				cos->s.queue_group = true;
+				cos->s.hash_proto = param->hash_proto;
+				cos->s.num_queue = param->num_queue;
+				cos->s.queue_param.type =
+					param->queue_type;
+				cos->s.queue_param.sched =
+					param->sched_param;
+			}
+
+			cos->s.queue = queue;
+			cos->s.pool = param->pool;
+			cos->s.headroom = 0;
+			cos->s.valid = 1;
+			cos->s.drop_policy = drop_policy;
+			odp_atomic_init_u32(&cos->s.num_rule, 0);
+			cos->s.index = i;
+			UNLOCK(&cos->s.lock);
 			return _odp_cast_scalar(odp_cos_t, i);
 		}
-		UNLOCK(&cos_tbl->cos_entry[i].s.lock);
+		UNLOCK(&cos->s.lock);
 	}
 
 	ODP_ERR("ODP_COS_MAX_ENTRY reached");
@@ -830,6 +877,10 @@  int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base,
 			odp_packet_hdr_t *pkt_hdr)
 {
 	cos_t *cos;
+	uint32_t tbl_index;
+	uint32_t hash;
+	odp_queue_t queue;
+	char queue_name[ODP_QUEUE_NAME_LEN];
 
 	packet_parse_reset(pkt_hdr);
 	packet_set_len(pkt_hdr, pkt_len);
@@ -846,11 +897,91 @@  int cls_classify_packet(pktio_entry_t *entry, const uint8_t *base,
 
 	*pool = cos->s.pool;
 	pkt_hdr->p.input_flags.dst_queue = 1;
-	pkt_hdr->dst_queue = cos->s.queue->s.handle;
 
+	if (!cos->s.queue_group) {
+		pkt_hdr->dst_queue = cos->s.queue->s.handle;
+		return 0;
+	}
+
+	hash = packet_rss_hash(pkt_hdr, cos->s.hash_proto, base);
+	hash = hash & 0x1F;
+	hash = hash & cos->s.num_queue;
+	tbl_index = (cos->s.index * ODP_COS_QUEUE_MAX) + hash;
+
+	if (!queue_grp_tbl->s.queue[tbl_index]) {
+		LOCK(&cos->s.lock);
+		if (!queue_grp_tbl->s.queue[tbl_index])
+			UNLOCK(&cos->s.lock);
+
+		sprintf(queue_name, "%s%d", "queue_group", tbl_index);
+		queue = odp_queue_create(queue_name, &cos->s.queue_param);
+		queue_grp_tbl->s.queue[tbl_index] = queue;
+		UNLOCK(&cos->s.lock);
+	}
+
+	pkt_hdr->dst_queue = queue_grp_tbl->s.queue[tbl_index];
 	return 0;
 }
 
+uint32_t packet_rss_hash(odp_packet_hdr_t *pkt_hdr,
+			 odp_hash_proto_t hash_proto,
+			 const uint8_t *base)
+{
+	thash_tuple_t tuple;
+	const _odp_ipv4hdr_t *ipv4;
+	const _odp_udphdr_t *udp;
+	const _odp_tcphdr_t *tcp;
+	uint32_t hash;
+
+	hash = 0;
+	switch (hash_proto) {
+	case  ODP_HASH_PROTO_IPV4_UDP: /** IPV4 + UDP */
+		ipv4 = (const _odp_ipv4hdr_t *)base + pkt_hdr->p.l3_offset;
+		udp = (const _odp_udphdr_t *)base + pkt_hdr->p.l4_offset;
+		tuple.v4.src_addr = ipv4->src_addr;
+		tuple.v4.dst_addr = ipv4->dst_addr;
+		tuple.v4.sport = udp->src_port;
+		tuple.v4.dport = udp->dst_port;
+		hash = thash_softrss((uint32_t *)&tuple,
+				     ODP_THASH_V4_L4_LEN, default_rss);
+	break;
+
+	case ODP_HASH_PROTO_IPV4_TCP: /** IPV4 + TCP */
+		ipv4 = (const _odp_ipv4hdr_t *)base + pkt_hdr->p.l3_offset;
+		tcp = (const _odp_tcphdr_t *)base + pkt_hdr->p.l4_offset;
+		tuple.v4.src_addr = ipv4->src_addr;
+		tuple.v4.dst_addr = ipv4->dst_addr;
+		tuple.v4.sport = tcp->src_port;
+		tuple.v4.dport = tcp->dst_port;
+		hash = thash_softrss((uint32_t *)&tuple,
+				     ODP_THASH_V4_L4_LEN, default_rss);
+
+	break;
+	case ODP_HASH_PROTO_IPV4: /** IPV4 */
+		ipv4 = (const _odp_ipv4hdr_t *)base + pkt_hdr->p.l3_offset;
+		tuple.v4.src_addr = ipv4->src_addr;
+		tuple.v4.dst_addr = ipv4->dst_addr;
+		hash = thash_softrss((uint32_t *)&tuple,
+				     ODP_THASH_V4_L3_LEN, default_rss);
+
+	break;
+
+	case ODP_HASH_PROTO_IPV6_UDP: /** IPV6 + UDP */
+	break;
+
+	case ODP_HASH_PROTO_IPV6_TCP: /** IPV6 + TCP */
+	break;
+
+	case ODP_HASH_PROTO_IPV6: /** IPV6 */
+	break;
+
+	default:
+	break;
+	}
+
+	return hash;
+}
+
 cos_t *match_qos_l3_cos(pmr_l3_cos_t *l3_cos, const uint8_t *pkt_addr,
 			odp_packet_hdr_t *hdr)
 {