diff mbox series

[API-NEXT,3/5] queue: adding a notification queue type

Message ID 1484670521-28503-4-git-send-email-sergei.trofimov@arm.com
State New
Headers show
Series power management api | expand

Commit Message

Sergei Trofimov Jan. 17, 2017, 4:28 p.m. UTC
This adds ODP_QUEUE_TYPE_NOTIF along side the existing plain and sched
queues. Notification queues are similar to plain queues, in that they
are dequeued manually. However it is also possible to wait on dequeue
(via the new odp_queue_deq_wait) method, which will block on an empty
queue in such a way that (if supported by the platform) the execution
will yield allowing the CPU to either run another thread or idle.

Signed-off-by: Sergei Trofimov <sergei.trofimov@arm.com>

---
 include/odp/api/spec/queue.h                       | 23 +++++++++-
 .../linux-generic/include/odp_packet_io_queue.h    |  4 +-
 .../linux-generic/include/odp_queue_internal.h     |  8 +++-
 platform/linux-generic/odp_packet_io.c             |  6 +--
 platform/linux-generic/odp_queue.c                 | 49 ++++++++++++++++++++--
 5 files changed, 79 insertions(+), 11 deletions(-)

-- 
1.9.1
diff mbox series

Patch

diff --git a/include/odp/api/spec/queue.h b/include/odp/api/spec/queue.h
index ed9284f..5af676b 100644
--- a/include/odp/api/spec/queue.h
+++ b/include/odp/api/spec/queue.h
@@ -62,7 +62,15 @@  typedef enum odp_queue_type_t {
 	  * Scheduled queues are connected to the scheduler. Application must
 	  * not dequeue events directly from these queues but use the scheduler
 	  * instead. */
-	ODP_QUEUE_TYPE_SCHED
+	ODP_QUEUE_TYPE_SCHED,
+
+	/** Notification queue
+	 *
+	 * Behave similar to plain queues, however, threads can execute a waiting
+	 * dequeue, causing them to block if the queue is empty.
+	 *
+	 */
+	ODP_QUEUE_TYPE_NOTIF
 } odp_queue_type_t;
 
 /**
@@ -309,6 +317,19 @@  odp_event_t odp_queue_deq(odp_queue_t queue);
 int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num);
 
 /**
+ * Queue dequeue
+ *
+ * Dequeues next event from head of the queue, blocking if the queue
+ * is empty. Must be used only with ODP_QUEUE_TYPE_NOTIF type queues.
+ *
+ * @param queue   Queue handle
+ *
+ * @return Event handle
+ * @retval ODP_EVENT_INVALID on failure (e.g. not a notification queue)
+ */
+odp_event_t odp_queue_deq_wait(odp_queue_t handle);
+
+/**
  * Queue type
  *
  * @param queue   Queue handle
diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h
index d1d4b22..a24adfc 100644
--- a/platform/linux-generic/include/odp_packet_io_queue.h
+++ b/platform/linux-generic/include/odp_packet_io_queue.h
@@ -29,14 +29,14 @@  ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX,
 		  "ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
 
 int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue, int wait);
 
 int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 
 
 int pktout_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue, int wait);
 
 int pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		     int num);
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index a10628e..fb78124 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -26,9 +26,12 @@  extern "C" {
 #include <odp/api/packet_io.h>
 #include <odp/api/align.h>
 #include <odp/api/hints.h>
+
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
 
+#include <semaphore.h>
+
 #define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
 
 #define QUEUE_STATUS_FREE         0
@@ -42,7 +45,7 @@  extern "C" {
 union queue_entry_u;
 
 typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
-typedef	odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
+typedef	odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *, int);
 
 typedef int (*enq_multi_func_t)(union queue_entry_u *,
 				odp_buffer_hdr_t **, int);
@@ -77,6 +80,7 @@  struct queue_entry_s {
 	char              name[ODP_QUEUE_NAME_LEN];
 
 	int          	  depth;
+	sem_t   	  notif_sem;
 };
 
 union queue_entry_u {
@@ -88,7 +92,7 @@  union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait);
 
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index 98460a5..8063806 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -590,7 +590,7 @@  int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
 	return (nbr == len ? 0 : -1);
 }
 
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED)
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED, int wait ODP_UNUSED)
 {
 	ODP_ABORT("attempted dequeue from a pktout queue");
 	return NULL;
@@ -625,13 +625,13 @@  int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
 	return -1;
 }
 
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry, int wait ODP_UNUSED)
 {
 	odp_buffer_hdr_t *buf_hdr;
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	int pkts;
 
-	buf_hdr = queue_deq(qentry);
+	buf_hdr = queue_deq(qentry, 0);
 	if (buf_hdr != NULL)
 		return buf_hdr;
 
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index d364801..82fe794 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -34,6 +34,7 @@ 
 #include <string.h>
 #include <inttypes.h>
 #include <unistd.h>
+#include <semaphore.h>
 
 typedef struct queue_table_t {
 	queue_entry_t  queue[ODP_CONFIG_QUEUES];
@@ -88,6 +89,10 @@  static int queue_init(queue_entry_t *queue, const char *name,
 						    0);
 		}
 	}
+
+	if (param->type == ODP_QUEUE_TYPE_NOTIF)
+		sem_init(&queue->s.notif_sem, 0, 0);
+
 	queue->s.type = queue->s.param.type;
 
 	queue->s.enqueue = queue_enq;
@@ -452,8 +457,14 @@  static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		queue->s.status = QUEUE_STATUS_SCHED;
 		sched = 1; /* retval: schedule queue */
 	}
+
 	UNLOCK(&queue->s.lock);
 
+	if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+		for (i = 0; i < num; i++)
+			sem_post(&queue->s.notif_sem);
+	}
+
 	/* Add queue to scheduling */
 	if (sched && sched_fn->sched_queue(queue->s.index))
 		ODP_ABORT("schedule_queue failed\n");
@@ -571,6 +582,12 @@  static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		queue->s.depth -= i;
 	}
 
+	if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+		for (j = 0; j < i; j++) {
+			sem_trywait(&queue->s.notif_sem);
+		}
+	}
+
 	/* Queue is empty */
 	if (hdr == NULL)
 		queue->s.tail = NULL;
@@ -588,11 +605,20 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 	return deq_multi(queue, buf_hdr, num);
 }
 
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait)
 {
 	odp_buffer_hdr_t *buf_hdr = NULL;
 	int ret;
 
+	if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+		if (wait) {
+			sem_wait(&queue->s.notif_sem);
+		} else {
+			if (sem_trywait(&queue->s.notif_sem))
+				return NULL;
+		}
+	}
+
 	ret = deq_multi(queue, &buf_hdr, 1);
 
 	if (ret == 1)
@@ -620,14 +646,31 @@  int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
 	return ret;
 }
 
-
 odp_event_t odp_queue_deq(odp_queue_t handle)
 {
 	queue_entry_t *queue;
 	odp_buffer_hdr_t *buf_hdr;
 
 	queue   = queue_to_qentry(handle);
-	buf_hdr = queue->s.dequeue(queue);
+	buf_hdr = queue->s.dequeue(queue, 0);
+
+	if (buf_hdr)
+		return odp_buffer_to_event(buf_hdr->handle.handle);
+
+	return ODP_EVENT_INVALID;
+}
+
+odp_event_t odp_queue_deq_wait(odp_queue_t handle)
+{
+	queue_entry_t *queue;
+	odp_buffer_hdr_t *buf_hdr;
+
+	queue   = queue_to_qentry(handle);
+
+	if (queue->s.type != ODP_QUEUE_TYPE_NOTIF)
+		return ODP_EVENT_INVALID;
+
+	buf_hdr = queue->s.dequeue(queue, 1);
 
 	if (buf_hdr)
 		return odp_buffer_to_event(buf_hdr->handle.handle);