diff mbox

[API-NEXT,PATCHv2,3/6] linux-generic: queue: add ordered_queue_enq() routine - part 1

Message ID 1447015330-24420-4-git-send-email-bill.fischofer@linaro.org
State Superseded
Headers show

Commit Message

Bill Fischofer Nov. 8, 2015, 8:42 p.m. UTC
Add the new ordered_queue_enq() internal routine. This is done in two
parts to make the diffs easier to follow. Part 1 adds the new routine
while Part 2 replaces queue_enq() to use it.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |   2 +
 platform/linux-generic/odp_queue.c                 | 118 +++++++++++++++++++++
 2 files changed, 120 insertions(+)

Comments

Bill Fischofer Nov. 9, 2015, 10:05 a.m. UTC | #1
It is necessary, and illustrates one of the complexities of ordered
queuing.  Consider the following sequence with QOrigin having current order
4:

Thread A (Order 4)                Thread B (Order 5)
                                                 enq (Q, b1)

   enq(Q, a1)
   release_order()
                                                  enq(Q b2)

Thread B enqueues first but since the QOrigin is at order 4 it's element
goes onto QOrigin's reorder queue.  Thread A now enqueues and resolves
order 4.  Thread B now tries to enqueue another element onto Q and this
time it has the current order, however element b2 needs to follow element
b1, which is on the reorder queue.  The reorder_enq() routine sorts
elements into the reorder queue so that b2 correctly follows any other
elements sharing the same order before they are all moved to the target
queue.

On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin <nmorey@kalray.eu
> wrote:


>

>

> On 11/08/2015 09:42 PM, Bill Fischofer wrote:

> > Add the new ordered_queue_enq() internal routine. This is done in two

> > parts to make the diffs easier to follow. Part 1 adds the new routine

> > while Part 2 replaces queue_enq() to use it.

> >

> > Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>

> > ---

> >  .../linux-generic/include/odp_queue_internal.h     |   2 +

> >  platform/linux-generic/odp_queue.c                 | 118

> +++++++++++++++++++++

> >  2 files changed, 120 insertions(+)

> >

> > diff --git a/platform/linux-generic/include/odp_queue_internal.h

> b/platform/linux-generic/include/odp_queue_internal.h

> > index 32e3288..1bd365b 100644

> > --- a/platform/linux-generic/include/odp_queue_internal.h

> > +++ b/platform/linux-generic/include/odp_queue_internal.h

> > @@ -96,6 +96,8 @@ union queue_entry_u {

> >  queue_entry_t *get_qentry(uint32_t queue_id);

> >

> >  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int

> sustain);

> > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,

> > +                   int systain, queue_entry_t *origin_qe, uint64_t

> order);

> >  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);

> >

> >  int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);

> > diff --git a/platform/linux-generic/odp_queue.c

> b/platform/linux-generic/odp_queue.c

> > index bcc8190..a545927 100644

> > --- a/platform/linux-generic/odp_queue.c

> > +++ b/platform/linux-generic/odp_queue.c

> > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue,

> odp_buffer_hdr_t *buf_hdr, int sustain)

> >       return 0;

> >  }

> >

> > +int ordered_queue_enq(queue_entry_t *queue,

> > +                   odp_buffer_hdr_t *buf_hdr,

> > +                   int sustain,

> > +                   queue_entry_t *origin_qe,

> > +                   uint64_t order)

> > +{

> > +     odp_buffer_hdr_t *reorder_buf;

> > +     odp_buffer_hdr_t *next_buf;

> > +     odp_buffer_hdr_t *reorder_prev;

> > +     odp_buffer_hdr_t *placeholder_buf = NULL;

> > +     int               release_count, placeholder_count;

> > +     int               sched = 0;

> > +

> > +     /* Need two locks for enq operations from ordered queues */

> > +     get_qe_locks(origin_qe, queue);

> > +

> > +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||

> > +                      queue->s.status < QUEUE_STATUS_READY)) {

> > +             free_qe_locks(queue, origin_qe);

> > +             ODP_ERR("Bad queue status\n");

> > +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",

> > +                     queue->s.name, origin_qe->s.name, buf_hdr);

> > +             return -1;

> > +     }

> > +

> > +     /* Remember that enq was called for this order */

> > +     sched_enq_called();

> > +

> > +     /* We can only complete this enq if we're in order */

> > +     if (order > origin_qe->s.order_out) {

> > +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);

> > +

> > +             /* This enq can't complete until order is restored, so

> > +              * we're done here.

> > +              */

> > +             free_qe_locks(queue, origin_qe);

> > +             return 0;

> > +     }

> > +

> > +     /* Resolve order if requested */

> > +     if (!sustain) {

> > +             order_release(origin_qe, 1);

> > +             sched_order_resolved(buf_hdr);

> > +     }

> > +

> > +     /* Update queue status */

> > +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {

> > +             queue->s.status = QUEUE_STATUS_SCHED;

> > +             sched = 1;

> > +     }

> > +

> > +     /* We're in order, however the reorder queue may have other buffers

> > +      * sharing this order on it and this buffer must not be enqueued

> ahead

> > +      * of them. If the reorder queue is empty we can short-cut and

> > +      * simply add to the target queue directly.

> > +      */

> > +

> > +     if (!origin_qe->s.reorder_head) {

> > +             queue_add_chain(queue, buf_hdr);

> > +             free_qe_locks(queue, origin_qe);

> > +

> > +             /* Add queue to scheduling */

> > +             if (sched && schedule_queue(queue))

> > +                     ODP_ABORT("schedule_queue failed\n");

> > +             return 0;

> > +     }

> > +

> > +     /* The reorder_queue is non-empty, so sort this buffer into it.

> Note

> > +      * that we force the sustain bit on here because we'll be removing

> > +      * this immediately and we already accounted for this order

> earlier.

> > +      */

> > +     reorder_enq(queue, order, origin_qe, buf_hdr, 1);

> > +

> Do we really need this call?

> When we reach this point in the code, buf_hdr is next inline to be queued,

> and what we want is pull out of the reorder queue the buffer that might be

> right after this one.

> In this case we push buf_hdr to the reorder queue just to pull it back

> afterward. It works but I'm not sure this is completely necessary.

> > +     /* Pick up this element, and all others resolved by this enq,

> > +      * and add them to the target queue.

> > +      */

> > +     reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,

> > +                 &placeholder_buf, &release_count, &placeholder_count);

> > +

> > +     /* Move the list from the reorder queue to the target queue */

> > +     if (queue->s.head)

> > +             queue->s.tail->next = origin_qe->s.reorder_head;

> > +     else

> > +             queue->s.head       = origin_qe->s.reorder_head;

> > +     queue->s.tail               = reorder_prev;

> > +     origin_qe->s.reorder_head   = reorder_prev->next;

> > +     reorder_prev->next          = NULL;

> > +

> > +     /* Reflect resolved orders in the output sequence */

> > +     order_release(origin_qe, release_count + placeholder_count);

> > +

> > +     /* Now handle any resolved orders for events destined for other

> > +      * queues, appending placeholder bufs as needed.

> > +      */

> > +     if (origin_qe != queue)

> > +             UNLOCK(&queue->s.lock);

> > +

> > +     /* Add queue to scheduling */

> > +     if (sched && schedule_queue(queue))

> > +             ODP_ABORT("schedule_queue failed\n");

> > +

> > +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,

> > +                      1, 0);

> > +     UNLOCK(&origin_qe->s.lock);

> > +

> > +     if (reorder_buf)

> > +             queue_enq_internal(reorder_buf);

> > +

> > +     /* Free all placeholder bufs that are now released */

> > +     while (placeholder_buf) {

> > +             next_buf = placeholder_buf->next;

> > +             odp_buffer_free(placeholder_buf->handle.handle);

> > +             placeholder_buf = next_buf;

> > +     }

> > +

> > +     return 0;

> > +}

> > +

> >  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],

> >                   int num, int sustain)

> >  {

>

>
Bill Fischofer Nov. 9, 2015, 10:15 a.m. UTC | #2
There's a surprising amount of subtlety involved in ordered queues, which
is one of the reasons they're such a powerful primitive to have.

On Mon, Nov 9, 2015 at 4:10 AM, Nicolas Morey-Chaisemartin <nmorey@kalray.eu
> wrote:


> You're right. In my mind order was linked to the buffer,not the thread

> meaning A would have enq b1 too.

>

>

> On 11/09/2015 11:05 AM, Bill Fischofer wrote:

>

> It is necessary, and illustrates one of the complexities of ordered

> queuing.  Consider the following sequence with QOrigin having current order

> 4:

>

> Thread A (Order 4)                Thread B (Order 5)

>                                                  enq (Q, b1)

>

>    enq(Q, a1)

>    release_order()

>                                                   enq(Q b2)

>

> Thread B enqueues first but since the QOrigin is at order 4 it's element

> goes onto QOrigin's reorder queue.  Thread A now enqueues and resolves

> order 4.  Thread B now tries to enqueue another element onto Q and this

> time it has the current order, however element b2 needs to follow element

> b1, which is on the reorder queue.  The reorder_enq() routine sorts

> elements into the reorder queue so that b2 correctly follows any other

> elements sharing the same order before they are all moved to the target

> queue.

>

> On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin <

> <nmorey@kalray.eu>nmorey@kalray.eu> wrote:

>

>>

>>

>> On 11/08/2015 09:42 PM, Bill Fischofer wrote:

>> > Add the new ordered_queue_enq() internal routine. This is done in two

>> > parts to make the diffs easier to follow. Part 1 adds the new routine

>> > while Part 2 replaces queue_enq() to use it.

>> >

>> > Signed-off-by: Bill Fischofer < <bill.fischofer@linaro.org>

>> bill.fischofer@linaro.org>

>> > ---

>> >  .../linux-generic/include/odp_queue_internal.h     |   2 +

>> >  platform/linux-generic/odp_queue.c                 | 118

>> +++++++++++++++++++++

>> >  2 files changed, 120 insertions(+)

>> >

>> > diff --git a/platform/linux-generic/include/odp_queue_internal.h

>> b/platform/linux-generic/include/odp_queue_internal.h

>> > index 32e3288..1bd365b 100644

>> > --- a/platform/linux-generic/include/odp_queue_internal.h

>> > +++ b/platform/linux-generic/include/odp_queue_internal.h

>> > @@ -96,6 +96,8 @@ union queue_entry_u {

>> >  queue_entry_t *get_qentry(uint32_t queue_id);

>> >

>> >  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int

>> sustain);

>> > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,

>> > +                   int systain, queue_entry_t *origin_qe, uint64_t

>> order);

>> >  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);

>> >

>> >  int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);

>> > diff --git a/platform/linux-generic/odp_queue.c

>> b/platform/linux-generic/odp_queue.c

>> > index bcc8190..a545927 100644

>> > --- a/platform/linux-generic/odp_queue.c

>> > +++ b/platform/linux-generic/odp_queue.c

>> > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue,

>> odp_buffer_hdr_t *buf_hdr, int sustain)

>> >       return 0;

>> >  }

>> >

>> > +int ordered_queue_enq(queue_entry_t *queue,

>> > +                   odp_buffer_hdr_t *buf_hdr,

>> > +                   int sustain,

>> > +                   queue_entry_t *origin_qe,

>> > +                   uint64_t order)

>> > +{

>> > +     odp_buffer_hdr_t *reorder_buf;

>> > +     odp_buffer_hdr_t *next_buf;

>> > +     odp_buffer_hdr_t *reorder_prev;

>> > +     odp_buffer_hdr_t *placeholder_buf = NULL;

>> > +     int               release_count, placeholder_count;

>> > +     int               sched = 0;

>> > +

>> > +     /* Need two locks for enq operations from ordered queues */

>> > +     get_qe_locks(origin_qe, queue);

>> > +

>> > +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||

>> > +                      queue->s.status < QUEUE_STATUS_READY)) {

>> > +             free_qe_locks(queue, origin_qe);

>> > +             ODP_ERR("Bad queue status\n");

>> > +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",

>> > +                     queue->s.name, origin_qe->s.name, buf_hdr);

>> > +             return -1;

>> > +     }

>> > +

>> > +     /* Remember that enq was called for this order */

>> > +     sched_enq_called();

>> > +

>> > +     /* We can only complete this enq if we're in order */

>> > +     if (order > origin_qe->s.order_out) {

>> > +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);

>> > +

>> > +             /* This enq can't complete until order is restored, so

>> > +              * we're done here.

>> > +              */

>> > +             free_qe_locks(queue, origin_qe);

>> > +             return 0;

>> > +     }

>> > +

>> > +     /* Resolve order if requested */

>> > +     if (!sustain) {

>> > +             order_release(origin_qe, 1);

>> > +             sched_order_resolved(buf_hdr);

>> > +     }

>> > +

>> > +     /* Update queue status */

>> > +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {

>> > +             queue->s.status = QUEUE_STATUS_SCHED;

>> > +             sched = 1;

>> > +     }

>> > +

>> > +     /* We're in order, however the reorder queue may have other

>> buffers

>> > +      * sharing this order on it and this buffer must not be enqueued

>> ahead

>> > +      * of them. If the reorder queue is empty we can short-cut and

>> > +      * simply add to the target queue directly.

>> > +      */

>> > +

>> > +     if (!origin_qe->s.reorder_head) {

>> > +             queue_add_chain(queue, buf_hdr);

>> > +             free_qe_locks(queue, origin_qe);

>> > +

>> > +             /* Add queue to scheduling */

>> > +             if (sched && schedule_queue(queue))

>> > +                     ODP_ABORT("schedule_queue failed\n");

>> > +             return 0;

>> > +     }

>> > +

>> > +     /* The reorder_queue is non-empty, so sort this buffer into it.

>> Note

>> > +      * that we force the sustain bit on here because we'll be removing

>> > +      * this immediately and we already accounted for this order

>> earlier.

>> > +      */

>> > +     reorder_enq(queue, order, origin_qe, buf_hdr, 1);

>> > +

>> Do we really need this call?

>> When we reach this point in the code, buf_hdr is next inline to be

>> queued, and what we want is pull out of the reorder queue the buffer that

>> might be right after this one.

>> In this case we push buf_hdr to the reorder queue just to pull it back

>> afterward. It works but I'm not sure this is completely necessary.

>> > +     /* Pick up this element, and all others resolved by this enq,

>> > +      * and add them to the target queue.

>> > +      */

>> > +     reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,

>> > +                 &placeholder_buf, &release_count, &placeholder_count);

>> > +

>> > +     /* Move the list from the reorder queue to the target queue */

>> > +     if (queue->s.head)

>> > +             queue->s.tail->next = origin_qe->s.reorder_head;

>> > +     else

>> > +             queue->s.head       = origin_qe->s.reorder_head;

>> > +     queue->s.tail               = reorder_prev;

>> > +     origin_qe->s.reorder_head   = reorder_prev->next;

>> > +     reorder_prev->next          = NULL;

>> > +

>> > +     /* Reflect resolved orders in the output sequence */

>> > +     order_release(origin_qe, release_count + placeholder_count);

>> > +

>> > +     /* Now handle any resolved orders for events destined for other

>> > +      * queues, appending placeholder bufs as needed.

>> > +      */

>> > +     if (origin_qe != queue)

>> > +             UNLOCK(&queue->s.lock);

>> > +

>> > +     /* Add queue to scheduling */

>> > +     if (sched && schedule_queue(queue))

>> > +             ODP_ABORT("schedule_queue failed\n");

>> > +

>> > +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,

>> > +                      1, 0);

>> > +     UNLOCK(&origin_qe->s.lock);

>> > +

>> > +     if (reorder_buf)

>> > +             queue_enq_internal(reorder_buf);

>> > +

>> > +     /* Free all placeholder bufs that are now released */

>> > +     while (placeholder_buf) {

>> > +             next_buf = placeholder_buf->next;

>> > +             odp_buffer_free(placeholder_buf->handle.handle);

>> > +             placeholder_buf = next_buf;

>> > +     }

>> > +

>> > +     return 0;

>> > +}

>> > +

>> >  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],

>> >                   int num, int sustain)

>> >  {

>>

>>

>

>
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 32e3288..1bd365b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -96,6 +96,8 @@  union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
+int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+		      int systain, queue_entry_t *origin_qe, uint64_t order);
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 
 int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index bcc8190..a545927 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -529,6 +529,124 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 	return 0;
 }
 
+int ordered_queue_enq(queue_entry_t *queue,
+		      odp_buffer_hdr_t *buf_hdr,
+		      int sustain,
+		      queue_entry_t *origin_qe,
+		      uint64_t order)
+{
+	odp_buffer_hdr_t *reorder_buf;
+	odp_buffer_hdr_t *next_buf;
+	odp_buffer_hdr_t *reorder_prev;
+	odp_buffer_hdr_t *placeholder_buf = NULL;
+	int               release_count, placeholder_count;
+	int               sched = 0;
+
+	/* Need two locks for enq operations from ordered queues */
+	get_qe_locks(origin_qe, queue);
+
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
+			 queue->s.status < QUEUE_STATUS_READY)) {
+		free_qe_locks(queue, origin_qe);
+		ODP_ERR("Bad queue status\n");
+		ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+			queue->s.name, origin_qe->s.name, buf_hdr);
+		return -1;
+	}
+
+	/* Remember that enq was called for this order */
+	sched_enq_called();
+
+	/* We can only complete this enq if we're in order */
+	if (order > origin_qe->s.order_out) {
+		reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+		/* This enq can't complete until order is restored, so
+		 * we're done here.
+		 */
+		free_qe_locks(queue, origin_qe);
+		return 0;
+	}
+
+	/* Resolve order if requested */
+	if (!sustain) {
+		order_release(origin_qe, 1);
+		sched_order_resolved(buf_hdr);
+	}
+
+	/* Update queue status */
+	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+		queue->s.status = QUEUE_STATUS_SCHED;
+		sched = 1;
+	}
+
+	/* We're in order, however the reorder queue may have other buffers
+	 * sharing this order on it and this buffer must not be enqueued ahead
+	 * of them. If the reorder queue is empty we can short-cut and
+	 * simply add to the target queue directly.
+	 */
+
+	if (!origin_qe->s.reorder_head) {
+		queue_add_chain(queue, buf_hdr);
+		free_qe_locks(queue, origin_qe);
+
+		/* Add queue to scheduling */
+		if (sched && schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
+		return 0;
+	}
+
+	/* The reorder_queue is non-empty, so sort this buffer into it.  Note
+	 * that we force the sustain bit on here because we'll be removing
+	 * this immediately and we already accounted for this order earlier.
+	 */
+	reorder_enq(queue, order, origin_qe, buf_hdr, 1);
+
+	/* Pick up this element, and all others resolved by this enq,
+	 * and add them to the target queue.
+	 */
+	reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
+		    &placeholder_buf, &release_count, &placeholder_count);
+
+	/* Move the list from the reorder queue to the target queue */
+	if (queue->s.head)
+		queue->s.tail->next = origin_qe->s.reorder_head;
+	else
+		queue->s.head       = origin_qe->s.reorder_head;
+	queue->s.tail               = reorder_prev;
+	origin_qe->s.reorder_head   = reorder_prev->next;
+	reorder_prev->next          = NULL;
+
+	/* Reflect resolved orders in the output sequence */
+	order_release(origin_qe, release_count + placeholder_count);
+
+	/* Now handle any resolved orders for events destined for other
+	 * queues, appending placeholder bufs as needed.
+	 */
+	if (origin_qe != queue)
+		UNLOCK(&queue->s.lock);
+
+	/* Add queue to scheduling */
+	if (sched && schedule_queue(queue))
+		ODP_ABORT("schedule_queue failed\n");
+
+	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
+			 1, 0);
+	UNLOCK(&origin_qe->s.lock);
+
+	if (reorder_buf)
+		queue_enq_internal(reorder_buf);
+
+	/* Free all placeholder bufs that are now released */
+	while (placeholder_buf) {
+		next_buf = placeholder_buf->next;
+		odp_buffer_free(placeholder_buf->handle.handle);
+		placeholder_buf = next_buf;
+	}
+
+	return 0;
+}
+
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		    int num, int sustain)
 {