From patchwork Tue Sep 20 19:53:30 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Ola Liljedahl X-Patchwork-Id: 76624 Delivered-To: patch@linaro.org Received: by 10.140.106.72 with SMTP id d66csp1659901qgf; Tue, 20 Sep 2016 12:54:47 -0700 (PDT) X-Received: by 10.200.44.189 with SMTP id 58mr37595167qtw.109.1474401287662; Tue, 20 Sep 2016 12:54:47 -0700 (PDT) Return-Path: Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id f126si25358182qke.149.2016.09.20.12.54.46; Tue, 20 Sep 2016 12:54:47 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 9014F61628; Tue, 20 Sep 2016 19:54:46 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-4.2 required=5.0 tests=BAD_ENC_HEADER,BAYES_00, RCVD_IN_DNSWL_MED autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id D86B86160D; Tue, 20 Sep 2016 19:54:11 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 36FBC6160F; Tue, 20 Sep 2016 19:54:00 +0000 (UTC) Received: from eu-smtp-delivery-143.mimecast.com (eu-smtp-delivery-143.mimecast.com [207.82.80.143]) by lists.linaro.org (Postfix) with ESMTP id DCEA561607 for ; Tue, 20 Sep 2016 19:53:54 +0000 (UTC) Received: from EUR01-HE1-obe.outbound.protection.outlook.com (mail-he1eur01lp0212.outbound.protection.outlook.com [213.199.154.212]) (Using TLS) by eu-smtp-1.mimecast.com with ESMTP id uk-mta-54-7_ety_HEOsms7bHQdvDvQA-1; Tue, 20 Sep 2016 20:53:53 +0100 Received: from localhost.localdomain (155.4.131.235) by DB5PR0801MB1589.eurprd08.prod.outlook.com (10.167.230.13) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.629.8; Tue, 20 Sep 2016 19:53:50 +0000 From: To: Date: Tue, 20 Sep 2016 21:53:30 +0200 Message-ID: <1474401210-18010-1-git-send-email-ola.liljedahl@arm.com> X-Mailer: git-send-email 2.7.4 MIME-Version: 1.0 X-Originating-IP: [155.4.131.235] X-ClientProxiedBy: DBXPR04CA0026.eurprd04.prod.outlook.com (10.141.8.154) To DB5PR0801MB1589.eurprd08.prod.outlook.com (10.167.230.13) X-MS-Office365-Filtering-Correlation-Id: da13e7ae-4eef-4b6d-352d-08d3e18fd7fd X-Microsoft-Exchange-Diagnostics: 1; DB5PR0801MB1589; 2:HUoZYMIg6+hi1+AolQuCTmrJYGkiRugQR6nmVS/QtKp1I8CTCxsApNx/eRPPahOavM68OTzj6/f6TIHout8w7pVDE70ykk4jRIjcg/yAwioeK966+doHnd2EqL8AwGw8Gpi7HonucREYrg/gxVMqZc6fKgz0RnoFIQVYF51EItaVimYCVf5pc/dMiuqoyO53; 3:czBoC5NO5iFDBhtdGoFtPC7jKK8m1FHvGnzrVrJPIDAY2hKc9HTbckv2qigD5nOZZ3Dg5nruwcXwih2IXT9OTg9youC8nLa6mzjFfGaIMhMZaNH+GoABoGmzxCH0eDOe X-Microsoft-Antispam: UriScan:;BCL:0;PCL:0;RULEID:;SRVR:DB5PR0801MB1589; X-Microsoft-Exchange-Diagnostics: 1; DB5PR0801MB1589; 25:N7slHBOv4xArLk9QYt1DdDD1kk8QCFMqE9E/57YHxyLnYGXghTzrZKoBhXV/bdswUv/gstWVydeQdyykSOLKrRa6XUvf8QlwmIky5zYmj0Ca0oGdpEvczv7H52Qf8IKL3xaCXeQEsr1GYT2z9mGBpjKKeVt1UOZKPOm4lO5Gyn+QukNpSJj7V+zpmap+KS0X6EWrGHJZq5nTJapq/OZ+Xj45B/mdgXx+JN6ZOjjyuSzcQZ7YTH7VuY7eMdsLKKeAX+4wyDI4GPn7QJh0U2cNuvj55c929Va9oRm5DhVkAS1K5bv3icdihrLJG0gKSq168j7rzqrohqN4BL34y59BcY2veoVmfjXb5qy4wkW7PJ7+82qizXr1BDfDhC3aC7biUcRl3bJsHN9LCvmQlB+GFLVorjANJzrj4RMb+2R4cw3NsvCvDMIyDd0GsQWDvON6sqXkm24IbAxlHAjf9g10Itw/BcVRQAaUz8/VUsCX3ac6t0DzGkrBluFvRQc00/5PCUTFCMFn51cFqQsIQoAonZ+XuX8sTvA/jbUncxE5lFpzZ9VKr0KAJpHtuJ5IvwDBwlnV2FKnmhgZugbXIPpuyasj6VrTIDm/nszxCyziUupQuN8wXwdc70eCswOCdeHvlpfdipJIEwiBgkByjFSFRShppNkPVZKswR0hi6QMt1mm8SgPNGnfU0mIqIVwXr/Tme9/oDfgwO+YSpGi3q6WziGkbA61kdbMEMZ8KENHI9g= X-Microsoft-Exchange-Diagnostics: 1; DB5PR0801MB1589; 31:E6Oe3wW7aHllg/OwQ4Ztcccx9ZWzzm0bNmpp75vPOWbgadmJce5XoHwq1iSXpKiFX8QnLnkzq63jCEWvh8XToVMmU2DQ8HE79Vy/oa54VuEAaDZ33m29jouMBL78WhuJ4PRoinq3t32L0/T/woDImOfkh0PHnvjuz2jpNH22yD7gB1lx/t50+5uGhEU3FBQWGAHQFul7Cv6LntOugDA8r92LVXT5zr/B6sI+w9xI2ww=; 20:P8THZ+oKUQr8WyCl4mGPcxoOQylOYNhlKHDkSH6TyvzCmn0g8WTJWIQbn5X4aAWACKM1SdCUBq6VlWOuPKMnXmtAjvkj0gYN+jN4rS8vPPxU5oPzda/Ow47pl4xV3rEHkmpIvhbJe0+K+yrDxziwuZr7UFxvlj/EaUInZDal+Po=; 4:JR5Kxckz8K5GonVVrEsNU4DxJ7YU8ilpq2hCHHQStzgm3V2zmvYmWgDaAMocLeTQce3FAlJ9L1b6ZL+lVZn0VW8MPXxpLYoYo6hxL39lWMlIXfWprppmPVmKvhTsKzYw7qG03wYvLxD2Dc9NoSzKaD69IhbiFH3Sq9mRnnuqHKpPY9pZZfqnYn+Oc5WZAdNo/lL35uOFt5oT/Yub8wepA0c5g2zlOrz9Ix4kerU7n2k/6WOqrMuWbkzsA/HNvg+lD/0PXRsWOP0HQq2hEpOPik0fe79W481EfJR2Dfdvx7M7bicmmYK95z3aR/EfSMTW+2xZCXwCyxUCP3R/O/jRJfBMH2Q3g9UKu7i3dVswysfss/24JKw27Mjl4j4/CdtUTN37IAVyGho9AE4B6nXZPssBFmV9ozf5PWmQDue9OPPZAffmuLGsu2kggv0Z0gqreQc3GPqMlCHt7lKTAzuUeHj7uLQbvTGUEZRoHk4rb/B1c4XukU0DtuO/wii0kvqwtYTdFzEPx+eZqWDfWzlylA== NoDisclaimer: True X-Microsoft-Antispam-PRVS: X-Exchange-Antispam-Report-Test: UriScan:(180628864354917)(131327999870524)(21532816269658); X-Exchange-Antispam-Report-CFA-Test: BCL:0; PCL:0; RULEID:(6040176)(601004)(2401047)(8121501046)(5005006)(10201501046)(3002001)(6055026); SRVR:DB5PR0801MB1589; BCL:0; PCL:0; RULEID:; SRVR:DB5PR0801MB1589; X-Forefront-PRVS: 0071BFA85B X-Forefront-Antispam-Report: SFV:NSPM; SFS:(10009020)(4630300001)(6069001)(6009001)(7916002)(199003)(189002)(51234002)(7736002)(19580405001)(19580395003)(81156014)(50226002)(110136003)(4326007)(3846002)(8676002)(305945005)(5003940100001)(6116002)(586003)(450100001)(2906002)(7846002)(50466002)(5660300001)(66066001)(47776003)(97736004)(81166006)(2876002)(189998001)(77096005)(68736007)(48376002)(42186005)(106356001)(92566002)(36756003)(105586002)(101416001)(86152002)(2351001)(229853001)(33646002)(86362001)(50986999)(3714002)(579004)(569005); DIR:OUT; SFP:1101; SCL:1; SRVR:DB5PR0801MB1589; H:localhost.localdomain; FPR:; SPF:None; PTR:InfoNoRecords; A:1; MX:1; LANG:en; X-Microsoft-Exchange-Diagnostics: =?us-ascii?Q?1; DB5PR0801MB1589; 23:attilqoO47Z9d4apdCVELjyl8eX5FqAb+LBVscM?= =?us-ascii?Q?cuARdCS5eTIVikcHZQDwiiOOeriFGtIhq9XnQRlg8CvVYDOl8XxymjYrAbNx?= =?us-ascii?Q?x36jb/N0RKzpAETjCAmFjxnlgmlIucsdDJ25CBpA7MYpSIyfm+RCOv/MpfbS?= =?us-ascii?Q?iUfZ1GtReeV//xOJu8GhE8d5gpyJ9oWNgr8a61fLAwbjvRwv1s5lgfLKf/s/?= =?us-ascii?Q?q4ivTlKu24Ommx3KiMPOcD1BL7F2A9+IfTcp9mfwUOWepDpnc8odYYzCz4TP?= =?us-ascii?Q?S1PRVwAEgf4jExaqctS2/h/IwZX+G4kpBM4o6zopEe+aNjEKbY5TR7diY8J1?= =?us-ascii?Q?599UqYMVcJvPJXhoiiaPzese8jnU9Tqi86AW3n7whaMgsko5SJc1a3Fio+sr?= =?us-ascii?Q?sUeW90QbE4APq9NIzPK3LIsSXQ4ym5ffFBuKTDNBGcUWFKFTE+3RZgnqh9iF?= =?us-ascii?Q?W3pzoKyQ3QyobTt/apl/gDXYy3aMXVCk1qtbX3kvs72d3YccBEDdG16MCZg2?= =?us-ascii?Q?6jYWgGDD1F7ZDze1QFfbw3sJfFssdaCwYaUE07w3G6YRkKNYJyvpmNwtWRLX?= =?us-ascii?Q?F5zhiobZ8Ogd5ZGMJfaOvzOc+J2suIzkcrx3HwZFwL8nAet/DpFliCED03Go?= =?us-ascii?Q?kcKemOfNN7qrULJ1YjCmf/V3vAXJtEI21G0VI1ywE/DpNQHetpyynNdQgFr9?= =?us-ascii?Q?DPVdh/3k794QbV7iEPQyROrNfpXmyaT/CcCEPNtsvG5qeh4Jlt2kJ9UkeCoa?= =?us-ascii?Q?hvT2Z517kfAHgQwElEPetNW+1QiixSzy7LKGq9kByW4a7GUNsrsEDbAVnzYb?= =?us-ascii?Q?wGkUT6JT0GhKZXSwrBiTD42pDK/PrIz6OB7apf0fXQRU+VhQ7iqQsP+/+z+L?= =?us-ascii?Q?f72nLcFc2RL8h31eMNlJcGgSgS/hs8wpzIe9d3+b+Sry28VZlkd5BQSu8DXd?= =?us-ascii?Q?ZjWV5q+PfkTDW3uPS6rKCbcijoMWwGYMBCO1emkOnm/n5ONznEvPXHeryJhX?= =?us-ascii?Q?amdyjkbjSght87A+GJSKgdzXTCDiFAsiTJGUhzCD1sbJ88dGOBZVlLug/4Iy?= =?us-ascii?Q?oa+WiwJiutm3LEhXJbI/gMMkJJHAv/w9GMgV7pe6m+LM6K700H9GfXNVTwDB?= =?us-ascii?Q?6x9nIX4m0jjH1xtR1ZTP0mmgwvMv0DLDtWZ093nerBCVf7SOdWxADzkP87KJ?= =?us-ascii?Q?n+OqY/Us31kbUomiFgYpjYMXLgx259xrLZUpu?= X-Microsoft-Exchange-Diagnostics: 1; DB5PR0801MB1589; 6:tq9RXujcGvAJEG4lR+IJy2cnS/rUoNBetcR+nENGkqubfcPJ3tsaneJ2PeTDGy4xhplNDpq0r+vCXDpTmpVLyGtC8s3wDTWwJa3jVhT3JKlJ7ymM6iWT1rovBqDfzjPg6wylGKy8erXaX2g11Nox/YseQKgu1s9Ehxdu15lO4BM0RCnQ5DreBoaW0WrRvQqoz4S0uxUoeaPGDRmG/TYYtRKk4r8W2jwCLJXNbjrQjj1hqOrHu0VEtnz8oGuai4B2dJzHS0xRCbP+POG2SC/4AqvhVs5DXU/j4aP3fjsssCVzgLNUWRSQZNdfm7YYRmtk9q/3IBMlVOZM+pDfAiU5ig==; 5:GhN45T02LTqmpBayUMFQg7Y2N161V5xfhkjgwQ9ZiZGU5hvP6glb2CXnfN3TlKbGb0NT4akGwqXTJfvmnHlyVEfxPgAoWNehgRA9pRKLsJL2iV68PNPFbXekK/6szysyRXFia/v9mqzH/c3zcpQgZg==; 24:rZ/H4h7NeIRALSlzWUnEaEPOkZni9b0YmBUecwk5CVtZHLBItHAt9yO8NZ6BYG1oeY8Gw2jil2Ag/AlxS0137G5Tn1wXCetsnX4LbuIE07U=; 7:h4NhSgMHCp1fusY/9j3PT01nsZPj0a6so4CJG80NEaR0QrBJh3tBNBhJd7RvBwd3XK9RRWzNkXuaN69jNZCl60m9mEv5kgV0xFli3Og5NERn/0a7Mi8DH3/fVUjWiBBN7tXOved+t/5z1RhfXo5LXINbpeDMOyHeg95JV0bdumDKWEZMnKm2ugWBAFG9iJ5uLn2+cTdnMlq9fYE+Jgy3Tgcsw6/0Mmzrrp+J1IG+lJ/LxqDa5dkCV1jx1bLr25K6NWGvELzbVfzI3og5Jh9C6FwnlKoAYyM7hXp5wYxuwP0AayKPmnOw8UUYepy/bR6G SpamDiagnosticOutput: 1:99 SpamDiagnosticMetadata: NSPM X-Microsoft-Exchange-Diagnostics: 1; DB5PR0801MB1589; 20:H/hWtR9u3DgwP5aw56yLYipOGSUyV0vBAkXMulKcBVYhXzWMiXQ0gRaAQb9ff0eEzgKQZNxxGBGuIg/2o2VzAvpsaccQYEhDr2ezYhpE3d6Vv/tSCR+Qx/UqRYmA0uspe+9TWTrug/W8WcpVnK/XLZb5CV4fCCf69Wns1O6BAho= X-OriginatorOrg: arm.com X-MS-Exchange-CrossTenant-OriginalArrivalTime: 20 Sep 2016 19:53:50.6588 (UTC) X-MS-Exchange-CrossTenant-FromEntityHeader: Hosted X-MS-Exchange-Transport-CrossTenantHeadersStamped: DB5PR0801MB1589 X-MC-Unique: 7_ety_HEOsms7bHQdvDvQA-1 Cc: Ola Liljedahl , nd@arm.com Subject: [lng-odp] [RFC] A prototype of a SW scheduler for ODP X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Ola Liljedahl A Monkeys Can Code Production <*- Locks are for Lamers -*> A high performance SW scheduler for ODP A queue and scheduler design attempting to use lock-free and lock-less synchronisation where possible and to minimise ordering and synchronisation between threads. Optimised for ARM (specifically Cortex-A53) targets. Builds and runs on x86 (-64) but no attempt to optimise performance here. Simple performance benchmark, pushing 2048 events through 20 queues (which takes a few milliseconds). Avg cycles for single-event enqueue/schedule operations on Cortex-A53@1.5GHz CPU's atomic parallel ordered 1 183 222 388 2 254 282 450 3 269 333 489 A presentation and discussion is scheduled for the ODP Design Sprint at Linaro Connect Las Vegas. Signed-off-by: Ola Liljedahl --- LICENSE | 28 + Makefile | 164 +++++ llqueue.c | 363 +++++++++++ llsc.c | 254 ++++++++ scheduler.c | 2042 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 2851 insertions(+) create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 llqueue.c create mode 100644 llsc.c create mode 100644 scheduler.c -- 2.1.4 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..15fdb21 --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2016, ARM Limited. All rights reserved. + +SPDX-License-Identifier: BSD-3-Clause + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, this +list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +Neither the name of ARM Limited nor the names of its contributors may be +used to endorse or promote products derived from this software without specific +prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ac7cd6b --- /dev/null +++ b/Makefile @@ -0,0 +1,164 @@ +############################################################################### +# Copyright (c) 2016, ARM Limited. All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +################################################################################ + +############################################################################### +# Project specific definitions +################################################################################ + +#Name of directory and also Dropbox source tar file +DIRNAME = scheduler +#List of executable files to build +TARGETS = scheduler +#List object files for each target +OBJECTS_scheduler = scheduler.o + +#Customizable compiler and linker flags +GCCTARGET = +CCFLAGS += -mcx16#Required for CMPXCHG16 on x86 +#GCCTARGET = aarch64-linux-gnu +#CCFLAGS += -mcpu=cortex-a53 +DEFINE += -DNDEBUG#disable assertions +CCFLAGS += -std=c99 +CCFLAGS += -g -ggdb -Wall +CCFLAGS += -O2 -fno-stack-check -fno-stack-protector +LDFLAGS += -g -ggdb -pthread +LIBS = -lrt + +#Where to find the source files +VPATH += . + +#Default to non-verbose mode (echo command lines) +VERB = @ + +#Location of object and other derived/temporary files +OBJDIR = obj#Must not be . + +############################################################################### +# Make actions (phony targets) +################################################################################ + +.PHONY : default all clean tags etags + +default: + @echo "Make targets:" + @echo "all build all targets ($(TARGETS))" + @echo "clean remove derived files" + @echo "tags generate vi tags file" + @echo "etags generate emacs tags file" + +all : $(TARGETS) + +#Make sure we don't remove current directory with all source files +ifeq ($(OBJDIR),.) +$(error invalid OBJDIR=$(OBJDIR)) +endif +ifeq ($(TARGETS),.) +$(error invalid TARGETS=$(TARGETS)) +endif +clean: + @echo "--- Removing derived files" + $(VERB)-rm -rf $(OBJDIR) $(TARGETS) tags TAGS perf.data perf.data.old + +tags : + $(VERB)ctags -R . + +etags : + $(VERB)ctags -e -R . + +################################################################################ +# Setup tool commands and flags +################################################################################ + +#Defaults to be overriden by compiler makefragment +CCOUT = -o $@ +ASOUT = -o $@ +LDOUT = -o $@ + +ifneq ($(GCCTARGET),) +#Some experimental cross compiling support +#GCCLIB = $(GCCROOT)/lib/gcc/$(GCCTARGET)/4.7.3 +GCCROOT = /opt/gcc-linaro-5.3-2016.02-x86_64_aarch64-linux-gnu +GCCSETUP = PATH=$(GCCROOT)/bin:$(GCCROOT)/$(GCCTARGET)/bin:/bin:/usr/bin +CC = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-gcc +CXX = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++ +LD = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++ +else +#Native compilation +ifeq ($(CLANG),yes) +CC = clang +CXX = clang++ +AS = as +LD = clang++ +else +CC = gcc +CXX = g++ +AS = as +LD = g++ +endif +endif +#GROUPSTART = -Wl,--start-group +#GROUPEND = -Wl,--end-group +BIN2C = bin2c + +#Important compilation flags +CCFLAGS += -c -MMD -MP + +################################################################################ +# Post-process some variables and definitions, generate dependencies +################################################################################ + +CCFLAGS += $(DEFINE) $(INCLUDE) +#Generate list of all object files (for all targets) +override OBJECTS := $(addprefix $(OBJDIR)/,$(foreach var,$(TARGETS),$(OBJECTS_$(var)))) +#Generate target:objects dependencies for all targets +$(foreach target,$(TARGETS),$(eval $(target) : $$(addprefix $$(OBJDIR)/,$$(OBJECTS_$(target))))) +#Special dependency for object files on object directory +$(OBJECTS) : | $(OBJDIR) + +################################################################################ +# Build recipes +################################################################################ + +$(OBJDIR) : + $(VERB)mkdir -p $(OBJDIR) + +#Keep intermediate pcap C-files +.PRECIOUS : $(OBJDIR)/%_pcap.c + +$(OBJDIR)/%_pcap.o : $(OBJDIR)/%_pcap.c + @echo "--- Compiling $<" + $(VERB)$(CC) $(CCFLAGS) $(CCOUT) $< + +$(OBJDIR)/%_pcap.c : %.pcap + @echo "--- Generating $@" + $(VERB)$(BIN2C) -n $(notdir $(basename $@)) -o $@ $< + +$(OBJDIR)/%.o : %.cc + @echo "--- Compiling $<" + $(VERB)$(CXX) $(CXXFLAGS) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) $< + +$(OBJDIR)/%.o : %.c + @echo "--- Compiling $<" + $(VERB)$(CC) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) $< + +$(OBJDIR)/%.o : %.s + @echo "--- Compiling $<" + $(VERB)$(AS) $(ASFLAGS) $(ASONLYFLAGS) $(ASOUT) $< + +$(OBJDIR)/%.o : %.S + @echo "--- Compiling $<" + $(VERB)$(CC) $(CCFLAGS) $(addprefix $(ASPREFIX),$(ASFLAGS)) $(CCOUT) $< + +$(TARGETS) : + @echo "--- Linking $@ from $(OBJECTS_$@) $(LIBS)" + $(VERB)$(LD) $(LDFLAGS) $(LDOUT) $(addprefix $(OBJDIR)/,$(OBJECTS_$@)) $(GROUPSTART) $(LIBS) $(GROUPEND) $(LDMAP) + +################################################################################ +# Include generated dependencies +################################################################################ + +-include $(patsubst %.o,%.d,$(OBJECTS)) +# DO NOT DELETE diff --git a/llqueue.c b/llqueue.c new file mode 100644 index 0000000..1fecab7 --- /dev/null +++ b/llqueue.c @@ -0,0 +1,363 @@ +//Copyright (c) 2016, ARM Limited. All rights reserved. +// +//SPDX-License-Identifier: BSD-3-Clause + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#undef likely +#undef unlikely +#if defined __GNUC__ +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +/****************************************************************************** + * Linked list queues + *****************************************************************************/ + +struct llnode +{ + struct llnode *next; + uint32_t tag;//For consistency checks +}; + +union llht +{ + struct + { + struct llnode *head, *tail; + } st; + dintptr_t ui; +}; + +struct llqueue +{ + union llht u; +//x86-64 seems faster using spin lock instead of CMPXCHG16 + pthread_spinlock_t lock; +}; + +#define SENTINEL ((void *)~(uintptr_t)0) + +//static void llq_enqueue(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) __attribute__((noinline)); +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) +{ + union llht old; + assert(node->next == NULL); + node->next = SENTINEL; +#ifdef USE_LLSC +retry: //Failed SC requires new LL + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); +#else + __atomic_load(&llq->u, &old, __ATOMIC_RELAXED); +retry: //Failed CAS returns existing value + (void)0;//Need statement after label +#endif + union llht neu; + neu.st.head = old.st.head == NULL ? node : old.st.head; + neu.st.tail = node; +#ifdef USE_LLSC + if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE))) +#else + if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu, + /*weak=*/false, + __ATOMIC_RELEASE, + __ATOMIC_RELAXED))) +#endif + { + //Failed + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + if (old.st.tail != NULL) + { + //List was not empty + assert(old.st.tail->next == SENTINEL); + old.st.tail->next = node; + } +} + +//static void llq_enqueue_l(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) __attribute__((noinline)); +static inline void llq_enqueue_l(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) +{ + assert(node->next == NULL); + node->next = SENTINEL; + pthread_spin_lock(&llq->lock); + if(llq->u.st.head == NULL) + { + llq->u.st.head = llq->u.st.tail = node; + } + else + { + llq->u.st.tail->next = node; + llq->u.st.tail = node; + } + pthread_spin_unlock(&llq->lock); +} + +//static struct llnode *llq_dequeue(struct llqueue *llq, uint32_t *numfailed) __attribute__((noinline)); +static inline struct llnode *llq_dequeue(struct llqueue *llq, uint32_t *numfailed) +{ + struct llnode *head; + + //llq_dequeue() may be used in a busy-waiting fashion + //Read head using plain load to avoid disturbing remote LL/SC + if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL) + { + return NULL; + } + //Read head->next before LL to minimize cache miss latency in LL/SC below + (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED); + + union llht old; +#ifdef USE_LLSC +retry: //Failed SC requires new LL + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); +#else + __atomic_load(&llq->u, &old, __ATOMIC_RELAXED); +retry: //Failed CAS returns existing value +#endif + if (unlikely(old.st.head == NULL)) //Empty list + { + clrex(); + return NULL; + } + else if (unlikely(old.st.head == old.st.tail))//Single-element in list + { + union llht neu; + neu.st.head = NULL; + neu.st.tail = NULL; +#ifdef USE_LLSC + if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))) +#else + if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu, + /*weak=*/false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED))) +#endif + { + //Failed + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + assert(old.st.head->next == SENTINEL); + } + else//Multi-element list, dequeue head + { + struct llnode *next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED); + //Check if llq_enqueue() has yet written true next pointer + if (unlikely(next == SENTINEL)) + { + //Sorry, can't continue + clrex(); + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + union llht neu; + neu.st.head = next; + neu.st.tail = old.st.tail; +#ifdef USE_LLSC + if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))) +#else + if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu, + /*weak=*/false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED))) +#endif + { + //Failed + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + assert(old.st.head->next != SENTINEL); + } + old.st.head->next = NULL; + return old.st.head; +} + +//static struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t *numfailed) __attribute__((noinline)); +static inline struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t *numfailed) +{ + struct llnode *head; + if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL) + { + return NULL; + } + + struct llnode *node = NULL; + pthread_spin_lock(&llq->lock); + if (llq->u.st.head != NULL) + { + node = llq->u.st.head; + if (llq->u.st.head == llq->u.st.tail) + { + assert(node->next == SENTINEL); + llq->u.st.head = llq->u.st.tail = NULL; + } + else + { + assert(node->next != SENTINEL); + llq->u.st.head = node->next; + } + node->next = NULL; + } + pthread_spin_unlock(&llq->lock); + return node; +} + +static struct llnode *llq_dequeue_cond(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) __attribute__((always_inline)); +static inline struct llnode *llq_dequeue_cond(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) +{ + union llht old; +#ifdef USE_LLSC +retry: //Failed SC requires new LL + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); +#else + __atomic_load(&llq->u, &old, __ATOMIC_RELAXED); +retry: //Failed CAS returns existing value +#endif + if (unlikely(old.st.head == NULL || old.st.head != exp)) //Empty list or wrong head + { + clrex(); + return NULL; + } + else if (unlikely(old.st.head == old.st.tail))//Single-element in list + { + union llht neu; + neu.st.head = NULL; + neu.st.tail = NULL; +#ifdef USE_LLSC + if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))) +#else + if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu, + /*weak=*/false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED))) +#endif + { + //Failed + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + assert(old.st.head->next == SENTINEL); + } + else//Multi-element list, dequeue head + { + struct llnode *next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED); + //Check if llq_enqueue() has yet written true next pointer + if (unlikely(next == SENTINEL)) + { + //Sorry, can't continue + clrex(); + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + union llht neu; + neu.st.head = next; + neu.st.tail = old.st.tail; +#ifdef USE_LLSC + if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))) +#else + if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu, + /*weak=*/false, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED))) +#endif + { + //Failed + doze(); + if (numfailed != NULL) + (*numfailed)++; + goto retry; + } + assert(old.st.head->next != SENTINEL); + } + old.st.head->next = NULL; + return old.st.head; +} + +//static struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) __attribute__((noinline)); +static inline struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) +{ + struct llnode *node = NULL; + pthread_spin_lock(&llq->lock); + if (likely(llq->u.st.head != NULL && llq->u.st.head == exp)) + { + node = llq->u.st.head; + if (llq->u.st.head == llq->u.st.tail) + { + assert(node->next == SENTINEL); + llq->u.st.head = llq->u.st.tail = NULL; + } + else + { + assert(node->next != SENTINEL); + llq->u.st.head = node->next; + } + node->next = NULL; + } + pthread_spin_unlock(&llq->lock); + return node; +} + +static inline struct llnode *llq_head(struct llqueue *llq) +{ + return llq->u.st.head; +} + +static inline uint32_t llq_assert(struct llqueue *llq) +{ + uint32_t nelems = 0; + struct llnode *node = llq->u.st.head; + if (node != NULL) + { + uint32_t tag = node->tag + 1; + node->tag = tag; + nelems++; + //Find last element in list + while (node->next != SENTINEL) + { + node = node->next; + assert(node->tag != tag); + node->tag = tag; + nelems++; + } + //Tail must point to last element + assert(llq->u.st.tail == node); + } + else//No elements in list + { + assert(llq->u.st.tail == NULL); + } + return nelems; +} + +static void llqueue_init(struct llqueue *llq) +{ + llq->u.st.head = NULL; + llq->u.st.tail = NULL; + pthread_spin_init(&llq->lock, PTHREAD_PROCESS_PRIVATE); +} diff --git a/llsc.c b/llsc.c new file mode 100644 index 0000000..b09d122 --- /dev/null +++ b/llsc.c @@ -0,0 +1,254 @@ +//Copyright (c) 2016, ARM Limited. All rights reserved. +// +//SPDX-License-Identifier: BSD-3-Clause + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#undef likely +#undef unlikely +#if defined __GNUC__ +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +#define ALIGNED(x) __attribute__((__aligned__(x))) +#define CACHE_LINE 64 + +/****************************************************************************** + * LL/SC primitives + *****************************************************************************/ + +#if defined __ARM_ARCH && __ARM_ARCH == 7 +static inline void dmb() +{ + __asm __volatile("dmb" : : : "memory"); +} + +static inline uint32_t ll(uint32_t *var, int mm) +{ + uint32_t old; + __asm __volatile("ldrex %0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + //Barrier after an acquiring load + if (mm == __ATOMIC_ACQUIRE) + dmb(); + return old; +} +#define ll32(a, b) ll((a), (b)) + +//Return 0 on success, 1 on failure +static inline uint32_t sc(uint32_t *var, uint32_t neu, int mm) +{ + uint32_t ret; + //Barrier before a releasing store + if (mm == __ATOMIC_RELEASE) + dmb(); + __asm __volatile("strex %0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + return ret; +} +#define sc32(a, b, c) sc((a), (b), (c)) + +static inline uint64_t lld(uint64_t *var, int mm) +{ + uint64_t old; + __asm __volatile("ldrexd %0, %H0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + //Barrier after an acquiring load + if (mm == __ATOMIC_ACQUIRE) + dmb(); + return old; +} +#define ll64(a, b) lld((a), (b)) + +//Return 0 on success, 1 on failure +static inline uint32_t scd(uint64_t *var, uint64_t neu, int mm) +{ + uint32_t ret; + //Barrier before a releasing store + if (mm == __ATOMIC_RELEASE) + dmb(); + __asm __volatile("strexd %0, %1, %H1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + return ret; +} +#define sc64(a, b, c) scd((a), (b), (c)) + +#endif + +#if defined __ARM_ARCH && __ARM_ARCH == 8 +static inline uint32_t ll32(uint32_t *var, int mm) +{ + uint32_t old; + if (mm == __ATOMIC_ACQUIRE) + __asm __volatile("ldaxr %w0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("ldxr %w0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + abort(); + return old; +} + +//Return 0 on success, 1 on failure +static inline uint32_t sc32(uint32_t *var, uint32_t neu, int mm) +{ + uint32_t ret; + if (mm == __ATOMIC_RELEASE) + __asm __volatile("stlxr %w0, %w1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("stxr %w0, %w1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + else + abort(); + return ret; +} + +static inline uint64_t ll(uint64_t *var, int mm) +{ + uint64_t old; + if (mm == __ATOMIC_ACQUIRE) + __asm __volatile("ldaxr %0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("ldxr %0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + abort(); + return old; +} +#define ll64(a, b) ll((a), (b)) + +//Return 0 on success, 1 on failure +static inline uint32_t sc(uint64_t *var, uint64_t neu, int mm) +{ + uint32_t ret; + if (mm == __ATOMIC_RELEASE) + __asm __volatile("stlxr %w0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("stxr %w0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + else + abort(); + return ret; +} +#define sc64(a, b, c) sc((a), (b), (c)) + +static inline __int128 lld(__int128 *var, int mm) +{ + __int128 old; + if (mm == __ATOMIC_ACQUIRE) + __asm __volatile("ldaxp %0, %H0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("ldxp %0, %H0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + abort(); + return old; +} + +//Return 0 on success, 1 on failure +static inline uint32_t scd(__int128 *var, __int128 neu, int mm) +{ + uint32_t ret; + if (mm == __ATOMIC_RELEASE) + __asm __volatile("stlxp %w0, %1, %H1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm __volatile("stxp %w0, %1, %H1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + else + abort(); + return ret; +} +#endif + +//Clear exclusive monitor, used when LL is not followed by SC +static inline void clrex(void) +{ +#if defined __ARM_ARCH + __asm __volatile("clrex" : : : ); +#endif +} + +static inline void sevl(void) +{ +#if defined __ARM_ARCH + __asm __volatile("sevl" : : : ); +#endif +} + +static inline void wfe(void) +{ +#if defined __ARM_ARCH + __asm __volatile("wfe" : : : ); +#endif +} + +static inline void doze(void) +{ +#if defined __ARM_ARCH + //YIELD hints the CPU to switch to another thread if available + //but otherwise executes as a NOP +// __asm __volatile("yield" : : : "memory"); + //ISB flushes the pipeline, then restarts. This is guaranteed to stall + //the CPU a number of cycles + __asm __volatile("isb" : : : "memory"); +#else + //Assume x86 + __asm __volatile("pause" : : : "memory"); +#endif +} + +//The scalar equivalent of a double pointer +#if __SIZEOF_PTRDIFF_T__ == 4 +typedef uint64_t dintptr_t; +#endif +#if __SIZEOF_PTRDIFF_T__ == 8 +typedef __int128 dintptr_t; +#endif diff --git a/scheduler.c b/scheduler.c new file mode 100644 index 0000000..7faee05 --- /dev/null +++ b/scheduler.c @@ -0,0 +1,2042 @@ +//Copyright (c) 2016, ARM Limited. All rights reserved. +// +//SPDX-License-Identifier: BSD-3-Clause + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//#define LOG + +#ifdef __ARM_ARCH +#define USE_LLSC +#endif + +#if defined __GNUC__ +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +//Function to set breakpoint on +void bp(void) __attribute((noinline)); +void bp(void) +{ +} + + +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +//Enable for Cortex-A57! +#if 0 +//Implement store-release (STLR) using DMB; STR (store-relaxed). +//This alternative is interesting to test since it has proven more +//performant in some cases on A57. +//We implement this using a macro since it is used with different types of +//parameters. +#define far_atomic_store(_ptr, _val, _mo) \ +do \ +{ \ + if ((_mo) == __ATOMIC_RELEASE) \ + { \ + __asm __volatile("dmb ishst" ::: "memory"); \ + __atomic_store_n((_ptr), (_val), __ATOMIC_RELAXED); \ + } \ + else \ + __atomic_store_n((_ptr), (_val), (_mo)); \ +} \ +while (0) +#else +#define far_atomic_store(_ptr, _val, _mo) \ + __atomic_store_n((_ptr), (_val), (_mo)) +#endif + +//Possibly, store-release a ticket after CAS can use store-relaxed +//Possibly, this has less overhead for the issuing thread +#define __ATOMIC_RELEASE_AFTER_CAS __ATOMIC_RELEASE + +#define CAS_WEAK false + +static bool VERBOSE = false; + +static inline bool is_power_of_two(uint32_t n) +{ + return n != 0 && (n & (n - 1)) == 0; +} + +//Thread priority and scheduling +#define PRIO 1 +#define SCHED SCHED_FIFO +//#define SCHED SCHED_OTHER + +#define ALIGNED(x) __attribute__((__aligned__(x))) +#define CACHE_LINE 64 + +/****************************************************************************** + * Linked list queue and its LL/SC support + *****************************************************************************/ + +#include "llsc.c" +#include "llqueue.c" + +/****************************************************************************** + * Type and forward declarations + *****************************************************************************/ + +//Max 64 threads +typedef uint64_t odp_thrmask_t; +#define ODP_THRMASK_ALL ((uint64_t)~0ULL) + +typedef union +{ + struct + { + struct llqueue llq; + uint32_t prio; + };//Anonymous struct, access members directly + char dummy[CACHE_LINE];//Required so that sched_queue is size of alignment +} sched_queue ALIGNED(CACHE_LINE); + +struct odp_event_s; +typedef struct odp_event_s *odp_event_t; +#define ODP_EVENT_INVALID ((odp_event_t)NULL) + +struct sched_obj;//Scheduler objects are the elements of the scheduler queues +typedef struct sched_obj *odp_queue_t;//ODP queues are scheduler objects +#define ODP_QUEUE_INVALID ((odp_queue_t)NULL) + +struct sched_group; +typedef uint64_t sched_group_mask_t; +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT) //E.g. 64 +typedef uint32_t odp_schedule_group_t;//1..MAX_SCHED_GROUP +#define ODP_SCHED_GROUP_INVALID 0 + +static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp, + uint32_t prio); + +#define NUM_PRIO 4 //High, medium, low and below priorities +#define PRIO_MED (NUM_PRIO / 2) + +static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num); + +/******************************************************************************* + * Per thread state + ******************************************************************************/ + +struct reorder_context; +struct reorder_window; +struct odp_event_s; + +static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn); +static void rwin_insert(struct reorder_window *rwin, + struct reorder_context *rctx, + uint32_t sn, + void (*callback)(const struct reorder_context *)); +static struct odp_event_s *event_next_get(struct odp_event_s *evt); +static odp_queue_t event_queue_get(struct odp_event_s *evt); +static uint32_t event_number_get(struct odp_event_s *evt); + +struct reorder_context +{ + struct odp_event_s *head, *tail;//Linked list of deferred events + struct reorder_window *rwin;//Reorder window for source queue (or whatever) + uint32_t *rvec_free;//Pointer to TS->rvec_free + uint32_t sn;//Our slot in the reorder window + uint16_t idx;//Our index in thread_state rvec array + uint16_t olock_flags; +} ALIGNED(CACHE_LINE); + +static inline void rctx_init(struct reorder_context *rctx, uint32_t *rvec_free, uint16_t idx, struct reorder_window *rwin) +{ + rctx->head = rctx->tail = NULL; + rctx->rwin = rwin; + rctx->rvec_free = rvec_free; + rctx->sn = 0; + rctx->idx = idx; + rctx->olock_flags = 0; + //Clear free bit + assert((*rctx->rvec_free & (1U << rctx->idx)) != 0); + __atomic_fetch_and(rctx->rvec_free, ~(1U << rctx->idx), __ATOMIC_RELAXED); +} + +static inline void rctx_free(const struct reorder_context *rctx) +{ + assert(rctx->rwin != NULL); + //Set free bit + assert((*rctx->rvec_free & (1U << rctx->idx)) == 0); + //Relaxed order is OK since we haven't written to the reorder_context + __atomic_fetch_or(rctx->rvec_free, 1U << rctx->idx, __ATOMIC_RELAXED); +} + +static void olock_release(const struct reorder_context *rctx); + +//rctx_retire may be called by any thread +static void rctx_retire(const struct reorder_context *rctx) +{ + struct odp_event_s *evt = rctx->head; + while (likely(evt != NULL)) + { + struct odp_event_s *next = event_next_get(evt); + //Prefetch next event + __builtin_prefetch(next, 0, 0); + int rc = odp_queue_enq(event_queue_get(evt), &evt, 1); + if (unlikely(rc != 1)) + { + fprintf(stderr, "rctx_retire: failed to enqueue event %p/%u on queue %p\n", evt, event_number_get(evt), event_queue_get(evt)); + fflush(NULL); abort(); + } + evt = next; + } + olock_release(rctx); + rctx_free(rctx); +} + +static inline void rctx_release(struct reorder_context *rctx) +{ + assert((*rctx->rvec_free & (1U << rctx->idx)) == 0); + //Insert reorder context into reorder window, potentially calling the + //rctx_retire function for all pending reorder_contexts + rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire); +} + +#define TS_RVEC_SIZE 16 + +struct thread_state +{ + struct sched_obj *atomq;//Atomic queue currently being processed or NULL + struct reorder_context *rctx;//Current reorder context or NULL + bool pause; + bool out_of_order; + uint32_t tidx;//Thread index + uint32_t ticket;//Ticket for atomic queue or TICKET_INVALID + uint32_t rvec_free;//Bitset of free entries in rvec + uint16_t num_schedq; + uint16_t sg_sem;//Set when sg_wanted is modified by other thread + sched_group_mask_t sg_actual[NUM_PRIO];//Current sched_group membership + sched_group_mask_t sg_wanted[NUM_PRIO];//Future sched_group membership +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * NUM_PRIO) + sched_queue *schedq_list[SCHEDQ_PER_THREAD]; + struct reorder_context rvec[TS_RVEC_SIZE]; +} ALIGNED(CACHE_LINE); + +#define MAXTHREADS 32 + +static struct thread_state thread_state[MAXTHREADS]; +static uint32_t NUMTHREADS = 2; +static __thread struct thread_state *TS; + +static void thread_state_init(int tidx) +{ + struct thread_state *ts = &thread_state[tidx]; + ts->atomq = ODP_QUEUE_INVALID; + ts->rctx = NULL; + ts->pause = false; + ts->out_of_order = false; + ts->tidx = tidx; + ts->rvec_free = 0; + assert(TS_RVEC_SIZE <= sizeof(ts->rvec_free) * CHAR_BIT); + ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1; + ts->num_schedq = 0; + ts->sg_sem = 1;//Start with sched group semaphore changed + memset(ts->sg_actual, 0, sizeof ts->sg_actual); + //clear ts->sg_wanted;//This might already have been set + TS = ts; +} + +static void insert_schedq_in_list(struct thread_state *ts, + sched_queue *schedq) +{ + //Find slot for schedq + for (uint32_t i = 0; i < ts->num_schedq; i++) + { + //Higher value is higher priority and closer to start of list + if (schedq->prio >= ts->schedq_list[i]->prio) + { + //This is the slot! + sched_queue *tmp = ts->schedq_list[i]; + ts->schedq_list[i] = schedq; + schedq = tmp; + //Continue the insertion procedure with the new schedq + } + } + //Insert schedq at end of list + if (ts->num_schedq == SCHEDQ_PER_THREAD) + { + fprintf(stderr, "insert_schedq_in_list: too many schedq's\n"); + abort(); + } + ts->schedq_list[ts->num_schedq++] = schedq; +} + +static void remove_schedq_from_list(struct thread_state *ts, + sched_queue *schedq) +{ + //Find schedq + for (uint32_t i = 0; i < ts->num_schedq; i++) + { + if (ts->schedq_list[i] == schedq) + { + //Move remaining schedq's + for (uint32_t j = i + 1; j < ts->num_schedq; j++) + { + ts->schedq_list[j - 1] = ts->schedq_list[j]; + } + ts->num_schedq--; + return; + } + } + //schedq not found, internal error + fprintf(stderr, "remove_schedq_from_list: schedq not found\n"); + abort(); +} + +/****************************************************************************** + * Scheduler queues + *****************************************************************************/ + +typedef enum +{ + pktio, parallel_q, ordered_q, atomic_q +} sched_obj_type; + +static inline void schedq_init(sched_queue *schedq, uint32_t prio) +{ + llqueue_init(&schedq->llq); + schedq->prio = prio; +} + +static inline struct sched_obj *schedq_peek(sched_queue *schedq) +{ + return (struct sched_obj *)llq_head(&schedq->llq); +} + +static bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj) __attribute__((always_inline)); +static inline bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj) +{ + return llq_dequeue_cond(&schedq->llq, (struct llnode *)obj, NULL) == + (struct llnode *)obj; +} + +static void schedq_push(sched_queue *schedq, + struct sched_obj *obj) +{ + llq_enqueue(&schedq->llq, (struct llnode *)obj, NULL); +} + +/****************************************************************************** + * ODP event + *****************************************************************************/ + +struct odp_event_s +{ + struct odp_event_s *next;//Next pointer for linked list + odp_queue_t queue;//Queue this event is destined for + //Below are fields used by the application + unsigned fromqidx; + unsigned number; +}; + +static odp_event_t odp_event_alloc(void) +{ + struct odp_event_s *evt = aligned_alloc(CACHE_LINE, + sizeof(struct odp_event_s)); + if (unlikely(evt == NULL)) + return ODP_EVENT_INVALID; + return evt; +} + +static inline struct odp_event_s *event_next_get(struct odp_event_s *evt) +{ + return evt->next; +} + +static inline void event_next_set(struct odp_event_s *evt, struct odp_event_s *nxt) +{ + evt->next = nxt; +} + +static inline odp_queue_t event_queue_get(struct odp_event_s *evt) +{ + return evt->queue; +} + +static inline void event_queue_set(struct odp_event_s *evt, odp_queue_t q) +{ + evt->queue = q; +} + +static inline uint32_t event_number_get(struct odp_event_s *evt) +{ + return evt->number; +} + +/****************************************************************************** + * Reorder window + *****************************************************************************/ + +struct hc +{ + uint32_t head;//First missing context + uint32_t chgi;//Change indicator +} ALIGNED(sizeof(uint64_t)); + +#define RWIN_SIZE 32 //Should be at least one per CPU + +#define NUM_OLOCKS 2 + +struct reorder_window +{ + struct hc hc;//head and chgi + uint32_t winmask; + uint32_t tail; + uint32_t turn; + uint16_t lock_count ALIGNED(CACHE_LINE);//Force new cache line + uint32_t olock[NUM_OLOCKS]; + struct reorder_context *ring[RWIN_SIZE] ALIGNED(CACHE_LINE);//Force new cache line +}; + +static inline void olock_unlock(struct thread_state *ts, const struct reorder_context *rctx, struct reorder_window *rwin, unsigned lock_index) +{ + if ((rctx->olock_flags & (1U << lock_index)) == 0) + { + //Lock not used +#ifdef LOG +if (VERBOSE) printf("%u: release %p->olock[%u]=%u\n", TS->tidx, rwin, lock_index, rctx->sn + 1); +#endif + //Use relaxed ordering, we are not releasing any updates + far_atomic_store(&rwin->olock[lock_index], + rctx->sn + 1, + __ATOMIC_RELAXED); + } +} + +static void olock_release(const struct reorder_context *rctx) +{ + struct reorder_window *rwin = rctx->rwin; +#ifdef LOG +if (VERBOSE) printf("%u: release sn=%u %p->olock[0]=%u olock_flags=%x\n", TS->tidx, rctx->sn, rwin, rwin->olock[0], rctx->olock_flags); +#endif + if (unlikely(rwin->lock_count != 0)) + { + olock_unlock(TS, rctx, rwin, 0); + if (rwin->lock_count != 1) + { + olock_unlock(TS, rctx, rwin, 1); + } + } + assert(NUM_OLOCKS == 2); +} + +static struct reorder_window *rwin_alloc(unsigned lock_count) +{ + assert(is_power_of_two(RWIN_SIZE)); + struct reorder_window *rwin = aligned_alloc(CACHE_LINE, sizeof(struct reorder_window)); + if (rwin != NULL) + { + assert(offsetof(struct reorder_window, hc) == 0); + assert(offsetof(struct reorder_window, lock_count) == CACHE_LINE); + assert(offsetof(struct reorder_window, ring) == 2 * CACHE_LINE); + rwin->hc.head = 0; + rwin->hc.chgi = 0; + rwin->winmask = RWIN_SIZE - 1; + rwin->tail = 0; + rwin->turn = 0; + rwin->lock_count = (uint16_t)lock_count; + memset(rwin->olock, 0, sizeof rwin->olock); + for (uint32_t i = 0; i < RWIN_SIZE; i++) + rwin->ring[i] = NULL; + } + return rwin; +} + +static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn) +{ + uint32_t head, oldt, newt; + //Read head and tail separately +#ifndef USE_LLSC + oldt = __atomic_load_n(&rwin->tail, __ATOMIC_RELAXED); +#endif + do + { + head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED); +#ifdef USE_LLSC + oldt = ll32(&rwin->tail, __ATOMIC_RELAXED); +#endif + if (unlikely(oldt - head >= rwin->winmask)) + { + return false; + } + newt = oldt + 1; + } +#ifdef USE_LLSC + while (unlikely(sc32(&rwin->tail, newt, __ATOMIC_RELAXED))); +#else + while (!__atomic_compare_exchange(&rwin->tail, + &oldt, + &newt, + CAS_WEAK, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); +#endif + *sn = oldt; + return true; +} + +static void rwin_insert(struct reorder_window *rwin, + struct reorder_context *rctx, + uint32_t sn, + void (*callback)(const struct reorder_context *)) +{ + struct hc old; + __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE); + uint32_t winmask = rwin->winmask; + if (old.head != sn) + { + //We are out-of-order + //Store context in reorder window, releasing its content + assert(rwin->ring[sn & winmask] == NULL); + __atomic_store_n(&rwin->ring[sn & winmask], rctx, __ATOMIC_RELEASE); + rctx = NULL; + + do + { + struct hc new; + new.head = old.head; + new.chgi = old.chgi + 1;//Changed value + //Update head&chgi, fail if any has changed + if (__atomic_compare_exchange(&rwin->hc, + &old,//Updated on failure + &new, + CAS_WEAK, + __ATOMIC_RELEASE,//Release our ring update + __ATOMIC_ACQUIRE)) + { + //CAS succeeded => head same (we are not in-order), chgi updated + return; + } + //CAS failed => head and/or chgi changed + //We might not be out-of-order anymore + } + while (old.head != sn); + //old.head == sn => we are now in-order! + } + + assert(old.head == sn); + //We are in-order so our responsibility to retire contexts + struct hc new; + new.head = old.head; + new.chgi = old.chgi + 1;//Changed value + + //Retire our in-order context (if we still have it) + if (rctx != NULL) + { + callback(rctx); + new.head++; + } + + //Retire in-order contexts in the ring + //The first context might actually be ours (if we were originally + //out-of-order) + do + { + for (;;) + { + rctx = __atomic_load_n(&rwin->ring[new.head & winmask], + __ATOMIC_ACQUIRE); + if (rctx == NULL) + break; + //We are the only thread that are in-order (until head updated) + //so don't have to use atomic load-and-clear (exchange) + rwin->ring[new.head & winmask] = NULL; + callback(rctx); + new.head++; + } + } + //Update head&chgi, fail if chgi has changed (head cannot change) + while (!__atomic_compare_exchange(&rwin->hc, + &old,//Updated on failure + &new, + /*weak=*/false, + __ATOMIC_RELEASE,//Release our ring updates + __ATOMIC_ACQUIRE)); +} + +/****************************************************************************** + * sched_obj aka ODP queue + *****************************************************************************/ + +//Number of events that can be stored in a queue +#define RING_SIZE 2048 + +typedef uint32_t ringidx_t; +struct ringstate +{ + ringidx_t read; + ringidx_t write; +} ALIGNED(8); +#define RINGSIZE_MAX (1U << 31) + +struct sharedstate +{ + uint32_t numevts; + uint16_t cur_ticket; + uint16_t nxt_ticket; +} ALIGNED(sizeof(uint32_t) * 2); +#define TICKET_INVALID (uint32_t)(~0U) + +struct ring +{ + struct ringstate prod; + struct ringstate cons; + struct sharedstate shared; + uint32_t mask; + odp_event_t ring[RING_SIZE] ALIGNED(CACHE_LINE); +}; + +struct sched_obj//May actually be an ODP queue +{ + struct llnode node; + sched_queue *schedq;//Which schedq we belong to + sched_obj_type type; + void *user_ctx; + struct reorder_window *rwin; + struct ring queue; +} ALIGNED(CACHE_LINE); + +static inline struct reorder_window *queue_rwin_get(const odp_queue_t q) +{ + return q->rwin; +} + +static inline bool queue_is_empty(const odp_queue_t q) +{ + return q->queue.cons.read == q->queue.cons.write; +} + +static inline ringidx_t ringstate_num_used(struct ringstate rs) +{ + return (ringidx_t)(rs.write - rs.read); +} + +static inline ringidx_t ringstate_num_free(struct ringstate rs) +{ + return RING_SIZE - (ringidx_t)(rs.write - rs.read); +} + +static odp_queue_t _odp_queue_create(uint32_t prio, + sched_obj_type sync, + odp_schedule_group_t group, + unsigned lock_count, + void *user_ctx) +{ + if (lock_count > (sync == ordered_q ? NUM_OLOCKS : 0)) + return NULL; + odp_queue_t q = aligned_alloc(CACHE_LINE, sizeof(struct sched_obj)); + if (q == NULL) + perror("aligned_alloc"), exit(EXIT_FAILURE); + q->schedq = schedq_from_sched_group(group, prio); + q->type = sync; + q->user_ctx = user_ctx; + assert(is_power_of_two(RING_SIZE)); + q->queue.prod.read = 0; + q->queue.prod.write = 0; + q->queue.cons.read = 0; + q->queue.cons.write = 0; + q->queue.shared.numevts = 0; + q->queue.shared.cur_ticket = 0; + q->queue.shared.nxt_ticket = 0; + q->queue.mask = RING_SIZE - 1; + for (uint32_t i = 0; i < RING_SIZE; i++) + { + q->queue.ring[i] = ODP_EVENT_INVALID; + } + q->rwin = NULL; + if (sync == ordered_q) + { + q->rwin = rwin_alloc(lock_count); + if (q->rwin == NULL) + perror("rwin_alloc"), exit(EXIT_FAILURE); + } + assert(queue_is_empty(q)); + return q; +} + +static const char *qtype2str(odp_queue_t q) +{ + switch (q->type) + { + case pktio : + return "pktio"; + case parallel_q : + return "parallel"; + case ordered_q : + return "ordered"; + case atomic_q : + return "atomic"; + } + return "?"; +} + +static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num) +{ + struct thread_state *ts = TS; + if (unlikely(ts->out_of_order))//unlikely() improves performance for atomic and parallel queues but degrades it for ordered queues + { + int i = 0; + struct reorder_context *rctx = ts->rctx; + assert(ts->rctx != NULL); + while (i < num) + { +#ifdef LOG +if (VERBOSE) printf("%u: Deferring enqueue event %p/%u on queue %p\n", TS->tidx, ev[i], ev[i]->number, q); +#endif + event_queue_set(ev[i], q); + if (rctx->head == NULL) + { + rctx->head = ev[i]; + rctx->tail = ev[i]; + } + else + { + event_next_set(rctx->tail, ev[i]); + rctx->tail = ev[i]; + } + i++; + } + event_next_set(ev[i - 1], NULL); + rctx->tail = ev[i - 1]; + return i; + } + + struct ringstate old; + ringidx_t new_write; + uint32_t actual; + + //Load producer ring state (read & write index) +#ifdef NDEBUG + //No debug => no assert => relaxed ordering OK +#define ATOMIC_READ_ON_ASSERT __ATOMIC_RELAXED +#else + //Debug => assert reads from the ring => needs acquire ordering +#define ATOMIC_READ_ON_ASSERT __ATOMIC_ACQUIRE +#endif + +#ifndef USE_LLSC + old.write = __atomic_load_n(&q->queue.prod.write, __ATOMIC_RELAXED); +#endif + do + { +#ifdef USE_LLSC + old.write = ll32(&q->queue.prod.write, ATOMIC_READ_ON_ASSERT); +#endif + old.read = __atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED); + + actual = MIN(num, ringstate_num_free(old)); + if (unlikely(actual == 0)) + { + return 0; + } + + new_write = old.write + actual; + } +#ifdef USE_LLSC + while (unlikely(sc32(&q->queue.prod.write, new_write, __ATOMIC_RELAXED))); +#else + while (!__atomic_compare_exchange_n(&q->queue.prod.write, + &old.write,//Updated on failure + new_write, + CAS_WEAK, + ATOMIC_READ_ON_ASSERT, + __ATOMIC_RELAXED)); +#endif + + //Store our event(s) in the ring + uint32_t index = old.write & q->queue.mask; + for (uint32_t i = 0; i < actual; i++) + { + //The following assert reads from the ring, needs acquire ordering above + assert(ev[i] != ODP_EVENT_INVALID); + assert(q->queue.ring[index] == ODP_EVENT_INVALID); +#ifdef LOG +if (VERBOSE) printf("%u: Enqueue event %p/%u on queue %p (%u used)\n", TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(new_write - old.read)); +#endif + q->queue.ring[index] = ev[i]; + index = (index + 1) & q->queue.mask; + } + + //Wait for our turn to signal consumers + while (__atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED) != old.write) + { + doze(); + } + + //Update the event counter, optionally take a ticket + union + { + struct sharedstate ss; + uint64_t ui; + } oss, nss; + uint32_t ticket; +#ifndef USE_LLSC + __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED); +#endif + do + { + ticket = TICKET_INVALID; +#ifdef USE_LLSC + oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED); +#endif + nss = oss; + nss.ss.numevts += actual; + if (oss.ss.numevts == 0)//Empty -> non-empty transition + { + if (q->type != atomic_q || oss.ss.cur_ticket == oss.ss.nxt_ticket) + { + //Atomic queue: only take ticket if one is immediately available + //Otherwise ticket already taken => queue processed by some thread + { + ticket = nss.ss.nxt_ticket++; + } + //Parallel or ordered queue + //Always take ticket + } + } + //Else queue already was non-empty + } + //Attempt to update numevts counter and optionally take ticket +#ifdef USE_LLSC + while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED)); +#else + while (!__atomic_compare_exchange(&q->queue.shared, + &oss, + &nss, + CAS_WEAK, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); +#endif + + //Signal consumers that events are available (release events) + //Enable other producers to continue + far_atomic_store(&q->queue.cons.write, new_write, __ATOMIC_RELEASE); + + if (ticket != TICKET_INVALID) + { + assert(oss.ss.numevts == 0); + //Wait for our turn to update schedq + while (__atomic_load_n(&q->queue.shared.cur_ticket, __ATOMIC_ACQUIRE) != + ticket) + { + doze(); + } + + //Enqueue at end of scheduler queue + schedq_push(q->schedq, q); +#ifdef LOG +if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, q->schedq); +#endif + far_atomic_store(&q->queue.shared.cur_ticket, + ticket + 1, + __ATOMIC_RELEASE_AFTER_CAS); + } + //Else queue was not empty or atomic queue already busy + + return actual; +} + +//We want _odp_queue_deq() to be inlined so that unexecuted paths caused by +//threadsafe and atomic parameters are removed +static int _odp_queue_deq(odp_queue_t q, odp_event_t ev[], int num, bool threadsafe, bool atomic) __attribute__((always_inline)); +static int _odp_queue_deq(odp_queue_t q, + odp_event_t ev[], + int num, + bool threadsafe, + bool atomic) +{ + uint32_t actual; + struct ringstate old; + ringidx_t new_read; + + //Load consumer ring state (read & write index) + if (!threadsafe) + { + old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_ACQUIRE); + old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED); + actual = MIN(num, ringstate_num_used(old)); + if (unlikely(actual == 0)) + { + return 0; + } + new_read = old.read + actual; + q->queue.cons.read = new_read; + } + else + { +#ifndef USE_LLSC + old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_RELAXED); +#endif + do + { +#ifdef USE_LLSC + old.read = ll32(&q->queue.cons.read, __ATOMIC_ACQUIRE); +#endif + old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED); + + actual = MIN(num, ringstate_num_used(old)); + if (unlikely(actual == 0)) + { + return 0; + } + + //Prefetch queue context for use by application + //__builtin_prefetch(q->user_ctx, 0, 0); + + //Attempt to free ring slot(s) + new_read = old.read + actual; + } +#ifdef USE_LLSC + while (unlikely(sc32(&q->queue.cons.read, new_read, __ATOMIC_RELAXED))); +#else + while (!__atomic_compare_exchange_n(&q->queue.cons.read, + &old.read,//Updated on failure + new_read, + CAS_WEAK, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE)); +#endif + } + + uint32_t index = old.read & q->queue.mask; + uint32_t i; + for (i = 0; i < actual; i++) + { + //TODO Prefetch event data + ev[i] = q->queue.ring[index]; + assert(ev[i] != ODP_EVENT_INVALID); +#ifndef NDEBUG + q->queue.ring[index] = ODP_EVENT_INVALID; +#endif + index = (index + 1) & q->queue.mask; +#ifdef LOG +if (VERBOSE) printf("%u: Dequeue event %p/%u from queue %p (%u used)\n", TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(old.write - new_read)); +#endif + } + + if (!threadsafe) + { + //Wait for our turn to signal producers + while (__atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED) != + old.read) + { + doze(); + } + } + + if (atomic) + { + (void)__atomic_fetch_sub(&q->queue.shared.numevts, + actual, + __ATOMIC_RELAXED); + + //Signal producers that empty slots are available (release ring slots) + //Enable other consumers to continue + far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE); + } + else + { + union + { + struct sharedstate ss; + uint64_t ui; + } oss, nss; + uint32_t ticket = TICKET_INVALID; +#ifndef USE_LLSC + __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED); +#endif + do + { +#ifdef USE_LLSC + oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED); +#endif + nss = oss; + nss.ss.numevts -= actual; + if (nss.ss.numevts == 0) + { + //If we emptied parallel/ordered queue, we need a ticket for a + //later pop + ticket = nss.ss.nxt_ticket++; + } + } + //Attempt update numevts and optionally take ticket +#ifdef USE_LLSC + while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED)); +#else + while (!__atomic_compare_exchange(&q->queue.shared, + &oss,//Updated on failure + &nss, + CAS_WEAK, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); +#endif + + //Signal producers that empty slots are available (release ring slots) + //Enable other consumers to continue + far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE); + + if (nss.ss.numevts == 0) + { + assert(q->type != atomic_q); + //Wait for our turn update schedq + while (__atomic_load_n(&q->queue.shared.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + { + doze(); + } + + bool b = schedq_cond_pop(q->schedq, q); + (void)b; +#ifdef LOG + if (VERBOSE) printf("%u: Pop queue %p from schedq %p %s\n", TS->tidx, q, q->schedq, b ? "success" : "failure"); +#endif + far_atomic_store(&q->queue.shared.cur_ticket, + ticket + 1, + __ATOMIC_RELEASE_AFTER_CAS); + } + } + + return actual; +} + +/****************************************************************************** + * Behold, the scheduler! + *****************************************************************************/ + +static inline void _odp_schedule_release_ordered(struct thread_state *ts) +{ + if (ts->rctx != NULL) + { +#ifdef LOG +if (VERBOSE) printf("%u: Release rctx %p\n", ts->tidx, ts->rctx); +#endif + ts->out_of_order = false; + rctx_release(ts->rctx); + ts->rctx = NULL; + } +} + +void odp_schedule_release_ordered(void) +{ + struct thread_state *ts = TS; + if (unlikely(ts->rctx == NULL)) + { + fprintf(stderr, "odp_schedule_release_ordered: unexpected call\n"); + fflush(NULL); abort(); + } + _odp_schedule_release_ordered(ts); +} + +static inline void _odp_schedule_release_atomic(struct thread_state *ts) +{ + struct sched_obj *q = ts->atomq; + bool pushed = false; + struct sharedstate oss, nss; + assert(ts->atomq != ODP_QUEUE_INVALID); + assert(ts->ticket != TICKET_INVALID); + //Only we have this queue, only we can dequeue but others can enqueue so + //numevts can increase but not decrease + __atomic_load(&q->queue.shared, &oss, __ATOMIC_ACQUIRE); + do + { + assert(oss.cur_ticket == ts->ticket); + if (oss.numevts != 0 && !pushed) + { + schedq_push(q->schedq, q); +#ifdef LOG +if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, q->schedq); +#endif + pushed = true;//Only push once + } + nss = oss; + //Release ticket + nss.cur_ticket = ts->ticket + 1; + } + //Attempt to release ticket expecting our view of numevts to be correct + while (!__atomic_compare_exchange(&q->queue.shared, + &oss, + &nss, + CAS_WEAK, + __ATOMIC_RELEASE, + __ATOMIC_ACQUIRE)); + //CAS succeed => if (numevts != 0) then queue pushed to schedq + ts->atomq = ODP_QUEUE_INVALID; + ts->ticket = TICKET_INVALID; +} + +void odp_schedule_release_atomic(void) +{ + struct thread_state *ts = TS; + if (unlikely(ts->atomq == ODP_QUEUE_INVALID || + ts->ticket == TICKET_INVALID)) + { + fprintf(stderr, "odp_schedule_release_atomic: unexpected call\n"); + fflush(NULL); abort(); + } + _odp_schedule_release_atomic(ts); +} + +static void update_sg_membership(struct thread_state *ts); + +static int odp_schedule_multi(odp_queue_t *from, uint64_t wait, + odp_event_t ev[], int num) __attribute__((noinline)); +static int odp_schedule_multi(odp_queue_t *from, uint64_t wait, + odp_event_t ev[], int num) +{ + (void)wait;//TODO implement timeout + //Get pointer to our per-thread state + struct thread_state *ts = TS; + if (unlikely(ts->pause)) + { + return 0; + } + odp_queue_t atomq = ts->atomq; + //Check if we are currently processing an atomic queue + if (atomq != ODP_QUEUE_INVALID) + { + //Yes, continue to process this queue (optimise for throughput) + int ret; + assert(ts->ticket != TICKET_INVALID); +dequeue_atomic: //No side effects before this label! + //Atomic queues can be dequeued without lock since this thread has the + //only reference to the atomic queue being processed + //We are the only thread that can dequeue but other threads can enqueue + if (likely((ret = _odp_queue_deq(atomq, + ev, + num, + /*threadsafe=*/false, + /*atomic=*/true)) != 0)) + { + *from = atomq; + //This thread must continue to "own" this atomic queue until all + //events processed and the thread re-invokes the scheduler + return ret; + } + //Atomic queue was empty, release it + _odp_schedule_release_atomic(ts); + } + //No atomic queue processing + //else + { + //Release any previous reorder context + _odp_schedule_release_ordered(ts); + } + + if (unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) + { + (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE); + __atomic_store_n(&ts->sg_sem, 0, __ATOMIC_RELAXED);//FIXME? + update_sg_membership(ts); + } + + //Iterate through our list of scheduler queues which are sorted with + //higher priority first + for (uint32_t i = 0; i < ts->num_schedq; i++) + { + //__builtin_prefetch(ts->schedq_list[i + 1], 0, 0); + sched_queue *schedq = ts->schedq_list[i]; + struct sched_obj *obj; +restart_same: + //Peek at the head of the scheduler queue + obj = schedq_peek(schedq); + if (likely(obj == NULL)) + { + //schedq empty + continue;//Look at next schedq + } + if (obj->type == atomic_q) + { + //Dequeue object only if it is still at head of schedq + bool b = schedq_cond_pop(schedq, obj); +#ifdef LOG +if (VERBOSE) printf("%u: Pop atomic queue %p from schedq %p %s\n", ts->tidx, obj, obj->schedq, b ? "success" : "failure"); +#endif + if (unlikely(!b)) + { + //atomq not at head of schedq anymore, some other thread + //stole it + goto restart_same;//Restart at the same schedq + } + ts->atomq = atomq = obj; + //Dequeued atomic queue from the schedq, only we can process it + ts->ticket = __atomic_fetch_add(&atomq->queue.shared.nxt_ticket, 1, __ATOMIC_RELAXED); + while (__atomic_load_n(&atomq->queue.shared.cur_ticket, __ATOMIC_RELAXED) != ts->ticket) + { + doze(); + } + goto dequeue_atomic; + } + else if (obj->type == ordered_q) + { + odp_queue_t ordq = obj; + assert(queue_rwin_get(ordq) != NULL); + //The scheduler object (probably an ordered queue) has a + //reorder window so requires order restoration + //We must use a reorder context to collect all outgoing events + //Find and initialise an unused reorder context + uint32_t i = __atomic_load_n(&ts->rvec_free, __ATOMIC_RELAXED); + if (unlikely(i == 0)) + { + //No free reorder contexts for this thread +#ifdef LOG +if (VERBOSE) printf("%u: Out of reorder contexts, queue ignored\n", ts->tidx); +#endif + continue;//Look at next schedq, hope we find non-ordered queue + } + //Get first bit set (starting from 0) + i = __builtin_ffs(i) - 1; + struct reorder_context *rctx = ts->rctx = &ts->rvec[i]; + rctx_init(rctx, &ts->rvec_free, i, queue_rwin_get(ordq)); +#ifdef LOG +if (VERBOSE) printf("%u: Using rctx %p\n", ts->tidx, rctx); +#endif + //rwin_reserve and odp_queue_deq must be atomic or we will + //have a potential race condition + //Allocate a slot in the reorder window + if (unlikely(!rwin_reserve(rctx->rwin, &rctx->sn))) + { + //Reorder window full +#ifdef LOG +if (VERBOSE) printf("%u: Reorder window full, queue ignored\n", ts->tidx); +#endif +//bp(); + rctx_free(rctx); + ts->rctx = NULL; + ts->out_of_order = false; + continue;//Look at next schedq, find other queue + } + //Are we in-order or out-of-order? + ts->out_of_order = rctx->sn != rctx->rwin->hc.head; +#ifdef LOG +if (VERBOSE) printf("%u: Reserved pos %u in rwin %p\n", ts->tidx, rctx->sn, rctx->rwin); +#endif + //Wait for our turn to dequeue + while (__atomic_load_n(&rctx->rwin->turn, __ATOMIC_RELAXED) != rctx->sn) + { + doze(); + } + int ret = _odp_queue_deq(ordq, + ev, + num, + /*threadsafe=*/false, + /*atomic=*/false); + //Someone else's turn + far_atomic_store(&rctx->rwin->turn, + rctx->sn + 1, + __ATOMIC_RELEASE_AFTER_CAS); + if (likely(ret != 0)) + { + *from = ordq; + return ret; + } +#ifdef LOG +if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, ordq); +//Queue will either become non-empty or will be removed by thread which made it empty +if (VERBOSE) printf("%u: Release unused rctx %p\n", ts->tidx, ts->rctx); +#endif + ts->out_of_order = false; + rctx_release(ts->rctx); +#ifdef LOG +if (VERBOSE) printf("%u: Release unused rctx %p rwin %p\n", ts->tidx, ts->rctx, ts->rctx->rwin); +#endif + ts->rctx = NULL; + continue;//Look at next schedq + } + else if (obj->type == parallel_q) + { + odp_queue_t pq = obj; + int ret = _odp_queue_deq(pq, + ev, + num, + /*threadsafe=*/true, + /*atomic=*/false); + if (likely(ret != 0)) + { + *from = pq; + return ret; + } +#ifdef LOG +if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, pq); +//Queue will either become non-empty or will be removed by thread which made it empty +#endif + continue;//Look at next schedq + } + else if (obj->type == pktio) + { + } + } + return 0; +} + +static odp_event_t odp_schedule(odp_queue_t *from, uint64_t wait) +{ + odp_event_t evt; + if (likely(odp_schedule_multi(from, wait, &evt, 1) == 1)) + { + return evt; + } + return ODP_EVENT_INVALID; +} + +int odp_schedule_num_prio(void) +{ + return NUM_PRIO; +} + +void odp_schedule_pause(void) +{ + struct thread_state *ts = TS; + ts->pause = true; +} + +void odp_schedule_resume(void) +{ + struct thread_state *ts = TS; + ts->pause = false; +} + +void odp_schedule_prefetch(int num) +{ + (void)num; + //No-op for the SW scheduler which is only driven by the application + //threads themselves +} + +/****************************************************************************** + * Scheduler groups + *****************************************************************************/ + +struct sched_group +{ + odp_thrmask_t thr_actual[NUM_PRIO];//Threads currently associated with the sched group + odp_thrmask_t thr_wanted; + uint32_t xcount[NUM_PRIO];//Used to spread queues over schedq's + uint32_t xfactor;//Number of schedq's per prio + sched_queue schedq[1];//NUMPRIO * xfactor +}; + +static sched_group_mask_t sg_used; +static struct sched_group *sg_vec[MAX_SCHED_GROUP]; + +static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp, + uint32_t prio) +{ + assert(grp > 0 && grp <= MAX_SCHED_GROUP); + assert((sg_used & (1ULL << (grp - 1))) != 0); + assert(prio >= 0 && prio < NUM_PRIO); + uint32_t sgi = grp - 1; + struct sched_group *sg = sg_vec[sgi]; + //Use xcount to spread queues over the xfactor schedq's per priority + uint32_t x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED); + if (x == 0) + { + //First ODP queue for this priority + //Notify all threads in sg->thr_wanted that they should join + sched_group_mask_t thrds = sg->thr_wanted; + while (thrds != 0) + { + uint32_t thr = __builtin_ffsl(thrds) - 1; + thrds &= ~(1ULL << thr); + //Notify the thread about membership in this group/priority + (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[prio], + 1ULL << sgi, + __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, 1, __ATOMIC_RELEASE); + } + } + return &sg->schedq[prio * sg->xfactor + x % sg->xfactor]; +} + +static void update_sg_membership(struct thread_state *ts) +{ + for (uint32_t p = 0; p < NUM_PRIO; p++) + { + sched_group_mask_t sg_wanted = __atomic_load_n(&ts->sg_wanted[p], + __ATOMIC_ACQUIRE); + if (ts->sg_actual[p] != sg_wanted) + { + //Our sched_group membership has changed + sched_group_mask_t added = sg_wanted & ~ts->sg_actual[p]; + while (added != 0) + { + uint32_t sgi = __builtin_ffsl(added) - 1; + struct sched_group *sg = sg_vec[sgi]; + for (uint32_t x = 0; x < sg->xfactor; x++) + { + //Include our thread index to shift (rotate) the order of + //schedq's + insert_schedq_in_list(ts, + &sg->schedq[p * sg->xfactor + + (x + ts->tidx) % sg->xfactor]); + } + (void)__atomic_fetch_or(&sg->thr_actual[p], + 1ULL << ts->tidx, + __ATOMIC_RELAXED); + added &= ~(1ULL << sgi); + } + sched_group_mask_t removed = ~sg_wanted & ts->sg_actual[p]; + while (removed != 0) + { + uint32_t sgi = __builtin_ffsl(removed) - 1; + struct sched_group *sg = sg_vec[sgi]; + for (uint32_t x = 0; x < sg->xfactor; x++) + { + remove_schedq_from_list(ts, + &sg->schedq[p * sg->xfactor + x]); + } + (void)__atomic_fetch_and(&sg->thr_actual[p], + ~(1ULL << ts->tidx), + __ATOMIC_RELAXED); + removed &= ~(1ULL << sgi); + } + ts->sg_actual[p] = sg_wanted; + } + } +} + +int odp_schedule_group_join(odp_schedule_group_t group, + const odp_thrmask_t *mask); + +odp_schedule_group_t odp_schedule_group_create(const char *name, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_mask_t used = __atomic_load_n(&sg_used, __ATOMIC_ACQUIRE); + do + { + if (~used == 0) + return -1;//All sched_groups in use + sgi = __builtin_ffsl(~used) - 1; + if (sgi >= MAX_SCHED_GROUP) + return -1;//All sched_groups in use + } while (!__atomic_compare_exchange_n(&sg_used, + &used, + used | (1ULL << sgi), + CAS_WEAK, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE)); + //Compute xfactor (spread factor) from the number of threads present in the + //thread mask + //Preferable this would be an explicit parameter + uint32_t xfactor = __builtin_popcountll(*mask); + if (xfactor < 1) + { + xfactor = 1; + } + struct sched_group *sg = aligned_alloc(CACHE_LINE, + sizeof(struct sched_group) + + (NUM_PRIO * xfactor - 1) * + sizeof(sched_queue)); + if (sg == NULL) + { + return -1; + } + sg_vec[sgi] = sg; + memset(sg->thr_actual, 0, sizeof sg->thr_actual); + sg->thr_wanted = 0; + sg->xfactor = xfactor; + for (uint32_t p = 0; p < NUM_PRIO; p++) + { + sg->xcount[p] = 0; + for (uint32_t x = 0; x < xfactor; x++) + { + schedq_init(&sg->schedq[p * xfactor + x], p); + } + } + if (__builtin_popcountll(*mask) != 0) + { + odp_schedule_group_join(sgi + 1, mask); + } + return sgi + 1; +} + +int odp_schedule_group_join(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + if (group < 1 && group > MAX_SCHED_GROUP) + return -1; + uint32_t sgi = group - 1; + if ((sg_used & (1ULL << sgi)) == 0) + return -1; + struct sched_group *sg = sg_vec[sgi]; + odp_thrmask_t toadd = *mask; + //Add threads to scheduler group wanted thread mask + (void)__atomic_fetch_or(&sg->thr_wanted, toadd, __ATOMIC_RELAXED); + //Notify relevant threads about the change + while (toadd != 0) + { + uint32_t thr = __builtin_ffsl(toadd) - 1; + toadd &= ~(1ULL << thr); + for (uint32_t p = 0; p < NUM_PRIO; p++) + { + if (sg->xcount[p] != 0) + { + //This priority level has ODP queues + //Notify the thread about membership in this group/priority + (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[p], + 1ULL << sgi, + __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, + 1, + __ATOMIC_RELEASE); + } + } + } + return 0; +} + +int odp_schedule_group_leave(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + if (group < 1 && group > MAX_SCHED_GROUP) + return -1; + uint32_t sgi = group - 1; + if ((sg_used & (1ULL << sgi)) == 0) + return -1; + struct sched_group *sg = sg_vec[sgi]; + odp_thrmask_t torem = *mask; + //Remove threads from scheduler group wanted thread mask + (void)__atomic_fetch_and(&sg->thr_wanted, ~torem, __ATOMIC_RELAXED); + //Notify relevant threads about the change + while (torem != 0) + { + uint32_t thr = __builtin_ffsl(torem) - 1; + torem &= ~(1ULL << thr); + for (uint32_t p = 0; p < NUM_PRIO; p++) + { + if (sg->xcount[p] != 0) + { + //Clear bit which specifies membership in this sched_group/prio + (void)__atomic_fetch_and(&thread_state[thr].sg_wanted[p], + ~(1ULL << sgi), + __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, + 1, + __ATOMIC_RELEASE); + } + } + } + return 0; +} + +int odp_schedule_group_thrmask(odp_schedule_group_t group, + odp_thrmask_t *thrmask) +{ + if (group < 1 && group > MAX_SCHED_GROUP) + return -1; + uint32_t sgi = group - 1; + if ((sg_used & (1ULL << sgi)) == 0) + return -1; + struct sched_group *sg = sg_vec[sgi]; + *thrmask = sg->thr_wanted; + return 0; +} + +/****************************************************************************** + * Ordered locks + *****************************************************************************/ + +void odp_schedule_order_lock(unsigned lock_index) +{ + struct thread_state *ts = TS; +#ifndef NDEBUG + if (unlikely(ts->rctx == NULL)) + { + fprintf(stderr, "odp_schedule_order_lock: unexpected call\n"); + abort(); + } +#endif + struct reorder_context *rctx = ts->rctx; + struct reorder_window *rwin = rctx->rwin; +#ifndef NDEBUG + if (unlikely(lock_index >= rwin->lock_count)) + { + fprintf(stderr, "odp_schedule_order_lock: invalid lock index %u\n", + lock_index); + abort(); + } +#endif +#ifdef LOG +if (VERBOSE) printf("%u: lock acquire sn=%u %p->olock[0]=%u\n", TS->tidx, rctx->sn, rwin, rwin->olock[0]); +#endif + while (__atomic_load_n(&rwin->olock[lock_index], __ATOMIC_ACQUIRE) != + rctx->sn) + { + doze(); + } +#ifdef LOG +if (VERBOSE) printf("%u: lock taken sn=%u %p->olock[0]=%u\n", TS->tidx, rctx->sn, rwin, rwin->olock[0]); +#endif +} + +void odp_schedule_order_unlock(unsigned lock_index) +{ + struct thread_state *ts = TS; +#ifndef NDEBUG + if (unlikely(ts->rctx == NULL)) + { + fprintf(stderr, "odp_schedule_order_unlock: unexpected call\n"); + abort(); + } +#endif + struct reorder_context *rctx = ts->rctx; + struct reorder_window *rwin = rctx->rwin; +#ifndef NDEBUG + if (unlikely(lock_index >= rwin->lock_count)) + { + fprintf(stderr, "odp_schedule_order_unlock: invalid lock index %u\n", + lock_index); + abort(); + } + if (unlikely(rwin->olock[lock_index] != rctx->sn)) + { + fprintf(stderr, "odp_schedule_order_unlock: mismatched call\n"); + } +#endif +#ifdef LOG +if (VERBOSE) printf("%u: lock released %p->olock[0]=%u\n", TS->tidx, rwin, rctx->sn + 1); +#endif + __atomic_store_n(&rwin->olock[lock_index], rctx->sn + 1, __ATOMIC_RELEASE); + rctx->olock_flags |= 1U << lock_index; +} + +/****************************************************************************** + * + *****************************************************************************/ + +static pthread_t tid[MAXTHREADS]; +static unsigned long CPUFREQ; +static bool AFFINITY = false; +static bool PARALLEL = false; +static bool ORDERED = false; +static pthread_barrier_t BAR; +#define MAXQUEUES 256 +static uint32_t NUMQUEUES = 20; +static odp_queue_t ODPQ[MAXQUEUES]; +#define MAXEVENTS 100000 +static uint32_t NUMEVENTS = 2048; +static odp_event_t EVENTS[MAXEVENTS]; +static uint32_t NUMCOMPLETED ALIGNED(CACHE_LINE); + +static void *entrypoint(void *arg) +{ + unsigned tidx = (unsigned)(long)arg; + thread_state_init(tidx); + + if (pthread_barrier_wait(&BAR) < PTHREAD_BARRIER_SERIAL_THREAD) + { + perror("pthread_barrier_wait"), abort(); + } + + if (tidx == 0) + { + //Enqueue events from events array into queue 0 + for (unsigned i = 0; i < NUMEVENTS; i++) + { + odp_event_t evt = EVENTS[i]; + evt->fromqidx = 0; + unsigned j; + for (j = 0; j < 100000; j++) + { + int rc = odp_queue_enq(ODPQ[0], &evt, 1); + if (rc == 1) + break; + doze(); + fprintf(stderr, "i=%u, read=%u, write=%u\n", i, ODPQ[0]->queue.prod.read, ODPQ[0]->queue.prod.write); + fflush(NULL); abort(); + } + if (j == 100000) + fprintf(stderr, "Failed initial enqueue\n"), fflush(NULL), abort(); + } + } + + //Move events from queue N to queue N+1 + uint32_t fails = 0; + while (__atomic_load_n(&NUMCOMPLETED, __ATOMIC_RELAXED) != NUMEVENTS) + { + odp_queue_t q; + odp_event_t evt = odp_schedule(&q, 0); + if (evt != ODP_EVENT_INVALID) + { + evt->fromqidx++; + if (evt->fromqidx < NUMQUEUES) + { + int rc = odp_queue_enq(ODPQ[evt->fromqidx], &evt, 1); + if (rc != 1) + { + fprintf(stderr, "Queue full\n"); + fflush(NULL); abort(); + } + } + else//Event has passed through all queues + { + if (ORDERED) + { + odp_schedule_order_lock(0); + } + + uint32_t expected = __atomic_fetch_add(&NUMCOMPLETED, + 1, __ATOMIC_RELAXED); +#ifdef LOG + if (VERBOSE) printf("%u: Event %u completed\n", TS->tidx, evt->number); +#endif + if (!PARALLEL && evt->number != expected) + { + //Ordered or atomic queues + fprintf(stderr, "%u: Event %u wrong order, expected %u\n", + TS->tidx, evt->number, expected); + } + //Else parallel queues, order not preserved + if (ORDERED) + { + odp_schedule_order_unlock(0); + } + } + fails = 0; + } + else + { + doze(); doze(); doze(); doze(); + doze(); doze(); doze(); doze(); + doze(); doze(); doze(); doze(); + doze(); doze(); doze(); doze(); + doze(); doze(); doze(); doze(); + doze(); doze(); doze(); doze(); + if (++fails == 10000000) + { + fprintf(stderr, "%u: Deadlock suspected\n", TS->tidx); + fflush(NULL); + bp();//break; + } + } + } + +#ifdef LOG + if (VERBOSE) + { + printf("NUMCOMPLETED %u\n", NUMCOMPLETED); + static int THREADEXIT = 0; + if (__atomic_fetch_add(&THREADEXIT, 1, __ATOMIC_ACQUIRE) == 0) + { + for (int i = 0; i < NUMQUEUES; i++) + { + printf("queue %p: numevts %u, cur_ticket %u, nxt_ticket %u\n", ODPQ[i], ODPQ[i]->queue.shared.numevts, ODPQ[i]->queue.shared.cur_ticket, ODPQ[i]->queue.shared.nxt_ticket); + struct ringstate rs; + rs = ODPQ[i]->queue.cons; + if (ringstate_num_used(rs) != 0) + { + printf("queue %p.cons has %u elements\n", ODPQ[i], ringstate_num_used(rs)); + } + rs = ODPQ[i]->queue.prod; + if (ringstate_num_used(rs) != 0) + { + printf("queue %p.prod has %u elements\n", ODPQ[i], ringstate_num_used(rs)); + } + } + } + } +#endif + + return NULL; +} + +static void +initialize_attr(pthread_attr_t *attr, int sched, int prio, unsigned cpu, const char *name) +{ + int err; + if (pthread_attr_init(attr) != 0) + { + perror("pthread_attr_init"), abort(); + } + if (AFFINITY) + { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpu + 1, &cpuset); + if (pthread_attr_setaffinity_np(attr, sizeof cpuset, &cpuset)) + { + perror("pthread_attr_setaffinity_np"), abort(); + } + } + if (pthread_attr_setschedpolicy(attr, sched)) + { + perror("pthread_attr_setschedpolicy"), abort(); + } + //Get scheduling policy from attr + if (pthread_attr_setinheritsched(attr, PTHREAD_EXPLICIT_SCHED)) + { + perror("pthread_attr_setinheritsched"), abort(); + } + struct sched_param schedp; + if (sched == SCHED_FIFO || sched == SCHED_RR) + { + memset(&schedp, 0, sizeof schedp); + schedp.sched_priority = prio; + if ((err = pthread_attr_setschedparam(attr, &schedp)) != 0) + { + errno = err; + perror("pthread_attr_setschedparam"), abort(); + } + } +} + +static void create_threads(void) +{ + unsigned thr; + void *(*ep)(void *) = entrypoint; + for (thr = 0; thr < NUMTHREADS; thr++) + { + int err; + pthread_attr_t pt_attr; + initialize_attr(&pt_attr, SCHED, PRIO, /*cpu=*/thr, "task"); + if ((err = pthread_create(&tid[thr], &pt_attr, ep, /*arg=*/(void*)(long)thr)) != 0) + { + if (err == EPERM) + { + //Work-around for some platforms that do not support/allow + //SCHED_FIFO/SCHED_RR + initialize_attr(&pt_attr, SCHED_OTHER, PRIO, /*cpu=*/thr, "task"); + err = pthread_create(&tid[thr], &pt_attr, ep, /*arg=*/(void*)(long)thr); + } + if (err != 0) + { + errno = err; + perror("pthread_create"); + exit(20); + } + } + } +} + +#if 0 +static unsigned permille(uint32_t rel, uint32_t tot) +{ + return (unsigned)(1000ULL * rel / tot); +} +#endif + +int main(int argc, char *argv[]) +{ + unsigned thr; + int c; + + while ((c = getopt(argc, argv, "ae:f:opq:t:v")) != -1) + { + switch (c) + { + case 'a' : + AFFINITY = true; + break; + case 'e' : + { + int numevents = atoi(optarg); + if (numevents < 1 || numevents > MAXEVENTS) + { + fprintf(stderr, "Invalid number of events %d\n", numevents); + exit(EXIT_FAILURE); + } + NUMEVENTS = (unsigned)numevents; + break; + } + case 'f' : + { + CPUFREQ = atol(optarg); + break; + } + case 'o' : + ORDERED = true; + break; + case 'p' : + PARALLEL = true; + break; + case 'q' : + { + int numqueues = atoi(optarg); + if (numqueues < 1 || numqueues > MAXQUEUES) + { + fprintf(stderr, "Invalid number of queues %d\n", numqueues); + exit(EXIT_FAILURE); + } + NUMQUEUES = (unsigned)numqueues; + break; + } + case 't' : + { + int numthreads = atoi(optarg); + if (numthreads < 1 || numthreads > MAXTHREADS) + { + fprintf(stderr, "Invalid number of threads %d\n", numthreads); + exit(EXIT_FAILURE); + } + NUMTHREADS = (unsigned)numthreads; + break; + } + default : +usage : + fprintf(stderr, "Usage: scheduler \n" + "-a Make threads CPU affine\n" + "-e Number of events\n" + "-f CPU frequency in kHz\n" + "-o Use ordered queues\n" + "-p Use parallel queues\n" + "-q Number of queues\n" + "-t Number of threads\n" + "-v Verbose\n" + ); + exit(EXIT_FAILURE); + case 'v' : + VERBOSE = true; + break; + } + } + if (optind > argc || (PARALLEL && ORDERED)) + { + goto usage; + } + + printf("%u events, %u %s queue%s, %u thread%s\n", + NUMEVENTS, + NUMQUEUES, + PARALLEL ? "parallel" : ORDERED ? "ordered" : "atomic", + NUMQUEUES != 1 ? "s" : "", + NUMTHREADS, + NUMTHREADS != 1 ? "s" : ""); + + if (pthread_barrier_init(&BAR, NULL, NUMTHREADS + 1) != 0) + { + perror("pthread_barrier_init"), abort(); + } + + //Create scheduler group with thread mask = all threads (0..NUMTHREADS-1) + //so the scheduler knows how many schedq's are needed for best spread + odp_thrmask_t all = (1ULL << NUMTHREADS) - 1; + odp_schedule_group_t grp_all = odp_schedule_group_create("ALL", &all); + + //Create all our ODP queues + for (unsigned i = 0; i < NUMQUEUES; i++) + { + //The last queue is atomic so that we can safely test ordering of events + odp_queue_t q = _odp_queue_create(/*prio=*/PRIO_MED, + /*sync=*/PARALLEL ? parallel_q : + ORDERED ? ordered_q : + atomic_q, + /*group=*/grp_all, + /*lock_count=*/ORDERED && (i == NUMQUEUES - 1), + /*user_ctx=*/NULL); + if (q == ODP_QUEUE_INVALID) + perror("_odp_queue_create"), abort(); +if (VERBOSE) printf("ODPQ[%u]=%p, type=%s, schedq %p\n", i, q, qtype2str(q), q->schedq); + ODPQ[i] = q; + } + + for (unsigned i = 0; i < NUMEVENTS; i++) + { + odp_event_t evt = odp_event_alloc(); + if (evt == ODP_EVENT_INVALID) + abort(); + evt->number = i; + EVENTS[i] = evt; + } + NUMCOMPLETED = 0; + + //Create threads + create_threads(); + + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + uint64_t start = ts.tv_sec * 1000000000ULL + ts.tv_nsec; + + //Release threads by joining the barrier + pthread_barrier_wait(&BAR); + + //Wait for threads to terminate + for (thr = 0; thr < NUMTHREADS; thr++) + { + pthread_join(tid[thr], NULL); + } + + clock_gettime(CLOCK_MONOTONIC, &ts); + if (AFFINITY && CPUFREQ == 0) + { + unsigned long cpufreq[MAXTHREADS]; + for (thr = 0; thr < NUMTHREADS; thr++) + { + char s[200]; + cpufreq[thr] = 0; + sprintf(s, "/sys/devices/system/cpu/cpu%u/cpufreq/cpuinfo_cur_freq", thr + 1); + int fd = open(s, O_RDONLY); + if (fd != -1) + { + char buf[40]; + int l = read(fd, buf, sizeof buf); + if (l > 0) + { + cpufreq[thr] = atol(buf); + } + close(fd); + } + } + CPUFREQ = 0; + for (thr = 0; thr < NUMTHREADS; thr++) + { + printf("Thread %u current CPU frequency %lukHz\n", thr, cpufreq[thr]); + CPUFREQ += cpufreq[thr] / NUMTHREADS; + } + printf("Average CPU frequency %lukHz\n", CPUFREQ); + } + uint64_t numops = NUMEVENTS * NUMQUEUES; + uint64_t elapsed = ts.tv_sec * 1000000000ULL + ts.tv_nsec - start; + printf("%llu.%03llu seconds, ", elapsed / 1000000000LLU, (elapsed % 1000000000LLU) / 1000000LLU); + if (elapsed / 1000000 != 0) + { + printf("%"PRIu32" ops/second", (uint32_t)((numops / (elapsed / 1000000)) * 1000)); + } + printf("\n"); + printf("%"PRIu32" nanoseconds/update\n", (uint32_t)(elapsed / numops)); + if (CPUFREQ != 0) + { + uint64_t cycles = NUMTHREADS * elapsed * CPUFREQ / 1000000ULL; + printf("%"PRIu32" cycles/update\n", (uint32_t)(cycles / numops)); + } + +if (VERBOSE) +{ + for (uint32_t thr = 0; thr < NUMTHREADS; thr++) + { + for (uint32_t j = 0; j < thread_state[thr].num_schedq; j++) + { + sched_queue *schedq = thread_state[thr].schedq_list[j]; + printf("%u: schedq[%u]=%p (prio=%u)\n", thr, j, schedq, schedq->prio); + } + } + uint32_t numpushpop = 0; + for (uint32_t i = 0; i < NUMQUEUES; i++) + { + numpushpop += ODPQ[i]->queue.shared.nxt_ticket; + } + printf("%u push/pop operations\n", numpushpop); +} + + return 0; +}