From patchwork Sat Sep 12 04:10:20 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bill Fischofer X-Patchwork-Id: 53510 Return-Path: X-Original-To: linaro@patches.linaro.org Delivered-To: linaro@patches.linaro.org Received: from mail-lb0-f198.google.com (mail-lb0-f198.google.com [209.85.217.198]) by patches.linaro.org (Postfix) with ESMTPS id B148122B26 for ; Sat, 12 Sep 2015 04:13:37 +0000 (UTC) Received: by lbcao8 with SMTP id ao8sf30547047lbc.1 for ; Fri, 11 Sep 2015 21:13:36 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:delivered-to:delivered-to:from:to:date :message-id:in-reply-to:references:subject:precedence:list-id :list-unsubscribe:list-archive:list-post:list-help:list-subscribe :mime-version:content-type:content-transfer-encoding:errors-to :sender:x-original-sender:x-original-authentication-results :mailing-list; bh=ULxY90kIVtcDCgfaY1qu34eFC0OKxkflGVXpIhyo9MY=; b=I4hK9KsgdIjKiBjunZyOxe+G3WDF8TqM5/fbZl5gAzhCF4U6dzvrbmM7hMP+HFsH7O z0ngPMb9FV/eiQaKhlWBn8U3L9T8shHE3TQOn5XEjlVbOsg2JqsMFZ6QpklECBrvycxL eJVG72EXu0eR+vZkm2iCYmuXXA25SP9Gy8F9GGQrint6775ewZs70Aax7iwh7dlqD+/p hb+0+AJ4kSstrLfwSNz5C7hg21sq+bMgxY/c/jPDVDPGAh85pXAL6pIR5XQd5MRaT3Wg V+exx1jZ2P85EBeOEjPkFTaJHD+p2LBbPOafHrnU0AWPkW/UE/EnLlKjMSkqWJBVnQIc Ebjw== X-Gm-Message-State: ALoCoQkEes14C7rjhwq37g7GmGDy2MV4zZaxpiK492o5Mcqm1DtTswKCc6+GLV8jA67rkSdcr1F0 X-Received: by 10.180.100.71 with SMTP id ew7mr426723wib.0.1442031216270; Fri, 11 Sep 2015 21:13:36 -0700 (PDT) X-BeenThere: patchwork-forward@linaro.org Received: by 10.152.6.37 with SMTP id x5ls436174lax.50.gmail; Fri, 11 Sep 2015 21:13:36 -0700 (PDT) X-Received: by 10.152.204.9 with SMTP id ku9mr2025051lac.51.1442031216082; Fri, 11 Sep 2015 21:13:36 -0700 (PDT) Received: from mail-lb0-f182.google.com (mail-lb0-f182.google.com. [209.85.217.182]) by mx.google.com with ESMTPS id dw9si565051lbc.71.2015.09.11.21.13.35 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 11 Sep 2015 21:13:35 -0700 (PDT) Received-SPF: pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.217.182 as permitted sender) client-ip=209.85.217.182; Received: by lbbmp1 with SMTP id mp1so48289425lbb.1 for ; Fri, 11 Sep 2015 21:13:35 -0700 (PDT) X-Received: by 10.112.166.106 with SMTP id zf10mr2060693lbb.36.1442031215604; Fri, 11 Sep 2015 21:13:35 -0700 (PDT) X-Forwarded-To: patchwork-forward@linaro.org X-Forwarded-For: patch@linaro.org patchwork-forward@linaro.org Delivered-To: patch@linaro.org Received: by 10.112.59.35 with SMTP id w3csp28116lbq; Fri, 11 Sep 2015 21:13:34 -0700 (PDT) X-Received: by 10.140.38.35 with SMTP id s32mr3693007qgs.10.1442031214370; Fri, 11 Sep 2015 21:13:34 -0700 (PDT) Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id 16si2947489qgx.108.2015.09.11.21.13.04; Fri, 11 Sep 2015 21:13:34 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Received: by lists.linaro.org (Postfix, from userid 109) id 2D2AE62C13; Sat, 12 Sep 2015 04:13:04 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL, URIBL_BLOCKED autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 3094E62C3B; Sat, 12 Sep 2015 04:10:47 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 847A462C29; Sat, 12 Sep 2015 04:10:40 +0000 (UTC) Received: from mail-oi0-f42.google.com (mail-oi0-f42.google.com [209.85.218.42]) by lists.linaro.org (Postfix) with ESMTPS id 86A0D62C29 for ; Sat, 12 Sep 2015 04:10:30 +0000 (UTC) Received: by oiev17 with SMTP id v17so53052924oie.1 for ; Fri, 11 Sep 2015 21:10:30 -0700 (PDT) X-Received: by 10.202.209.71 with SMTP id i68mr1799553oig.7.1442031029951; Fri, 11 Sep 2015 21:10:29 -0700 (PDT) Received: from Ubuntu15.localdomain (cpe-24-28-70-239.austin.res.rr.com. [24.28.70.239]) by smtp.gmail.com with ESMTPSA id m133sm1745821oif.8.2015.09.11.21.10.29 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Fri, 11 Sep 2015 21:10:29 -0700 (PDT) From: Bill Fischofer To: lng-odp@lists.linaro.org Date: Fri, 11 Sep 2015 23:10:20 -0500 Message-Id: <1442031020-1907-6-git-send-email-bill.fischofer@linaro.org> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1442031020-1907-1-git-send-email-bill.fischofer@linaro.org> References: <1442031020-1907-1-git-send-email-bill.fischofer@linaro.org> X-Topics: patch Subject: [lng-odp] [API-NEXT PATCHv2 5/5] api: schedule: revise definition of ordered locks X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: , List-Help: , List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" X-Removed-Original-Auth: Dkim didn't pass. X-Original-Sender: bill.fischofer@linaro.org X-Original-Authentication-Results: mx.google.com; spf=pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.217.182 as permitted sender) smtp.mailfrom=patch+caf_=patchwork-forward=linaro.org@linaro.org Mailing-list: list patchwork-forward@linaro.org; contact patchwork-forward+owners@linaro.org X-Google-Group-Id: 836684582541 Revise the definition of odp_schedule_order_lock() and odp_schedule_order_unlock(). These APIs now take as an argument the index value of the ordered lock within the current context to be locked or unlocked. Because the API is changed, this patch also updates the linux-generic implementation of these APIs as well as the scheduler validation test that uses them. Signed-off-by: Bill Fischofer --- include/odp/api/schedule.h | 44 ++++++++++----- .../include/odp/plat/schedule_types.h | 2 - .../linux-generic/include/odp_buffer_internal.h | 2 +- .../linux-generic/include/odp_queue_internal.h | 19 ++++--- platform/linux-generic/odp_queue.c | 66 ++++++++++++++-------- platform/linux-generic/odp_schedule.c | 15 +++-- test/validation/scheduler/scheduler.c | 61 +++++++++++++++----- 7 files changed, 141 insertions(+), 68 deletions(-) diff --git a/include/odp/api/schedule.h b/include/odp/api/schedule.h index 11f85ad..4e278b7 100644 --- a/include/odp/api/schedule.h +++ b/include/odp/api/schedule.h @@ -31,11 +31,6 @@ extern "C" { */ /** - * @typedef odp_schedule_order_lock_t - * Scheduler ordered context lock - */ - -/** * @def ODP_SCHED_WAIT * Wait infinitely */ @@ -299,15 +294,31 @@ int odp_schedule_group_thrmask(odp_schedule_group_t group, /** * Acquire ordered context lock * - * This call is valid only when holding an ordered synchronization context. The - * lock is used to protect a critical section that is executed within an - * ordered context. Threads enter the critical section in the order determined - * by the context (source queue). Lock ordering is automatically skipped for - * threads that release the context instead of calling the lock. - * - * @param lock Ordered context lock + * This call is valid only when holding an ordered synchronization context. + * Ordered locks are used to protect critical sections that are executed + * within an ordered context. Threads enter the critical section in the order + * determined by the context (source queue). Lock ordering is automatically + * skipped for threads that release the context instead of using the lock. + * + * The number of ordered locks available is set by the lock_count parameter of + * the schedule parameters passed to odp_queue_create(), which must be less + * than or equal to the ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE configuration + * option. If this routine is called outside of an ordered context or with a + * lock_index that exceeds the number of available ordered locks in this + * context results are undefined. The number of ordered locks associated with + * a given ordered queue may be queried by the odp_queue_lock_count() API. + * + * Each ordered lock may be used only once per ordered context. If events + * are to be processed with multiple ordered critical sections, each should + * be protected by its own ordered lock. This promotes maximum parallelism by + * allowing order to maintained on a more granular basis. If an ordered lock + * is used multiple times in the same ordered context results are undefined. + * + * @param lock_index Index of the ordered lock in the current context to be + * acquired. Must be in the range 0..odp_queue_lock_count() + * - 1 */ -void odp_schedule_order_lock(odp_schedule_order_lock_t *lock); +void odp_schedule_order_lock(uint32_t lock_index); /** * Release ordered context lock @@ -315,9 +326,12 @@ void odp_schedule_order_lock(odp_schedule_order_lock_t *lock); * This call is valid only when holding an ordered synchronization context. * Release a previously locked ordered context lock. * - * @param lock Ordered context lock + * @param lock_index Index of the ordered lock in the current context to be + * released. Results are undefined if the caller does not + * hold this lock. Must be in the range + * 0..odp_queue_lock_count() - 1 */ -void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock); +void odp_schedule_order_unlock(uint32_t lock_index); /** * @} diff --git a/platform/linux-generic/include/odp/plat/schedule_types.h b/platform/linux-generic/include/odp/plat/schedule_types.h index c48b652..2189cbb 100644 --- a/platform/linux-generic/include/odp/plat/schedule_types.h +++ b/platform/linux-generic/include/odp/plat/schedule_types.h @@ -51,8 +51,6 @@ typedef int odp_schedule_group_t; #define ODP_SCHED_GROUP_NAME_LEN 32 -typedef int odp_schedule_order_lock_t; - /** * @} */ diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 4cacca1..f063c4c 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -137,7 +137,7 @@ struct odp_buffer_hdr_t { queue_entry_t *origin_qe; /* ordered queue origin */ union { queue_entry_t *target_qe; /* ordered queue target */ - uint64_t sync; /* for ordered synchronization */ + uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; }; }; diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 19a0f07..cc7120d 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -83,8 +83,8 @@ struct queue_entry_s { uint64_t order_out; odp_buffer_hdr_t *reorder_head; odp_buffer_hdr_t *reorder_tail; - odp_atomic_u64_t sync_in; - odp_atomic_u64_t sync_out; + odp_atomic_u64_t sync_in[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; + odp_atomic_u64_t sync_out[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; }; union queue_entry_u { @@ -123,7 +123,7 @@ int queue_sched_atomic(odp_queue_t handle); int release_order(queue_entry_t *origin_qe, uint64_t order, odp_pool_t pool, int enq_called); void get_sched_order(queue_entry_t **origin_qe, uint64_t *order); -void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync); +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx); void sched_enq_called(void); void sched_order_resolved(odp_buffer_hdr_t *buf_hdr); @@ -194,12 +194,17 @@ static inline void reorder_enq(queue_entry_t *queue, static inline void order_release(queue_entry_t *origin_qe, int count) { - uint64_t sync = odp_atomic_load_u64(&origin_qe->s.sync_out); + uint64_t sync; + uint32_t i; origin_qe->s.order_out += count; - if (sync < origin_qe->s.order_out) - odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, - origin_qe->s.order_out - sync); + + for (i = 0; i < origin_qe->s.param.sched.lock_count; i++) { + sync = odp_atomic_load_u64(&origin_qe->s.sync_out[i]); + if (sync < origin_qe->s.order_out) + odp_atomic_fetch_add_u64(&origin_qe->s.sync_out[i], + origin_qe->s.order_out - sync); + } } static inline int reorder_deq(queue_entry_t *queue, diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index ac933da..383cf87 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -53,14 +53,17 @@ queue_entry_t *get_qentry(uint32_t queue_id) return &queue_tbl->queue[queue_id]; } -static void queue_init(queue_entry_t *queue, const char *name, - odp_queue_type_t type, odp_queue_param_t *param) +static int queue_init(queue_entry_t *queue, const char *name, + odp_queue_type_t type, odp_queue_param_t *param) { strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1); queue->s.type = type; if (param) { memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); + if (queue->s.param.sched.lock_count > + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) + return -1; } else { /* Defaults */ memset(&queue->s.param, 0, sizeof(odp_queue_param_t)); @@ -98,12 +101,13 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.pri_queue = ODP_QUEUE_INVALID; queue->s.cmd_ev = ODP_EVENT_INVALID; + return 0; } int odp_queue_init_global(void) { - uint32_t i; + uint32_t i, j; odp_shm_t shm; ODP_DBG("Queue init ... "); @@ -123,8 +127,10 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); - odp_atomic_init_u64(&queue->s.sync_in, 0); - odp_atomic_init_u64(&queue->s.sync_out, 0); + for (j = 0; j < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; j++) { + odp_atomic_init_u64(&queue->s.sync_in[j], 0); + odp_atomic_init_u64(&queue->s.sync_out[j], 0); + } queue->s.handle = queue_from_id(i); } @@ -201,6 +207,14 @@ odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle) return queue->s.param.sched.group; } +int odp_queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = queue_to_qentry(handle); + + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + (int)queue->s.param.sched.lock_count : -1; +} + odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, odp_queue_param_t *param) { @@ -216,7 +230,10 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, LOCK(&queue->s.lock); if (queue->s.status == QUEUE_STATUS_FREE) { - queue_init(queue, name, type, param); + if (queue_init(queue, name, type, param)) { + UNLOCK(&queue->s.lock); + return handle; + } if (type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN) @@ -577,6 +594,7 @@ int queue_enq_internal(odp_buffer_hdr_t *buf_hdr) odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { odp_buffer_hdr_t *buf_hdr; + uint32_t i; LOCK(&queue->s.lock); @@ -600,7 +618,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) if (queue_is_ordered(queue)) { buf_hdr->origin_qe = queue; buf_hdr->order = queue->s.order_in++; - buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (i = 0; i < queue->s.param.sched.lock_count; i++) { + buf_hdr->sync[i] = + odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]); + } buf_hdr->flags.sustain = 0; } else { buf_hdr->origin_qe = NULL; @@ -621,6 +642,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { odp_buffer_hdr_t *hdr; int i; + uint32_t j; LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { @@ -648,8 +670,11 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) if (queue_is_ordered(queue)) { buf_hdr[i]->origin_qe = queue; buf_hdr[i]->order = queue->s.order_in++; - buf_hdr[i]->sync = - odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (j = 0; j < queue->s.param.sched.lock_count; j++) { + buf_hdr[i]->sync[j] = + odp_atomic_fetch_inc_u64 + (&queue->s.sync_in[j]); + } buf_hdr[i]->flags.sustain = 0; } else { buf_hdr[i]->origin_qe = NULL; @@ -966,39 +991,32 @@ int release_order(queue_entry_t *origin_qe, uint64_t order, return 0; } -/* This routine is a no-op in linux-generic */ -int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED, - odp_queue_t queue ODP_UNUSED) -{ - return 0; -} - -void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_lock(uint32_t lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Wait until we are in order. Note that sync_out will be incremented * both by unlocks as well as order resolution, so we're OK if only * some events in the ordered flow need to lock. */ - while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out)) + while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index])) odp_spin(); } -void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_unlock(uint32_t lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Release the ordered lock */ - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out); + odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]); } diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index c6619e5..a08d995 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -86,7 +86,7 @@ typedef struct { queue_entry_t *qe; queue_entry_t *origin_qe; uint64_t order; - uint64_t sync; + uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; odp_pool_t pool; int enq_called; int num; @@ -440,6 +440,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], int i, j; int thr; int ret; + uint32_t k; if (sched_local.num) { ret = copy_events(out_ev, max_num); @@ -551,8 +552,12 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.origin_qe = qe; sched_local.order = sched_local.buf_hdr[0]->order; - sched_local.sync = - sched_local.buf_hdr[0]->sync; + for (k = 0; + k < qe->s.param.sched.lock_count; + k++) { + sched_local.sync[k] = + sched_local.buf_hdr[0]->sync[k]; + } sched_local.enq_called = 0; } else if (queue_is_atomic(qe)) { /* Hold queue during atomic access */ @@ -802,10 +807,10 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t *order) *order = sched_local.order; } -void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync) +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx) { *origin_qe = sched_local.origin_qe; - *sync = &sched_local.sync; + *sync = &sched_local.sync[ndx]; } void sched_order_resolved(odp_buffer_hdr_t *buf_hdr) diff --git a/test/validation/scheduler/scheduler.c b/test/validation/scheduler/scheduler.c index 8cde985..a28d60c 100644 --- a/test/validation/scheduler/scheduler.c +++ b/test/validation/scheduler/scheduler.c @@ -62,13 +62,13 @@ typedef struct { typedef struct { uint64_t sequence; + uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; } buf_contents; typedef struct { odp_buffer_t ctx_handle; uint64_t sequence; - uint64_t lock_sequence; - odp_schedule_order_lock_t order_lock; + uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; } queue_context; odp_pool_t pool; @@ -419,14 +419,20 @@ static void *schedule_common_(void *arg) continue; if (sync == ODP_SCHED_SYNC_ORDERED) { + uint32_t ndx; + uint32_t ndx_max = odp_queue_lock_count(from); + qctx = odp_queue_context(from); bctx = odp_buffer_addr( odp_buffer_from_event(events[0])); - odp_schedule_order_lock(&qctx->order_lock); - CU_ASSERT(bctx->sequence == - qctx->lock_sequence); - qctx->lock_sequence += num; - odp_schedule_order_unlock(&qctx->order_lock); + + for (ndx = 0; ndx < ndx_max; ndx++) { + odp_schedule_order_lock(ndx); + CU_ASSERT(bctx->sequence == + qctx->lock_sequence[ndx]); + qctx->lock_sequence[ndx] += num; + odp_schedule_order_unlock(ndx); + } } for (j = 0; j < num; j++) @@ -438,13 +444,19 @@ static void *schedule_common_(void *arg) continue; num = 1; if (sync == ODP_SCHED_SYNC_ORDERED) { + uint32_t ndx; + uint32_t ndx_max = odp_queue_lock_count(from); + qctx = odp_queue_context(from); bctx = odp_buffer_addr(buf); - odp_schedule_order_lock(&qctx->order_lock); - CU_ASSERT(bctx->sequence == - qctx->lock_sequence); - qctx->lock_sequence += num; - odp_schedule_order_unlock(&qctx->order_lock); + + for (ndx = 0; ndx < ndx_max; ndx++) { + odp_schedule_order_lock(ndx); + CU_ASSERT(bctx->sequence == + qctx->lock_sequence[ndx]); + qctx->lock_sequence[ndx] += num; + odp_schedule_order_unlock(ndx); + } } odp_buffer_free(buf); } @@ -570,8 +582,12 @@ static void reset_queues(thread_args_t *args) for (k = 0; k < args->num_bufs; k++) { queue_context *qctx = odp_queue_context(queue); + uint32_t ndx; + uint32_t ndx_max = + odp_queue_lock_count(queue); qctx->sequence = 0; - qctx->lock_sequence = 0; + for (ndx = 0; ndx < ndx_max; ndx++) + qctx->lock_sequence[ndx] = 0; } } } @@ -902,6 +918,7 @@ static int create_queues(void) odp_pool_param_t params; odp_buffer_t queue_ctx_buf; queue_context *qctx; + uint32_t ndx; prios = odp_schedule_num_prio(); odp_pool_param_init(¶ms); @@ -946,12 +963,23 @@ static int create_queues(void) snprintf(name, sizeof(name), "sched_%d_%d_o", i, j); p.sched.sync = ODP_SCHED_SYNC_ORDERED; + p.sched.lock_count = + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; q = odp_queue_create(name, ODP_QUEUE_TYPE_SCHED, &p); if (q == ODP_QUEUE_INVALID) { printf("Schedule queue create failed.\n"); return -1; } + if (odp_queue_lock_count(q) != + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) { + printf("Queue %" PRIu64 " created with " + "%d locks instead of expected %d\n", + odp_queue_to_u64(q), + odp_queue_lock_count(q), + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE); + return -1; + } queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool); @@ -963,7 +991,12 @@ static int create_queues(void) qctx = odp_buffer_addr(queue_ctx_buf); qctx->ctx_handle = queue_ctx_buf; qctx->sequence = 0; - qctx->lock_sequence = 0; + + for (ndx = 0; + ndx < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; + ndx++) { + qctx->lock_sequence[ndx] = 0; + } rc = odp_queue_context_set(q, qctx);