@@ -64,6 +64,11 @@ struct odp_buffer_hdr_t {
struct {
void *hdr;
uint8_t *data;
+#ifdef _ODP_PKTIO_IPC
+ /* ipc mapped process can not walk over pointers,
+ * offset has to be used */
+ uint64_t ipc_data_offset;
+#endif
uint32_t len;
} seg[CONFIG_PACKET_MAX_SEGS];
@@ -101,11 +106,6 @@ struct odp_buffer_hdr_t {
queue_entry_t *target_qe; /* ordered queue target */
uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE];
};
-#ifdef _ODP_PKTIO_IPC
- /* ipc mapped process can not walk over pointers,
- * offset has to be used */
- uint64_t ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];
-#endif
/* Data or next header */
uint8_t data[0];
@@ -50,7 +50,6 @@ struct odp_global_data_s {
odp_cpumask_t control_cpus;
odp_cpumask_t worker_cpus;
int num_cpus_installed;
- int ipc_ns;
};
enum init_stage {
@@ -26,22 +26,31 @@
*/
struct pktio_info {
struct {
- /* number of buffer in remote pool */
- int shm_pool_bufs_num;
- /* size of remote pool */
- size_t shm_pkt_pool_size;
+ /* number of buffer*/
+ int num;
/* size of packet/segment in remote pool */
- uint32_t shm_pkt_size;
+ uint32_t block_size;
/* offset from shared memory block start
- * to pool_mdata_addr (odp-linux pool specific) */
- size_t mdata_offset;
+ * to pool *base_addr in remote process.
+ * (odp-linux pool specific) */
+ size_t base_addr_offset;
char pool_name[ODP_POOL_NAME_LEN];
+ /* 1 if master finished creation of all shared objects */
+ int init_done;
} master;
struct {
/* offset from shared memory block start
- * to pool_mdata_addr in remote process.
+ * to pool *base_addr in remote process.
* (odp-linux pool specific) */
- size_t mdata_offset;
+ size_t base_addr_offset;
+ void *base_addr;
+ uint32_t block_size;
char pool_name[ODP_POOL_NAME_LEN];
+ /* pid of the slave process written to shm and
+ * used by master to look up memory created by
+ * slave
+ */
+ int pid;
+ int init_done;
} slave;
} ODP_PACKED;
@@ -67,7 +67,7 @@ static int cleanup_files(const char *dirpath, int odp_pid)
int odp_init_global(odp_instance_t *instance,
const odp_init_t *params,
- const odp_platform_init_t *platform_params)
+ const odp_platform_init_t *platform_params ODP_UNUSED)
{
char *hpdir;
@@ -75,9 +75,6 @@ int odp_init_global(odp_instance_t *instance,
odp_global_data.main_pid = getpid();
cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);
- if (platform_params)
- odp_global_data.ipc_ns = platform_params->ipc_ns;
-
enum init_stage stage = NO_INIT;
odp_global_data.log_fn = odp_override_log;
odp_global_data.abort_fn = odp_override_abort;
@@ -3,150 +3,84 @@
*
* SPDX-License-Identifier: BSD-3-Clause
*/
-#ifdef _ODP_PKTIO_IPC
#include <odp_packet_io_ipc_internal.h>
#include <odp_debug_internal.h>
#include <odp_packet_io_internal.h>
#include <odp/api/system_info.h>
#include <odp_shm_internal.h>
+#include <_ishm_internal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
+#define IPC_ODP_DEBUG_PRINT 0
+
+#define IPC_ODP_DBG(fmt, ...) \
+ do { \
+ if (IPC_ODP_DEBUG_PRINT == 1) \
+ ODP_DBG(fmt, ##__VA_ARGS__);\
+ } while (0)
+
/* MAC address for the "ipc" interface */
static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
-static void *_ipc_map_remote_pool(const char *name, size_t size);
+static void *_ipc_map_remote_pool(const char *name, int pid);
static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
{
- pool_entry_t *pool;
- uint32_t pool_id;
+ pool_t *pool;
odp_shm_t shm;
odp_shm_info_t info;
- pool_id = pool_handle_to_index(pool_hdl);
- pool = get_pool_entry(pool_id);
- shm = pool->s.pool_shm;
+ pool = pool_entry_from_hdl(pool_hdl);
+ shm = pool->shm;
odp_shm_info(shm, &info);
return info.name;
}
-/**
-* Look up for shared memory object.
-*
-* @param name name of shm object
-*
-* @return 0 on success, otherwise non-zero
-*/
-static int _ipc_shm_lookup(const char *name)
-{
- int shm;
- char shm_devname[SHM_DEVNAME_MAXLEN];
-
- if (!odp_global_data.ipc_ns)
- ODP_ABORT("ipc_ns not set\n");
-
- snprintf(shm_devname, SHM_DEVNAME_MAXLEN,
- SHM_DEVNAME_FORMAT,
- odp_global_data.ipc_ns, name);
-
- shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);
- if (shm == -1) {
- if (errno == ENOENT) {
- ODP_DBG("no file %s\n", shm_devname);
- return -1;
- }
- ODP_ABORT("shm_open for %s err %s\n",
- shm_devname, strerror(errno));
- }
- close(shm);
- return 0;
-}
-
-static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
- const char *dev,
- int *slave)
-{
- struct pktio_info *pinfo;
- char name[ODP_POOL_NAME_LEN + sizeof("_info")];
- uint32_t flags;
- odp_shm_t shm;
-
- /* Create info about remote pktio */
- snprintf(name, sizeof(name), "%s_info", dev);
-
- flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
-
- shm = odp_shm_reserve(name, sizeof(struct pktio_info),
- ODP_CACHE_LINE_SIZE,
- flags);
- if (ODP_SHM_INVALID != shm) {
- pinfo = odp_shm_addr(shm);
- pinfo->master.pool_name[0] = 0;
- *slave = 0;
- } else {
- flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
- shm = odp_shm_reserve(name, sizeof(struct pktio_info),
- ODP_CACHE_LINE_SIZE,
- flags);
- if (ODP_SHM_INVALID == shm)
- ODP_ABORT("can not connect to shm\n");
-
- pinfo = odp_shm_addr(shm);
- *slave = 1;
- }
-
- pktio_entry->s.ipc.pinfo = pinfo;
- pktio_entry->s.ipc.pinfo_shm = shm;
-
- return 0;
-}
-
static int _ipc_master_start(pktio_entry_t *pktio_entry)
{
struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
- int ret;
void *ipc_pool_base;
- if (pinfo->slave.mdata_offset == 0)
+ if (pinfo->slave.init_done == 0)
return -1;
- ret = _ipc_shm_lookup(pinfo->slave.pool_name);
- if (ret) {
- ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
- return -1;
- }
-
ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
- pinfo->master.shm_pkt_pool_size);
+ pinfo->slave.pid);
+ if (ipc_pool_base == NULL) {
+ ODP_DBG("no pool file %s for pid %d\n",
+ pinfo->slave.pool_name, pinfo->slave.pid);
+ return -1;
+ }
+
+ pktio_entry->s.ipc.pool_base = ipc_pool_base;
pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
- pinfo->slave.mdata_offset;
+ pinfo->slave.base_addr_offset;
odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
- ODP_DBG("%s started.\n", pktio_entry->s.name);
+ IPC_ODP_DBG("%s started.\n", pktio_entry->s.name);
return 0;
}
static int _ipc_init_master(pktio_entry_t *pktio_entry,
const char *dev,
- odp_pool_t pool)
+ odp_pool_t pool_hdl)
{
char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];
- pool_entry_t *pool_entry;
- uint32_t pool_id;
+ pool_t *pool;
struct pktio_info *pinfo;
const char *pool_name;
- pool_id = pool_handle_to_index(pool);
- pool_entry = get_pool_entry(pool_id);
+ pool = pool_entry_from_hdl(pool_hdl);
+ (void)pool;
if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {
- ODP_DBG("too big ipc name\n");
+ ODP_ERR("too big ipc name\n");
return -1;
}
@@ -158,7 +92,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
PKTIO_IPC_ENTRIES,
_RING_SHM_PROC | _RING_NO_LIST);
if (!pktio_entry->s.ipc.tx.send) {
- ODP_DBG("pid %d unable to create ipc ring %s name\n",
+ ODP_ERR("pid %d unable to create ipc ring %s name\n",
getpid(), ipc_shm_name);
return -1;
}
@@ -174,7 +108,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
PKTIO_IPC_ENTRIES,
_RING_SHM_PROC | _RING_NO_LIST);
if (!pktio_entry->s.ipc.tx.free) {
- ODP_DBG("pid %d unable to create ipc ring %s name\n",
+ ODP_ERR("pid %d unable to create ipc ring %s name\n",
getpid(), ipc_shm_name);
goto free_m_prod;
}
@@ -187,7 +121,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
PKTIO_IPC_ENTRIES,
_RING_SHM_PROC | _RING_NO_LIST);
if (!pktio_entry->s.ipc.rx.recv) {
- ODP_DBG("pid %d unable to create ipc ring %s name\n",
+ ODP_ERR("pid %d unable to create ipc ring %s name\n",
getpid(), ipc_shm_name);
goto free_m_cons;
}
@@ -200,7 +134,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
PKTIO_IPC_ENTRIES,
_RING_SHM_PROC | _RING_NO_LIST);
if (!pktio_entry->s.ipc.rx.free) {
- ODP_DBG("pid %d unable to create ipc ring %s name\n",
+ ODP_ERR("pid %d unable to create ipc ring %s name\n",
getpid(), ipc_shm_name);
goto free_s_prod;
}
@@ -210,24 +144,23 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
/* Set up pool name for remote info */
pinfo = pktio_entry->s.ipc.pinfo;
- pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+ pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);
if (strlen(pool_name) > ODP_POOL_NAME_LEN) {
- ODP_DBG("pid %d ipc pool name %s is too big %d\n",
+ ODP_ERR("pid %d ipc pool name %s is too big %d\n",
getpid(), pool_name, strlen(pool_name));
goto free_s_prod;
}
memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
- pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
- pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
- pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
- pinfo->master.mdata_offset = pool_entry->s.pool_mdata_addr -
- pool_entry->s.pool_base_addr;
- pinfo->slave.mdata_offset = 0;
+ pinfo->slave.base_addr_offset = 0;
+ pinfo->slave.base_addr = 0;
+ pinfo->slave.pid = 0;
+ pinfo->slave.init_done = 0;
- pktio_entry->s.ipc.pool = pool;
+ pktio_entry->s.ipc.pool = pool_hdl;
ODP_DBG("Pre init... DONE.\n");
+ pinfo->master.init_done = 1;
_ipc_master_start(pktio_entry);
@@ -246,55 +179,45 @@ free_m_prod:
}
static void _ipc_export_pool(struct pktio_info *pinfo,
- odp_pool_t pool)
+ odp_pool_t pool_hdl)
{
- pool_entry_t *pool_entry;
-
- pool_entry = odp_pool_to_entry(pool);
- if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
- ODP_ABORT("pktio for same name should have the same pool size\n");
- if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
- ODP_ABORT("pktio for same name should have the same pool size\n");
+ pool_t *pool = pool_entry_from_hdl(pool_hdl);
snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
- pool_entry->s.name);
- pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
- pool_entry->s.pool_base_addr;
+ _ipc_odp_buffer_pool_shm_name(pool_hdl));
+ pinfo->slave.pid = odp_global_data.main_pid;
+ pinfo->slave.block_size = pool->block_size;
+ pinfo->slave.base_addr = pool->base_addr;
}
-static void *_ipc_map_remote_pool(const char *name, size_t size)
+static void *_ipc_map_remote_pool(const char *name, int pid)
{
odp_shm_t shm;
void *addr;
+ char rname[ODP_SHM_NAME_LEN];
- ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
- shm = odp_shm_reserve(name,
- size,
- ODP_CACHE_LINE_SIZE,
- _ODP_SHM_PROC_NOCREAT);
- if (shm == ODP_SHM_INVALID)
- ODP_ABORT("unable map %s\n", name);
+ snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);
+ shm = odp_shm_import(name, pid, rname);
+ if (shm == ODP_SHM_INVALID) {
+ ODP_ERR("unable map %s\n", name);
+ return NULL;
+ }
addr = odp_shm_addr(shm);
- ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
- addr, (char *)addr + size, size, name);
+
+ IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);
return addr;
}
-static void *_ipc_shm_map(char *name, size_t size)
+static void *_ipc_shm_map(char *name, int pid)
{
odp_shm_t shm;
- int ret;
- ret = _ipc_shm_lookup(name);
- if (ret == -1)
+ shm = odp_shm_import(name, pid, name);
+ if (ODP_SHM_INVALID == shm) {
+ ODP_ERR("unable to map: %s\n", name);
return NULL;
-
- shm = odp_shm_reserve(name, size,
- ODP_CACHE_LINE_SIZE,
- _ODP_SHM_PROC_NOCREAT);
- if (ODP_SHM_INVALID == shm)
- ODP_ABORT("unable to map: %s\n", name);
+ }
return odp_shm_addr(shm);
}
@@ -313,15 +236,22 @@ static int _ipc_init_slave(const char *dev,
static int _ipc_slave_start(pktio_entry_t *pktio_entry)
{
char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
- size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
- sizeof(_ring_t);
struct pktio_info *pinfo;
void *ipc_pool_base;
odp_shm_t shm;
- const char *dev = pktio_entry->s.name;
+ char tail[ODP_POOL_NAME_LEN];
+ char dev[ODP_POOL_NAME_LEN];
+ int pid;
+
+ if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {
+ ODP_ERR("wrong pktio name\n");
+ return -1;
+ }
+
+ sprintf(dev, "ipc:%s", tail);
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
- pktio_entry->s.ipc.rx.recv = _ipc_shm_map(ipc_shm_name, ring_size);
+ pktio_entry->s.ipc.rx.recv = _ipc_shm_map(ipc_shm_name, pid);
if (!pktio_entry->s.ipc.rx.recv) {
ODP_DBG("pid %d unable to find ipc ring %s name\n",
getpid(), dev);
@@ -333,9 +263,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
_ring_free_count(pktio_entry->s.ipc.rx.recv));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
- pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+ pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);
if (!pktio_entry->s.ipc.rx.free) {
- ODP_DBG("pid %d unable to find ipc ring %s name\n",
+ ODP_ERR("pid %d unable to find ipc ring %s name\n",
getpid(), dev);
goto free_m_prod;
}
@@ -344,9 +274,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
_ring_free_count(pktio_entry->s.ipc.rx.free));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
- pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
+ pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);
if (!pktio_entry->s.ipc.tx.send) {
- ODP_DBG("pid %d unable to find ipc ring %s name\n",
+ ODP_ERR("pid %d unable to find ipc ring %s name\n",
getpid(), dev);
goto free_m_cons;
}
@@ -355,9 +285,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
_ring_free_count(pktio_entry->s.ipc.tx.send));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
- pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+ pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);
if (!pktio_entry->s.ipc.tx.free) {
- ODP_DBG("pid %d unable to find ipc ring %s name\n",
+ ODP_ERR("pid %d unable to find ipc ring %s name\n",
getpid(), dev);
goto free_s_prod;
}
@@ -368,14 +298,15 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
/* Get info about remote pool */
pinfo = pktio_entry->s.ipc.pinfo;
ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
- pinfo->master.shm_pkt_pool_size);
+ pid);
pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
- pinfo->master.mdata_offset;
- pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
+ pinfo->master.base_addr_offset;
+ pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;
_ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+ pinfo->slave.init_done = 1;
ODP_DBG("%s started.\n", pktio_entry->s.name);
return 0;
@@ -401,7 +332,11 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
odp_pool_t pool)
{
int ret = -1;
- int slave;
+ int pid ODP_UNUSED;
+ struct pktio_info *pinfo;
+ char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+ char tail[ODP_POOL_NAME_LEN];
+ odp_shm_t shm;
ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,
"mismatch pool and ring name arrays");
@@ -411,65 +346,59 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
- _ipc_map_pktio_info(pktio_entry, dev, &slave);
- pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
- PKTIO_TYPE_IPC_SLAVE;
+ /* Shared info about remote pktio */
+ if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
+ pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
- if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+ snprintf(name, sizeof(name), "ipc:%s_info", tail);
+ IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);
+ shm = odp_shm_import(name, pid, name);
+ if (ODP_SHM_INVALID == shm)
+ return -1;
+ pinfo = odp_shm_addr(shm);
+
+ if (!pinfo->master.init_done) {
+ odp_shm_free(shm);
+ return -1;
+ }
+ pktio_entry->s.ipc.pinfo = pinfo;
+ pktio_entry->s.ipc.pinfo_shm = shm;
+ ODP_DBG("process %d is slave\n", getpid());
+ ret = _ipc_init_slave(name, pktio_entry, pool);
+ } else {
+ pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;
+ snprintf(name, sizeof(name), "%s_info", dev);
+ shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+ ODP_CACHE_LINE_SIZE,
+ _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);
+ if (ODP_SHM_INVALID == shm) {
+ ODP_ERR("can not create shm %s\n", name);
+ return -1;
+ }
+
+ pinfo = odp_shm_addr(shm);
+ pinfo->master.init_done = 0;
+ pinfo->master.pool_name[0] = 0;
+ pktio_entry->s.ipc.pinfo = pinfo;
+ pktio_entry->s.ipc.pinfo_shm = shm;
ODP_DBG("process %d is master\n", getpid());
ret = _ipc_init_master(pktio_entry, dev, pool);
- } else {
- ODP_DBG("process %d is slave\n", getpid());
- ret = _ipc_init_slave(dev, pktio_entry, pool);
}
return ret;
}
-static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
- uint32_t offset,
- uint32_t *seglen,
- uint32_t limit)
+static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)
{
- int seg_index = offset / buf->segsize;
- int seg_offset = offset % buf->segsize;
-#ifdef _ODP_PKTIO_IPC
- void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
-#else
- /** buf_hdr.ipc_addr_offset defined only when ipc is
- * enabled. */
- void *addr = NULL;
-
- (void)seg_index;
-#endif
- if (seglen) {
- uint32_t buf_left = limit - offset;
- *seglen = seg_offset + buf_left <= buf->segsize ?
- buf_left : buf->segsize - seg_offset;
- }
-
- return (void *)(seg_offset + (uint8_t *)addr);
-}
-
-static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
- uint32_t offset, uint32_t *seglen)
-{
- if (offset > pkt_hdr->frame_len)
- return NULL;
-
- return _ipc_buffer_map(&pkt_hdr->buf_hdr,
- pkt_hdr->headroom + offset, seglen,
- pkt_hdr->headroom + pkt_hdr->frame_len);
-}
-
-static void _ipc_free_ring_packets(_ring_t *r)
-{
- odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+ uint64_t offsets[PKTIO_IPC_ENTRIES];
int ret;
void **rbuf_p;
int i;
- rbuf_p = (void *)&r_p_pkts;
+ if (!r)
+ return;
+
+ rbuf_p = (void *)&offsets;
while (1) {
ret = _ring_mc_dequeue_burst(r, rbuf_p,
@@ -477,8 +406,13 @@ static void _ipc_free_ring_packets(_ring_t *r)
if (0 == ret)
break;
for (i = 0; i < ret; i++) {
- if (r_p_pkts[i] != ODP_PACKET_INVALID)
- odp_packet_free(r_p_pkts[i]);
+ odp_packet_hdr_t *phdr;
+ odp_packet_t pkt;
+ void *mbase = pktio_entry->s.ipc.pool_mdata_base;
+
+ phdr = (void *)((uint8_t *)mbase + offsets[i]);
+ pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;
+ odp_packet_free(pkt);
}
}
}
@@ -490,22 +424,23 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
int i;
_ring_t *r;
_ring_t *r_p;
+ uint64_t offsets[PKTIO_IPC_ENTRIES];
+ void **ipcbufs_p = (void *)&offsets;
+ uint32_t ready;
+ int pkts_ring;
- odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
- void **ipcbufs_p = (void *)&remote_pkts;
- uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
-
+ ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
if (odp_unlikely(!ready)) {
- ODP_DBG("start pktio is missing before usage?\n");
+ IPC_ODP_DBG("start pktio is missing before usage?\n");
return -1;
}
- _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+ _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
r = pktio_entry->s.ipc.rx.recv;
pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
if (odp_unlikely(pkts < 0))
- ODP_ABORT("error to dequeue no packets\n");
+ ODP_ABORT("internal error dequeue\n");
/* fast path */
if (odp_likely(0 == pkts))
@@ -514,36 +449,21 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
for (i = 0; i < pkts; i++) {
odp_pool_t pool;
odp_packet_t pkt;
- odp_packet_hdr_t phdr;
- void *ptr;
- odp_buffer_bits_t handle;
- int idx; /* Remote packet has coded pool and index.
- * We need only index.*/
+ odp_packet_hdr_t *phdr;
void *pkt_data;
- void *remote_pkt_data;
+ uint64_t data_pool_off;
+ void *rmt_data_ptr;
- if (remote_pkts[i] == ODP_PACKET_INVALID)
- continue;
+ phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+ offsets[i]);
- handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
- idx = handle.index;
-
- /* Link to packed data. To this line we have Zero-Copy between
- * processes, to simplify use packet copy in that version which
- * can be removed later with more advance buffer management
- * (ref counters).
- */
- /* reverse odp_buf_to_hdr() */
- ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
- (idx * ODP_CACHE_LINE_SIZE);
- memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
-
- /* Allocate new packet. Select*/
pool = pktio_entry->s.ipc.pool;
if (odp_unlikely(pool == ODP_POOL_INVALID))
ODP_ABORT("invalid pool");
- pkt = odp_packet_alloc(pool, phdr.frame_len);
+ data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;
+
+ pkt = odp_packet_alloc(pool, phdr->frame_len);
if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
/* Original pool might be smaller then
* PKTIO_IPC_ENTRIES. If packet can not be
@@ -562,30 +482,40 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
(PKTIO_TYPE_IPC_SLAVE ==
pktio_entry->s.ipc.type));
- remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
- if (odp_unlikely(!remote_pkt_data))
- ODP_ABORT("unable to map remote_pkt_data, ipc_slave %d\n",
- (PKTIO_TYPE_IPC_SLAVE ==
- pktio_entry->s.ipc.type));
-
/* Copy packet data from shared pool to local pool. */
- memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+ rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+ data_pool_off;
+ memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);
/* Copy packets L2, L3 parsed offsets and size */
- copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));
+ copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));
+
+ odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
+ odp_packet_hdr(pkt)->headroom = phdr->headroom;
+ odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
+
+ /* Take classification fields */
+ odp_packet_hdr(pkt)->p = phdr->p;
- odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
- odp_packet_hdr(pkt)->headroom = phdr.headroom;
- odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
- odp_packet_hdr(pkt)->input = pktio_entry->s.handle;
pkt_table[i] = pkt;
}
/* Now tell other process that we no longer need that buffers.*/
r_p = pktio_entry->s.ipc.rx.free;
- pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+
+repeat:
+ pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);
if (odp_unlikely(pkts < 0))
ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+ if (odp_unlikely(pkts != pkts_ring)) {
+ IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"
+ " _ring_free_count %d\n",
+ _ring_full(r_p), _ring_count(r_p),
+ _ring_free_count(r_p));
+ ipcbufs_p = (void *)&offsets[pkts_ring - 1];
+ pkts = pkts - pkts_ring;
+ goto repeat;
+ }
return pkts;
}
@@ -614,26 +544,23 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be
* in memory mapped pool. */
+ uint64_t offsets[len];
if (odp_unlikely(!ready))
return 0;
- _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+ _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
- /* Prepare packets: calculate offset from address. */
+ /* Copy packets to shm shared pool if they are in different */
for (i = 0; i < len; i++) {
- int j;
odp_packet_t pkt = pkt_table[i];
- odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+ pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);
odp_buffer_bits_t handle;
- uint32_t cur_mapped_pool_id =
- pool_handle_to_index(pktio_entry->s.ipc.pool);
- uint32_t pool_id;
+ uint32_t pkt_pool_id;
- /* do copy if packet was allocated from not mapped pool */
handle.handle = _odp_packet_to_buffer(pkt);
- pool_id = handle.pool_id;
- if (pool_id != cur_mapped_pool_id) {
+ pkt_pool_id = handle.pool_id;
+ if (pkt_pool_id != ipc_pool->pool_idx) {
odp_packet_t newpkt;
newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
@@ -645,24 +572,30 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
} else {
pkt_table_mapped[i] = pkt;
}
+ }
- /* buf_hdr.addr can not be used directly in remote process,
- * convert it to offset
- */
- for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
-#ifdef _ODP_PKTIO_IPC
- pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
- (char *)pkt_hdr->buf_hdr.addr[j];
-#else
- /** buf_hdr.ipc_addr_offset defined only when ipc is
- * enabled. */
- (void)pkt_hdr;
-#endif
- }
+ /* Set offset to phdr for outgoing packets */
+ for (i = 0; i < len; i++) {
+ uint64_t data_pool_off;
+ odp_packet_t pkt = pkt_table_mapped[i];
+ odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+ odp_pool_t pool_hdl = odp_packet_pool(pkt);
+ pool_t *pool = pool_entry_from_hdl(pool_hdl);
+
+ offsets[i] = (uint8_t *)pkt_hdr -
+ (uint8_t *)odp_shm_addr(pool->shm);
+ data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -
+ (uint8_t *)odp_shm_addr(pool->shm);
+ pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;
+ IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"
+ "phdr = %p, offset %x\n",
+ i, len,
+ odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),
+ pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);
}
/* Put packets to ring to be processed by other process. */
- rbuf_p = (void *)&pkt_table_mapped[0];
+ rbuf_p = (void *)&offsets[0];
r = pktio_entry->s.ipc.tx.send;
ret = _ring_mp_enqueue_burst(r, rbuf_p, len);
if (odp_unlikely(ret < 0)) {
@@ -673,6 +606,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n",
_ring_full(r), _ring_count(r),
_ring_free_count(r));
+ ODP_ABORT("Unexpected!\n");
}
return ret;
@@ -722,22 +656,25 @@ static int ipc_start(pktio_entry_t *pktio_entry)
static int ipc_stop(pktio_entry_t *pktio_entry)
{
- unsigned tx_send, tx_free;
+ unsigned tx_send = 0, tx_free = 0;
odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
- _ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
+ if (pktio_entry->s.ipc.tx.send)
+ _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);
/* other process can transfer packets from one ring to
* other, use delay here to free that packets. */
sleep(1);
- _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+ if (pktio_entry->s.ipc.tx.free)
+ _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
- tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
- tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
+ if (pktio_entry->s.ipc.tx.send)
+ tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
+ if (pktio_entry->s.ipc.tx.free)
+ tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
if (tx_send | tx_free) {
ODP_DBG("IPC rings: tx send %d tx free %d\n",
- _ring_free_count(pktio_entry->s.ipc.tx.send),
- _ring_free_count(pktio_entry->s.ipc.tx.free));
+ tx_send, tx_free);
}
return 0;
@@ -795,4 +732,3 @@ const pktio_if_ops_t ipc_pktio_ops = {
.pktin_ts_from_ns = NULL,
.config = NULL
};
-#endif
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 10 +- platform/linux-generic/include/odp_internal.h | 1 - .../include/odp_packet_io_ipc_internal.h | 27 +- platform/linux-generic/odp_init.c | 5 +- platform/linux-generic/pktio/ipc.c | 488 +++++++++------------ 5 files changed, 236 insertions(+), 295 deletions(-) -- 2.7.1.250.gff4ea60