From patchwork Wed Feb 28 12:00:14 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Github ODP bot X-Patchwork-Id: 129972 Delivered-To: patch@linaro.org Received: by 10.46.66.2 with SMTP id p2csp946782lja; Wed, 28 Feb 2018 04:08:56 -0800 (PST) X-Google-Smtp-Source: AG47ELtxlZSrxEreWUlhloAp2LQPDCAq0O0BgYIGaP7XW0ZaYBA/cp/cmuMx50Xbl2n/PdsMe2cE X-Received: by 10.233.239.129 with SMTP id d123mr28822617qkg.57.1519819736543; Wed, 28 Feb 2018 04:08:56 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1519819736; cv=none; d=google.com; s=arc-20160816; b=cKdSurBnXq5YiSM//39+qzaN+rVON8O9on7eqvAe50Lt5wDBXkJNZX/o2ljwWuMAhy zc4OddrBrQDScouZU1q5h8TFPpvymgPnhveQZVdFkUgSFFKVLBoM+QaXvCKq7I8x35mt jf4ab3Qq6lSfv9uj3xysRHdpexay9OavdRmsOZV3IB0ivjXTs6OdFvNYqSi+QG7Q1mVk qarR1uxEsk0gtSkxPNg5lwT8lmTXclqjg/kYbkITpYlBURwr5lFsgkaLKIoKpr9lgCjM 3iBh4PtUwUcRvE7iUpyB75rTDryzLa79A86wPYWSBP207FQYQuDEBdtacrVTVmHydzp2 Mmfg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:github-pr-num :references:in-reply-to:message-id:date:to:from:delivered-to :arc-authentication-results; bh=MLp3F474AjlHEeRFeUWYN2ziBP/SlWKUDEy7zWwbBoU=; b=AOdNGvmqDUw9DL+w+iyjLj0h6IHdFK3BE6tEWQtljAYUoQ8nEvhXKW3Jl1s8aauwo1 fvYBaUqjicUAIBL4/wdek0muQh/END9e98ZuVPeWzgBi4pqqgCPEJ8sMAVNjls/3Vv6a 3q7e46Zbcw7hKZ+OfnBYh5MpI7SpKh/OjuIyhEpgGMNRq9bbbDHMeFQzf3HpN9OtjADn ujw2pOoV8rwi3ej7x2yiNur+6EkkjPW5E7Col6EU6pJAYXU5XOLVhrs40goP7ZWpjPA8 OLDjHoThG8mVTkM/nQSKkZwFg6Gi8ndpygeX+jPqZRCirJxpyCIpbdN+hv8nSOlBif0u 0L3g== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Return-Path: Received: from lists.linaro.org (ec2-54-197-127-237.compute-1.amazonaws.com. [54.197.127.237]) by mx.google.com with ESMTP id e56si1692882qtc.204.2018.02.28.04.08.56; Wed, 28 Feb 2018 04:08:56 -0800 (PST) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) client-ip=54.197.127.237; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Received: by lists.linaro.org (Postfix, from userid 109) id 40DE56174B; Wed, 28 Feb 2018 12:08:56 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00,FREEMAIL_FROM, RCVD_IN_DNSWL_LOW autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 752E861776; Wed, 28 Feb 2018 12:02:40 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 6712D6082D; Wed, 28 Feb 2018 12:02:29 +0000 (UTC) Received: from forward104p.mail.yandex.net (forward104p.mail.yandex.net [77.88.28.107]) by lists.linaro.org (Postfix) with ESMTPS id 53EDC61714 for ; Wed, 28 Feb 2018 12:01:26 +0000 (UTC) Received: from mxback9o.mail.yandex.net (mxback9o.mail.yandex.net [IPv6:2a02:6b8:0:1a2d::23]) by forward104p.mail.yandex.net (Yandex) with ESMTP id C8BA218418F for ; Wed, 28 Feb 2018 15:00:25 +0300 (MSK) Received: from smtp1o.mail.yandex.net (smtp1o.mail.yandex.net [2a02:6b8:0:1a2d::25]) by mxback9o.mail.yandex.net (nwsmtp/Yandex) with ESMTP id 7FDfxMf438-0PlmtL5L; Wed, 28 Feb 2018 15:00:25 +0300 Received: by smtp1o.mail.yandex.net (nwsmtp/Yandex) with ESMTPSA id iE1m40jcci-0OS8bdPi; Wed, 28 Feb 2018 15:00:24 +0300 (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (Client certificate not present) From: Github ODP bot To: lng-odp@lists.linaro.org Date: Wed, 28 Feb 2018 15:00:14 +0300 Message-Id: <1519819218-27901-7-git-send-email-odpbot@yandex.ru> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1519819218-27901-1-git-send-email-odpbot@yandex.ru> References: <1519819218-27901-1-git-send-email-odpbot@yandex.ru> Github-pr-num: 492 Subject: [lng-odp] [PATCH v3 6/10] linux-gen: queue: ring based queue implementation X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Petri Savolainen Change from linked list of bursts to a ring implementation. Queues have maximum size but code is simpler and performance is a bit better. This step helps in a potential future step to implement queues with a lockless ring. Signed-off-by: Petri Savolainen --- /** Email created from pull request 492 (psavol:master-sched-optim) ** https://github.com/Linaro/odp/pull/492 ** Patch: https://github.com/Linaro/odp/pull/492.patch ** Base sha: f5e12df388352b27f09787028a0040afb28564f4 ** Merge commit sha: 56e6340663c8679516a24dc81df13a53488b86b8 **/ .../linux-generic/include/odp_buffer_internal.h | 34 ++--- platform/linux-generic/include/odp_pool_internal.h | 28 ++++ .../linux-generic/include/odp_queue_internal.h | 5 +- platform/linux-generic/odp_packet.c | 3 +- platform/linux-generic/odp_pool.c | 22 +-- platform/linux-generic/odp_queue_basic.c | 170 +++++++++------------ 6 files changed, 130 insertions(+), 132 deletions(-) diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index bd90ee156..e50dd604a 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -33,19 +33,31 @@ extern "C" { #include #include -#define BUFFER_BURST_SIZE CONFIG_BURST_SIZE - typedef struct seg_entry_t { void *hdr; uint8_t *data; uint32_t len; } seg_entry_t; +typedef union buffer_index_t { + uint32_t u32; + + struct { + uint32_t pool :8; + uint32_t buffer :24; + }; +} buffer_index_t; + +/* Check that pool index fit into bit field */ +ODP_STATIC_ASSERT(ODP_CONFIG_POOLS <= (0xFF + 1), "TOO_MANY_POOLS"); + +/* Check that buffer index fit into bit field */ +ODP_STATIC_ASSERT(CONFIG_POOL_MAX_NUM <= (0xFFFFFF + 1), "TOO_LARGE_POOL"); + /* Common buffer header */ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { - - /* Buffer index in the pool */ - uint32_t index; + /* Combined pool and buffer index */ + buffer_index_t index; /* Total segment count */ uint16_t segcount; @@ -73,16 +85,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { /* Segments */ seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR]; - /* Burst counts */ - uint8_t burst_num; - uint8_t burst_first; - - /* Next buf in a list */ - struct odp_buffer_hdr_t *next; - - /* Burst table */ - struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE]; - /* --- Mostly read only data --- */ const void *user_ptr; @@ -109,8 +111,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { ODP_STATIC_ASSERT(CONFIG_PACKET_SEGS_PER_HDR < 256, "CONFIG_PACKET_SEGS_PER_HDR_TOO_LARGE"); -ODP_STATIC_ASSERT(BUFFER_BURST_SIZE < 256, "BUFFER_BURST_SIZE_TOO_LARGE"); - #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index faa019f48..e3de2b65d 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -104,6 +104,34 @@ static inline odp_buffer_hdr_t *buf_hdl_to_hdr(odp_buffer_t buf) return (odp_buffer_hdr_t *)(uintptr_t)buf; } +static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, + uint32_t buffer_idx) +{ + uint64_t block_offset; + odp_buffer_hdr_t *buf_hdr; + + block_offset = buffer_idx * (uint64_t)pool->block_size; + + /* clang requires cast to uintptr_t */ + buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; + + return buf_hdr; +} + +static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32) +{ + buffer_index_t index; + uint32_t pool_idx, buffer_idx; + pool_t *pool; + + index.u32 = u32; + pool_idx = index.pool; + buffer_idx = index.buffer; + pool = pool_entry(pool_idx); + + return buf_hdr_from_index(pool, buffer_idx); +} + int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num); void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free); diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 8215a8180..0540bf72d 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -29,6 +29,7 @@ extern "C" { #include #include #include +#include #define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_DESTROYED 1 @@ -38,9 +39,7 @@ extern "C" { struct queue_entry_s { odp_ticketlock_t ODP_ALIGNED_CACHE lock; - - odp_buffer_hdr_t *head; - odp_buffer_hdr_t *tail; + ring_st_t ring_st; int status; queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c index 22df7d670..e24aa0c7a 100644 --- a/platform/linux-generic/odp_packet.c +++ b/platform/linux-generic/odp_packet.c @@ -1819,7 +1819,8 @@ void odp_packet_print_data(odp_packet_t pkt, uint32_t offset, len += snprintf(&str[len], n - len, " pool index %" PRIu32 "\n", pool->pool_idx); len += snprintf(&str[len], n - len, - " buf index %" PRIu32 "\n", hdr->buf_hdr.index); + " buf index %" PRIu32 "\n", + hdr->buf_hdr.index.buffer); len += snprintf(&str[len], n - len, " segcount %" PRIu16 "\n", hdr->buf_hdr.segcount); len += snprintf(&str[len], n - len, diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index 687edb8f0..998fc649e 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -80,20 +80,6 @@ static inline pool_t *pool_from_buf(odp_buffer_t buf) return buf_hdr->pool_ptr; } -static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, - uint32_t buffer_idx) -{ - uint64_t block_offset; - odp_buffer_hdr_t *buf_hdr; - - block_offset = buffer_idx * (uint64_t)pool->block_size; - - /* clang requires cast to uintptr_t */ - buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; - - return buf_hdr; -} - int odp_pool_init_global(void) { uint32_t i; @@ -296,7 +282,9 @@ static void init_buffers(pool_t *pool) memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr); /* Initialize buffer metadata */ - buf_hdr->index = i; + buf_hdr->index.u32 = 0; + buf_hdr->index.pool = pool->pool_idx; + buf_hdr->index.buffer = i; buf_hdr->type = type; buf_hdr->event_type = type; buf_hdr->pool_ptr = pool; @@ -785,7 +773,7 @@ static inline void buffer_free_to_pool(pool_t *pool, ring = &pool->ring->hdr; mask = pool->ring_mask; for (i = 0; i < num; i++) - buf_index[i] = buf_hdr[i]->index; + buf_index[i] = buf_hdr[i]->index.buffer; ring_enq_multi(ring, mask, buf_index, num); @@ -825,7 +813,7 @@ static inline void buffer_free_to_pool(pool_t *pool, } for (i = 0; i < num; i++) - cache->buf_index[cache_num + i] = buf_hdr[i]->index; + cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer; cache->num = cache_num + num; } diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 2801b220f..9e99994da 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -40,9 +40,14 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param); +typedef struct ODP_ALIGNED_CACHE { + /* Storage space for ring data */ + uint32_t data[CONFIG_QUEUE_SIZE]; +} ring_data_t; + typedef struct queue_global_t { queue_entry_t queue[ODP_CONFIG_QUEUES]; - + ring_data_t ring_data[ODP_CONFIG_QUEUES]; uint32_t queue_lf_num; uint32_t queue_lf_size; queue_lf_func_t queue_lf_func; @@ -154,9 +159,11 @@ static int queue_capability(odp_queue_capability_t *capa) capa->max_sched_groups = sched_fn->num_grps(); capa->sched_prios = odp_schedule_num_prio(); capa->plain.max_num = capa->max_queues; - capa->sched.max_num = capa->max_queues; + capa->plain.max_size = CONFIG_QUEUE_SIZE; capa->plain.lockfree.max_num = queue_glb->queue_lf_num; capa->plain.lockfree.max_size = queue_glb->queue_lf_size; + capa->sched.max_num = capa->max_queues; + capa->sched.max_size = CONFIG_QUEUE_SIZE; return 0; } @@ -204,6 +211,20 @@ static odp_queue_t queue_create(const char *name, param = &default_param; } + if (param->nonblocking == ODP_BLOCKING) { + if (param->size > CONFIG_QUEUE_SIZE) + return ODP_QUEUE_INVALID; + } else if (param->nonblocking == ODP_NONBLOCKING_LF) { + /* Only plain type lock-free queues supported */ + if (param->type != ODP_QUEUE_TYPE_PLAIN) + return ODP_QUEUE_INVALID; + if (param->size > queue_glb->queue_lf_size) + return ODP_QUEUE_INVALID; + } else { + /* Wait-free queues not supported */ + return ODP_QUEUE_INVALID; + } + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { queue = &queue_glb->queue[i]; @@ -297,7 +318,7 @@ static int queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name); return -1; } - if (queue->s.head != NULL) { + if (ring_st_is_empty(&queue->s.ring_st) == 0) { UNLOCK(&queue->s.lock); ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; @@ -364,81 +385,71 @@ static odp_queue_t queue_lookup(const char *name) return ODP_QUEUE_INVALID; } +static inline void buffer_index_from_buf(uint32_t buffer_index[], + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + + for (i = 0; i < num; i++) + buffer_index[i] = buf_hdr[i]->index.u32; +} + +static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[], + uint32_t buffer_index[], int num) +{ + int i; + + for (i = 0; i < num; i++) { + buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]); + odp_prefetch(buf_hdr[i]); + } +} + static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) { int sched = 0; - int i, ret; + int ret; queue_entry_t *queue; - odp_buffer_hdr_t *hdr, *tail, *next_hdr; + int num_enq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; queue = qentry_from_int(q_int); + ring_st = &queue->s.ring_st; + if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret)) return ret; - /* Optimize the common case of single enqueue */ - if (num == 1) { - tail = buf_hdr[0]; - hdr = tail; - hdr->burst_num = 0; - hdr->next = NULL; - } else { - int next; - - /* Start from the last buffer header */ - tail = buf_hdr[num - 1]; - hdr = tail; - hdr->next = NULL; - next = num - 2; - - while (1) { - /* Build a burst. The buffer header carrying - * a burst is the last buffer of the burst. */ - for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE; - i++, next--) - hdr->burst[BUFFER_BURST_SIZE - 1 - i] = - buf_hdr[next]; - - hdr->burst_num = i; - hdr->burst_first = BUFFER_BURST_SIZE - i; - - if (odp_likely(next < 0)) - break; - - /* Get another header and link it */ - next_hdr = hdr; - hdr = buf_hdr[next]; - hdr->next = next_hdr; - next--; - } - } + buffer_index_from_buf(buf_idx, buf_hdr, num); LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); ODP_ERR("Bad queue status\n"); return -1; } - /* Empty queue */ - if (queue->s.head == NULL) - queue->s.head = hdr; - else - queue->s.tail->next = hdr; + num_enq = ring_st_enq_multi(ring_st, buf_idx, num); - queue->s.tail = tail; + if (odp_unlikely(num_enq == 0)) { + UNLOCK(&queue->s.lock); + return 0; + } if (queue->s.status == QUEUE_STATUS_NOTSCHED) { queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; /* retval: schedule queue */ + sched = 1; } + UNLOCK(&queue->s.lock); /* Add queue to scheduling */ if (sched && sched_fn->sched_queue(queue->s.index)) ODP_ABORT("schedule_queue failed\n"); - return num; /* All events enqueued */ + return num_enq; } static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -484,12 +495,15 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev) static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - odp_buffer_hdr_t *hdr, *next; - int i, j; - int updated = 0; int status_sync = sched_fn->status_sync; + int num_deq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; + + ring_st = &queue->s.ring_st; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. * Scheduler finalizes queue destroy after this. */ @@ -497,9 +511,9 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return -1; } - hdr = queue->s.head; + num_deq = ring_st_deq_multi(ring_st, buf_idx, num); - if (hdr == NULL) { + if (num_deq == 0) { /* Already empty queue */ if (queue->s.status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED; @@ -509,51 +523,18 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], } UNLOCK(&queue->s.lock); - return 0; - } - - for (i = 0; i < num && hdr; ) { - int burst_num = hdr->burst_num; - int first = hdr->burst_first; - - /* First, get bursted buffers */ - for (j = 0; j < burst_num && i < num; j++, i++) { - buf_hdr[i] = hdr->burst[first + j]; - odp_prefetch(buf_hdr[i]); - } - - if (burst_num) { - hdr->burst_num = burst_num - j; - hdr->burst_first = first + j; - } - if (i == num) - break; - - /* When burst is empty, consume the current buffer header and - * move to the next header */ - buf_hdr[i] = hdr; - next = hdr->next; - hdr->next = NULL; - hdr = next; - updated++; - i++; + return 0; } - /* Write head only if updated */ - if (updated) - queue->s.head = hdr; - - /* Queue is empty */ - if (hdr == NULL) - queue->s.tail = NULL; - if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED) sched_fn->save_context(queue->s.index); UNLOCK(&queue->s.lock); - return i; + buffer_index_to_buf(buf_hdr, buf_idx, num_deq); + + return num_deq; } static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -622,8 +603,9 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID; - queue->s.head = NULL; - queue->s.tail = NULL; + ring_st_init(&queue->s.ring_st, + queue_glb->ring_data[queue->s.index].data, + CONFIG_QUEUE_SIZE); return 0; } @@ -699,7 +681,7 @@ int sched_cb_queue_empty(uint32_t queue_index) return -1; } - if (queue->s.head == NULL) { + if (ring_st_is_empty(&queue->s.ring_st)) { /* Already empty queue. Update status. */ if (queue->s.status == QUEUE_STATUS_SCHED) queue->s.status = QUEUE_STATUS_NOTSCHED;