From patchwork Fri Dec 9 19:50:05 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Maxim Uvarov X-Patchwork-Id: 87555 Delivered-To: patch@linaro.org Received: by 10.140.20.101 with SMTP id 92csp486003qgi; Fri, 9 Dec 2016 11:53:04 -0800 (PST) X-Received: by 10.55.41.209 with SMTP id p78mr78525227qkp.282.1481313184735; Fri, 09 Dec 2016 11:53:04 -0800 (PST) Return-Path: Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id o18si20843903qke.325.2016.12.09.11.53.04; Fri, 09 Dec 2016 11:53:04 -0800 (PST) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=pass (p=NONE dis=NONE) header.from=linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 5F1E462C49; Fri, 9 Dec 2016 19:53:04 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-1.9 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_NONE, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 320C062C40; Fri, 9 Dec 2016 19:51:12 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id D1C9862B7D; Fri, 9 Dec 2016 19:51:02 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by lists.linaro.org (Postfix) with ESMTPS id E96D660E71 for ; Fri, 9 Dec 2016 19:50:29 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id c13so7186861lfg.0 for ; Fri, 09 Dec 2016 11:50:29 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=nbvUXmIuFxsGzvnRE7io1/h2iEwE4ktShpQsh4jggnY=; b=YMaRmuLC6gjWTmTLOBowEkgbp9CXA542ny7DwOrHBWCn5WYFly9BRCutKQjC1ureV4 kLAT7I2EbqpCk+rzsy8UevRLDUUlXDn83ms/vrMLBby2ATz7oPMisRA3mTb13uSR9DqV lzatklEZ3HZir8S8QIAioJaJiAKxDuczSEm1b5bwlY/CBsYtUeHxb7b3pVdTJdi6v//F 7Uph18Ep3u9O5hJRdznaNWMf/AyLiE0vsBPnHh2KDGhBt1ooC2GWnkHHiHvCP2MkRgsS M9/YzqexpIXVinNfqPf7ahPzaprtUWFre/FwrOhqT7OrHWy38nBM2RUCFdo4u9tI8U9G 48JA== X-Gm-Message-State: AKaTC02N6di4b2754xdJvTDyH2p5aT2gfVoiSuPVrgnmX5A8ltCu50ZTEw85W2NijMND/yrQvAE= X-Received: by 10.25.72.74 with SMTP id v71mr28450876lfa.130.1481313028347; Fri, 09 Dec 2016 11:50:28 -0800 (PST) Received: from localhost.localdomain (ppp91-77-165-188.pppoe.mtu-net.ru. [91.77.165.188]) by smtp.gmail.com with ESMTPSA id l137sm1065776lfb.7.2016.12.09.11.50.27 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Fri, 09 Dec 2016 11:50:27 -0800 (PST) From: Maxim Uvarov To: lng-odp@lists.linaro.org Date: Fri, 9 Dec 2016 22:50:05 +0300 Message-Id: <1481313005-9281-5-git-send-email-maxim.uvarov@linaro.org> X-Mailer: git-send-email 2.7.1.250.gff4ea60 In-Reply-To: <1481313005-9281-1-git-send-email-maxim.uvarov@linaro.org> References: <1481313005-9281-1-git-send-email-maxim.uvarov@linaro.org> Subject: [lng-odp] [API-NEXT PATCH 4/4] linux-gen: pktio ipc: make it work again X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" Signed-off-by: Maxim Uvarov --- .../linux-generic/include/odp_buffer_internal.h | 10 +- platform/linux-generic/include/odp_internal.h | 1 - .../include/odp_packet_io_ipc_internal.h | 27 +- platform/linux-generic/odp_init.c | 5 +- platform/linux-generic/pktio/ipc.c | 488 +++++++++------------ 5 files changed, 236 insertions(+), 295 deletions(-) -- 2.7.1.250.gff4ea60 diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 4e75908..d0eb597 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -64,6 +64,11 @@ struct odp_buffer_hdr_t { struct { void *hdr; uint8_t *data; +#ifdef _ODP_PKTIO_IPC + /* ipc mapped process can not walk over pointers, + * offset has to be used */ + uint64_t ipc_data_offset; +#endif uint32_t len; } seg[CONFIG_PACKET_MAX_SEGS]; @@ -101,11 +106,6 @@ struct odp_buffer_hdr_t { queue_entry_t *target_qe; /* ordered queue target */ uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; }; -#ifdef _ODP_PKTIO_IPC - /* ipc mapped process can not walk over pointers, - * offset has to be used */ - uint64_t ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS]; -#endif /* Data or next header */ uint8_t data[0]; diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h index 6a80d9d..b313b1f 100644 --- a/platform/linux-generic/include/odp_internal.h +++ b/platform/linux-generic/include/odp_internal.h @@ -50,7 +50,6 @@ struct odp_global_data_s { odp_cpumask_t control_cpus; odp_cpumask_t worker_cpus; int num_cpus_installed; - int ipc_ns; }; enum init_stage { diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h index 851114d..7cd2948 100644 --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h @@ -26,22 +26,31 @@ */ struct pktio_info { struct { - /* number of buffer in remote pool */ - int shm_pool_bufs_num; - /* size of remote pool */ - size_t shm_pkt_pool_size; + /* number of buffer*/ + int num; /* size of packet/segment in remote pool */ - uint32_t shm_pkt_size; + uint32_t block_size; /* offset from shared memory block start - * to pool_mdata_addr (odp-linux pool specific) */ - size_t mdata_offset; + * to pool *base_addr in remote process. + * (odp-linux pool specific) */ + size_t base_addr_offset; char pool_name[ODP_POOL_NAME_LEN]; + /* 1 if master finished creation of all shared objects */ + int init_done; } master; struct { /* offset from shared memory block start - * to pool_mdata_addr in remote process. + * to pool *base_addr in remote process. * (odp-linux pool specific) */ - size_t mdata_offset; + size_t base_addr_offset; + void *base_addr; + uint32_t block_size; char pool_name[ODP_POOL_NAME_LEN]; + /* pid of the slave process written to shm and + * used by master to look up memory created by + * slave + */ + int pid; + int init_done; } slave; } ODP_PACKED; diff --git a/platform/linux-generic/odp_init.c b/platform/linux-generic/odp_init.c index d40a83c..1b0d8f8 100644 --- a/platform/linux-generic/odp_init.c +++ b/platform/linux-generic/odp_init.c @@ -67,7 +67,7 @@ static int cleanup_files(const char *dirpath, int odp_pid) int odp_init_global(odp_instance_t *instance, const odp_init_t *params, - const odp_platform_init_t *platform_params) + const odp_platform_init_t *platform_params ODP_UNUSED) { char *hpdir; @@ -75,9 +75,6 @@ int odp_init_global(odp_instance_t *instance, odp_global_data.main_pid = getpid(); cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid); - if (platform_params) - odp_global_data.ipc_ns = platform_params->ipc_ns; - enum init_stage stage = NO_INIT; odp_global_data.log_fn = odp_override_log; odp_global_data.abort_fn = odp_override_abort; diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 0e99c6e..5e5e3b2 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -3,150 +3,84 @@ * * SPDX-License-Identifier: BSD-3-Clause */ -#ifdef _ODP_PKTIO_IPC #include #include #include #include #include +#include <_ishm_internal.h> #include #include #include +#define IPC_ODP_DEBUG_PRINT 0 + +#define IPC_ODP_DBG(fmt, ...) \ + do { \ + if (IPC_ODP_DEBUG_PRINT == 1) \ + ODP_DBG(fmt, ##__VA_ARGS__);\ + } while (0) + /* MAC address for the "ipc" interface */ static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12}; -static void *_ipc_map_remote_pool(const char *name, size_t size); +static void *_ipc_map_remote_pool(const char *name, int pid); static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl) { - pool_entry_t *pool; - uint32_t pool_id; + pool_t *pool; odp_shm_t shm; odp_shm_info_t info; - pool_id = pool_handle_to_index(pool_hdl); - pool = get_pool_entry(pool_id); - shm = pool->s.pool_shm; + pool = pool_entry_from_hdl(pool_hdl); + shm = pool->shm; odp_shm_info(shm, &info); return info.name; } -/** -* Look up for shared memory object. -* -* @param name name of shm object -* -* @return 0 on success, otherwise non-zero -*/ -static int _ipc_shm_lookup(const char *name) -{ - int shm; - char shm_devname[SHM_DEVNAME_MAXLEN]; - - if (!odp_global_data.ipc_ns) - ODP_ABORT("ipc_ns not set\n"); - - snprintf(shm_devname, SHM_DEVNAME_MAXLEN, - SHM_DEVNAME_FORMAT, - odp_global_data.ipc_ns, name); - - shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR); - if (shm == -1) { - if (errno == ENOENT) { - ODP_DBG("no file %s\n", shm_devname); - return -1; - } - ODP_ABORT("shm_open for %s err %s\n", - shm_devname, strerror(errno)); - } - close(shm); - return 0; -} - -static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry, - const char *dev, - int *slave) -{ - struct pktio_info *pinfo; - char name[ODP_POOL_NAME_LEN + sizeof("_info")]; - uint32_t flags; - odp_shm_t shm; - - /* Create info about remote pktio */ - snprintf(name, sizeof(name), "%s_info", dev); - - flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL; - - shm = odp_shm_reserve(name, sizeof(struct pktio_info), - ODP_CACHE_LINE_SIZE, - flags); - if (ODP_SHM_INVALID != shm) { - pinfo = odp_shm_addr(shm); - pinfo->master.pool_name[0] = 0; - *slave = 0; - } else { - flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL; - shm = odp_shm_reserve(name, sizeof(struct pktio_info), - ODP_CACHE_LINE_SIZE, - flags); - if (ODP_SHM_INVALID == shm) - ODP_ABORT("can not connect to shm\n"); - - pinfo = odp_shm_addr(shm); - *slave = 1; - } - - pktio_entry->s.ipc.pinfo = pinfo; - pktio_entry->s.ipc.pinfo_shm = shm; - - return 0; -} - static int _ipc_master_start(pktio_entry_t *pktio_entry) { struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo; - int ret; void *ipc_pool_base; - if (pinfo->slave.mdata_offset == 0) + if (pinfo->slave.init_done == 0) return -1; - ret = _ipc_shm_lookup(pinfo->slave.pool_name); - if (ret) { - ODP_DBG("no pool file %s\n", pinfo->slave.pool_name); - return -1; - } - ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name, - pinfo->master.shm_pkt_pool_size); + pinfo->slave.pid); + if (ipc_pool_base == NULL) { + ODP_DBG("no pool file %s for pid %d\n", + pinfo->slave.pool_name, pinfo->slave.pid); + return -1; + } + + pktio_entry->s.ipc.pool_base = ipc_pool_base; pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base + - pinfo->slave.mdata_offset; + pinfo->slave.base_addr_offset; odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1); - ODP_DBG("%s started.\n", pktio_entry->s.name); + IPC_ODP_DBG("%s started.\n", pktio_entry->s.name); return 0; } static int _ipc_init_master(pktio_entry_t *pktio_entry, const char *dev, - odp_pool_t pool) + odp_pool_t pool_hdl) { char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")]; - pool_entry_t *pool_entry; - uint32_t pool_id; + pool_t *pool; struct pktio_info *pinfo; const char *pool_name; - pool_id = pool_handle_to_index(pool); - pool_entry = get_pool_entry(pool_id); + pool = pool_entry_from_hdl(pool_hdl); + (void)pool; if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) { - ODP_DBG("too big ipc name\n"); + ODP_ERR("too big ipc name\n"); return -1; } @@ -158,7 +92,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, PKTIO_IPC_ENTRIES, _RING_SHM_PROC | _RING_NO_LIST); if (!pktio_entry->s.ipc.tx.send) { - ODP_DBG("pid %d unable to create ipc ring %s name\n", + ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); return -1; } @@ -174,7 +108,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, PKTIO_IPC_ENTRIES, _RING_SHM_PROC | _RING_NO_LIST); if (!pktio_entry->s.ipc.tx.free) { - ODP_DBG("pid %d unable to create ipc ring %s name\n", + ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_prod; } @@ -187,7 +121,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, PKTIO_IPC_ENTRIES, _RING_SHM_PROC | _RING_NO_LIST); if (!pktio_entry->s.ipc.rx.recv) { - ODP_DBG("pid %d unable to create ipc ring %s name\n", + ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_cons; } @@ -200,7 +134,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, PKTIO_IPC_ENTRIES, _RING_SHM_PROC | _RING_NO_LIST); if (!pktio_entry->s.ipc.rx.free) { - ODP_DBG("pid %d unable to create ipc ring %s name\n", + ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_s_prod; } @@ -210,24 +144,23 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, /* Set up pool name for remote info */ pinfo = pktio_entry->s.ipc.pinfo; - pool_name = _ipc_odp_buffer_pool_shm_name(pool); + pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl); if (strlen(pool_name) > ODP_POOL_NAME_LEN) { - ODP_DBG("pid %d ipc pool name %s is too big %d\n", + ODP_ERR("pid %d ipc pool name %s is too big %d\n", getpid(), pool_name, strlen(pool_name)); goto free_s_prod; } memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name)); - pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size; - pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num; - pinfo->master.shm_pkt_size = pool_entry->s.seg_size; - pinfo->master.mdata_offset = pool_entry->s.pool_mdata_addr - - pool_entry->s.pool_base_addr; - pinfo->slave.mdata_offset = 0; + pinfo->slave.base_addr_offset = 0; + pinfo->slave.base_addr = 0; + pinfo->slave.pid = 0; + pinfo->slave.init_done = 0; - pktio_entry->s.ipc.pool = pool; + pktio_entry->s.ipc.pool = pool_hdl; ODP_DBG("Pre init... DONE.\n"); + pinfo->master.init_done = 1; _ipc_master_start(pktio_entry); @@ -246,55 +179,45 @@ free_m_prod: } static void _ipc_export_pool(struct pktio_info *pinfo, - odp_pool_t pool) + odp_pool_t pool_hdl) { - pool_entry_t *pool_entry; - - pool_entry = odp_pool_to_entry(pool); - if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size) - ODP_ABORT("pktio for same name should have the same pool size\n"); - if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num) - ODP_ABORT("pktio for same name should have the same pool size\n"); + pool_t *pool = pool_entry_from_hdl(pool_hdl); snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s", - pool_entry->s.name); - pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr - - pool_entry->s.pool_base_addr; + _ipc_odp_buffer_pool_shm_name(pool_hdl)); + pinfo->slave.pid = odp_global_data.main_pid; + pinfo->slave.block_size = pool->block_size; + pinfo->slave.base_addr = pool->base_addr; } -static void *_ipc_map_remote_pool(const char *name, size_t size) +static void *_ipc_map_remote_pool(const char *name, int pid) { odp_shm_t shm; void *addr; + char rname[ODP_SHM_NAME_LEN]; - ODP_DBG("Mapping remote pool %s, size %ld\n", name, size); - shm = odp_shm_reserve(name, - size, - ODP_CACHE_LINE_SIZE, - _ODP_SHM_PROC_NOCREAT); - if (shm == ODP_SHM_INVALID) - ODP_ABORT("unable map %s\n", name); + snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name); + shm = odp_shm_import(name, pid, rname); + if (shm == ODP_SHM_INVALID) { + ODP_ERR("unable map %s\n", name); + return NULL; + } addr = odp_shm_addr(shm); - ODP_DBG("MAP master: %p - %p size %ld, pool %s\n", - addr, (char *)addr + size, size, name); + + IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname); return addr; } -static void *_ipc_shm_map(char *name, size_t size) +static void *_ipc_shm_map(char *name, int pid) { odp_shm_t shm; - int ret; - ret = _ipc_shm_lookup(name); - if (ret == -1) + shm = odp_shm_import(name, pid, name); + if (ODP_SHM_INVALID == shm) { + ODP_ERR("unable to map: %s\n", name); return NULL; - - shm = odp_shm_reserve(name, size, - ODP_CACHE_LINE_SIZE, - _ODP_SHM_PROC_NOCREAT); - if (ODP_SHM_INVALID == shm) - ODP_ABORT("unable to map: %s\n", name); + } return odp_shm_addr(shm); } @@ -313,15 +236,22 @@ static int _ipc_init_slave(const char *dev, static int _ipc_slave_start(pktio_entry_t *pktio_entry) { char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")]; - size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) + - sizeof(_ring_t); struct pktio_info *pinfo; void *ipc_pool_base; odp_shm_t shm; - const char *dev = pktio_entry->s.name; + char tail[ODP_POOL_NAME_LEN]; + char dev[ODP_POOL_NAME_LEN]; + int pid; + + if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) { + ODP_ERR("wrong pktio name\n"); + return -1; + } + + sprintf(dev, "ipc:%s", tail); snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); - pktio_entry->s.ipc.rx.recv = _ipc_shm_map(ipc_shm_name, ring_size); + pktio_entry->s.ipc.rx.recv = _ipc_shm_map(ipc_shm_name, pid); if (!pktio_entry->s.ipc.rx.recv) { ODP_DBG("pid %d unable to find ipc ring %s name\n", getpid(), dev); @@ -333,9 +263,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) _ring_free_count(pktio_entry->s.ipc.rx.recv)); snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); - pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size); + pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid); if (!pktio_entry->s.ipc.rx.free) { - ODP_DBG("pid %d unable to find ipc ring %s name\n", + ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_m_prod; } @@ -344,9 +274,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) _ring_free_count(pktio_entry->s.ipc.rx.free)); snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); - pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size); + pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid); if (!pktio_entry->s.ipc.tx.send) { - ODP_DBG("pid %d unable to find ipc ring %s name\n", + ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_m_cons; } @@ -355,9 +285,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) _ring_free_count(pktio_entry->s.ipc.tx.send)); snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); - pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size); + pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid); if (!pktio_entry->s.ipc.tx.free) { - ODP_DBG("pid %d unable to find ipc ring %s name\n", + ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_s_prod; } @@ -368,14 +298,15 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) /* Get info about remote pool */ pinfo = pktio_entry->s.ipc.pinfo; ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name, - pinfo->master.shm_pkt_pool_size); + pid); pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base + - pinfo->master.mdata_offset; - pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size; + pinfo->master.base_addr_offset; + pktio_entry->s.ipc.pkt_size = pinfo->master.block_size; _ipc_export_pool(pinfo, pktio_entry->s.ipc.pool); odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1); + pinfo->slave.init_done = 1; ODP_DBG("%s started.\n", pktio_entry->s.name); return 0; @@ -401,7 +332,11 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, odp_pool_t pool) { int ret = -1; - int slave; + int pid ODP_UNUSED; + struct pktio_info *pinfo; + char name[ODP_POOL_NAME_LEN + sizeof("_info")]; + char tail[ODP_POOL_NAME_LEN]; + odp_shm_t shm; ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE, "mismatch pool and ring name arrays"); @@ -411,65 +346,59 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0); - _ipc_map_pktio_info(pktio_entry, dev, &slave); - pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER : - PKTIO_TYPE_IPC_SLAVE; + /* Shared info about remote pktio */ + if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) { + pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE; - if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) { + snprintf(name, sizeof(name), "ipc:%s_info", tail); + IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid); + shm = odp_shm_import(name, pid, name); + if (ODP_SHM_INVALID == shm) + return -1; + pinfo = odp_shm_addr(shm); + + if (!pinfo->master.init_done) { + odp_shm_free(shm); + return -1; + } + pktio_entry->s.ipc.pinfo = pinfo; + pktio_entry->s.ipc.pinfo_shm = shm; + ODP_DBG("process %d is slave\n", getpid()); + ret = _ipc_init_slave(name, pktio_entry, pool); + } else { + pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER; + snprintf(name, sizeof(name), "%s_info", dev); + shm = odp_shm_reserve(name, sizeof(struct pktio_info), + ODP_CACHE_LINE_SIZE, + _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK); + if (ODP_SHM_INVALID == shm) { + ODP_ERR("can not create shm %s\n", name); + return -1; + } + + pinfo = odp_shm_addr(shm); + pinfo->master.init_done = 0; + pinfo->master.pool_name[0] = 0; + pktio_entry->s.ipc.pinfo = pinfo; + pktio_entry->s.ipc.pinfo_shm = shm; ODP_DBG("process %d is master\n", getpid()); ret = _ipc_init_master(pktio_entry, dev, pool); - } else { - ODP_DBG("process %d is slave\n", getpid()); - ret = _ipc_init_slave(dev, pktio_entry, pool); } return ret; } -static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf, - uint32_t offset, - uint32_t *seglen, - uint32_t limit) +static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r) { - int seg_index = offset / buf->segsize; - int seg_offset = offset % buf->segsize; -#ifdef _ODP_PKTIO_IPC - void *addr = (char *)buf - buf->ipc_addr_offset[seg_index]; -#else - /** buf_hdr.ipc_addr_offset defined only when ipc is - * enabled. */ - void *addr = NULL; - - (void)seg_index; -#endif - if (seglen) { - uint32_t buf_left = limit - offset; - *seglen = seg_offset + buf_left <= buf->segsize ? - buf_left : buf->segsize - seg_offset; - } - - return (void *)(seg_offset + (uint8_t *)addr); -} - -static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr, - uint32_t offset, uint32_t *seglen) -{ - if (offset > pkt_hdr->frame_len) - return NULL; - - return _ipc_buffer_map(&pkt_hdr->buf_hdr, - pkt_hdr->headroom + offset, seglen, - pkt_hdr->headroom + pkt_hdr->frame_len); -} - -static void _ipc_free_ring_packets(_ring_t *r) -{ - odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES]; + uint64_t offsets[PKTIO_IPC_ENTRIES]; int ret; void **rbuf_p; int i; - rbuf_p = (void *)&r_p_pkts; + if (!r) + return; + + rbuf_p = (void *)&offsets; while (1) { ret = _ring_mc_dequeue_burst(r, rbuf_p, @@ -477,8 +406,13 @@ static void _ipc_free_ring_packets(_ring_t *r) if (0 == ret) break; for (i = 0; i < ret; i++) { - if (r_p_pkts[i] != ODP_PACKET_INVALID) - odp_packet_free(r_p_pkts[i]); + odp_packet_hdr_t *phdr; + odp_packet_t pkt; + void *mbase = pktio_entry->s.ipc.pool_mdata_base; + + phdr = (void *)((uint8_t *)mbase + offsets[i]); + pkt = (odp_packet_t)phdr->buf_hdr.handle.handle; + odp_packet_free(pkt); } } } @@ -490,22 +424,23 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, int i; _ring_t *r; _ring_t *r_p; + uint64_t offsets[PKTIO_IPC_ENTRIES]; + void **ipcbufs_p = (void *)&offsets; + uint32_t ready; + int pkts_ring; - odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES]; - void **ipcbufs_p = (void *)&remote_pkts; - uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready); - + ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready); if (odp_unlikely(!ready)) { - ODP_DBG("start pktio is missing before usage?\n"); + IPC_ODP_DBG("start pktio is missing before usage?\n"); return -1; } - _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free); + _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free); r = pktio_entry->s.ipc.rx.recv; pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len); if (odp_unlikely(pkts < 0)) - ODP_ABORT("error to dequeue no packets\n"); + ODP_ABORT("internal error dequeue\n"); /* fast path */ if (odp_likely(0 == pkts)) @@ -514,36 +449,21 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, for (i = 0; i < pkts; i++) { odp_pool_t pool; odp_packet_t pkt; - odp_packet_hdr_t phdr; - void *ptr; - odp_buffer_bits_t handle; - int idx; /* Remote packet has coded pool and index. - * We need only index.*/ + odp_packet_hdr_t *phdr; void *pkt_data; - void *remote_pkt_data; + uint64_t data_pool_off; + void *rmt_data_ptr; - if (remote_pkts[i] == ODP_PACKET_INVALID) - continue; + phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base + + offsets[i]); - handle.handle = _odp_packet_to_buffer(remote_pkts[i]); - idx = handle.index; - - /* Link to packed data. To this line we have Zero-Copy between - * processes, to simplify use packet copy in that version which - * can be removed later with more advance buffer management - * (ref counters). - */ - /* reverse odp_buf_to_hdr() */ - ptr = (char *)pktio_entry->s.ipc.pool_mdata_base + - (idx * ODP_CACHE_LINE_SIZE); - memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t)); - - /* Allocate new packet. Select*/ pool = pktio_entry->s.ipc.pool; if (odp_unlikely(pool == ODP_POOL_INVALID)) ODP_ABORT("invalid pool"); - pkt = odp_packet_alloc(pool, phdr.frame_len); + data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset; + + pkt = odp_packet_alloc(pool, phdr->frame_len); if (odp_unlikely(pkt == ODP_PACKET_INVALID)) { /* Original pool might be smaller then * PKTIO_IPC_ENTRIES. If packet can not be @@ -562,30 +482,40 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, (PKTIO_TYPE_IPC_SLAVE == pktio_entry->s.ipc.type)); - remote_pkt_data = _ipc_packet_map(ptr, 0, NULL); - if (odp_unlikely(!remote_pkt_data)) - ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n", - (PKTIO_TYPE_IPC_SLAVE == - pktio_entry->s.ipc.type)); - /* Copy packet data from shared pool to local pool. */ - memcpy(pkt_data, remote_pkt_data, phdr.frame_len); + rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base + + data_pool_off; + memcpy(pkt_data, rmt_data_ptr, phdr->frame_len); /* Copy packets L2, L3 parsed offsets and size */ - copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt)); + copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt)); + + odp_packet_hdr(pkt)->frame_len = phdr->frame_len; + odp_packet_hdr(pkt)->headroom = phdr->headroom; + odp_packet_hdr(pkt)->tailroom = phdr->tailroom; + + /* Take classification fields */ + odp_packet_hdr(pkt)->p = phdr->p; - odp_packet_hdr(pkt)->frame_len = phdr.frame_len; - odp_packet_hdr(pkt)->headroom = phdr.headroom; - odp_packet_hdr(pkt)->tailroom = phdr.tailroom; - odp_packet_hdr(pkt)->input = pktio_entry->s.handle; pkt_table[i] = pkt; } /* Now tell other process that we no longer need that buffers.*/ r_p = pktio_entry->s.ipc.rx.free; - pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i); + +repeat: + pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts); if (odp_unlikely(pkts < 0)) ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n"); + if (odp_unlikely(pkts != pkts_ring)) { + IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d," + " _ring_free_count %d\n", + _ring_full(r_p), _ring_count(r_p), + _ring_free_count(r_p)); + ipcbufs_p = (void *)&offsets[pkts_ring - 1]; + pkts = pkts - pkts_ring; + goto repeat; + } return pkts; } @@ -614,26 +544,23 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready); odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be * in memory mapped pool. */ + uint64_t offsets[len]; if (odp_unlikely(!ready)) return 0; - _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free); + _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free); - /* Prepare packets: calculate offset from address. */ + /* Copy packets to shm shared pool if they are in different */ for (i = 0; i < len; i++) { - int j; odp_packet_t pkt = pkt_table[i]; - odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt); + pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool); odp_buffer_bits_t handle; - uint32_t cur_mapped_pool_id = - pool_handle_to_index(pktio_entry->s.ipc.pool); - uint32_t pool_id; + uint32_t pkt_pool_id; - /* do copy if packet was allocated from not mapped pool */ handle.handle = _odp_packet_to_buffer(pkt); - pool_id = handle.pool_id; - if (pool_id != cur_mapped_pool_id) { + pkt_pool_id = handle.pool_id; + if (pkt_pool_id != ipc_pool->pool_idx) { odp_packet_t newpkt; newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool); @@ -645,24 +572,30 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, } else { pkt_table_mapped[i] = pkt; } + } - /* buf_hdr.addr can not be used directly in remote process, - * convert it to offset - */ - for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) { -#ifdef _ODP_PKTIO_IPC - pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr - - (char *)pkt_hdr->buf_hdr.addr[j]; -#else - /** buf_hdr.ipc_addr_offset defined only when ipc is - * enabled. */ - (void)pkt_hdr; -#endif - } + /* Set offset to phdr for outgoing packets */ + for (i = 0; i < len; i++) { + uint64_t data_pool_off; + odp_packet_t pkt = pkt_table_mapped[i]; + odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt); + odp_pool_t pool_hdl = odp_packet_pool(pkt); + pool_t *pool = pool_entry_from_hdl(pool_hdl); + + offsets[i] = (uint8_t *)pkt_hdr - + (uint8_t *)odp_shm_addr(pool->shm); + data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data - + (uint8_t *)odp_shm_addr(pool->shm); + pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off; + IPC_ODP_DBG("%d/%d send packet %llx, pool %llx," + "phdr = %p, offset %x\n", + i, len, + odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl), + pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset); } /* Put packets to ring to be processed by other process. */ - rbuf_p = (void *)&pkt_table_mapped[0]; + rbuf_p = (void *)&offsets[0]; r = pktio_entry->s.ipc.tx.send; ret = _ring_mp_enqueue_burst(r, rbuf_p, len); if (odp_unlikely(ret < 0)) { @@ -673,6 +606,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n", _ring_full(r), _ring_count(r), _ring_free_count(r)); + ODP_ABORT("Unexpected!\n"); } return ret; @@ -722,22 +656,25 @@ static int ipc_start(pktio_entry_t *pktio_entry) static int ipc_stop(pktio_entry_t *pktio_entry) { - unsigned tx_send, tx_free; + unsigned tx_send = 0, tx_free = 0; odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0); - _ipc_free_ring_packets(pktio_entry->s.ipc.tx.send); + if (pktio_entry->s.ipc.tx.send) + _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send); /* other process can transfer packets from one ring to * other, use delay here to free that packets. */ sleep(1); - _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free); + if (pktio_entry->s.ipc.tx.free) + _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free); - tx_send = _ring_count(pktio_entry->s.ipc.tx.send); - tx_free = _ring_count(pktio_entry->s.ipc.tx.free); + if (pktio_entry->s.ipc.tx.send) + tx_send = _ring_count(pktio_entry->s.ipc.tx.send); + if (pktio_entry->s.ipc.tx.free) + tx_free = _ring_count(pktio_entry->s.ipc.tx.free); if (tx_send | tx_free) { ODP_DBG("IPC rings: tx send %d tx free %d\n", - _ring_free_count(pktio_entry->s.ipc.tx.send), - _ring_free_count(pktio_entry->s.ipc.tx.free)); + tx_send, tx_free); } return 0; @@ -795,4 +732,3 @@ const pktio_if_ops_t ipc_pktio_ops = { .pktin_ts_from_ns = NULL, .config = NULL }; -#endif