From patchwork Tue Oct 21 11:38:11 2014 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Maxim Uvarov X-Patchwork-Id: 39109 Return-Path: X-Original-To: linaro@patches.linaro.org Delivered-To: linaro@patches.linaro.org Received: from mail-la0-f71.google.com (mail-la0-f71.google.com [209.85.215.71]) by ip-10-151-82-157.ec2.internal (Postfix) with ESMTPS id A89242039B for ; Tue, 21 Oct 2014 11:38:51 +0000 (UTC) Received: by mail-la0-f71.google.com with SMTP id gi9sf663034lab.10 for ; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:delivered-to:from:to:date:message-id:subject :precedence:list-id:list-unsubscribe:list-archive:list-post :list-help:list-subscribe:mime-version:errors-to:sender :x-original-sender:x-original-authentication-results:mailing-list :content-type:content-transfer-encoding; bh=cJToBy9uKinlJ+N82p92bp/XfTggV3obNo6vsUMZel4=; b=lKSqhkO0Anig++2BcKzhAbFKimhJWh34VN4GR0E0qM0Z2WQdei6EuBu/IjfgL8ApaJ rd8rAYEO5yZlkKFtRP4qxY1bhYN0qpzOzqvxIblyDkpYdgaakCFE22jbIMkSb0UQ2PeR nOZie5r5+SYcHudXerXBlNl+QdpZ8IwS8k5avcaiyx5cILJxjwHmzfn/sjnMGy6qUqmj xavt0d5OY7K0gky+BXGPUM+EZlyF1ifD/q4K/LuOcaZBFNBWjW02M7Bd4ZIyVx+Qnykx nWTa/FIxC7HGYUDs7c8UF/uTDsLsVxM0ZpvGCFcbSi3PezE9fEk9fmAQ6jfdp1u8M7Ii xopA== X-Gm-Message-State: ALoCoQng3jEatO9NYU+TK52ncqoKz7sO4lKWj0qZfB4Quyu6qWsDR7tlhSOzdCt81KGsL+vk8nCp X-Received: by 10.112.254.226 with SMTP id al2mr5077934lbd.1.1413891527942; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) X-BeenThere: patchwork-forward@linaro.org Received: by 10.152.19.131 with SMTP id f3ls43843lae.38.gmail; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) X-Received: by 10.112.38.67 with SMTP id e3mr34568563lbk.6.1413891527768; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) Received: from mail-la0-f43.google.com (mail-la0-f43.google.com. [209.85.215.43]) by mx.google.com with ESMTPS id h4si18489704lbd.53.2014.10.21.04.38.47 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Tue, 21 Oct 2014 04:38:47 -0700 (PDT) Received-SPF: pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.215.43 as permitted sender) client-ip=209.85.215.43; Received: by mail-la0-f43.google.com with SMTP id mc6so861086lab.2 for ; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) X-Received: by 10.112.77.74 with SMTP id q10mr25536742lbw.66.1413891527617; Tue, 21 Oct 2014 04:38:47 -0700 (PDT) X-Forwarded-To: patchwork-forward@linaro.org X-Forwarded-For: patch@linaro.org patchwork-forward@linaro.org Delivered-To: patch@linaro.org Received: by 10.112.84.229 with SMTP id c5csp479150lbz; Tue, 21 Oct 2014 04:38:46 -0700 (PDT) X-Received: by 10.224.79.79 with SMTP id o15mr6817185qak.93.1413891525763; Tue, 21 Oct 2014 04:38:45 -0700 (PDT) Received: from ip-10-35-177-41.ec2.internal (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTPS id a8si21591949qas.71.2014.10.21.04.38.43 for (version=TLSv1 cipher=RC4-SHA bits=128/128); Tue, 21 Oct 2014 04:38:44 -0700 (PDT) Received-SPF: none (google.com: lng-odp-bounces@lists.linaro.org does not designate permitted sender hosts) client-ip=54.225.227.206; Received: from localhost ([127.0.0.1] helo=ip-10-35-177-41.ec2.internal) by ip-10-35-177-41.ec2.internal with esmtp (Exim 4.76) (envelope-from ) id 1XgXlx-0001d3-2V; Tue, 21 Oct 2014 11:38:41 +0000 Received: from mail-lb0-f178.google.com ([209.85.217.178]) by ip-10-35-177-41.ec2.internal with esmtp (Exim 4.76) (envelope-from ) id 1XgXlq-0001ct-3E for lng-odp@lists.linaro.org; Tue, 21 Oct 2014 11:38:34 +0000 Received: by mail-lb0-f178.google.com with SMTP id w7so824845lbi.37 for ; Tue, 21 Oct 2014 04:38:28 -0700 (PDT) X-Received: by 10.152.243.8 with SMTP id wu8mr33958817lac.21.1413891507874; Tue, 21 Oct 2014 04:38:27 -0700 (PDT) Received: from localhost.localdomain (ppp91-76-163-205.pppoe.mtu-net.ru. [91.76.163.205]) by mx.google.com with ESMTPSA id l11sm4430879lbb.27.2014.10.21.04.38.23 for (version=TLSv1.1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Tue, 21 Oct 2014 04:38:24 -0700 (PDT) From: Maxim Uvarov To: lng-odp@lists.linaro.org Date: Tue, 21 Oct 2014 15:38:11 +0400 Message-Id: <1413891491-16717-1-git-send-email-maxim.uvarov@linaro.org> X-Mailer: git-send-email 1.8.5.1.163.gd7aced9 X-Topics: patch Subject: [lng-odp] [PATCH] ipc linux-generic implementation based on pktio X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: , List-Help: , List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: lng-odp-bounces@lists.linaro.org X-Removed-Original-Auth: Dkim didn't pass. X-Original-Sender: maxim.uvarov@linaro.org X-Original-Authentication-Results: mx.google.com; spf=pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.215.43 as permitted sender) smtp.mail=patch+caf_=patchwork-forward=linaro.org@linaro.org Mailing-list: list patchwork-forward@linaro.org; contact patchwork-forward+owners@linaro.org X-Google-Group-Id: 836684582541 Signed-off-by: Maxim Uvarov --- Hello, Please find here IPC implementation for linux generic. As we discussed prior IPC patches we got following decisions: 1. IPC should be done on PKTIO, not on QUEUE level. 2. For initial implementation we can go with packet copy, but it's better to avoid copying. 3. IPC should be done for 2 separate processes. Difference from first implementation is fork() before odp_init_global, just after main(). 4. Doing IPC in linux generic we should relay on some portable implementation, like shared memory. In current patch I implemented IPC with shared memory in following way: 1. First shared memory block between processes exist for shared packet pool, I.e. place where packet data is actually stored.; 2. Second and Third shared memory blocks between processes is for messages for consumed and produced packets. In case if it will be HW implementation, then second and third chunks of shm are not needed. Please review current implementation how it corresponds to what we discussed. For further implementation I think we can add timers for remote IPC buffers to do some action if remote app does not handle them. Do full zero copy with saving remote odp_packet_t value to packet meta data or additional field in odp packet header. Best regards, Maxim. example/Makefile.am | 2 +- example/ipc/Makefile.am | 6 + example/ipc/odp_pktio.c | 765 +++++++++++++++++++++ helper/include/odph_ring.h | 3 + platform/linux-generic/include/api/odp_packet_io.h | 22 + .../linux-generic/include/api/odp_shared_memory.h | 11 + .../include/odp_buffer_pool_internal.h | 7 +- .../linux-generic/include/odp_packet_io_internal.h | 7 + platform/linux-generic/odp_buffer_pool.c | 9 - platform/linux-generic/odp_init.c | 6 + platform/linux-generic/odp_packet_io.c | 229 ++++++ platform/linux-generic/odp_ring.c | 9 +- platform/linux-generic/odp_shared_memory.c | 35 +- 13 files changed, 1097 insertions(+), 14 deletions(-) create mode 100644 example/ipc/Makefile.am create mode 100644 example/ipc/odp_pktio.c diff --git a/example/Makefile.am b/example/Makefile.am index b2a22a3..7911069 100644 --- a/example/Makefile.am +++ b/example/Makefile.am @@ -1 +1 @@ -SUBDIRS = generator ipsec l2fwd odp_example packet timer +SUBDIRS = generator ipsec l2fwd odp_example packet timer ipc diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am new file mode 100644 index 0000000..603a1ab --- /dev/null +++ b/example/ipc/Makefile.am @@ -0,0 +1,6 @@ +include $(top_srcdir)/example/Makefile.inc + +bin_PROGRAMS = odp_pktio +odp_pktio_LDFLAGS = $(AM_LDFLAGS) -static + +dist_odp_pktio_SOURCES = odp_pktio.c diff --git a/example/ipc/odp_pktio.c b/example/ipc/odp_pktio.c new file mode 100644 index 0000000..1eb4a95 --- /dev/null +++ b/example/ipc/odp_pktio.c @@ -0,0 +1,765 @@ +/* Copyright (c) 2013, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +/** + * @file + * + * @example odp_pktio.c ODP basic packet IO loopback test application + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/** @def MAX_WORKERS + * @brief Maximum number of worker threads + */ +#define MAX_WORKERS 32 + +/** @def SHM_PKT_POOL_SIZE + * @brief Size of the shared memory block + */ +#define SHM_PKT_POOL_SIZE (512*2048) + +/** @def SHM_PKT_POOL_BUF_SIZE + * @brief Buffer size of the packet pool buffer + */ +#define SHM_PKT_POOL_BUF_SIZE 1856 + +/** @def MAX_PKT_BURST + * @brief Maximum number of packet bursts + */ +#define MAX_PKT_BURST 16 + +/** @def APPL_MODE_PKT_BURST + * @brief The application will handle pakcets in bursts + */ +#define APPL_MODE_PKT_BURST 0 + +/** @def APPL_MODE_PKT_QUEUE + * @brief The application will handle packets in queues + */ +#define APPL_MODE_PKT_QUEUE 1 + +/** @def PRINT_APPL_MODE(x) + * @brief Macro to print the current status of how the application handles + * packets. + */ +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x)) + +/** Get rid of path in filename - only for unix-type paths using '/' */ +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ + strrchr((file_name), '/') + 1 : (file_name)) +/** + * Parsed command line application arguments + */ +typedef struct { + int core_count; + int if_count; /**< Number of interfaces to be used */ + char **if_names; /**< Array of pointers to interface names */ + int mode; /**< Packet IO mode */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ +} appl_args_t; + +/** + * Thread specific arguments + */ +typedef struct { + char *pktio_dev; /**< Interface name to use */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ + int mode; /**< Thread mode */ +} thread_args_t; + +/** + * Grouping of both parsed CL args and thread specific args - alloc together + */ +typedef struct { + /** Application (parsed) arguments */ + appl_args_t appl; + /** Thread specific arguments */ + thread_args_t thread[MAX_WORKERS]; +} args_t; + +/** Global pointer to args */ +static args_t *args; + +/* helper funcs */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len); +static void swap_pkt_addrs(odp_packet_t pkt_tbl[], unsigned len); +static void parse_args(int argc, char *argv[], appl_args_t *appl_args); +static void print_info(char *progname, appl_args_t *appl_args); +static void usage(char *progname); + +/** + * Packet IO loopback worker thread using ODP queues + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_queue_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + odp_pktio_t ipc_pktio; + thread_args_t *thr_args; + odp_queue_t ipcq; + odp_queue_t inq_def; + char inq_name[ODP_QUEUE_NAME_LEN]; + odp_queue_param_t qparam; + odp_packet_t pkt; + odp_buffer_t buf; + int ret; + unsigned long pkt_cnt = 0; + unsigned long err_cnt = 0; + + thr = odp_thread_id(); + thr_args = arg; + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + + /* + * Create and set the default INPUT queue associated with the 'pktio' + * resource + */ + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam); + if (inq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + ret = odp_pktio_inq_setdef(pktio, inq_def); + if (ret != 0) { + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr); + return NULL; + } + + /* IPC pktio */ + ipc_pktio = odp_pktio_open("ipc_pktio", 0); + if (ipc_pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: ipc pktio create failed.\n", thr); + return NULL; + } + + /* IPC queue */ + snprintf(inq_name, sizeof(inq_name), "%i-pktio_ipcq", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + ipcq = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTOUT, NULL); + if (inq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + /* Bind IPC pktio to IPC queue */ + ret = odp_pktio_outq_setdef(ipc_pktio, ipcq); + if (ret != 0) { + ODP_ERR(" [%02i] Error: output-Q setup for ipcq\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n" + " default pktio%02i-INPUT queue:%u\n", + thr, pktio, pktio, inq_def); + + /* Loop packets */ + for (;;) { + /* Use schedule to get buf from any input queue */ + buf = odp_schedule(NULL, ODP_SCHED_WAIT); + + pkt = odp_packet_from_buffer(buf); + + /* Drop packets with errors */ + if (odp_unlikely(drop_err_pkts(&pkt, 1) == 0)) { + ODP_ERR("Drop frame - err_cnt:%lu\n", ++err_cnt); + continue; + } + + /* Swap Eth MACs and possibly IP-addrs before sending back */ + swap_pkt_addrs(&pkt, 1); + + /* Enqueue the packet for output */ + odp_queue_enq(ipcq, buf); + + /* Print packet counts every once in a while */ + if (odp_unlikely(pkt_cnt++ % 100000 == 0)) { + printf(" [%02i] pkt_cnt:%lu\n", thr, pkt_cnt); + fflush(NULL); + } + } + + ODP_ABORT("unreachable code"); +} + +/** + * Packet IO loopback worker thread using bursts from/to IO resources + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_ifburst_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + odp_pktio_t ipc_pktio; + thread_args_t *thr_args; + int pkts, pkts_ok; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + unsigned long pkt_cnt = 0; + unsigned long err_cnt = 0; + unsigned long tmp = 0; + + thr = odp_thread_id(); + thr_args = arg; + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, burst mode\n", + thr, pktio); + + printf("pid: %d, create IPC pktio\n", getpid()); + + ipc_pktio = odp_pktio_open("ipc_pktio", 0); + if (ipc_pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: ipc pktio create failed.\n", thr); + return NULL; + } + + /* Loop packets */ + for (;;) { + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); + if (pkts > 0) { + /* Drop packets with errors */ + pkts_ok = drop_err_pkts(pkt_tbl, pkts); + if (pkts_ok > 0) { + /* Swap Eth MACs and IP-addrs */ + swap_pkt_addrs(pkt_tbl, pkts_ok); + odp_pktio_send(ipc_pktio, pkt_tbl, pkts_ok); + } + + if (odp_unlikely(pkts_ok != pkts)) + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n", + pkts-pkts_ok, ++err_cnt); + + /* Print packet counts every once in a while */ + tmp += pkts_ok; + if (odp_unlikely((tmp >= 100000) || /* OR first print:*/ + ((pkt_cnt == 0) && ((tmp-1) < MAX_PKT_BURST)))) { + pkt_cnt += tmp; + printf(" [%02i] pkt_cnt:%lu\n", thr, pkt_cnt); + fflush(NULL); + tmp = 0; + } + } + } + +/* unreachable */ +} + + +static int ipc_second_process(void) +{ + odp_pktio_t pktio; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + odp_shm_t shm; + odp_buffer_pool_t pool; + void *pool_base; + int i; + int pkts; + + /* Create packet pool visible by only second process. We will copy + * packets to that queue from IPC shared memory. + */ + shm = odp_shm_reserve("local_packet_pool", + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, 0); + + pool_base = odp_shm_addr(shm); + if (pool_base == NULL) { + ODP_ERR("Error: packet pool mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + + pool = odp_buffer_pool_create("ipc_packet_pool", pool_base, + SHM_PKT_POOL_SIZE, + SHM_PKT_POOL_BUF_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_BUFFER_TYPE_PACKET); + if (pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: packet pool create failed.\n"); + exit(EXIT_FAILURE); + } + + sleep(3); + pool_base = NULL; + /* Find remote shared pool */ + while (1) { + shm = odp_shm_reserve("shm_packet_pool", + SHM_PKT_POOL_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC_NOCREAT); + pool_base = odp_shm_addr(shm); + if (pool_base != NULL) { + break; + } else { + ODP_DBG("looking up for shm_packet_pool\n"); + sleep(1); + } + } + + /* Do lookup packet I/O in IPC shared memory, + * and link it to local pool. */ + while (1) { + pktio = odp_pktio_lookup("ipc_pktio", pool, pool_base); + if (pktio == ODP_PKTIO_INVALID) { + sleep(1); + printf("pid %d: looking for ipc_pktio\n", getpid()); + continue; + } + break; + } + + for (;;) { + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); + if (pkts > 0) { + for (i = 0; i < pkts; i++) { + ODP_DBG("pid %d, got packet %d, size %ld\n", + getpid(), pkt_tbl[i], + odp_packet_get_len(pkt_tbl[i])); + odp_buffer_free(pkt_tbl[i]); + } + } else { + /* No need to load cpu in example app.*/ + sleep(1); + } + } + + ODP_ABORT("Unexpected close."); + return 0; +} + + +/** + * ODP packet example main function + */ +int main(int argc, char *argv[]) +{ + odph_linux_pthread_t thread_tbl[MAX_WORKERS]; + odp_buffer_pool_t pool; + int num_workers; + void *pool_base; + int i; + int first_core; + int core_count; + odp_shm_t shm; + int f; + + + f = fork(); + if (f) { + printf("Process one pid: %d\n", getpid()); + /* Init ODP before calling anything else */ + if (odp_init_global()) { + ODP_ERR("Error: ODP global init failed.\n"); + exit(EXIT_FAILURE); + } + + /* Init this thread */ + if (odp_init_local()) { + ODP_ERR("Error: ODP local init failed.\n"); + exit(EXIT_FAILURE); + } + + ipc_second_process(); + } else { + printf("Process two pid: %d\n", getpid()); + } + + + /* Init ODP before calling anything else */ + if (odp_init_global()) { + ODP_ERR("Error: ODP global init failed.\n"); + exit(EXIT_FAILURE); + } + + /* Init this thread */ + if (odp_init_local()) { + ODP_ERR("Error: ODP local init failed.\n"); + exit(EXIT_FAILURE); + } + + /* At early stage fork to 2 separate processes */ + /* Reserve memory for args from shared mem */ + shm = odp_shm_reserve("shm_args", sizeof(args_t), + ODP_CACHE_LINE_SIZE, 0); + args = odp_shm_addr(shm); + + if (args == NULL) { + ODP_ERR("Error: shared mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + memset(args, 0, sizeof(*args)); + + /* Parse and store the application arguments */ + parse_args(argc, argv, &args->appl); + + /* Print both system and application information */ + print_info(NO_PATH(argv[0]), &args->appl); + + core_count = odp_sys_core_count(); + num_workers = core_count; + + if (args->appl.core_count) + num_workers = args->appl.core_count; + + if (num_workers > MAX_WORKERS) + num_workers = MAX_WORKERS; + + printf("Num worker threads: %i\n", num_workers); + + /* + * By default core #0 runs Linux kernel background tasks. + * Start mapping thread from core #1 + */ + first_core = 1; + if (core_count == 1) + first_core = 0; + + printf("First core: %i\n\n", first_core); + + /* Create packet pool in shared memory */ + shm = odp_shm_reserve("shm_packet_pool", + SHM_PKT_POOL_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC); + pool_base = odp_shm_addr(shm); + + if (pool_base == NULL) { + ODP_ERR("Error: packet pool mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + + pool = odp_buffer_pool_create("packet_pool", pool_base, + SHM_PKT_POOL_SIZE, + SHM_PKT_POOL_BUF_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_BUFFER_TYPE_PACKET); + if (pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: packet pool create failed.\n"); + exit(EXIT_FAILURE); + } + odp_buffer_pool_print(pool); + + /* Create and init worker threads */ + memset(thread_tbl, 0, sizeof(thread_tbl)); + for (i = 0; i < num_workers; ++i) { + void *(*thr_run_func) (void *); + int core; + int if_idx; + + core = (first_core + i) % core_count; + + if_idx = i % args->appl.if_count; + + args->thread[i].pktio_dev = args->appl.if_names[if_idx]; + args->thread[i].pool = pool; + args->thread[i].mode = args->appl.mode; + + if (args->appl.mode == APPL_MODE_PKT_BURST) + thr_run_func = pktio_ifburst_thread; + else /* APPL_MODE_PKT_QUEUE */ + thr_run_func = pktio_queue_thread; + /* + * Create threads one-by-one instead of all-at-once, + * because each thread might get different arguments. + * Calls odp_thread_create(cpu) for each thread + */ + odph_linux_pthread_create(&thread_tbl[i], 1, core, thr_run_func, + &args->thread[i]); + } + + /* Master thread waits for other threads to exit */ + odph_linux_pthread_join(thread_tbl, num_workers); + + printf("Exit\n\n"); + + return 0; +} + +/** + * Drop packets which input parsing marked as containing errors. + * + * Frees packets with error and modifies pkt_tbl[] to only contain packets with + * no detected errors. + * + * @param pkt_tbl Array of packet + * @param len Length of pkt_tbl[] + * + * @return Number of packets with no detected error + */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len) +{ + odp_packet_t pkt; + unsigned pkt_cnt = len; + unsigned i, j; + + for (i = 0, j = 0; i < len; ++i) { + pkt = pkt_tbl[i]; + + if (odp_unlikely(odp_packet_error(pkt))) { + odph_packet_free(pkt); /* Drop */ + pkt_cnt--; + } else if (odp_unlikely(i != j++)) { + pkt_tbl[j-1] = pkt; + } + } + + return pkt_cnt; +} + +/** + * Swap eth src<->dst and IP src<->dst addresses + * + * @param pkt_tbl Array of packets + * @param len Length of pkt_tbl[] + */ + +static void swap_pkt_addrs(odp_packet_t pkt_tbl[], unsigned len) +{ + odp_packet_t pkt; + odph_ethhdr_t *eth; + odph_ethaddr_t tmp_addr; + odph_ipv4hdr_t *ip; + uint32be_t ip_tmp_addr; /* tmp ip addr */ + unsigned i; + + for (i = 0; i < len; ++i) { + pkt = pkt_tbl[i]; + if (odp_packet_inflag_eth(pkt)) { + eth = (odph_ethhdr_t *)odp_packet_l2(pkt); + + tmp_addr = eth->dst; + eth->dst = eth->src; + eth->src = tmp_addr; + + if (odp_packet_inflag_ipv4(pkt)) { + /* IPv4 */ + ip = (odph_ipv4hdr_t *)odp_packet_l3(pkt); + + ip_tmp_addr = ip->src_addr; + ip->src_addr = ip->dst_addr; + ip->dst_addr = ip_tmp_addr; + } + } + } +} + +/** + * Parse and store the command line arguments + * + * @param argc argument count + * @param argv[] argument vector + * @param appl_args Store application arguments here + */ +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) +{ + int opt; + int long_index; + char *names, *str, *token, *save; + size_t len; + int i; + static struct option longopts[] = { + {"count", required_argument, NULL, 'c'}, + {"interface", required_argument, NULL, 'i'}, /* return 'i' */ + {"mode", required_argument, NULL, 'm'}, /* return 'm' */ + {"help", no_argument, NULL, 'h'}, /* return 'h' */ + {NULL, 0, NULL, 0} + }; + + appl_args->mode = -1; /* Invalid, must be changed by parsing */ + + while (1) { + opt = getopt_long(argc, argv, "+c:i:m:h", + longopts, &long_index); + + if (opt == -1) + break; /* No more options */ + + switch (opt) { + case 'c': + appl_args->core_count = atoi(optarg); + break; + /* parse packet-io interface names */ + case 'i': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + names = malloc(len); + if (names == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* count the number of tokens separated by ',' */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + } + appl_args->if_count = i; + + if (appl_args->if_count == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* allocate storage for the if names */ + appl_args->if_names = + calloc(appl_args->if_count, sizeof(char *)); + + /* store the if names (reset names string) */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + appl_args->if_names[i] = token; + } + break; + + case 'm': + i = atoi(optarg); + if (i == 0) + appl_args->mode = APPL_MODE_PKT_BURST; + else + appl_args->mode = APPL_MODE_PKT_QUEUE; + break; + + case 'h': + usage(argv[0]); + exit(EXIT_SUCCESS); + break; + + default: + break; + } + } + + if (appl_args->if_count == 0 || appl_args->mode == -1) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + optind = 1; /* reset 'extern optind' from the getopt lib */ +} + +/** + * Print system and application info + */ +static void print_info(char *progname, appl_args_t *appl_args) +{ + int i; + + printf("\n" + "ODP system info\n" + "---------------\n" + "ODP API version: %s\n" + "CPU model: %s\n" + "CPU freq (hz): %"PRIu64"\n" + "Cache line size: %i\n" + "Core count: %i\n" + "\n", + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(), + odp_sys_cache_line_size(), odp_sys_core_count()); + + printf("Running ODP appl: \"%s\"\n" + "-----------------\n" + "IF-count: %i\n" + "Using IFs: ", + progname, appl_args->if_count); + for (i = 0; i < appl_args->if_count; ++i) + printf(" %s", appl_args->if_names[i]); + printf("\n" + "Mode: "); + if (appl_args->mode == APPL_MODE_PKT_BURST) + PRINT_APPL_MODE(APPL_MODE_PKT_BURST); + else + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE); + printf("\n\n"); + fflush(NULL); +} + +/** + * Prinf usage information + */ +static void usage(char *progname) +{ + printf("\n" + "Usage: %s OPTIONS\n" + " E.g. %s -i eth1,eth2,eth3 -m 0\n" + "\n" + "OpenDataPlane example application.\n" + "\n" + "Mandatory OPTIONS:\n" + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" + " -m, --mode 0: Burst send&receive packets (no queues)\n" + " 1: Send&receive packets through ODP queues.\n" + "\n" + "Optional OPTIONS\n" + " -c, --count Core count.\n" + " -h, --help Display help and exit.\n" + " environment variables: ODP_PKTIO_DISABLE_SOCKET_MMAP\n" + " ODP_PKTIO_DISABLE_SOCKET_MMSG\n" + " ODP_PKTIO_DISABLE_SOCKET_BASIC\n" + " can be used to advanced pkt I/O selection for linux-generic\n" + "\n", NO_PATH(progname), NO_PATH(progname) + ); +} diff --git a/helper/include/odph_ring.h b/helper/include/odph_ring.h index 76c1db8..6240646 100644 --- a/helper/include/odph_ring.h +++ b/helper/include/odph_ring.h @@ -100,6 +100,7 @@ extern "C" { #include #include #include +#include #include #include @@ -158,6 +159,8 @@ typedef struct odph_ring { #define ODPH_RING_F_SP_ENQ 0x0001 /* The default enqueue is "single-producer".*/ #define ODPH_RING_F_SC_DEQ 0x0002 /* The default dequeue is "single-consumer".*/ +#define ODPH_RING_SHM_PROC 0x0004 /* If set - ring is visible from different + processes. Default is thread visible. */ #define ODPH_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */ #define ODPH_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */ diff --git a/platform/linux-generic/include/api/odp_packet_io.h b/platform/linux-generic/include/api/odp_packet_io.h index 29fd105..156dd9d 100644 --- a/platform/linux-generic/include/api/odp_packet_io.h +++ b/platform/linux-generic/include/api/odp_packet_io.h @@ -30,6 +30,19 @@ typedef uint32_t odp_pktio_t; #define ODP_PKTIO_INVALID 0 /** + * Lookup already existance ODP packet IO instance + * + * @param dev Packet IO device + * @param pool Pool to use for packet IO for current process + * @param pool Shareble between process Pool mapped address + * + * @return ODP packet IO handle or ODP_PKTIO_INVALID on error + */ +odp_pktio_t odp_pktio_lookup(const char *dev, odp_buffer_pool_t pool, + void *ext_pool_base); + + +/** * Open an ODP packet IO instance * * @param dev Packet IO device @@ -98,6 +111,15 @@ odp_queue_t odp_pktio_inq_getdef(odp_pktio_t id); int odp_pktio_inq_remdef(odp_pktio_t id); /** + * Set the default output queue to be associated with a pktio handle + * + * @param id ODP packet IO handle + * @param queue default output queue set + * @return 0 on success or -1 on error + */ +int odp_pktio_outq_setdef(odp_pktio_t id, odp_queue_t queue); + +/** * Query default output queue * * @param id ODP packet IO handle diff --git a/platform/linux-generic/include/api/odp_shared_memory.h b/platform/linux-generic/include/api/odp_shared_memory.h index 7ad29c3..46b6e18 100644 --- a/platform/linux-generic/include/api/odp_shared_memory.h +++ b/platform/linux-generic/include/api/odp_shared_memory.h @@ -31,6 +31,7 @@ extern "C" { /* Share level */ #define ODP_SHM_SW_ONLY 0x1 /**< Application SW only, no HW access */ #define ODP_SHM_PROC 0x2 /**< Share with external processes */ +#define ODP_SHM_PROC_NOCREAT 0x4 /** * ODP shared memory block @@ -98,6 +99,16 @@ int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info); /** + * Look up for shared memory object. + * + * @param name name of shm object + * + * @return 0 on success, otherwise non-zero + */ + +int odp_shm_lookup_ipc(const char *name); + +/** * Print all shared memory blocks */ void odp_shm_print_all(void); diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h b/platform/linux-generic/include/odp_buffer_pool_internal.h index e0210bd..d5f219d 100644 --- a/platform/linux-generic/include/odp_buffer_pool_internal.h +++ b/platform/linux-generic/include/odp_buffer_pool_internal.h @@ -73,6 +73,12 @@ static inline void *get_pool_entry(uint32_t pool_id) return pool_entry_ptr[pool_id]; } +typedef union { + struct pool_entry_s s; + + uint8_t pad[ODP_CACHE_LINE_SIZE_ROUNDUP(sizeof(struct pool_entry_s))]; + +} pool_entry_t; static inline odp_buffer_hdr_t *odp_buf_to_hdr(odp_buffer_t buf) { @@ -103,7 +109,6 @@ static inline odp_buffer_hdr_t *odp_buf_to_hdr(odp_buffer_t buf) #endif hdr = (odp_buffer_hdr_t *)(pool->buf_base + index * pool->buf_size); - return hdr; } diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index 23633ed..1403e1e 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -20,6 +20,7 @@ extern "C" { #include #include +#include /** * Packet IO types @@ -28,6 +29,7 @@ typedef enum { ODP_PKTIO_TYPE_SOCKET_BASIC = 0x1, ODP_PKTIO_TYPE_SOCKET_MMSG, ODP_PKTIO_TYPE_SOCKET_MMAP, + ODP_PKTIO_TYPE_IPC, } odp_pktio_type_t; struct pktio_entry { @@ -38,6 +40,11 @@ struct pktio_entry { odp_pktio_type_t type; /**< pktio type */ pkt_sock_t pkt_sock; /**< using socket API for IO */ pkt_sock_mmap_t pkt_sock_mmap; /**< using socket mmap API for IO */ + odph_ring_t *ipc_r; /**< ODP ring for IPC mgs packets + indexes transmitted to shared memory */ + odph_ring_t *ipc_p; /**< ODP ring for IPC msg packets + indexes already processed by remote process */ + void *ipc_pool_base; /**< IPC Remote pool base addr */ }; typedef union { diff --git a/platform/linux-generic/odp_buffer_pool.c b/platform/linux-generic/odp_buffer_pool.c index a48d7d6..1024e4c 100644 --- a/platform/linux-generic/odp_buffer_pool.c +++ b/platform/linux-generic/odp_buffer_pool.c @@ -55,15 +55,6 @@ typedef struct { uint8_t buf_data[]; /* start of buffer data area */ } odp_any_buffer_hdr_t; - -typedef union pool_entry_u { - struct pool_entry_s s; - - uint8_t pad[ODP_CACHE_LINE_SIZE_ROUNDUP(sizeof(struct pool_entry_s))]; - -} pool_entry_t; - - typedef struct pool_table_t { pool_entry_t pool[ODP_CONFIG_BUFFER_POOLS]; diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c index 55fa53a..fa41ce3 100644 --- a/platform/linux-generic/odp_init.c +++ b/platform/linux-generic/odp_init.c @@ -8,6 +8,7 @@ #include #include +#include int odp_init_global(void) { @@ -53,6 +54,11 @@ int odp_init_global(void) return -1; } + /* for linux-generic IPC queue implemented totaly in + * software using odp_ring. + */ + odph_ring_tailq_init(); + return 0; } diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 0c30f0f..d85110f 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -20,6 +20,14 @@ #include #include +#include +#include + +/* IPC packet I/O over odph_ring */ +#include + +#define PKTIO_IPC_ENTRIES 4096 /**< number of odp buffers in + odp ring queue */ typedef struct { pktio_entry_t entries[ODP_CONFIG_PKTIO_ENTRIES]; @@ -149,6 +157,73 @@ static int free_pktio_entry(odp_pktio_t id) return 0; } +odp_pktio_t odp_pktio_lookup(const char *dev, odp_buffer_pool_t pool, + void *ext_pool_base) +{ + odp_pktio_t id; + pktio_entry_t *pktio_entry; + + id = alloc_lock_pktio_entry(); + if (id == ODP_PKTIO_INVALID) { + ODP_ERR("No resources available.\n"); + return ODP_PKTIO_INVALID; + } + /* if successful, alloc_pktio_entry() returns with the entry locked */ + + pktio_entry = get_entry(id); + + if (odp_shm_lookup_ipc(dev) == 0) { + printf("pid %d odp_shm_lookup_ipc found shared object\n", + getpid()); + size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) + + sizeof(odph_ring_t); + + char ipc_shm_name[ODPH_RING_NAMESIZE]; + + memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE); + memcpy(ipc_shm_name, dev, strlen(dev)); + memcpy(ipc_shm_name + strlen(dev), "_r", 2); + + /* allocate shared memory for buffers needed to be produced */ + odp_shm_t shm = odp_shm_reserve(ipc_shm_name, ring_size, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC_NOCREAT); + + pktio_entry->s.ipc_r = odp_shm_addr(shm); + if (!pktio_entry->s.ipc_r) { + ODP_DBG("pid %d unable to find ipc ring %s name\n", + getpid(), dev); + goto error; + } + + memcpy(ipc_shm_name + strlen(dev), "_p", 2); + /* allocate shared memory for buffers tp be clenead up after they were produced + * by other proces. */ + shm = odp_shm_reserve(ipc_shm_name, ring_size, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC_NOCREAT); + + pktio_entry->s.ipc_p = odp_shm_addr(shm); + if (!pktio_entry->s.ipc_p) { + ODP_DBG("pid %d unable to find ipc ring %s name\n", getpid(), dev); + goto error; + } + } else { + ODP_DBG("pid %d unable to find ipc object %s name\n", getpid(), dev); + goto error; + } + + pktio_entry->s.type = ODP_PKTIO_TYPE_IPC; + pktio_entry->s.pkt_sock.pool = pool; + pktio_entry->s.ipc_pool_base = ext_pool_base; + unlock_entry(pktio_entry); + return id; +error: + unlock_entry(pktio_entry); + free_pktio_entry(id); + return ODP_PKTIO_INVALID; +} + odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool) { odp_pktio_t id; @@ -165,6 +240,42 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool) pktio_entry = get_entry(id); + /* if pool is 0, then we assume that queue is IPC, I.e. it's software packet I/O + * communicating to different process. + */ + if (pool == 0) { + char ipc_shm_name[ODPH_RING_NAMESIZE]; + + /* generate name in shm like ipc_pktio_r for packet to be processed ring */ + memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE); + memcpy(ipc_shm_name, dev, strlen(dev)); + memcpy(ipc_shm_name + strlen(dev), "_r", 2); + + pktio_entry->s.ipc_r = odph_ring_create(ipc_shm_name, + PKTIO_IPC_ENTRIES, + ODPH_RING_SHM_PROC); + if (!pktio_entry->s.ipc_r) { + ODP_DBG("pid %d unable to create ipc ring %s name\n", getpid(), dev); + goto invalid; + } + ODP_DBG("Created IPC ring: %s\n", ipc_shm_name); + + /* generate name in shm like ipc_pktio_p for already processed packets */ + memcpy(ipc_shm_name + strlen(dev), "_p", 2); + + pktio_entry->s.ipc_p = odph_ring_create(ipc_shm_name, + PKTIO_IPC_ENTRIES, + ODPH_RING_SHM_PROC); + if (!pktio_entry->s.ipc_p) { + ODP_DBG("pid %d unable to create ipc ring %s name\n", getpid(), dev); + goto invalid; + } + + ODP_DBG("Created IPC ring: %s\n", ipc_shm_name); + pktio_entry->s.type = ODP_PKTIO_TYPE_IPC; + goto done; + } + ODP_DBG("ODP_PKTIO_USE_FANOUT: %d\n", fanout); if (getenv("ODP_PKTIO_DISABLE_SOCKET_MMAP") == NULL) { pktio_entry->s.type = ODP_PKTIO_TYPE_SOCKET_MMAP; @@ -197,6 +308,7 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool) close_pkt_sock(&pktio_entry->s.pkt_sock); } +invalid: unlock_entry(pktio_entry); free_pktio_entry(id); ODP_ERR("Unable to init any I/O type.\n"); @@ -272,6 +384,68 @@ int odp_pktio_recv(odp_pktio_t id, odp_packet_t pkt_table[], unsigned len) pkts = recv_pkt_sock_mmap(&pktio_entry->s.pkt_sock_mmap, pkt_table, len); break; + case ODP_PKTIO_TYPE_IPC: + pkts = 0; + int ret; + odph_ring_t *r = pktio_entry->s.ipc_r; + odph_ring_t *r_p = pktio_entry->s.ipc_p; + odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES]; + void **ipcbufs_p = (void *)&remote_pkts; + unsigned ring_len = odph_ring_count(r); + int idx; + + pkts = len; + if (len > ring_len) + pkts = ring_len; + + ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, pkts); + if (ret == 0) { + for (i = 0; i < pkts; i++) { + /* Remote packet has coded pool and index. We need only index.*/ + odp_buffer_bits_t handle; + handle.u32 = remote_pkts[i]; + idx = handle.index; + + /* Link to packed data. From here we have Zero-Copy between processes. phdr pointer to + * packet buffer in shared buffer and 2048 is buffer address provided tp pool_create in remote + * host. */ + odp_packet_hdr_t *phdr = (odp_packet_hdr_t *)((char *)pktio_entry->s.ipc_pool_base + (idx * 2048)); + + /* Allocate new packet.*/ + odp_buffer_pool_t pool = pktio_entry->s.pkt_sock.pool; + odp_packet_t pkt = odp_buffer_alloc(pool); + if (odp_unlikely(pkt == ODP_PACKET_INVALID)) + ODP_ABORT("unable to allocate memory for pool"); + + /* Copy packet data. */ + uint8_t *pkt_buf = odp_packet_addr(pkt); + uint8_t *l2_hdr = pkt_buf + pktio_entry->s.pkt_sock.frame_offset; + memcpy(l2_hdr, phdr->buf_data, phdr->frame_len); + + /* Copy packets L2, L3 parsed offsets and size */ + odp_packet_hdr(pkt)->l2_offset = + phdr->l2_offset; + odp_packet_hdr(pkt)->l3_offset = + phdr->l3_offset; + odp_packet_hdr(pkt)->l4_offset = + phdr->l4_offset; + odp_packet_hdr(pkt)->frame_len = + phdr->frame_len; + odp_packet_hdr(pkt)->user_ctx = + phdr->user_ctx; + + pkt_table[i] = pkt; + } + + /* Now tell other process that we no longer need that buffers.*/ + ret = odph_ring_mp_enqueue_bulk(r_p, ipcbufs_p, pkts); + if (ret != 0) + ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n"); + } else { + pkts = -1; + ODP_DBG("dequeue no packets\n"); + } + break; default: pkts = -1; break; @@ -309,6 +483,38 @@ int odp_pktio_send(odp_pktio_t id, odp_packet_t pkt_table[], unsigned len) pkts = send_pkt_sock_mmap(&pktio_entry->s.pkt_sock_mmap, pkt_table, len); break; + case ODP_PKTIO_TYPE_IPC: + pkts = len; + odph_ring_t *r = pktio_entry->s.ipc_r; + void **rbuf_p; + int ret; + unsigned i; + + /* Free already processed packets, if any */ + { + odph_ring_t *r_p = pktio_entry->s.ipc_p; + unsigned complete_packets = odph_ring_count(r_p); + odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES]; + if (complete_packets > 0) { + rbuf_p = (void *)&r_p_pkts; + ret = odph_ring_mc_dequeue_bulk(r_p, rbuf_p, + complete_packets); + if (ret == 0) { + for (i = 0; i < complete_packets; i++) + odp_buffer_free(r_p_pkts[i]); + } + } + } + + /* Put packets to ring to be processed in other process. */ + for (i = 0; i < len; i++) { + odp_packet_t pkt = pkt_table[i]; + rbuf_p = (void *)&pkt; + ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1); + if (ret != 0) + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + } + break; default: pkts = -1; } @@ -357,6 +563,29 @@ odp_queue_t odp_pktio_inq_getdef(odp_pktio_t id) return pktio_entry->s.inq_default; } +int odp_pktio_outq_setdef(odp_pktio_t id, odp_queue_t queue) +{ + pktio_entry_t *pktio_entry = get_entry(id); + queue_entry_t *qentry = queue_to_qentry(queue); + + if (pktio_entry == NULL || qentry == NULL) + return -1; + + if (qentry->s.type != ODP_QUEUE_TYPE_PKTOUT) + return -1; + + lock_entry(pktio_entry); + pktio_entry->s.outq_default = queue; + unlock_entry(pktio_entry); + + queue_lock(qentry); + qentry->s.pktout = id; + qentry->s.status = QUEUE_STATUS_SCHED; + queue_unlock(qentry); + + return 0; +} + odp_queue_t odp_pktio_outq_getdef(odp_pktio_t id) { pktio_entry_t *pktio_entry = get_entry(id); diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c index 632aa66..7f6eaad 100644 --- a/platform/linux-generic/odp_ring.c +++ b/platform/linux-generic/odp_ring.c @@ -158,8 +158,14 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) char ring_name[ODPH_RING_NAMESIZE]; odph_ring_t *r; size_t ring_size; + uint32_t shm_flag; odp_shm_t shm; + if (flags & ODPH_RING_SHM_PROC) + shm_flag = ODP_SHM_PROC; + else + shm_flag = 0; + /* count must be a power of 2 */ if (!ODP_VAL_IS_POWER_2(count) || (count > ODPH_RING_SZ_MASK)) { ODP_ERR("Requested size is invalid, must be power of 2, and do not exceed the size limit %u\n", @@ -172,7 +178,8 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) odp_rwlock_write_lock(&qlock); /* reserve a memory zone for this ring.*/ - shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, 0); + shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, + shm_flag); r = odp_shm_addr(shm); diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c index 1898a34..dd7ea22 100644 --- a/platform/linux-generic/odp_shared_memory.c +++ b/platform/linux-generic/odp_shared_memory.c @@ -20,6 +20,7 @@ #include +#include #define ODP_SHM_NUM_BLOCKS 32 @@ -122,14 +123,17 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, int fd = -1; int map_flag = MAP_SHARED; /* If already exists: O_EXCL: error, O_TRUNC: truncate to zero */ - int oflag = O_RDWR | O_CREAT | O_TRUNC; + int oflag = O_RDWR; uint64_t alloc_size = size + align; uint64_t page_sz, huge_sz; huge_sz = odp_sys_huge_page_size(); page_sz = odp_sys_page_size(); - if (flags & ODP_SHM_PROC) { + if (flags & ODP_SHM_PROC) + oflag |= O_CREAT | O_TRUNC; + + if (flags & (ODP_SHM_PROC | ODP_SHM_PROC_NOCREAT)) { /* Creates a file to /dev/shm */ fd = shm_open(name, oflag, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -273,6 +277,33 @@ int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info) return 0; } +int odp_shm_lookup_ipc(const char *name) +{ + int shm; + char ipc_shm_name[ODPH_RING_NAMESIZE]; + + memset(ipc_shm_name, 0, ODPH_RING_NAMESIZE); + memcpy(ipc_shm_name, name, strlen(name)); + memcpy(ipc_shm_name + strlen(name), "_r", 2); + + shm = shm_open(ipc_shm_name, O_RDWR, S_IRUSR | S_IWUSR); + if (shm == -1) { + ODP_DBG("IPC shm_open for %s not found\n", ipc_shm_name); + return -1; + } + close(shm); + + memcpy(ipc_shm_name + strlen(name), "_p", 2); + + shm = shm_open(ipc_shm_name, O_RDWR, S_IRUSR | S_IWUSR); + if (shm == -1) { + ODP_DBG("IPC shm_open for %s not found\n", ipc_shm_name); + return -1; + } + close(shm); + + return 0; +} void odp_shm_print_all(void) {