From patchwork Sun Sep 6 02:50:24 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bill Fischofer X-Patchwork-Id: 53161 Return-Path: X-Original-To: linaro@patches.linaro.org Delivered-To: linaro@patches.linaro.org Received: from mail-wi0-f198.google.com (mail-wi0-f198.google.com [209.85.212.198]) by patches.linaro.org (Postfix) with ESMTPS id A340620A78 for ; Sun, 6 Sep 2015 02:53:17 +0000 (UTC) Received: by wicuu12 with SMTP id uu12sf16791256wic.2 for ; Sat, 05 Sep 2015 19:53:17 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:delivered-to:delivered-to:from:to:date :message-id:in-reply-to:references:subject:precedence:list-id :list-unsubscribe:list-archive:list-post:list-help:list-subscribe :mime-version:content-type:content-transfer-encoding:errors-to :sender:x-original-sender:x-original-authentication-results :mailing-list; bh=k7Z34gtq3fclB4rAXBvvLG9HLjKp/D0jS49/NuUFleU=; b=aQxLdwPJWEZA3eSBE6wvZpbuqHrOm5Ma8Zk8xtNC2wdRY29LEeh+3IWvhJxKZPfEPE Zfc7mdtg3PjbxOVK9wxguPubD1ef2jyEOHEV6ii7EPE4A1EPTONqj5CCmueIGwnoBUke Pwp6LSV1fQO4JH29JzNUv32w90jb+33SeJAbJo8QCxKVQYQhNzQAHlez4jtrgPdB4lAX fRS4GBnj/Mq+tpH8dLRkyl1q3qwJPYAFG1mAQsLAjD9xUHOzkPOqCwME9s3qi8GoTki9 qxgNnq/afO1G9z+DaAqH+YmqAOjlOZT7X180qO5JlqBqOVx0LnlQQbW0slUqpS0g06Bl ec+Q== X-Gm-Message-State: ALoCoQmZnPZbOplnAuFIsUf2bv7goknJkfcyw4Bf8AoHA6ddWv/3of6pi6rGqQj01RCmgBHO9pyI X-Received: by 10.180.12.205 with SMTP id a13mr3170463wic.4.1441507996990; Sat, 05 Sep 2015 19:53:16 -0700 (PDT) X-BeenThere: patchwork-forward@linaro.org Received: by 10.152.3.194 with SMTP id e2ls370138lae.49.gmail; Sat, 05 Sep 2015 19:53:16 -0700 (PDT) X-Received: by 10.112.156.137 with SMTP id we9mr10964943lbb.110.1441507996807; Sat, 05 Sep 2015 19:53:16 -0700 (PDT) Received: from mail-la0-f51.google.com (mail-la0-f51.google.com. [209.85.215.51]) by mx.google.com with ESMTPS id an5si4995341lac.166.2015.09.05.19.53.16 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sat, 05 Sep 2015 19:53:16 -0700 (PDT) Received-SPF: pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.215.51 as permitted sender) client-ip=209.85.215.51; Received: by laeb10 with SMTP id b10so34215825lae.1 for ; Sat, 05 Sep 2015 19:53:16 -0700 (PDT) X-Received: by 10.152.22.133 with SMTP id d5mr10969702laf.112.1441507996687; Sat, 05 Sep 2015 19:53:16 -0700 (PDT) X-Forwarded-To: patchwork-forward@linaro.org X-Forwarded-For: patch@linaro.org patchwork-forward@linaro.org Delivered-To: patch@linaro.org Received: by 10.112.164.42 with SMTP id yn10csp772384lbb; Sat, 5 Sep 2015 19:53:15 -0700 (PDT) X-Received: by 10.55.27.27 with SMTP id b27mr17695023qkb.4.1441507995447; Sat, 05 Sep 2015 19:53:15 -0700 (PDT) Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id j17si9046577qhc.91.2015.09.05.19.53.13; Sat, 05 Sep 2015 19:53:15 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Received: by lists.linaro.org (Postfix, from userid 109) id A9DA3610AE; Sun, 6 Sep 2015 02:53:13 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL, URIBL_BLOCKED autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 347F461D0F; Sun, 6 Sep 2015 02:50:50 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 0C15961CC7; Sun, 6 Sep 2015 02:50:38 +0000 (UTC) Received: from mail-oi0-f42.google.com (mail-oi0-f42.google.com [209.85.218.42]) by lists.linaro.org (Postfix) with ESMTPS id D3526618C9 for ; Sun, 6 Sep 2015 02:50:33 +0000 (UTC) Received: by oiww128 with SMTP id w128so29774333oiw.2 for ; Sat, 05 Sep 2015 19:50:33 -0700 (PDT) X-Received: by 10.202.73.199 with SMTP id w190mr2160895oia.31.1441507833302; Sat, 05 Sep 2015 19:50:33 -0700 (PDT) Received: from Ubuntu15.localdomain (cpe-24-28-70-239.austin.res.rr.com. [24.28.70.239]) by smtp.gmail.com with ESMTPSA id k187sm4558263oia.18.2015.09.05.19.50.32 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Sat, 05 Sep 2015 19:50:32 -0700 (PDT) From: Bill Fischofer To: lng-odp@lists.linaro.org Date: Sat, 5 Sep 2015 21:50:24 -0500 Message-Id: <1441507824-16437-6-git-send-email-bill.fischofer@linaro.org> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1441507824-16437-1-git-send-email-bill.fischofer@linaro.org> References: <1441507824-16437-1-git-send-email-bill.fischofer@linaro.org> X-Topics: patch Subject: [lng-odp] [API-NEXT PATCH 5/5] api: schedule: revise definition of ordered locks X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: , List-Help: , List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" X-Removed-Original-Auth: Dkim didn't pass. X-Original-Sender: bill.fischofer@linaro.org X-Original-Authentication-Results: mx.google.com; spf=pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.215.51 as permitted sender) smtp.mailfrom=patch+caf_=patchwork-forward=linaro.org@linaro.org Mailing-list: list patchwork-forward@linaro.org; contact patchwork-forward+owners@linaro.org X-Google-Group-Id: 836684582541 Revise the definition of odp_schedule_order_lock() and odp_schedule_order_unlock(). These APIs now take as an argument the index value of the ordered lock within the current context to be locked or unlocked. Because the API is changed, this patch also updates the linux-generic implementation of these APIs as well as the scheduler validation test that uses them. Signed-off-by: Bill Fischofer --- include/odp/api/schedule.h | 44 ++++++++++----- .../include/odp/plat/schedule_types.h | 2 - .../linux-generic/include/odp_buffer_internal.h | 2 +- .../linux-generic/include/odp_queue_internal.h | 19 ++++--- platform/linux-generic/odp_queue.c | 66 ++++++++++++++-------- platform/linux-generic/odp_schedule.c | 15 +++-- test/validation/scheduler/scheduler.c | 61 +++++++++++++++----- 7 files changed, 141 insertions(+), 68 deletions(-) diff --git a/include/odp/api/schedule.h b/include/odp/api/schedule.h index 11f85ad..679d5b8 100644 --- a/include/odp/api/schedule.h +++ b/include/odp/api/schedule.h @@ -31,11 +31,6 @@ extern "C" { */ /** - * @typedef odp_schedule_order_lock_t - * Scheduler ordered context lock - */ - -/** * @def ODP_SCHED_WAIT * Wait infinitely */ @@ -299,15 +294,31 @@ int odp_schedule_group_thrmask(odp_schedule_group_t group, /** * Acquire ordered context lock * - * This call is valid only when holding an ordered synchronization context. The - * lock is used to protect a critical section that is executed within an - * ordered context. Threads enter the critical section in the order determined - * by the context (source queue). Lock ordering is automatically skipped for - * threads that release the context instead of calling the lock. - * - * @param lock Ordered context lock + * This call is valid only when holding an ordered synchronization context. + * Ordered locks are used to protect critical sections that are executed + * within an ordered context. Threads enter the critical section in the order + * determined by the context (source queue). Lock ordering is automatically + * skipped for threads that release the context instead of using the lock. + * + * The number of ordered locks available is set by the lock_count parameter of + * the schedule parameters passed to odp_queue_create(), which must be less + * than or equal to the ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE configuration + * option. If this routine is called outside of an ordered context or with a + * lock_index that exceeds the number of available ordered locks in this + * context results are undefined. The number of ordered locks associated with + * a given ordered queue may be queried by the odp_queue_lock_count() API. + * + * Each ordered lock may be used only once per ordered context. If events + * are to be processed with multiple ordered critical sections, each should + * be protected by its own ordered lock. This promotes maximum parallelism by + * allowing order to maintained on a more granular basis. If an ordered lock + * is used multiple times in the same event results are undefined. + * + * @param lock_index Index of the ordered lock in the current context to be + * acquired. Must be in the range 0..odp_queue_lock_count() + * - 1 */ -void odp_schedule_order_lock(odp_schedule_order_lock_t *lock); +void odp_schedule_order_lock(uint32_t lock_index); /** * Release ordered context lock @@ -315,9 +326,12 @@ void odp_schedule_order_lock(odp_schedule_order_lock_t *lock); * This call is valid only when holding an ordered synchronization context. * Release a previously locked ordered context lock. * - * @param lock Ordered context lock + * @param lock_index Index of the ordered lock in the current context to be + * released. Results are undefined if the caller does not + * hold this lock. Must be in the range + * 0..odp_queue_lock_count() - 1 */ -void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock); +void odp_schedule_order_unlock(uint32_t lock_index); /** * @} diff --git a/platform/linux-generic/include/odp/plat/schedule_types.h b/platform/linux-generic/include/odp/plat/schedule_types.h index c48b652..2189cbb 100644 --- a/platform/linux-generic/include/odp/plat/schedule_types.h +++ b/platform/linux-generic/include/odp/plat/schedule_types.h @@ -51,8 +51,6 @@ typedef int odp_schedule_group_t; #define ODP_SCHED_GROUP_NAME_LEN 32 -typedef int odp_schedule_order_lock_t; - /** * @} */ diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 4cacca1..f063c4c 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -137,7 +137,7 @@ struct odp_buffer_hdr_t { queue_entry_t *origin_qe; /* ordered queue origin */ union { queue_entry_t *target_qe; /* ordered queue target */ - uint64_t sync; /* for ordered synchronization */ + uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; }; }; diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 19a0f07..cc7120d 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -83,8 +83,8 @@ struct queue_entry_s { uint64_t order_out; odp_buffer_hdr_t *reorder_head; odp_buffer_hdr_t *reorder_tail; - odp_atomic_u64_t sync_in; - odp_atomic_u64_t sync_out; + odp_atomic_u64_t sync_in[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; + odp_atomic_u64_t sync_out[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; }; union queue_entry_u { @@ -123,7 +123,7 @@ int queue_sched_atomic(odp_queue_t handle); int release_order(queue_entry_t *origin_qe, uint64_t order, odp_pool_t pool, int enq_called); void get_sched_order(queue_entry_t **origin_qe, uint64_t *order); -void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync); +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx); void sched_enq_called(void); void sched_order_resolved(odp_buffer_hdr_t *buf_hdr); @@ -194,12 +194,17 @@ static inline void reorder_enq(queue_entry_t *queue, static inline void order_release(queue_entry_t *origin_qe, int count) { - uint64_t sync = odp_atomic_load_u64(&origin_qe->s.sync_out); + uint64_t sync; + uint32_t i; origin_qe->s.order_out += count; - if (sync < origin_qe->s.order_out) - odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, - origin_qe->s.order_out - sync); + + for (i = 0; i < origin_qe->s.param.sched.lock_count; i++) { + sync = odp_atomic_load_u64(&origin_qe->s.sync_out[i]); + if (sync < origin_qe->s.order_out) + odp_atomic_fetch_add_u64(&origin_qe->s.sync_out[i], + origin_qe->s.order_out - sync); + } } static inline int reorder_deq(queue_entry_t *queue, diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index ac933da..ec5a17a 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -53,14 +53,17 @@ queue_entry_t *get_qentry(uint32_t queue_id) return &queue_tbl->queue[queue_id]; } -static void queue_init(queue_entry_t *queue, const char *name, - odp_queue_type_t type, odp_queue_param_t *param) +static int queue_init(queue_entry_t *queue, const char *name, + odp_queue_type_t type, odp_queue_param_t *param) { strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1); queue->s.type = type; if (param) { memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); + if (queue->s.param.sched.lock_count > + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) + return -1; } else { /* Defaults */ memset(&queue->s.param, 0, sizeof(odp_queue_param_t)); @@ -98,12 +101,13 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.pri_queue = ODP_QUEUE_INVALID; queue->s.cmd_ev = ODP_EVENT_INVALID; + return 0; } int odp_queue_init_global(void) { - uint32_t i; + uint32_t i, j; odp_shm_t shm; ODP_DBG("Queue init ... "); @@ -123,8 +127,10 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); - odp_atomic_init_u64(&queue->s.sync_in, 0); - odp_atomic_init_u64(&queue->s.sync_out, 0); + for (j = 0; j < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; j++) { + odp_atomic_init_u64(&queue->s.sync_in[j], 0); + odp_atomic_init_u64(&queue->s.sync_out[j], 0); + } queue->s.handle = queue_from_id(i); } @@ -201,6 +207,14 @@ odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle) return queue->s.param.sched.group; } +uint32_t odp_queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = queue_to_qentry(handle); + + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + queue->s.param.sched.lock_count : 0; +} + odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, odp_queue_param_t *param) { @@ -216,7 +230,10 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, LOCK(&queue->s.lock); if (queue->s.status == QUEUE_STATUS_FREE) { - queue_init(queue, name, type, param); + if (queue_init(queue, name, type, param)) { + UNLOCK(&queue->s.lock); + return handle; + } if (type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN) @@ -577,6 +594,7 @@ int queue_enq_internal(odp_buffer_hdr_t *buf_hdr) odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { odp_buffer_hdr_t *buf_hdr; + uint32_t i; LOCK(&queue->s.lock); @@ -600,7 +618,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) if (queue_is_ordered(queue)) { buf_hdr->origin_qe = queue; buf_hdr->order = queue->s.order_in++; - buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (i = 0; i < queue->s.param.sched.lock_count; i++) { + buf_hdr->sync[i] = + odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]); + } buf_hdr->flags.sustain = 0; } else { buf_hdr->origin_qe = NULL; @@ -621,6 +642,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { odp_buffer_hdr_t *hdr; int i; + uint32_t j; LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { @@ -648,8 +670,11 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) if (queue_is_ordered(queue)) { buf_hdr[i]->origin_qe = queue; buf_hdr[i]->order = queue->s.order_in++; - buf_hdr[i]->sync = - odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (j = 0; j < queue->s.param.sched.lock_count; j++) { + buf_hdr[i]->sync[j] = + odp_atomic_fetch_inc_u64 + (&queue->s.sync_in[j]); + } buf_hdr[i]->flags.sustain = 0; } else { buf_hdr[i]->origin_qe = NULL; @@ -966,39 +991,32 @@ int release_order(queue_entry_t *origin_qe, uint64_t order, return 0; } -/* This routine is a no-op in linux-generic */ -int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED, - odp_queue_t queue ODP_UNUSED) -{ - return 0; -} - -void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_lock(uint32_t lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Wait until we are in order. Note that sync_out will be incremented * both by unlocks as well as order resolution, so we're OK if only * some events in the ordered flow need to lock. */ - while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out)) + while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index])) odp_spin(); } -void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_unlock(uint32_t lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Release the ordered lock */ - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out); + odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]); } diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index c6619e5..a08d995 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -86,7 +86,7 @@ typedef struct { queue_entry_t *qe; queue_entry_t *origin_qe; uint64_t order; - uint64_t sync; + uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; odp_pool_t pool; int enq_called; int num; @@ -440,6 +440,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], int i, j; int thr; int ret; + uint32_t k; if (sched_local.num) { ret = copy_events(out_ev, max_num); @@ -551,8 +552,12 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.origin_qe = qe; sched_local.order = sched_local.buf_hdr[0]->order; - sched_local.sync = - sched_local.buf_hdr[0]->sync; + for (k = 0; + k < qe->s.param.sched.lock_count; + k++) { + sched_local.sync[k] = + sched_local.buf_hdr[0]->sync[k]; + } sched_local.enq_called = 0; } else if (queue_is_atomic(qe)) { /* Hold queue during atomic access */ @@ -802,10 +807,10 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t *order) *order = sched_local.order; } -void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync) +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx) { *origin_qe = sched_local.origin_qe; - *sync = &sched_local.sync; + *sync = &sched_local.sync[ndx]; } void sched_order_resolved(odp_buffer_hdr_t *buf_hdr) diff --git a/test/validation/scheduler/scheduler.c b/test/validation/scheduler/scheduler.c index 8cde985..a28d60c 100644 --- a/test/validation/scheduler/scheduler.c +++ b/test/validation/scheduler/scheduler.c @@ -62,13 +62,13 @@ typedef struct { typedef struct { uint64_t sequence; + uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; } buf_contents; typedef struct { odp_buffer_t ctx_handle; uint64_t sequence; - uint64_t lock_sequence; - odp_schedule_order_lock_t order_lock; + uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE]; } queue_context; odp_pool_t pool; @@ -419,14 +419,20 @@ static void *schedule_common_(void *arg) continue; if (sync == ODP_SCHED_SYNC_ORDERED) { + uint32_t ndx; + uint32_t ndx_max = odp_queue_lock_count(from); + qctx = odp_queue_context(from); bctx = odp_buffer_addr( odp_buffer_from_event(events[0])); - odp_schedule_order_lock(&qctx->order_lock); - CU_ASSERT(bctx->sequence == - qctx->lock_sequence); - qctx->lock_sequence += num; - odp_schedule_order_unlock(&qctx->order_lock); + + for (ndx = 0; ndx < ndx_max; ndx++) { + odp_schedule_order_lock(ndx); + CU_ASSERT(bctx->sequence == + qctx->lock_sequence[ndx]); + qctx->lock_sequence[ndx] += num; + odp_schedule_order_unlock(ndx); + } } for (j = 0; j < num; j++) @@ -438,13 +444,19 @@ static void *schedule_common_(void *arg) continue; num = 1; if (sync == ODP_SCHED_SYNC_ORDERED) { + uint32_t ndx; + uint32_t ndx_max = odp_queue_lock_count(from); + qctx = odp_queue_context(from); bctx = odp_buffer_addr(buf); - odp_schedule_order_lock(&qctx->order_lock); - CU_ASSERT(bctx->sequence == - qctx->lock_sequence); - qctx->lock_sequence += num; - odp_schedule_order_unlock(&qctx->order_lock); + + for (ndx = 0; ndx < ndx_max; ndx++) { + odp_schedule_order_lock(ndx); + CU_ASSERT(bctx->sequence == + qctx->lock_sequence[ndx]); + qctx->lock_sequence[ndx] += num; + odp_schedule_order_unlock(ndx); + } } odp_buffer_free(buf); } @@ -570,8 +582,12 @@ static void reset_queues(thread_args_t *args) for (k = 0; k < args->num_bufs; k++) { queue_context *qctx = odp_queue_context(queue); + uint32_t ndx; + uint32_t ndx_max = + odp_queue_lock_count(queue); qctx->sequence = 0; - qctx->lock_sequence = 0; + for (ndx = 0; ndx < ndx_max; ndx++) + qctx->lock_sequence[ndx] = 0; } } } @@ -902,6 +918,7 @@ static int create_queues(void) odp_pool_param_t params; odp_buffer_t queue_ctx_buf; queue_context *qctx; + uint32_t ndx; prios = odp_schedule_num_prio(); odp_pool_param_init(¶ms); @@ -946,12 +963,23 @@ static int create_queues(void) snprintf(name, sizeof(name), "sched_%d_%d_o", i, j); p.sched.sync = ODP_SCHED_SYNC_ORDERED; + p.sched.lock_count = + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; q = odp_queue_create(name, ODP_QUEUE_TYPE_SCHED, &p); if (q == ODP_QUEUE_INVALID) { printf("Schedule queue create failed.\n"); return -1; } + if (odp_queue_lock_count(q) != + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) { + printf("Queue %" PRIu64 " created with " + "%d locks instead of expected %d\n", + odp_queue_to_u64(q), + odp_queue_lock_count(q), + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE); + return -1; + } queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool); @@ -963,7 +991,12 @@ static int create_queues(void) qctx = odp_buffer_addr(queue_ctx_buf); qctx->ctx_handle = queue_ctx_buf; qctx->sequence = 0; - qctx->lock_sequence = 0; + + for (ndx = 0; + ndx < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; + ndx++) { + qctx->lock_sequence[ndx] = 0; + } rc = odp_queue_context_set(q, qctx);