Message ID | 20201006132905.46205-2-honnappa.nagarahalli@arm.com |
---|---|
State | New |
Headers | show |
Series | lib/ring: add scatter gather APIs | expand |
Hi Honnappa, From a quick walkthrough, I have some questions/comments, please see below. On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > Add scatter gather APIs to avoid intermediate memcpy. Use cases > that involve copying large amount of data to/from the ring > can benefit from these APIs. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > --- > lib/librte_ring/meson.build | 3 +- > lib/librte_ring/rte_ring_elem.h | 1 + > lib/librte_ring/rte_ring_peek_sg.h | 552 +++++++++++++++++++++++++++++ > 3 files changed, 555 insertions(+), 1 deletion(-) > create mode 100644 lib/librte_ring/rte_ring_peek_sg.h > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > index 31c0b4649..377694713 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > 'rte_ring_peek.h', > 'rte_ring_peek_c11_mem.h', > 'rte_ring_rts.h', > - 'rte_ring_rts_c11_mem.h') > + 'rte_ring_rts_c11_mem.h', > + 'rte_ring_peek_sg.h') > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h > index 938b398fc..7d3933f15 100644 > --- a/lib/librte_ring/rte_ring_elem.h > +++ b/lib/librte_ring/rte_ring_elem.h > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > > #ifdef ALLOW_EXPERIMENTAL_API > #include <rte_ring_peek.h> > +#include <rte_ring_peek_sg.h> > #endif > > #include <rte_ring.h> > diff --git a/lib/librte_ring/rte_ring_peek_sg.h b/lib/librte_ring/rte_ring_peek_sg.h > new file mode 100644 > index 000000000..97d5764a6 > --- /dev/null > +++ b/lib/librte_ring/rte_ring_peek_sg.h > @@ -0,0 +1,552 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2020 Arm > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_PEEK_SG_H_ > +#define _RTE_RING_PEEK_SG_H_ > + > +/** > + * @file > + * @b EXPERIMENTAL: this API may change without prior notice > + * It is not recommended to include this file directly. > + * Please include <rte_ring_elem.h> instead. > + * > + * Ring Peek Scatter Gather APIs I am not fully convinced by the API name. To me, "scatter/gather" is associated to iovecs, like for instance in [1]. The wikipedia definition [2] may be closer though. [1] https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGather.html [2] https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) What about "zero-copy"? Also, the "peek" term looks also a bit confusing to me, but it existed before your patch. I understand it for dequeue, but not for enqueue. Or, what about replacing the existing experimental peek API by this one? They look quite similar to me. > + * Introduction of rte_ring with scatter gather serialized producer/consumer > + * (HTS sync mode) makes it possible to split public enqueue/dequeue API > + * into 3 phases: > + * - enqueue/dequeue start > + * - copy data to/from the ring > + * - enqueue/dequeue finish > + * Along with the advantages of the peek APIs, these APIs provide the ability > + * to avoid copying of the data to temporary area. > + * > + * Note that right now this new API is available only for two sync modes: > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > + * It is a user responsibility to create/init ring with appropriate sync > + * modes selected. > + * > + * Example usage: > + * // read 1 elem from the ring: Comment should be "prepare enqueuing 32 objects" > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > + * if (n != 0) { > + * //Copy objects in the ring > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > + * if (n != sgd->n1) > + * //Second memcpy because of wrapround > + * n2 = n - sgd->n1; > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); Missing { } > + * rte_ring_dequeue_sg_finish(ring, n); Should be enqueue > + * } > + * > + * Note that between _start_ and _finish_ none other thread can proceed > + * with enqueue(/dequeue) operation till _finish_ completes. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <rte_ring_peek_c11_mem.h> > + > +/* Rock that needs to be passed between reserve and commit APIs */ > +struct rte_ring_sg_data { > + /* Pointer to the first space in the ring */ > + void **ptr1; > + /* Pointer to the second space in the ring if there is wrap-around */ > + void **ptr2; > + /* Number of elements in the first pointer. If this is equal to > + * the number of elements requested, then ptr2 is NULL. > + * Otherwise, subtracting n1 from number of elements requested > + * will give the number of elements available at ptr2. > + */ > + unsigned int n1; > +}; Would it be possible to simply return the offset instead of this structure? The wrap could be managed by a __rte_ring_enqueue_elems() function. I mean something like this: uint32_t start; n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); if (n != 0) { /* Copy objects in the ring. */ __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), n); rte_ring_enqueue_sg_finish(ring, n); } It would require to slightly change __rte_ring_enqueue_elems() to support to be called with prod_head >= size, and wrap in that case. Regards, Olivier
<snip> > > Hi Honnappa, > > From a quick walkthrough, I have some questions/comments, please see > below. Hi Olivier, appreciate your input. > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > involve copying large amount of data to/from the ring can benefit from > > these APIs. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > --- > > lib/librte_ring/meson.build | 3 +- > > lib/librte_ring/rte_ring_elem.h | 1 + > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > +++++++++++++++++++++++++++++ > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > lib/librte_ring/rte_ring_peek_sg.h > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > index 31c0b4649..377694713 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > 'rte_ring_peek.h', > > 'rte_ring_peek_c11_mem.h', > > 'rte_ring_rts.h', > > - 'rte_ring_rts_c11_mem.h') > > + 'rte_ring_rts_c11_mem.h', > > + 'rte_ring_peek_sg.h') > > diff --git a/lib/librte_ring/rte_ring_elem.h > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > --- a/lib/librte_ring/rte_ring_elem.h > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > void *obj_table, > > > > #ifdef ALLOW_EXPERIMENTAL_API > > #include <rte_ring_peek.h> > > +#include <rte_ring_peek_sg.h> > > #endif > > > > #include <rte_ring.h> > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > b/lib/librte_ring/rte_ring_peek_sg.h > > new file mode 100644 > > index 000000000..97d5764a6 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > @@ -0,0 +1,552 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2020 Arm > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_PEEK_SG_H_ > > +#define _RTE_RING_PEEK_SG_H_ > > + > > +/** > > + * @file > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include <rte_ring_elem.h> instead. > > + * > > + * Ring Peek Scatter Gather APIs > > I am not fully convinced by the API name. To me, "scatter/gather" is > associated to iovecs, like for instance in [1]. The wikipedia definition [2] may > be closer though. > > [1] > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGathe > r.html > [2] https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) The way I understand scatter-gather is, the data to be sent to something (like a device) is coming from multiple sources. It would require copying to put the data together to be contiguous. If the device supports scatter-gather, such copying is not required. The device can collect data from multiple locations and make it contiguous. In the case I was looking at, one part of the data was coming from the user of the API and another was generated by the API itself. If these two pieces of information have to be enqueued as a single object on the ring, I had to create an intermediate copy. But by exposing the ring memory to the user, the intermediate copy is avoided. Hence I called it scatter-gather. > > What about "zero-copy"? I think no-copy (nc for short) or user-copy (uc for short) would convey the meaning better. These would indicate that the rte_ring APIs are not copying the objects and it is left to the user to do the actual copy. > > Also, the "peek" term looks also a bit confusing to me, but it existed before > your patch. I understand it for dequeue, but not for enqueue. I kept 'peek' there because the API still offers the 'peek' API capabilities. I am also not sure on what 'peek' means for enqueue API. The enqueue 'peek' API was provided to be symmetric with dequeue peek API. > > Or, what about replacing the existing experimental peek API by this one? > They look quite similar to me. I agree, scatter gather APIs provide the peek capability and the no-copy benefits. Konstantin, any opinions here? > > > + * Introduction of rte_ring with scatter gather serialized > > + producer/consumer > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > + API > > + * into 3 phases: > > + * - enqueue/dequeue start > > + * - copy data to/from the ring > > + * - enqueue/dequeue finish > > + * Along with the advantages of the peek APIs, these APIs provide the > > + ability > > + * to avoid copying of the data to temporary area. > > + * > > + * Note that right now this new API is available only for two sync modes: > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > + * It is a user responsibility to create/init ring with appropriate > > + sync > > + * modes selected. > > + * > > + * Example usage: > > + * // read 1 elem from the ring: > > Comment should be "prepare enqueuing 32 objects" > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > + * if (n != 0) { > > + * //Copy objects in the ring > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > + * if (n != sgd->n1) > > + * //Second memcpy because of wrapround > > + * n2 = n - sgd->n1; > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > Missing { } > > > + * rte_ring_dequeue_sg_finish(ring, n); > > Should be enqueue > Thanks, will correct them. > > + * } > > + * > > + * Note that between _start_ and _finish_ none other thread can > > + proceed > > + * with enqueue(/dequeue) operation till _finish_ completes. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <rte_ring_peek_c11_mem.h> > > + > > +/* Rock that needs to be passed between reserve and commit APIs */ > > +struct rte_ring_sg_data { > > + /* Pointer to the first space in the ring */ > > + void **ptr1; > > + /* Pointer to the second space in the ring if there is wrap-around */ > > + void **ptr2; > > + /* Number of elements in the first pointer. If this is equal to > > + * the number of elements requested, then ptr2 is NULL. > > + * Otherwise, subtracting n1 from number of elements requested > > + * will give the number of elements available at ptr2. > > + */ > > + unsigned int n1; > > +}; > > Would it be possible to simply return the offset instead of this structure? > The wrap could be managed by a __rte_ring_enqueue_elems() function. Trying to use __rte_ring_enqueue_elems() will force temporary copy. See below. > > I mean something like this: > > uint32_t start; > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > if (n != 0) { > /* Copy objects in the ring. */ > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > n); For ex: 'obj' here is temporary copy. > rte_ring_enqueue_sg_finish(ring, n); > } > > It would require to slightly change __rte_ring_enqueue_elems() to support > to be called with prod_head >= size, and wrap in that case. > The alternate solution I can think of requires 3 things 1) the base address of the ring 2) Index to where to copy 3) the mask. With these 3 things one could write the code like below: for (i = 0; i < n; i++) { ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take care of wrap-around. } However, I think this does not allow for passing the address in the ring to another function/API to copy the data (It is possible, but the user has to calculate the actual address, worry about the wrap-around, second pointer etc). The current approach hides some details and provides flexibility to the application to use the pointer the way it wants. > > Regards, > Olivier
<snip> > > > > > Hi Honnappa, > > > > From a quick walkthrough, I have some questions/comments, please see > > below. > Hi Olivier, appreciate your input. > > > > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > > involve copying large amount of data to/from the ring can benefit > > > from these APIs. > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > --- > > > lib/librte_ring/meson.build | 3 +- > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > +++++++++++++++++++++++++++++ > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode > > > 100644 lib/librte_ring/rte_ring_peek_sg.h > > > > > > diff --git a/lib/librte_ring/meson.build > > > b/lib/librte_ring/meson.build index 31c0b4649..377694713 100644 > > > --- a/lib/librte_ring/meson.build > > > +++ b/lib/librte_ring/meson.build > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > 'rte_ring_peek.h', > > > 'rte_ring_peek_c11_mem.h', > > > 'rte_ring_rts.h', > > > - 'rte_ring_rts_c11_mem.h') > > > + 'rte_ring_rts_c11_mem.h', > > > + 'rte_ring_peek_sg.h') > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > > --- a/lib/librte_ring/rte_ring_elem.h > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring > > > *r, void *obj_table, > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > #include <rte_ring_peek.h> > > > +#include <rte_ring_peek_sg.h> > > > #endif > > > > > > #include <rte_ring.h> > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > new file mode 100644 > > > index 000000000..97d5764a6 > > > --- /dev/null > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > @@ -0,0 +1,552 @@ > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > + * > > > + * Copyright (c) 2020 Arm > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > + * All rights reserved. > > > + * Derived from FreeBSD's bufring.h > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > + */ > > > + > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > +#define _RTE_RING_PEEK_SG_H_ > > > + > > > +/** > > > + * @file > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > + * It is not recommended to include this file directly. > > > + * Please include <rte_ring_elem.h> instead. > > > + * > > > + * Ring Peek Scatter Gather APIs > > > > I am not fully convinced by the API name. To me, "scatter/gather" is > > associated to iovecs, like for instance in [1]. The wikipedia > > definition [2] may be closer though. > > > > [1] > > > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGathe > > r.html > > [2] https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) > The way I understand scatter-gather is, the data to be sent to something (like > a device) is coming from multiple sources. It would require copying to put the > data together to be contiguous. If the device supports scatter-gather, such > copying is not required. The device can collect data from multiple locations > and make it contiguous. > > In the case I was looking at, one part of the data was coming from the user of > the API and another was generated by the API itself. If these two pieces of > information have to be enqueued as a single object on the ring, I had to > create an intermediate copy. But by exposing the ring memory to the user, > the intermediate copy is avoided. Hence I called it scatter-gather. > > > > > What about "zero-copy"? > I think no-copy (nc for short) or user-copy (uc for short) would convey the > meaning better. These would indicate that the rte_ring APIs are not copying > the objects and it is left to the user to do the actual copy. > > > > > Also, the "peek" term looks also a bit confusing to me, but it existed > > before your patch. I understand it for dequeue, but not for enqueue. > I kept 'peek' there because the API still offers the 'peek' API capabilities. I am > also not sure on what 'peek' means for enqueue API. The enqueue 'peek' > API was provided to be symmetric with dequeue peek API. > > > > > Or, what about replacing the existing experimental peek API by this one? > > They look quite similar to me. > I agree, scatter gather APIs provide the peek capability and the no-copy > benefits. > Konstantin, any opinions here? > > > > > > + * Introduction of rte_ring with scatter gather serialized > > > + producer/consumer > > > + * (HTS sync mode) makes it possible to split public > > > + enqueue/dequeue API > > > + * into 3 phases: > > > + * - enqueue/dequeue start > > > + * - copy data to/from the ring > > > + * - enqueue/dequeue finish > > > + * Along with the advantages of the peek APIs, these APIs provide > > > + the ability > > > + * to avoid copying of the data to temporary area. > > > + * > > > + * Note that right now this new API is available only for two sync modes: > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > + * 2) Serialized Producer/Serialized Consumer > (RTE_RING_SYNC_MT_HTS). > > > + * It is a user responsibility to create/init ring with appropriate > > > + sync > > > + * modes selected. > > > + * > > > + * Example usage: > > > + * // read 1 elem from the ring: > > > > Comment should be "prepare enqueuing 32 objects" > > > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > + * if (n != 0) { > > > + * //Copy objects in the ring > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > + * if (n != sgd->n1) > > > + * //Second memcpy because of wrapround > > > + * n2 = n - sgd->n1; > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > > Missing { } > > > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > Should be enqueue > > > Thanks, will correct them. > > > > + * } > > > + * > > > + * Note that between _start_ and _finish_ none other thread can > > > + proceed > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > + */ > > > + > > > +#ifdef __cplusplus > > > +extern "C" { > > > +#endif > > > + > > > +#include <rte_ring_peek_c11_mem.h> > > > + > > > +/* Rock that needs to be passed between reserve and commit APIs */ > > > +struct rte_ring_sg_data { > > > + /* Pointer to the first space in the ring */ > > > + void **ptr1; > > > + /* Pointer to the second space in the ring if there is wrap-around */ > > > + void **ptr2; > > > + /* Number of elements in the first pointer. If this is equal to > > > + * the number of elements requested, then ptr2 is NULL. > > > + * Otherwise, subtracting n1 from number of elements requested > > > + * will give the number of elements available at ptr2. > > > + */ > > > + unsigned int n1; > > > +}; > > > > Would it be possible to simply return the offset instead of this structure? > > The wrap could be managed by a __rte_ring_enqueue_elems() function. > Trying to use __rte_ring_enqueue_elems() will force temporary copy. See > below. > > > > > I mean something like this: > > > > uint32_t start; > > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > > if (n != 0) { > > /* Copy objects in the ring. */ > > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > n); > For ex: 'obj' here is temporary copy. The example I provided in the comment at the top of the file is not good. I will replace the 'memcpy' with calling another API that copies the data to the ring directly. That should show the clear benefit. > > > rte_ring_enqueue_sg_finish(ring, n); > > } > > > > It would require to slightly change __rte_ring_enqueue_elems() to > > support to be called with prod_head >= size, and wrap in that case. > > > The alternate solution I can think of requires 3 things 1) the base address of > the ring 2) Index to where to copy 3) the mask. With these 3 things one could > write the code like below: > for (i = 0; i < n; i++) { > ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take > care of wrap-around. > } > > However, I think this does not allow for passing the address in the ring to > another function/API to copy the data (It is possible, but the user has to > calculate the actual address, worry about the wrap-around, second pointer > etc). > > The current approach hides some details and provides flexibility to the > application to use the pointer the way it wants. > > > > > Regards, > > Olivier
On Thu, Oct 08, 2020 at 08:44:13PM +0000, Honnappa Nagarahalli wrote: > <snip> > > > > > Hi Honnappa, > > > > From a quick walkthrough, I have some questions/comments, please see > > below. > Hi Olivier, appreciate your input. > > > > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > > involve copying large amount of data to/from the ring can benefit from > > > these APIs. > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > --- > > > lib/librte_ring/meson.build | 3 +- > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > +++++++++++++++++++++++++++++ > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > > lib/librte_ring/rte_ring_peek_sg.h > > > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > > index 31c0b4649..377694713 100644 > > > --- a/lib/librte_ring/meson.build > > > +++ b/lib/librte_ring/meson.build > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > 'rte_ring_peek.h', > > > 'rte_ring_peek_c11_mem.h', > > > 'rte_ring_rts.h', > > > - 'rte_ring_rts_c11_mem.h') > > > + 'rte_ring_rts_c11_mem.h', > > > + 'rte_ring_peek_sg.h') > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > > --- a/lib/librte_ring/rte_ring_elem.h > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > > void *obj_table, > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > #include <rte_ring_peek.h> > > > +#include <rte_ring_peek_sg.h> > > > #endif > > > > > > #include <rte_ring.h> > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > new file mode 100644 > > > index 000000000..97d5764a6 > > > --- /dev/null > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > @@ -0,0 +1,552 @@ > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > + * > > > + * Copyright (c) 2020 Arm > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > + * All rights reserved. > > > + * Derived from FreeBSD's bufring.h > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > + */ > > > + > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > +#define _RTE_RING_PEEK_SG_H_ > > > + > > > +/** > > > + * @file > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > + * It is not recommended to include this file directly. > > > + * Please include <rte_ring_elem.h> instead. > > > + * > > > + * Ring Peek Scatter Gather APIs > > > > I am not fully convinced by the API name. To me, "scatter/gather" is > > associated to iovecs, like for instance in [1]. The wikipedia definition [2] may > > be closer though. > > > > [1] > > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGathe > > r.html > > [2] https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) > The way I understand scatter-gather is, the data to be sent to something (like a device) is coming from multiple sources. It would require copying to put the data together to be contiguous. If the device supports scatter-gather, such copying is not required. The device can collect data from multiple locations and make it contiguous. > > In the case I was looking at, one part of the data was coming from the user of the API and another was generated by the API itself. If these two pieces of information have to be enqueued as a single object on the ring, I had to create an intermediate copy. But by exposing the ring memory to the user, the intermediate copy is avoided. Hence I called it scatter-gather. > > > > > What about "zero-copy"? > I think no-copy (nc for short) or user-copy (uc for short) would convey the meaning better. These would indicate that the rte_ring APIs are not copying the objects and it is left to the user to do the actual copy. > > > > > Also, the "peek" term looks also a bit confusing to me, but it existed before > > your patch. I understand it for dequeue, but not for enqueue. > I kept 'peek' there because the API still offers the 'peek' API capabilities. I am also not sure on what 'peek' means for enqueue API. The enqueue 'peek' API was provided to be symmetric with dequeue peek API. > > > > > Or, what about replacing the existing experimental peek API by this one? > > They look quite similar to me. > I agree, scatter gather APIs provide the peek capability and the no-copy benefits. > Konstantin, any opinions here? > > > > > > + * Introduction of rte_ring with scatter gather serialized > > > + producer/consumer > > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > > + API > > > + * into 3 phases: > > > + * - enqueue/dequeue start > > > + * - copy data to/from the ring > > > + * - enqueue/dequeue finish > > > + * Along with the advantages of the peek APIs, these APIs provide the > > > + ability > > > + * to avoid copying of the data to temporary area. > > > + * > > > + * Note that right now this new API is available only for two sync modes: > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > > + * It is a user responsibility to create/init ring with appropriate > > > + sync > > > + * modes selected. > > > + * > > > + * Example usage: > > > + * // read 1 elem from the ring: > > > > Comment should be "prepare enqueuing 32 objects" > > > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > + * if (n != 0) { > > > + * //Copy objects in the ring > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > + * if (n != sgd->n1) > > > + * //Second memcpy because of wrapround > > > + * n2 = n - sgd->n1; > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > > Missing { } > > > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > Should be enqueue > > > Thanks, will correct them. > > > > + * } > > > + * > > > + * Note that between _start_ and _finish_ none other thread can > > > + proceed > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > + */ > > > + > > > +#ifdef __cplusplus > > > +extern "C" { > > > +#endif > > > + > > > +#include <rte_ring_peek_c11_mem.h> > > > + > > > +/* Rock that needs to be passed between reserve and commit APIs */ > > > +struct rte_ring_sg_data { > > > + /* Pointer to the first space in the ring */ > > > + void **ptr1; > > > + /* Pointer to the second space in the ring if there is wrap-around */ > > > + void **ptr2; > > > + /* Number of elements in the first pointer. If this is equal to > > > + * the number of elements requested, then ptr2 is NULL. > > > + * Otherwise, subtracting n1 from number of elements requested > > > + * will give the number of elements available at ptr2. > > > + */ > > > + unsigned int n1; > > > +}; > > > > Would it be possible to simply return the offset instead of this structure? > > The wrap could be managed by a __rte_ring_enqueue_elems() function. > Trying to use __rte_ring_enqueue_elems() will force temporary copy. See below. > > > > > I mean something like this: > > > > uint32_t start; > > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > > if (n != 0) { > > /* Copy objects in the ring. */ > > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > > n); > For ex: 'obj' here is temporary copy. > > > rte_ring_enqueue_sg_finish(ring, n); > > } > > > > It would require to slightly change __rte_ring_enqueue_elems() to support > > to be called with prod_head >= size, and wrap in that case. > > > The alternate solution I can think of requires 3 things 1) the base address of the ring 2) Index to where to copy 3) the mask. With these 3 things one could write the code like below: > for (i = 0; i < n; i++) { > ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take care of wrap-around. > } > > However, I think this does not allow for passing the address in the ring to another function/API to copy the data (It is possible, but the user has to calculate the actual address, worry about the wrap-around, second pointer etc). > > The current approach hides some details and provides flexibility to the application to use the pointer the way it wants. I agree that doing the access + masking manually looks too complex. However I'm not sure to get why using __rte_ring_enqueue_elems() would result in an additional copy. I suppose the object that you want to enqueue is already stored somewhere? For instance, let's say you have 10 objects to enqueue, located at different places: void *obj_0_to_3 = <place where objects 0 to 3 are stored>; void *obj_4_to_7 = ...; void *obj_8_to_9 = ...; uint32_t start; n = rte_ring_enqueue_sg_bulk_start(ring, 10, &start, NULL); if (n != 0) { __rte_ring_enqueue_elems(ring, start, obj_0_to_3, sizeof(uintptr_t), 4); __rte_ring_enqueue_elems(ring, start + 4, obj_4_to_7, sizeof(uintptr_t), 4); __rte_ring_enqueue_elems(ring, start + 8, obj_8_to_9, sizeof(uintptr_t), 2); rte_ring_enqueue_sg_finish(ring, 10); } Thanks, Olivier
Hi lads, > On Thu, Oct 08, 2020 at 08:44:13PM +0000, Honnappa Nagarahalli wrote: > > <snip> > > > > > > > > Hi Honnappa, > > > > > > From a quick walkthrough, I have some questions/comments, please see > > > below. > > Hi Olivier, appreciate your input. > > > > > > > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > > > involve copying large amount of data to/from the ring can benefit from > > > > these APIs. > > > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > > --- > > > > lib/librte_ring/meson.build | 3 +- > > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > > +++++++++++++++++++++++++++++ > > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > > > lib/librte_ring/rte_ring_peek_sg.h > > > > > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > > > index 31c0b4649..377694713 100644 > > > > --- a/lib/librte_ring/meson.build > > > > +++ b/lib/librte_ring/meson.build > > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > > 'rte_ring_peek.h', > > > > 'rte_ring_peek_c11_mem.h', > > > > 'rte_ring_rts.h', > > > > - 'rte_ring_rts_c11_mem.h') > > > > + 'rte_ring_rts_c11_mem.h', > > > > + 'rte_ring_peek_sg.h') > > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > > > --- a/lib/librte_ring/rte_ring_elem.h > > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > > > void *obj_table, > > > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > > #include <rte_ring_peek.h> > > > > +#include <rte_ring_peek_sg.h> > > > > #endif > > > > > > > > #include <rte_ring.h> > > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > > new file mode 100644 > > > > index 000000000..97d5764a6 > > > > --- /dev/null > > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > > @@ -0,0 +1,552 @@ > > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > > + * > > > > + * Copyright (c) 2020 Arm > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > > + * All rights reserved. > > > > + * Derived from FreeBSD's bufring.h > > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > > + */ > > > > + > > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > > +#define _RTE_RING_PEEK_SG_H_ > > > > + > > > > +/** > > > > + * @file > > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > > + * It is not recommended to include this file directly. > > > > + * Please include <rte_ring_elem.h> instead. > > > > + * > > > > + * Ring Peek Scatter Gather APIs > > > > > > I am not fully convinced by the API name. To me, "scatter/gather" is > > > associated to iovecs, like for instance in [1]. The wikipedia definition [2] may > > > be closer though. > > > > > > [1] > > > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGathe > > > r.html > > > [2] https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) > > The way I understand scatter-gather is, the data to be sent to something (like a device) is coming from multiple sources. It would require > copying to put the data together to be contiguous. If the device supports scatter-gather, such copying is not required. The device can > collect data from multiple locations and make it contiguous. > > > > In the case I was looking at, one part of the data was coming from the user of the API and another was generated by the API itself. If > these two pieces of information have to be enqueued as a single object on the ring, I had to create an intermediate copy. But by exposing > the ring memory to the user, the intermediate copy is avoided. Hence I called it scatter-gather. > > > > > > > > What about "zero-copy"? > > I think no-copy (nc for short) or user-copy (uc for short) would convey the meaning better. These would indicate that the rte_ring APIs are > not copying the objects and it is left to the user to do the actual copy. > > > > > > > > Also, the "peek" term looks also a bit confusing to me, but it existed before > > > your patch. I understand it for dequeue, but not for enqueue. > > I kept 'peek' there because the API still offers the 'peek' API capabilities. I am also not sure on what 'peek' means for enqueue API. The > enqueue 'peek' API was provided to be symmetric with dequeue peek API. > > > > > > > > Or, what about replacing the existing experimental peek API by this one? > > > They look quite similar to me. > > I agree, scatter gather APIs provide the peek capability and the no-copy benefits. > > Konstantin, any opinions here? Sorry, didn't have time yet, to look at this RFC properly. Will try to do it next week, as I understand that's for 21.02 anyway? > > > > > > > + * Introduction of rte_ring with scatter gather serialized > > > > + producer/consumer > > > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > > > + API > > > > + * into 3 phases: > > > > + * - enqueue/dequeue start > > > > + * - copy data to/from the ring > > > > + * - enqueue/dequeue finish > > > > + * Along with the advantages of the peek APIs, these APIs provide the > > > > + ability > > > > + * to avoid copying of the data to temporary area. > > > > + * > > > > + * Note that right now this new API is available only for two sync modes: > > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > > > + * It is a user responsibility to create/init ring with appropriate > > > > + sync > > > > + * modes selected. > > > > + * > > > > + * Example usage: > > > > + * // read 1 elem from the ring: > > > > > > Comment should be "prepare enqueuing 32 objects" > > > > > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > > + * if (n != 0) { > > > > + * //Copy objects in the ring > > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > > + * if (n != sgd->n1) > > > > + * //Second memcpy because of wrapround > > > > + * n2 = n - sgd->n1; > > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > > > > Missing { } > > > > > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > > > Should be enqueue > > > > > Thanks, will correct them. > > > > > > + * } > > > > + * > > > > + * Note that between _start_ and _finish_ none other thread can > > > > + proceed > > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > > + */ > > > > + > > > > +#ifdef __cplusplus > > > > +extern "C" { > > > > +#endif > > > > + > > > > +#include <rte_ring_peek_c11_mem.h> > > > > + > > > > +/* Rock that needs to be passed between reserve and commit APIs */ > > > > +struct rte_ring_sg_data { > > > > + /* Pointer to the first space in the ring */ > > > > + void **ptr1; > > > > + /* Pointer to the second space in the ring if there is wrap-around */ > > > > + void **ptr2; > > > > + /* Number of elements in the first pointer. If this is equal to > > > > + * the number of elements requested, then ptr2 is NULL. > > > > + * Otherwise, subtracting n1 from number of elements requested > > > > + * will give the number of elements available at ptr2. > > > > + */ > > > > + unsigned int n1; > > > > +}; > > > > > > Would it be possible to simply return the offset instead of this structure? > > > The wrap could be managed by a __rte_ring_enqueue_elems() function. > > Trying to use __rte_ring_enqueue_elems() will force temporary copy. See below. > > > > > > > > I mean something like this: > > > > > > uint32_t start; > > > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > > > if (n != 0) { > > > /* Copy objects in the ring. */ > > > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > > > n); > > For ex: 'obj' here is temporary copy. > > > > > rte_ring_enqueue_sg_finish(ring, n); > > > } > > > > > > It would require to slightly change __rte_ring_enqueue_elems() to support > > > to be called with prod_head >= size, and wrap in that case. > > > > > The alternate solution I can think of requires 3 things 1) the base address of the ring 2) Index to where to copy 3) the mask. With these 3 > things one could write the code like below: > > for (i = 0; i < n; i++) { > > ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take care of wrap-around. > > } > > > > However, I think this does not allow for passing the address in the ring to another function/API to copy the data (It is possible, but the user > has to calculate the actual address, worry about the wrap-around, second pointer etc). > > > > The current approach hides some details and provides flexibility to the application to use the pointer the way it wants. > > I agree that doing the access + masking manually looks too complex. > > However I'm not sure to get why using __rte_ring_enqueue_elems() would > result in an additional copy. I suppose the object that you want to > enqueue is already stored somewhere? > > For instance, let's say you have 10 objects to enqueue, located at > different places: > > void *obj_0_to_3 = <place where objects 0 to 3 are stored>; > void *obj_4_to_7 = ...; > void *obj_8_to_9 = ...; > uint32_t start; > n = rte_ring_enqueue_sg_bulk_start(ring, 10, &start, NULL); > if (n != 0) { > __rte_ring_enqueue_elems(ring, start, obj_0_to_3, > sizeof(uintptr_t), 4); > __rte_ring_enqueue_elems(ring, start + 4, obj_4_to_7, > sizeof(uintptr_t), 4); > __rte_ring_enqueue_elems(ring, start + 8, obj_8_to_9, > sizeof(uintptr_t), 2); > rte_ring_enqueue_sg_finish(ring, 10); > } > As I understand, It is not about different objects stored in different places, it is about: a) object is relatively big (16B+ ?) b) You compose objects from values stored in few different places. Let say you have: struct elem_obj {uint64_t a; uint32_t b, c;}; And then you'd like to copy 'a' value from one location, 'b' from second, and 'c' from third one. Konstantin
<snip> > > > > Hi Honnappa, > > > > > > > > From a quick walkthrough, I have some questions/comments, please > > > > see below. > > > Hi Olivier, appreciate your input. > > > > > > > > > > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases > > > > > that involve copying large amount of data to/from the ring can > > > > > benefit from these APIs. > > > > > > > > > > Signed-off-by: Honnappa Nagarahalli > > > > > <honnappa.nagarahalli@arm.com> > > > > > --- > > > > > lib/librte_ring/meson.build | 3 +- > > > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > > > +++++++++++++++++++++++++++++ > > > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode > > > > > 100644 lib/librte_ring/rte_ring_peek_sg.h > > > > > > > > > > diff --git a/lib/librte_ring/meson.build > > > > > b/lib/librte_ring/meson.build index 31c0b4649..377694713 100644 > > > > > --- a/lib/librte_ring/meson.build > > > > > +++ b/lib/librte_ring/meson.build > > > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > > > 'rte_ring_peek.h', > > > > > 'rte_ring_peek_c11_mem.h', > > > > > 'rte_ring_rts.h', > > > > > - 'rte_ring_rts_c11_mem.h') > > > > > + 'rte_ring_rts_c11_mem.h', > > > > > + 'rte_ring_peek_sg.h') > > > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 > > > > > 100644 > > > > > --- a/lib/librte_ring/rte_ring_elem.h > > > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct > > > > > rte_ring *r, void *obj_table, > > > > > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > > > #include <rte_ring_peek.h> > > > > > +#include <rte_ring_peek_sg.h> > > > > > #endif > > > > > > > > > > #include <rte_ring.h> > > > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > > > new file mode 100644 > > > > > index 000000000..97d5764a6 > > > > > --- /dev/null > > > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > > > @@ -0,0 +1,552 @@ > > > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > > > + * > > > > > + * Copyright (c) 2020 Arm > > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > > > + * All rights reserved. > > > > > + * Derived from FreeBSD's bufring.h > > > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > > > + */ > > > > > + > > > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > > > +#define _RTE_RING_PEEK_SG_H_ > > > > > + > > > > > +/** > > > > > + * @file > > > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > > > + * It is not recommended to include this file directly. > > > > > + * Please include <rte_ring_elem.h> instead. > > > > > + * > > > > > + * Ring Peek Scatter Gather APIs > > > > > > > > I am not fully convinced by the API name. To me, "scatter/gather" > > > > is associated to iovecs, like for instance in [1]. The wikipedia > > > > definition [2] may be closer though. > > > > > > > > [1] > > > > > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGat > > > > he > > > > r.html > > > > [2] > > > > https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) > > > The way I understand scatter-gather is, the data to be sent to > > > something (like a device) is coming from multiple sources. It would > > > require > > copying to put the data together to be contiguous. If the device > > supports scatter-gather, such copying is not required. The device can > collect data from multiple locations and make it contiguous. > > > > > > In the case I was looking at, one part of the data was coming from > > > the user of the API and another was generated by the API itself. If > > these two pieces of information have to be enqueued as a single object > > on the ring, I had to create an intermediate copy. But by exposing the ring > memory to the user, the intermediate copy is avoided. Hence I called it > scatter-gather. > > > > > > > > > > > What about "zero-copy"? > > > I think no-copy (nc for short) or user-copy (uc for short) would > > > convey the meaning better. These would indicate that the rte_ring > > > APIs are > > not copying the objects and it is left to the user to do the actual copy. > > > > > > > > > > > Also, the "peek" term looks also a bit confusing to me, but it > > > > existed before your patch. I understand it for dequeue, but not for > enqueue. > > > I kept 'peek' there because the API still offers the 'peek' API > > > capabilities. I am also not sure on what 'peek' means for enqueue > > > API. The > > enqueue 'peek' API was provided to be symmetric with dequeue peek API. > > > > > > > > > > > Or, what about replacing the existing experimental peek API by this one? > > > > They look quite similar to me. > > > I agree, scatter gather APIs provide the peek capability and the no-copy > benefits. > > > Konstantin, any opinions here? > > Sorry, didn't have time yet, to look at this RFC properly. > Will try to do it next week, as I understand that's for 21.02 anyway? This is committed for 20.11. We should be able to get into RC2. > > > > > > > > > > + * Introduction of rte_ring with scatter gather serialized > > > > > + producer/consumer > > > > > + * (HTS sync mode) makes it possible to split public > > > > > + enqueue/dequeue API > > > > > + * into 3 phases: > > > > > + * - enqueue/dequeue start > > > > > + * - copy data to/from the ring > > > > > + * - enqueue/dequeue finish > > > > > + * Along with the advantages of the peek APIs, these APIs > > > > > + provide the ability > > > > > + * to avoid copying of the data to temporary area. > > > > > + * > > > > > + * Note that right now this new API is available only for two sync > modes: > > > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > > > + * 2) Serialized Producer/Serialized Consumer > (RTE_RING_SYNC_MT_HTS). > > > > > + * It is a user responsibility to create/init ring with > > > > > + appropriate sync > > > > > + * modes selected. > > > > > + * > > > > > + * Example usage: > > > > > + * // read 1 elem from the ring: > > > > > > > > Comment should be "prepare enqueuing 32 objects" > > > > > > > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > > > + * if (n != 0) { > > > > > + * //Copy objects in the ring > > > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > > > + * if (n != sgd->n1) > > > > > + * //Second memcpy because of wrapround > > > > > + * n2 = n - sgd->n1; > > > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > > > > > > Missing { } > > > > > > > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > > > > > Should be enqueue > > > > > > > Thanks, will correct them. > > > > > > > > + * } > > > > > + * > > > > > + * Note that between _start_ and _finish_ none other thread can > > > > > + proceed > > > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > > > + */ > > > > > + > > > > > +#ifdef __cplusplus > > > > > +extern "C" { > > > > > +#endif > > > > > + > > > > > +#include <rte_ring_peek_c11_mem.h> > > > > > + > > > > > +/* Rock that needs to be passed between reserve and commit APIs > > > > > +*/ struct rte_ring_sg_data { > > > > > + /* Pointer to the first space in the ring */ > > > > > + void **ptr1; > > > > > + /* Pointer to the second space in the ring if there is wrap- > around */ > > > > > + void **ptr2; > > > > > + /* Number of elements in the first pointer. If this is equal to > > > > > + * the number of elements requested, then ptr2 is NULL. > > > > > + * Otherwise, subtracting n1 from number of elements > requested > > > > > + * will give the number of elements available at ptr2. > > > > > + */ > > > > > + unsigned int n1; > > > > > +}; > > > > > > > > Would it be possible to simply return the offset instead of this structure? > > > > The wrap could be managed by a __rte_ring_enqueue_elems() > function. > > > Trying to use __rte_ring_enqueue_elems() will force temporary copy. > See below. > > > > > > > > > > > I mean something like this: > > > > > > > > uint32_t start; > > > > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > > > > if (n != 0) { > > > > /* Copy objects in the ring. */ > > > > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > > > > n); > > > For ex: 'obj' here is temporary copy. > > > > > > > rte_ring_enqueue_sg_finish(ring, n); > > > > } > > > > > > > > It would require to slightly change __rte_ring_enqueue_elems() to > > > > support to be called with prod_head >= size, and wrap in that case. > > > > > > > The alternate solution I can think of requires 3 things 1) the base > > > address of the ring 2) Index to where to copy 3) the mask. With > > > these 3 > > things one could write the code like below: > > > for (i = 0; i < n; i++) { > > > ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take > care of wrap-around. > > > } > > > > > > However, I think this does not allow for passing the address in the > > > ring to another function/API to copy the data (It is possible, but > > > the user > > has to calculate the actual address, worry about the wrap-around, second > pointer etc). > > > > > > The current approach hides some details and provides flexibility to the > application to use the pointer the way it wants. > > > > I agree that doing the access + masking manually looks too complex. > > > > However I'm not sure to get why using __rte_ring_enqueue_elems() > would > > result in an additional copy. I suppose the object that you want to > > enqueue is already stored somewhere? I think this is the key. The object is not stored any where (yet), it is getting generated. When it is generated, it should get stored directly into the ring. I have provided some examples below. > > > > For instance, let's say you have 10 objects to enqueue, located at > > different places: > > > > void *obj_0_to_3 = <place where objects 0 to 3 are stored>; > > void *obj_4_to_7 = ...; > > void *obj_8_to_9 = ...; > > uint32_t start; > > n = rte_ring_enqueue_sg_bulk_start(ring, 10, &start, NULL); > > if (n != 0) { > > __rte_ring_enqueue_elems(ring, start, obj_0_to_3, > > sizeof(uintptr_t), 4); > > __rte_ring_enqueue_elems(ring, start + 4, obj_4_to_7, > > sizeof(uintptr_t), 4); > > __rte_ring_enqueue_elems(ring, start + 8, obj_8_to_9, > > sizeof(uintptr_t), 2); > > rte_ring_enqueue_sg_finish(ring, 10); > > } > > > > > As I understand, It is not about different objects stored in different places, it > is about: > a) object is relatively big (16B+ ?) > b) You compose objects from values stored in few different places. > > Let say you have: > struct elem_obj {uint64_t a; uint32_t b, c;}; > > And then you'd like to copy 'a' value from one location, 'b' from second, and > 'c' from third one. > > Konstantin > I think there are multiple use cases. Some I have in mind are: 1) Code without this patch: struct rte_mbuf *pkts_burst[32]; /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ /* Pkt I/O core polls packets from the NIC, pkts_burst is the temporary store */ nb_rx = rte_eth_rx_burst(portid, queueid, pkts_burst, 32); /* Provide packets to the packet processing cores */ rte_ring_enqueue_burst(ring, pkts_burst, 32, &free_space); Code with the patch: /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ /* Reserve space on the ring */ n = rte_ring_enqueue_sg_burst_start(ring, 32, &sgd, NULL); /* Pkt I/O core polls packets from the NIC */ if (n == 32) nb_rx = rte_eth_rx_burst(portid, queueid, sgd->ptr1, 32); else nb_rx = rte_eth_rx_burst(portid, queueid, sgd->ptr1, sgd->n1); /* Provide packets to the packet processing cores */ /* Temporary storage 'pkts_burst' is not required */ rte_ring_enqueue_sg_finish(ring, nb_rx); 2) This is same/similar to what Konstantin mentioned Code without this patch: struct elem_obj {uint64_t a; uint32_t b, c;}; struct elem_obj obj; /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ obj.a = rte_get_a(); obj.b = rte_get_b(); obj.c = rte_get_c(); /* obj is the temporary storage and results in memcpy in the following call */ rte_ring_enqueue_elem(ring, sizeof(struct elem_obj), 1, &obj, NULL); Code with the patch: struct elem_obj *obj; /* Reserve space on the ring */ n = rte_ring_enqueue_sg_bulk_elem_start(ring, sizeof(elem_obj), 1, &sgd, NULL); obj = (struct elem_obj *)sgd->ptr1; obj.a = rte_get_a(); obj.b = rte_get_b(); obj.c = rte_get_c(); /* obj is not a temporary storage */ rte_ring_enqueue_sg_elem_finish(ring, n);
> Add scatter gather APIs to avoid intermediate memcpy. Use cases > that involve copying large amount of data to/from the ring > can benefit from these APIs. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > --- > lib/librte_ring/meson.build | 3 +- > lib/librte_ring/rte_ring_elem.h | 1 + > lib/librte_ring/rte_ring_peek_sg.h | 552 +++++++++++++++++++++++++++++ > 3 files changed, 555 insertions(+), 1 deletion(-) > create mode 100644 lib/librte_ring/rte_ring_peek_sg.h As a generic one - need to update ring UT both func and perf to test/measure this new API. > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > index 31c0b4649..377694713 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > 'rte_ring_peek.h', > 'rte_ring_peek_c11_mem.h', > 'rte_ring_rts.h', > - 'rte_ring_rts_c11_mem.h') > + 'rte_ring_rts_c11_mem.h', > + 'rte_ring_peek_sg.h') > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h > index 938b398fc..7d3933f15 100644 > --- a/lib/librte_ring/rte_ring_elem.h > +++ b/lib/librte_ring/rte_ring_elem.h > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > > #ifdef ALLOW_EXPERIMENTAL_API > #include <rte_ring_peek.h> > +#include <rte_ring_peek_sg.h> > #endif > > #include <rte_ring.h> > diff --git a/lib/librte_ring/rte_ring_peek_sg.h b/lib/librte_ring/rte_ring_peek_sg.h > new file mode 100644 > index 000000000..97d5764a6 > --- /dev/null > +++ b/lib/librte_ring/rte_ring_peek_sg.h > @@ -0,0 +1,552 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2020 Arm > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_PEEK_SG_H_ > +#define _RTE_RING_PEEK_SG_H_ > + > +/** > + * @file > + * @b EXPERIMENTAL: this API may change without prior notice > + * It is not recommended to include this file directly. > + * Please include <rte_ring_elem.h> instead. > + * > + * Ring Peek Scatter Gather APIs > + * Introduction of rte_ring with scatter gather serialized producer/consumer > + * (HTS sync mode) makes it possible to split public enqueue/dequeue API > + * into 3 phases: > + * - enqueue/dequeue start > + * - copy data to/from the ring > + * - enqueue/dequeue finish > + * Along with the advantages of the peek APIs, these APIs provide the ability > + * to avoid copying of the data to temporary area. > + * > + * Note that right now this new API is available only for two sync modes: > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > + * It is a user responsibility to create/init ring with appropriate sync > + * modes selected. > + * > + * Example usage: > + * // read 1 elem from the ring: > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > + * if (n != 0) { > + * //Copy objects in the ring > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > + * if (n != sgd->n1) > + * //Second memcpy because of wrapround > + * n2 = n - sgd->n1; > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > + * rte_ring_dequeue_sg_finish(ring, n); It is not clear from the example above why do you need SG(ZC) API. Existing peek API would be able to handle such situation (just copy will be done internally). Probably better to use examples you provided in your last reply to Olivier. > + * } > + * > + * Note that between _start_ and _finish_ none other thread can proceed > + * with enqueue(/dequeue) operation till _finish_ completes. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <rte_ring_peek_c11_mem.h> > + > +/* Rock that needs to be passed between reserve and commit APIs */ > +struct rte_ring_sg_data { > + /* Pointer to the first space in the ring */ > + void **ptr1; > + /* Pointer to the second space in the ring if there is wrap-around */ > + void **ptr2; > + /* Number of elements in the first pointer. If this is equal to > + * the number of elements requested, then ptr2 is NULL. > + * Otherwise, subtracting n1 from number of elements requested > + * will give the number of elements available at ptr2. > + */ > + unsigned int n1; > +}; I wonder what is the primary goal of that API? The reason I am asking: from what I understand with this patch ZC API will work only for ST and HTS modes (same as peek API). Though, I think it is possible to make it work for any sync model, by changing API a bit: instead of returning sg_data to the user, force him to provide function to read/write elems from/to the ring. Just a schematic one, to illustrate the idea: typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to update inside the ring*/ uint32_t num, /* number of elems to update */ uint32_t esize, void *udata /* caller provide data */); rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize, unsigned int n, unsigned int *free_space, write_ring_func_t wf, void *udata) { struct rte_ring_sg_data sgd; ..... n = move_head_tail(r, ...); /* get sgd data based on n */ get_elem_addr(r, ..., &sgd); /* call user defined function to fill reserved elems */ wf(sgd.p1, sgd.n1, esize, udata); if (n != n1) wf(sgd.p2, sgd.n2, esize, udata); .... return n; } If we want ZC peek API also - some extra work need to be done with introducing return value for write_ring_func() and checking it properly, but I don't see any big problems here too. That way ZC API can support all sync models, plus we don't need to expose sg_data to the user directly. Also, in future, we probably can de-duplicate the code by making our non-ZC API to use that one internally (pass ring_enqueue_elems()/ob_table as a parameters). > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx = head & r->mask; > + uint64_t *ring = (uint64_t *)&r[1]; > + > + *dst1 = ring + idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + uint32_t idx = head & r->mask; > + rte_int128_t *ring = (rte_int128_t *)&r[1]; > + > + *dst1 = ring + idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > +} > + > +static __rte_always_inline void > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) > +{ > + if (esize == 8) > + __rte_ring_get_elem_addr_64(r, head, > + num, dst1, n1, dst2); > + else if (esize == 16) > + __rte_ring_get_elem_addr_128(r, head, > + num, dst1, n1, dst2); I don't think we need that special handling for 8/16B sizes. In all functions esize is an input parameter. If user will specify is as a constant - compiler will be able to convert multiply to shift and add ops. > + else { > + uint32_t idx, scale, nr_idx; > + uint32_t *ring = (uint32_t *)&r[1]; > + > + /* Normalize to uint32_t */ > + scale = esize / sizeof(uint32_t); > + idx = head & r->mask; > + nr_idx = idx * scale; > + > + *dst1 = ring + nr_idx; > + *n1 = num; > + > + if (idx + num > r->size) { > + *n1 = num - (r->size - idx - 1); > + *dst2 = ring; > + } > + } > +} > + > +/** > + * @internal This function moves prod head value. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int esize, > + uint32_t n, enum rte_ring_queue_behavior behavior, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + uint32_t free, head, next; > + > + switch (r->prod.sync_type) { > + case RTE_RING_SYNC_ST: > + n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, > + behavior, &head, &next, &free); > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, > + &sgd->n1, (void **)&sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, > + &sgd->n1, (void **)&sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + n = 0; > + free = 0; > + } > + > + if (free_space != NULL) > + *free_space = free - n; > + return n; > +} > + > +/** > + * Start to enqueue several objects on the ring. > + * Note that no actual objects are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy objects into the queue using the returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_FIXED, sgd, free_space); > +} > + > +/** > + * Start to enqueue several pointers to objects on the ring. > + * Note that no actual pointers are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy pointers to objects into the queue using the > + * returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n, > + sgd, free_space); > +} > +/** > + * Start to enqueue several objects on the ring. > + * Note that no actual objects are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy objects into the queue using the returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int esize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_VARIABLE, sgd, free_space); > +} > + > +/** > + * Start to enqueue several pointers to objects on the ring. > + * Note that no actual pointers are put in the queue by this function, > + * it just reserves space for the user on the ring. > + * User has to copy pointers to objects into the queue using the > + * returned pointers. > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > + * enqueue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add in the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * reservation operation has finished. > + * @return > + * The number of objects that can be enqueued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *free_space) > +{ > + return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t), n, > + sgd, free_space); > +} > + > +/** > + * Complete enqueuing several objects on the ring. > + * Note that number of objects to enqueue should not exceed previous > + * enqueue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to add to the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) > +{ > + uint32_t tail; > + > + switch (r->prod.sync_type) { > + case RTE_RING_SYNC_ST: > + n = __rte_ring_st_get_tail(&r->prod, &tail, n); > + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); > + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + } > +} > + > +/** > + * Complete enqueuing several pointers to objects on the ring. > + * Note that number of objects to enqueue should not exceed previous > + * enqueue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of pointers to objects to add to the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) > +{ > + rte_ring_enqueue_sg_elem_finish(r, n); > +} > + > +/** > + * @internal This function moves cons head value and copies up to *n* > + * objects from the ring to the user provided obj_table. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r, > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + uint32_t avail, head, next; > + > + switch (r->cons.sync_type) { > + case RTE_RING_SYNC_ST: > + n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, > + behavior, &head, &next, &avail); > + __rte_ring_get_elem_addr(r, head, esize, n, > + sgd->ptr1, &sgd->n1, sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_move_cons_head(r, n, behavior, > + &head, &avail); > + __rte_ring_get_elem_addr(r, head, esize, n, > + sgd->ptr1, &sgd->n1, sgd->ptr2); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + n = 0; > + avail = 0; > + } > + > + if (available != NULL) > + *available = avail - n; > + return n; > +} > + > +/** > + * Start to dequeue several objects from the ring. > + * Note that no actual objects are copied from the queue by this function. > + * User has to copy objects from the queue using the returned pointers. > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_FIXED, sgd, available); > +} > + > +/** > + * Start to dequeue several pointers to objects from the ring. > + * Note that no actual pointers are removed from the queue by this function. > + * User has to copy pointers to objects from the queue using the > + * returned pointers. > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t), > + n, sgd, available); > +} > + > +/** > + * Start to dequeue several objects from the ring. > + * Note that no actual objects are copied from the queue by this function. > + * User has to copy objects from the queue using the returned pointers. > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * This must be the same value used while creating the ring. Otherwise > + * the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int esize, > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > + RTE_RING_QUEUE_VARIABLE, sgd, available); > +} > + > +/** > + * Start to dequeue several pointers to objects from the ring. > + * Note that no actual pointers are removed from the queue by this function. > + * User has to copy pointers to objects from the queue using the > + * returned pointers. > + * User should call rte_ring_dequeue_sg_burst_finish to complete the > + * dequeue operation. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + * @param sgd > + * The scatter-gather data containing pointers for copying data. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects that can be dequeued, either 0 or n > + */ > +__rte_experimental > +static __rte_always_inline unsigned int > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n, > + struct rte_ring_sg_data *sgd, unsigned int *available) > +{ > + return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t), n, > + sgd, available); > +} > + > +/** > + * Complete dequeuing several objects from the ring. > + * Note that number of objects to dequeued should not exceed previous > + * dequeue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) > +{ > + uint32_t tail; > + > + switch (r->cons.sync_type) { > + case RTE_RING_SYNC_ST: > + n = __rte_ring_st_get_tail(&r->cons, &tail, n); > + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); > + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); > + break; > + case RTE_RING_SYNC_MT: > + case RTE_RING_SYNC_MT_RTS: > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + } > +} > + > +/** > + * Complete dequeuing several objects from the ring. > + * Note that number of objects to dequeued should not exceed previous > + * dequeue_start return value. > + * > + * @param r > + * A pointer to the ring structure. > + * @param n > + * The number of objects to remove from the ring. > + */ > +__rte_experimental > +static __rte_always_inline void > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) > +{ > + rte_ring_dequeue_elem_finish(r, n); > +} > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RING_PEEK_SG_H_ */ > -- > 2.17.1
> <snip> > > > > > > Hi Honnappa, > > > > > > > > > > From a quick walkthrough, I have some questions/comments, please > > > > > see below. > > > > Hi Olivier, appreciate your input. > > > > > > > > > > > > > > On Tue, Oct 06, 2020 at 08:29:05AM -0500, Honnappa Nagarahalli wrote: > > > > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases > > > > > > that involve copying large amount of data to/from the ring can > > > > > > benefit from these APIs. > > > > > > > > > > > > Signed-off-by: Honnappa Nagarahalli > > > > > > <honnappa.nagarahalli@arm.com> > > > > > > --- > > > > > > lib/librte_ring/meson.build | 3 +- > > > > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > > > > +++++++++++++++++++++++++++++ > > > > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode > > > > > > 100644 lib/librte_ring/rte_ring_peek_sg.h > > > > > > > > > > > > diff --git a/lib/librte_ring/meson.build > > > > > > b/lib/librte_ring/meson.build index 31c0b4649..377694713 100644 > > > > > > --- a/lib/librte_ring/meson.build > > > > > > +++ b/lib/librte_ring/meson.build > > > > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > > > > 'rte_ring_peek.h', > > > > > > 'rte_ring_peek_c11_mem.h', > > > > > > 'rte_ring_rts.h', > > > > > > - 'rte_ring_rts_c11_mem.h') > > > > > > + 'rte_ring_rts_c11_mem.h', > > > > > > + 'rte_ring_peek_sg.h') > > > > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 > > > > > > 100644 > > > > > > --- a/lib/librte_ring/rte_ring_elem.h > > > > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct > > > > > > rte_ring *r, void *obj_table, > > > > > > > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > > > > #include <rte_ring_peek.h> > > > > > > +#include <rte_ring_peek_sg.h> > > > > > > #endif > > > > > > > > > > > > #include <rte_ring.h> > > > > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > > > > new file mode 100644 > > > > > > index 000000000..97d5764a6 > > > > > > --- /dev/null > > > > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > > > > @@ -0,0 +1,552 @@ > > > > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > > > > + * > > > > > > + * Copyright (c) 2020 Arm > > > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > > > > + * All rights reserved. > > > > > > + * Derived from FreeBSD's bufring.h > > > > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > > > > + */ > > > > > > + > > > > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > > > > +#define _RTE_RING_PEEK_SG_H_ > > > > > > + > > > > > > +/** > > > > > > + * @file > > > > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > > > > + * It is not recommended to include this file directly. > > > > > > + * Please include <rte_ring_elem.h> instead. > > > > > > + * > > > > > > + * Ring Peek Scatter Gather APIs > > > > > > > > > > I am not fully convinced by the API name. To me, "scatter/gather" > > > > > is associated to iovecs, like for instance in [1]. The wikipedia > > > > > definition [2] may be closer though. > > > > > > > > > > [1] > > > > > > > https://www.gnu.org/software/libc/manual/html_node/Scatter_002dGat > > > > > he > > > > > r.html > > > > > [2] > > > > > https://en.wikipedia.org/wiki/Gather-scatter_(vector_addressing) > > > > The way I understand scatter-gather is, the data to be sent to > > > > something (like a device) is coming from multiple sources. It would > > > > require > > > copying to put the data together to be contiguous. If the device > > > supports scatter-gather, such copying is not required. The device can > > collect data from multiple locations and make it contiguous. > > > > > > > > In the case I was looking at, one part of the data was coming from > > > > the user of the API and another was generated by the API itself. If > > > these two pieces of information have to be enqueued as a single object > > > on the ring, I had to create an intermediate copy. But by exposing the ring > > memory to the user, the intermediate copy is avoided. Hence I called it > > scatter-gather. > > > > > > > > > > > > > > What about "zero-copy"? > > > > I think no-copy (nc for short) or user-copy (uc for short) would > > > > convey the meaning better. These would indicate that the rte_ring > > > > APIs are > > > not copying the objects and it is left to the user to do the actual copy. +1 for _ZC_ in naming. _NC_ is probably ok too, but sounds really strange to me. > > > > > > > > > > > > > > Also, the "peek" term looks also a bit confusing to me, but it > > > > > existed before your patch. I understand it for dequeue, but not for > > enqueue. > > > > I kept 'peek' there because the API still offers the 'peek' API > > > > capabilities. I am also not sure on what 'peek' means for enqueue > > > > API. The > > > enqueue 'peek' API was provided to be symmetric with dequeue peek API. > > > > > > > > > > > > > > Or, what about replacing the existing experimental peek API by this one? > > > > > They look quite similar to me. > > > > I agree, scatter gather APIs provide the peek capability and the no-copy > > benefits. > > > > Konstantin, any opinions here? I am still not very comfortable with API that allows users to access elems locations directly. I do understand that it could be beneficial in some special cases (you provided some good examples below), so I don't object to have it as addon. But I still think it shouldn't be the _only_ API. > > > > Sorry, didn't have time yet, to look at this RFC properly. > > Will try to do it next week, as I understand that's for 21.02 anyway? > This is committed for 20.11. We should be able to get into RC2. Sounds really tight..., but ok, let's see how it goes. > > > > > > > > > > > > > + * Introduction of rte_ring with scatter gather serialized > > > > > > + producer/consumer > > > > > > + * (HTS sync mode) makes it possible to split public > > > > > > + enqueue/dequeue API > > > > > > + * into 3 phases: > > > > > > + * - enqueue/dequeue start > > > > > > + * - copy data to/from the ring > > > > > > + * - enqueue/dequeue finish > > > > > > + * Along with the advantages of the peek APIs, these APIs > > > > > > + provide the ability > > > > > > + * to avoid copying of the data to temporary area. > > > > > > + * > > > > > > + * Note that right now this new API is available only for two sync > > modes: > > > > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > > > > + * 2) Serialized Producer/Serialized Consumer > > (RTE_RING_SYNC_MT_HTS). > > > > > > + * It is a user responsibility to create/init ring with > > > > > > + appropriate sync > > > > > > + * modes selected. > > > > > > + * > > > > > > + * Example usage: > > > > > > + * // read 1 elem from the ring: > > > > > > > > > > Comment should be "prepare enqueuing 32 objects" > > > > > > > > > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > > > > + * if (n != 0) { > > > > > > + * //Copy objects in the ring > > > > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > > > > + * if (n != sgd->n1) > > > > > > + * //Second memcpy because of wrapround > > > > > > + * n2 = n - sgd->n1; > > > > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > > > > > > > > Missing { } > > > > > > > > > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > > > > > > > Should be enqueue > > > > > > > > > Thanks, will correct them. > > > > > > > > > > + * } > > > > > > + * > > > > > > + * Note that between _start_ and _finish_ none other thread can > > > > > > + proceed > > > > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > > > > + */ > > > > > > + > > > > > > +#ifdef __cplusplus > > > > > > +extern "C" { > > > > > > +#endif > > > > > > + > > > > > > +#include <rte_ring_peek_c11_mem.h> > > > > > > + > > > > > > +/* Rock that needs to be passed between reserve and commit APIs > > > > > > +*/ struct rte_ring_sg_data { > > > > > > + /* Pointer to the first space in the ring */ > > > > > > + void **ptr1; > > > > > > + /* Pointer to the second space in the ring if there is wrap- > > around */ > > > > > > + void **ptr2; > > > > > > + /* Number of elements in the first pointer. If this is equal to > > > > > > + * the number of elements requested, then ptr2 is NULL. > > > > > > + * Otherwise, subtracting n1 from number of elements > > requested > > > > > > + * will give the number of elements available at ptr2. > > > > > > + */ > > > > > > + unsigned int n1; > > > > > > +}; > > > > > > > > > > Would it be possible to simply return the offset instead of this structure? > > > > > The wrap could be managed by a __rte_ring_enqueue_elems() > > function. > > > > Trying to use __rte_ring_enqueue_elems() will force temporary copy. > > See below. > > > > > > > > > > > > > > I mean something like this: > > > > > > > > > > uint32_t start; > > > > > n = rte_ring_enqueue_sg_bulk_start(ring, 32, &start, NULL); > > > > > if (n != 0) { > > > > > /* Copy objects in the ring. */ > > > > > __rte_ring_enqueue_elems(ring, start, obj, sizeof(uintptr_t), > > > > > n); > > > > For ex: 'obj' here is temporary copy. > > > > > > > > > rte_ring_enqueue_sg_finish(ring, n); > > > > > } > > > > > > > > > > It would require to slightly change __rte_ring_enqueue_elems() to > > > > > support to be called with prod_head >= size, and wrap in that case. > > > > > > > > > The alternate solution I can think of requires 3 things 1) the base > > > > address of the ring 2) Index to where to copy 3) the mask. With > > > > these 3 > > > things one could write the code like below: > > > > for (i = 0; i < n; i++) { > > > > ring_addr[(index + i) & mask] = obj[i]; // ANDing with mask will take > > care of wrap-around. > > > > } > > > > > > > > However, I think this does not allow for passing the address in the > > > > ring to another function/API to copy the data (It is possible, but > > > > the user > > > has to calculate the actual address, worry about the wrap-around, second > > pointer etc). > > > > > > > > The current approach hides some details and provides flexibility to the > > application to use the pointer the way it wants. > > > > > > I agree that doing the access + masking manually looks too complex. > > > > > > However I'm not sure to get why using __rte_ring_enqueue_elems() > > would > > > result in an additional copy. I suppose the object that you want to > > > enqueue is already stored somewhere? > I think this is the key. The object is not stored any where (yet), it is getting generated. When it is generated, it should get stored directly into > the ring. I have provided some examples below. > > > > > > > For instance, let's say you have 10 objects to enqueue, located at > > > different places: > > > > > > void *obj_0_to_3 = <place where objects 0 to 3 are stored>; > > > void *obj_4_to_7 = ...; > > > void *obj_8_to_9 = ...; > > > uint32_t start; > > > n = rte_ring_enqueue_sg_bulk_start(ring, 10, &start, NULL); > > > if (n != 0) { > > > __rte_ring_enqueue_elems(ring, start, obj_0_to_3, > > > sizeof(uintptr_t), 4); > > > __rte_ring_enqueue_elems(ring, start + 4, obj_4_to_7, > > > sizeof(uintptr_t), 4); > > > __rte_ring_enqueue_elems(ring, start + 8, obj_8_to_9, > > > sizeof(uintptr_t), 2); > > > rte_ring_enqueue_sg_finish(ring, 10); > > > } > > > > > > > > > As I understand, It is not about different objects stored in different places, it > > is about: > > a) object is relatively big (16B+ ?) > > b) You compose objects from values stored in few different places. > > > > Let say you have: > > struct elem_obj {uint64_t a; uint32_t b, c;}; > > > > And then you'd like to copy 'a' value from one location, 'b' from second, and > > 'c' from third one. > > > > Konstantin > > > I think there are multiple use cases. Some I have in mind are: > > 1) > Code without this patch: > > struct rte_mbuf *pkts_burst[32]; > > /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ > > /* Pkt I/O core polls packets from the NIC, pkts_burst is the temporary store */ > nb_rx = rte_eth_rx_burst(portid, queueid, pkts_burst, 32); > /* Provide packets to the packet processing cores */ > rte_ring_enqueue_burst(ring, pkts_burst, 32, &free_space); > > Code with the patch: > > /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ > > /* Reserve space on the ring */ > n = rte_ring_enqueue_sg_burst_start(ring, 32, &sgd, NULL); > /* Pkt I/O core polls packets from the NIC */ > if (n == 32) > nb_rx = rte_eth_rx_burst(portid, queueid, sgd->ptr1, 32); > else > nb_rx = rte_eth_rx_burst(portid, queueid, sgd->ptr1, sgd->n1); > /* Provide packets to the packet processing cores */ > /* Temporary storage 'pkts_burst' is not required */ > rte_ring_enqueue_sg_finish(ring, nb_rx); > > > 2) This is same/similar to what Konstantin mentioned > > Code without this patch: > > struct elem_obj {uint64_t a; uint32_t b, c;}; > struct elem_obj obj; > > /* Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS */ > > obj.a = rte_get_a(); > obj.b = rte_get_b(); > obj.c = rte_get_c(); > /* obj is the temporary storage and results in memcpy in the following call */ > rte_ring_enqueue_elem(ring, sizeof(struct elem_obj), 1, &obj, NULL); > > Code with the patch: > > struct elem_obj *obj; > /* Reserve space on the ring */ > n = rte_ring_enqueue_sg_bulk_elem_start(ring, sizeof(elem_obj), 1, &sgd, NULL); > > obj = (struct elem_obj *)sgd->ptr1; > obj.a = rte_get_a(); > obj.b = rte_get_b(); > obj.c = rte_get_c(); > /* obj is not a temporary storage */ > rte_ring_enqueue_sg_elem_finish(ring, n);
Hi Konstantin, Appreciate your feedback. <snip> > > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > involve copying large amount of data to/from the ring can benefit from > > these APIs. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > --- > > lib/librte_ring/meson.build | 3 +- > > lib/librte_ring/rte_ring_elem.h | 1 + > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > +++++++++++++++++++++++++++++ > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > lib/librte_ring/rte_ring_peek_sg.h > > As a generic one - need to update ring UT both func and perf to > test/measure this new API. Yes, will add. > > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > index 31c0b4649..377694713 100644 > > --- a/lib/librte_ring/meson.build > > +++ b/lib/librte_ring/meson.build > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > 'rte_ring_peek.h', > > 'rte_ring_peek_c11_mem.h', > > 'rte_ring_rts.h', > > - 'rte_ring_rts_c11_mem.h') > > + 'rte_ring_rts_c11_mem.h', > > + 'rte_ring_peek_sg.h') > > diff --git a/lib/librte_ring/rte_ring_elem.h > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > --- a/lib/librte_ring/rte_ring_elem.h > > +++ b/lib/librte_ring/rte_ring_elem.h > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > void *obj_table, > > > > #ifdef ALLOW_EXPERIMENTAL_API > > #include <rte_ring_peek.h> > > +#include <rte_ring_peek_sg.h> > > #endif > > > > #include <rte_ring.h> > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > b/lib/librte_ring/rte_ring_peek_sg.h > > new file mode 100644 > > index 000000000..97d5764a6 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > @@ -0,0 +1,552 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2020 Arm > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_PEEK_SG_H_ > > +#define _RTE_RING_PEEK_SG_H_ > > + > > +/** > > + * @file > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include <rte_ring_elem.h> instead. > > + * > > + * Ring Peek Scatter Gather APIs > > + * Introduction of rte_ring with scatter gather serialized > > +producer/consumer > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > +API > > + * into 3 phases: > > + * - enqueue/dequeue start > > + * - copy data to/from the ring > > + * - enqueue/dequeue finish > > + * Along with the advantages of the peek APIs, these APIs provide the > > +ability > > + * to avoid copying of the data to temporary area. > > + * > > + * Note that right now this new API is available only for two sync modes: > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > + * It is a user responsibility to create/init ring with appropriate > > +sync > > + * modes selected. > > + * > > + * Example usage: > > + * // read 1 elem from the ring: > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > + * if (n != 0) { > > + * //Copy objects in the ring > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > + * if (n != sgd->n1) > > + * //Second memcpy because of wrapround > > + * n2 = n - sgd->n1; > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > + * rte_ring_dequeue_sg_finish(ring, n); > > It is not clear from the example above why do you need SG(ZC) API. > Existing peek API would be able to handle such situation (just copy will be > done internally). Probably better to use examples you provided in your last > reply to Olivier. Agree, not a good example, will change it. > > > + * } > > + * > > + * Note that between _start_ and _finish_ none other thread can > > + proceed > > + * with enqueue(/dequeue) operation till _finish_ completes. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <rte_ring_peek_c11_mem.h> > > + > > +/* Rock that needs to be passed between reserve and commit APIs */ > > +struct rte_ring_sg_data { > > + /* Pointer to the first space in the ring */ > > + void **ptr1; > > + /* Pointer to the second space in the ring if there is wrap-around */ > > + void **ptr2; > > + /* Number of elements in the first pointer. If this is equal to > > + * the number of elements requested, then ptr2 is NULL. > > + * Otherwise, subtracting n1 from number of elements requested > > + * will give the number of elements available at ptr2. > > + */ > > + unsigned int n1; > > +}; > > I wonder what is the primary goal of that API? > The reason I am asking: from what I understand with this patch ZC API will > work only for ST and HTS modes (same as peek API). > Though, I think it is possible to make it work for any sync model, by changing Agree, the functionality can be extended to other modes as well. I added these 2 modes as I found the use cases for these. > API a bit: instead of returning sg_data to the user, force him to provide > function to read/write elems from/to the ring. > Just a schematic one, to illustrate the idea: > > typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to > update inside the ring*/ > uint32_t num, /* number of elems to update > */ > uint32_t esize, > void *udata /* caller provide data */); > > rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize, > unsigned int n, unsigned int *free_space, write_ring_func_t wf, void > *udata) { > struct rte_ring_sg_data sgd; > ..... > n = move_head_tail(r, ...); > > /* get sgd data based on n */ > get_elem_addr(r, ..., &sgd); > > /* call user defined function to fill reserved elems */ > wf(sgd.p1, sgd.n1, esize, udata); > if (n != n1) > wf(sgd.p2, sgd.n2, esize, udata); > > .... > return n; > } > I think the call back function makes it difficult to use the API. The call back function would be a wrapper around another function or API which will have its own arguments. Now all those parameters have to passed using the 'udata'. For ex: in the 2nd example that I provided earlier, the user has to create a wrapper around 'rte_eth_rx_burst' API and then provide the parameters to 'rte_eth_rx_burst' through 'udata'. 'udata' would need a structure definition as well. > If we want ZC peek API also - some extra work need to be done with > introducing return value for write_ring_func() and checking it properly, but I > don't see any big problems here too. > That way ZC API can support all sync models, plus we don't need to expose > sg_data to the user directly. Other modes can be supported with the method used in this patch as well. If you see a need, I can add them. IMO, only issue with exposing sg_data is ABI compatibility in the future. I think, we can align the 'struct rte_ring_sg_data' to cache line boundary and it should provide ability to extend it in the future without affecting the ABI compatibility. > Also, in future, we probably can de-duplicate the code by making our non-ZC > API to use that one internally (pass ring_enqueue_elems()/ob_table as a > parameters). > > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx = head & r->mask; > > + uint64_t *ring = (uint64_t *)&r[1]; > > + > > + *dst1 = ring + idx; > > + *n1 = num; > > + > > + if (idx + num > r->size) { > > + *n1 = num - (r->size - idx - 1); > > + *dst2 = ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, > > + uint32_t num, void **dst1, uint32_t *n1, void **dst2) { > > + uint32_t idx = head & r->mask; > > + rte_int128_t *ring = (rte_int128_t *)&r[1]; > > + > > + *dst1 = ring + idx; > > + *n1 = num; > > + > > + if (idx + num > r->size) { > > + *n1 = num - (r->size - idx - 1); > > + *dst2 = ring; > > + } > > +} > > + > > +static __rte_always_inline void > > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, > > + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void > > +**dst2) { > > + if (esize == 8) > > + __rte_ring_get_elem_addr_64(r, head, > > + num, dst1, n1, dst2); > > + else if (esize == 16) > > + __rte_ring_get_elem_addr_128(r, head, > > + num, dst1, n1, dst2); > > > I don't think we need that special handling for 8/16B sizes. > In all functions esize is an input parameter. > If user will specify is as a constant - compiler will be able to convert multiply > to shift and add ops. Ok, I will check this out. > > > + else { > > + uint32_t idx, scale, nr_idx; > > + uint32_t *ring = (uint32_t *)&r[1]; > > + > > + /* Normalize to uint32_t */ > > + scale = esize / sizeof(uint32_t); > > + idx = head & r->mask; > > + nr_idx = idx * scale; > > + > > + *dst1 = ring + nr_idx; > > + *n1 = num; > > + > > + if (idx + num > r->size) { > > + *n1 = num - (r->size - idx - 1); > > + *dst2 = ring; > > + } > > + } > > +} > > + > > +/** > > + * @internal This function moves prod head value. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int > esize, > > + uint32_t n, enum rte_ring_queue_behavior behavior, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + uint32_t free, head, next; > > + > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, > > + behavior, &head, &next, &free); > > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd- > >ptr1, > > + &sgd->n1, (void **)&sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, > &free); > > + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd- > >ptr1, > > + &sgd->n1, (void **)&sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + n = 0; > > + free = 0; > > + } > > + > > + if (free_space != NULL) > > + *free_space = free - n; > > + return n; > > +} > > + > > +/** > > + * Start to enqueue several objects on the ring. > > + * Note that no actual objects are put in the queue by this function, > > + * it just reserves space for the user on the ring. > > + * User has to copy objects into the queue using the returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete > > +the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*free_space) { > > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_FIXED, sgd, free_space); } > > + > > +/** > > + * Start to enqueue several pointers to objects on the ring. > > + * Note that no actual pointers are put in the queue by this > > +function, > > + * it just reserves space for the user on the ring. > > + * User has to copy pointers to objects into the queue using the > > + * returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n, > > + sgd, free_space); > > +} > > +/** > > + * Start to enqueue several objects on the ring. > > + * Note that no actual objects are put in the queue by this function, > > + * it just reserves space for the user on the ring. > > + * User has to copy objects into the queue using the returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete > > +the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*free_space) { > > + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_VARIABLE, sgd, free_space); } > > + > > +/** > > + * Start to enqueue several pointers to objects on the ring. > > + * Note that no actual pointers are put in the queue by this > > +function, > > + * it just reserves space for the user on the ring. > > + * User has to copy pointers to objects into the queue using the > > + * returned pointers. > > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the > > + * enqueue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add in the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param free_space > > + * if non-NULL, returns the amount of space in the ring after the > > + * reservation operation has finished. > > + * @return > > + * The number of objects that can be enqueued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *free_space) { > > + return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t), > n, > > + sgd, free_space); > > +} > > + > > +/** > > + * Complete enqueuing several objects on the ring. > > + * Note that number of objects to enqueue should not exceed previous > > + * enqueue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to add to the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) { > > + uint32_t tail; > > + > > + switch (r->prod.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n = __rte_ring_st_get_tail(&r->prod, &tail, n); > > + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); > > + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + } > > +} > > + > > +/** > > + * Complete enqueuing several pointers to objects on the ring. > > + * Note that number of objects to enqueue should not exceed previous > > + * enqueue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of pointers to objects to add to the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) { > > + rte_ring_enqueue_sg_elem_finish(r, n); } > > + > > +/** > > + * @internal This function moves cons head value and copies up to *n* > > + * objects from the ring to the user provided obj_table. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r, > > + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + uint32_t avail, head, next; > > + > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, > > + behavior, &head, &next, &avail); > > + __rte_ring_get_elem_addr(r, head, esize, n, > > + sgd->ptr1, &sgd->n1, sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n = __rte_ring_hts_move_cons_head(r, n, behavior, > > + &head, &avail); > > + __rte_ring_get_elem_addr(r, head, esize, n, > > + sgd->ptr1, &sgd->n1, sgd->ptr2); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + n = 0; > > + avail = 0; > > + } > > + > > + if (available != NULL) > > + *available = avail - n; > > + return n; > > +} > > + > > +/** > > + * Start to dequeue several objects from the ring. > > + * Note that no actual objects are copied from the queue by this function. > > + * User has to copy objects from the queue using the returned pointers. > > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete > > +the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*available) { > > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_FIXED, sgd, available); } > > + > > +/** > > + * Start to dequeue several pointers to objects from the ring. > > + * Note that no actual pointers are removed from the queue by this > function. > > + * User has to copy pointers to objects from the queue using the > > + * returned pointers. > > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t), > > + n, sgd, available); > > +} > > + > > +/** > > + * Start to dequeue several objects from the ring. > > + * Note that no actual objects are copied from the queue by this function. > > + * User has to copy objects from the queue using the returned pointers. > > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete > > +the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param esize > > + * The size of ring element, in bytes. It must be a multiple of 4. > > + * This must be the same value used while creating the ring. Otherwise > > + * the results are undefined. > > + * @param n > > + * The number of objects to dequeue from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int > esize, > > + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int > > +*available) { > > + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, > > + RTE_RING_QUEUE_VARIABLE, sgd, available); } > > + > > +/** > > + * Start to dequeue several pointers to objects from the ring. > > + * Note that no actual pointers are removed from the queue by this > function. > > + * User has to copy pointers to objects from the queue using the > > + * returned pointers. > > + * User should call rte_ring_dequeue_sg_burst_finish to complete the > > + * dequeue operation. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + * @param sgd > > + * The scatter-gather data containing pointers for copying data. > > + * @param available > > + * If non-NULL, returns the number of remaining ring entries after the > > + * dequeue has finished. > > + * @return > > + * The number of objects that can be dequeued, either 0 or n > > + */ > > +__rte_experimental > > +static __rte_always_inline unsigned int > > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n, > > + struct rte_ring_sg_data *sgd, unsigned int *available) { > > + return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t), > n, > > + sgd, available); > > +} > > + > > +/** > > + * Complete dequeuing several objects from the ring. > > + * Note that number of objects to dequeued should not exceed previous > > + * dequeue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) { > > + uint32_t tail; > > + > > + switch (r->cons.sync_type) { > > + case RTE_RING_SYNC_ST: > > + n = __rte_ring_st_get_tail(&r->cons, &tail, n); > > + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); > > + break; > > + case RTE_RING_SYNC_MT_HTS: > > + n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); > > + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); > > + break; > > + case RTE_RING_SYNC_MT: > > + case RTE_RING_SYNC_MT_RTS: > > + default: > > + /* unsupported mode, shouldn't be here */ > > + RTE_ASSERT(0); > > + } > > +} > > + > > +/** > > + * Complete dequeuing several objects from the ring. > > + * Note that number of objects to dequeued should not exceed previous > > + * dequeue_start return value. > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param n > > + * The number of objects to remove from the ring. > > + */ > > +__rte_experimental > > +static __rte_always_inline void > > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) { > > + rte_ring_dequeue_elem_finish(r, n); > > +} > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RING_PEEK_SG_H_ */ > > -- > > 2.17.1
Hi Honnappa, > Hi Konstantin, > Appreciate your feedback. > > <snip> > > > > > > > > Add scatter gather APIs to avoid intermediate memcpy. Use cases that > > > involve copying large amount of data to/from the ring can benefit from > > > these APIs. > > > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > > > --- > > > lib/librte_ring/meson.build | 3 +- > > > lib/librte_ring/rte_ring_elem.h | 1 + > > > lib/librte_ring/rte_ring_peek_sg.h | 552 > > > +++++++++++++++++++++++++++++ > > > 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 > > > lib/librte_ring/rte_ring_peek_sg.h > > > > As a generic one - need to update ring UT both func and perf to > > test/measure this new API. > Yes, will add. > > > > > > > > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build > > > index 31c0b4649..377694713 100644 > > > --- a/lib/librte_ring/meson.build > > > +++ b/lib/librte_ring/meson.build > > > @@ -12,4 +12,5 @@ headers = files('rte_ring.h', > > > 'rte_ring_peek.h', > > > 'rte_ring_peek_c11_mem.h', > > > 'rte_ring_rts.h', > > > - 'rte_ring_rts_c11_mem.h') > > > + 'rte_ring_rts_c11_mem.h', > > > + 'rte_ring_peek_sg.h') > > > diff --git a/lib/librte_ring/rte_ring_elem.h > > > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 > > > --- a/lib/librte_ring/rte_ring_elem.h > > > +++ b/lib/librte_ring/rte_ring_elem.h > > > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, > > > void *obj_table, > > > > > > #ifdef ALLOW_EXPERIMENTAL_API > > > #include <rte_ring_peek.h> > > > +#include <rte_ring_peek_sg.h> > > > #endif > > > > > > #include <rte_ring.h> > > > diff --git a/lib/librte_ring/rte_ring_peek_sg.h > > > b/lib/librte_ring/rte_ring_peek_sg.h > > > new file mode 100644 > > > index 000000000..97d5764a6 > > > --- /dev/null > > > +++ b/lib/librte_ring/rte_ring_peek_sg.h > > > @@ -0,0 +1,552 @@ > > > +/* SPDX-License-Identifier: BSD-3-Clause > > > + * > > > + * Copyright (c) 2020 Arm > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org > > > + * All rights reserved. > > > + * Derived from FreeBSD's bufring.h > > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > > + */ > > > + > > > +#ifndef _RTE_RING_PEEK_SG_H_ > > > +#define _RTE_RING_PEEK_SG_H_ > > > + > > > +/** > > > + * @file > > > + * @b EXPERIMENTAL: this API may change without prior notice > > > + * It is not recommended to include this file directly. > > > + * Please include <rte_ring_elem.h> instead. > > > + * > > > + * Ring Peek Scatter Gather APIs > > > + * Introduction of rte_ring with scatter gather serialized > > > +producer/consumer > > > + * (HTS sync mode) makes it possible to split public enqueue/dequeue > > > +API > > > + * into 3 phases: > > > + * - enqueue/dequeue start > > > + * - copy data to/from the ring > > > + * - enqueue/dequeue finish > > > + * Along with the advantages of the peek APIs, these APIs provide the > > > +ability > > > + * to avoid copying of the data to temporary area. > > > + * > > > + * Note that right now this new API is available only for two sync modes: > > > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) > > > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). > > > + * It is a user responsibility to create/init ring with appropriate > > > +sync > > > + * modes selected. > > > + * > > > + * Example usage: > > > + * // read 1 elem from the ring: > > > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); > > > + * if (n != 0) { > > > + * //Copy objects in the ring > > > + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); > > > + * if (n != sgd->n1) > > > + * //Second memcpy because of wrapround > > > + * n2 = n - sgd->n1; > > > + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); > > > + * rte_ring_dequeue_sg_finish(ring, n); > > > > It is not clear from the example above why do you need SG(ZC) API. > > Existing peek API would be able to handle such situation (just copy will be > > done internally). Probably better to use examples you provided in your last > > reply to Olivier. > Agree, not a good example, will change it. > > > > > > + * } > > > + * > > > + * Note that between _start_ and _finish_ none other thread can > > > + proceed > > > + * with enqueue(/dequeue) operation till _finish_ completes. > > > + */ > > > + > > > +#ifdef __cplusplus > > > +extern "C" { > > > +#endif > > > + > > > +#include <rte_ring_peek_c11_mem.h> > > > + > > > +/* Rock that needs to be passed between reserve and commit APIs */ > > > +struct rte_ring_sg_data { > > > + /* Pointer to the first space in the ring */ > > > + void **ptr1; > > > + /* Pointer to the second space in the ring if there is wrap-around */ > > > + void **ptr2; > > > + /* Number of elements in the first pointer. If this is equal to > > > + * the number of elements requested, then ptr2 is NULL. > > > + * Otherwise, subtracting n1 from number of elements requested > > > + * will give the number of elements available at ptr2. > > > + */ > > > + unsigned int n1; > > > +}; > > > > I wonder what is the primary goal of that API? > > The reason I am asking: from what I understand with this patch ZC API will > > work only for ST and HTS modes (same as peek API). > > Though, I think it is possible to make it work for any sync model, by changing > Agree, the functionality can be extended to other modes as well. I added these 2 modes as I found the use cases for these. > > > API a bit: instead of returning sg_data to the user, force him to provide > > function to read/write elems from/to the ring. > > Just a schematic one, to illustrate the idea: > > > > typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to > > update inside the ring*/ > > uint32_t num, /* number of elems to update > > */ > > uint32_t esize, > > void *udata /* caller provide data */); > > > > rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize, > > unsigned int n, unsigned int *free_space, write_ring_func_t wf, void > > *udata) { > > struct rte_ring_sg_data sgd; > > ..... > > n = move_head_tail(r, ...); > > > > /* get sgd data based on n */ > > get_elem_addr(r, ..., &sgd); > > > > /* call user defined function to fill reserved elems */ > > wf(sgd.p1, sgd.n1, esize, udata); > > if (n != n1) > > wf(sgd.p2, sgd.n2, esize, udata); > > > > .... > > return n; > > } > > > I think the call back function makes it difficult to use the API. The call back function would be a wrapper around another function or API > which will have its own arguments. Now all those parameters have to passed using the 'udata'. For ex: in the 2nd example that I provided > earlier, the user has to create a wrapper around 'rte_eth_rx_burst' API and then provide the parameters to 'rte_eth_rx_burst' through > 'udata'. 'udata' would need a structure definition as well. Yes, it would, though I don't see much problems with that. Let say for eth_rx_burst(), user will need something like struct {uint16_t p, q;} udata = {.p = port_id, .q=queue_id,}; > > > If we want ZC peek API also - some extra work need to be done with > > introducing return value for write_ring_func() and checking it properly, but I > > don't see any big problems here too. > > That way ZC API can support all sync models, plus we don't need to expose > > sg_data to the user directly. > Other modes can be supported with the method used in this patch as well. You mean via exposing to the user tail value (in sg_data or so)? I am still a bit nervous about doing that. > If you see a need, I can add them. Not, really, I just thought callbacks will be a good idea here... > IMO, only issue with exposing sg_data is ABI compatibility in the future. I think, we can align the 'struct rte_ring_sg_data' to cache line > boundary and it should provide ability to extend it in the future without affecting the ABI compatibility. As I understand sg_data is experimental struct (as the rest of API in that file). So breaking it shouldn't be a problem for a while. I suppose to summarize things - as I understand you think callback approach is not a good choice. From other hand, I am not really happy with idea to expose tail values updates to the user. Then I suggest we can just go ahead with that patch as it is: sg_data approach, _ZC_ peek API only. > > > Also, in future, we probably can de-duplicate the code by making our non-ZC > > API to use that one internally (pass ring_enqueue_elems()/ob_table as a > > parameters). > > > > > +
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index 31c0b4649..377694713 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -12,4 +12,5 @@ headers = files('rte_ring.h', 'rte_ring_peek.h', 'rte_ring_peek_c11_mem.h', 'rte_ring_rts.h', - 'rte_ring_rts_c11_mem.h') + 'rte_ring_rts_c11_mem.h', + 'rte_ring_peek_sg.h') diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, #ifdef ALLOW_EXPERIMENTAL_API #include <rte_ring_peek.h> +#include <rte_ring_peek_sg.h> #endif #include <rte_ring.h> diff --git a/lib/librte_ring/rte_ring_peek_sg.h b/lib/librte_ring/rte_ring_peek_sg.h new file mode 100644 index 000000000..97d5764a6 --- /dev/null +++ b/lib/librte_ring/rte_ring_peek_sg.h @@ -0,0 +1,552 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2020 Arm + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_PEEK_SG_H_ +#define _RTE_RING_PEEK_SG_H_ + +/** + * @file + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include <rte_ring_elem.h> instead. + * + * Ring Peek Scatter Gather APIs + * Introduction of rte_ring with scatter gather serialized producer/consumer + * (HTS sync mode) makes it possible to split public enqueue/dequeue API + * into 3 phases: + * - enqueue/dequeue start + * - copy data to/from the ring + * - enqueue/dequeue finish + * Along with the advantages of the peek APIs, these APIs provide the ability + * to avoid copying of the data to temporary area. + * + * Note that right now this new API is available only for two sync modes: + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). + * It is a user responsibility to create/init ring with appropriate sync + * modes selected. + * + * Example usage: + * // read 1 elem from the ring: + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL); + * if (n != 0) { + * //Copy objects in the ring + * memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t)); + * if (n != sgd->n1) + * //Second memcpy because of wrapround + * n2 = n - sgd->n1; + * memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t)); + * rte_ring_dequeue_sg_finish(ring, n); + * } + * + * Note that between _start_ and _finish_ none other thread can proceed + * with enqueue(/dequeue) operation till _finish_ completes. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_peek_c11_mem.h> + +/* Rock that needs to be passed between reserve and commit APIs */ +struct rte_ring_sg_data { + /* Pointer to the first space in the ring */ + void **ptr1; + /* Pointer to the second space in the ring if there is wrap-around */ + void **ptr2; + /* Number of elements in the first pointer. If this is equal to + * the number of elements requested, then ptr2 is NULL. + * Otherwise, subtracting n1 from number of elements requested + * will give the number of elements available at ptr2. + */ + unsigned int n1; +}; + +static __rte_always_inline void +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, + uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + uint32_t idx = head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + + *dst1 = ring + idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } +} + +static __rte_always_inline void +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, + uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + uint32_t idx = head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + + *dst1 = ring + idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } +} + +static __rte_always_inline void +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + if (esize == 8) + __rte_ring_get_elem_addr_64(r, head, + num, dst1, n1, dst2); + else if (esize == 16) + __rte_ring_get_elem_addr_128(r, head, + num, dst1, n1, dst2); + else { + uint32_t idx, scale, nr_idx; + uint32_t *ring = (uint32_t *)&r[1]; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + idx = head & r->mask; + nr_idx = idx * scale; + + *dst1 = ring + nr_idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } + } +} + +/** + * @internal This function moves prod head value. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int esize, + uint32_t n, enum rte_ring_queue_behavior behavior, + struct rte_ring_sg_data *sgd, unsigned int *free_space) +{ + uint32_t free, head, next; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &free); + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, + &sgd->n1, (void **)&sgd->ptr2); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + __rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd->ptr1, + &sgd->n1, (void **)&sgd->ptr2); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + free = 0; + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves space for the user on the ring. + * User has to copy objects into the queue using the returned pointers. + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the + * enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * reservation operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esize, + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, + RTE_RING_QUEUE_FIXED, sgd, free_space); +} + +/** + * Start to enqueue several pointers to objects on the ring. + * Note that no actual pointers are put in the queue by this function, + * it just reserves space for the user on the ring. + * User has to copy pointers to objects into the queue using the + * returned pointers. + * User should call rte_ring_enqueue_sg_bulk_finish to complete the + * enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * reservation operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n, + struct rte_ring_sg_data *sgd, unsigned int *free_space) +{ + return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n, + sgd, free_space); +} +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves space for the user on the ring. + * User has to copy objects into the queue using the returned pointers. + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete the + * enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to add in the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * reservation operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int esize, + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_sg_elem_start(r, esize, n, + RTE_RING_QUEUE_VARIABLE, sgd, free_space); +} + +/** + * Start to enqueue several pointers to objects on the ring. + * Note that no actual pointers are put in the queue by this function, + * it just reserves space for the user on the ring. + * User has to copy pointers to objects into the queue using the + * returned pointers. + * User should call rte_ring_enqueue_sg_bulk_finish to complete the + * enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * reservation operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n, + struct rte_ring_sg_data *sgd, unsigned int *free_space) +{ + return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t), n, + sgd, free_space); +} + +/** + * Complete enqueuing several objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add to the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) +{ + uint32_t tail; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->prod, &tail, n); + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete enqueuing several pointers to objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of pointers to objects to add to the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) +{ + rte_ring_enqueue_sg_elem_finish(r, n); +} + +/** + * @internal This function moves cons head value and copies up to *n* + * objects from the ring to the user provided obj_table. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + struct rte_ring_sg_data *sgd, unsigned int *available) +{ + uint32_t avail, head, next; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &avail); + __rte_ring_get_elem_addr(r, head, esize, n, + sgd->ptr1, &sgd->n1, sgd->ptr2); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_cons_head(r, n, behavior, + &head, &avail); + __rte_ring_get_elem_addr(r, head, esize, n, + sgd->ptr1, &sgd->n1, sgd->ptr2); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + avail = 0; + } + + if (available != NULL) + *available = avail - n; + return n; +} + +/** + * Start to dequeue several objects from the ring. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete the + * dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param n + * The number of objects to remove from the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects that can be dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int esize, + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) +{ + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, + RTE_RING_QUEUE_FIXED, sgd, available); +} + +/** + * Start to dequeue several pointers to objects from the ring. + * Note that no actual pointers are removed from the queue by this function. + * User has to copy pointers to objects from the queue using the + * returned pointers. + * User should call rte_ring_dequeue_sg_bulk_finish to complete the + * dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects that can be dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n, + struct rte_ring_sg_data *sgd, unsigned int *available) +{ + return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t), + n, sgd, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that no actual objects are copied from the queue by this function. + * User has to copy objects from the queue using the returned pointers. + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete the + * dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects that can be dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int esize, + unsigned int n, struct rte_ring_sg_data *sgd, unsigned int *available) +{ + return __rte_ring_do_dequeue_sg_elem_start(r, esize, n, + RTE_RING_QUEUE_VARIABLE, sgd, available); +} + +/** + * Start to dequeue several pointers to objects from the ring. + * Note that no actual pointers are removed from the queue by this function. + * User has to copy pointers to objects from the queue using the + * returned pointers. + * User should call rte_ring_dequeue_sg_burst_finish to complete the + * dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + * @param sgd + * The scatter-gather data containing pointers for copying data. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects that can be dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n, + struct rte_ring_sg_data *sgd, unsigned int *available) +{ + return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t), n, + sgd, available); +} + +/** + * Complete dequeuing several objects from the ring. + * Note that number of objects to dequeued should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) +{ + uint32_t tail; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->cons, &tail, n); + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete dequeuing several objects from the ring. + * Note that number of objects to dequeued should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) +{ + rte_ring_dequeue_elem_finish(r, n); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_PEEK_SG_H_ */
Add scatter gather APIs to avoid intermediate memcpy. Use cases that involve copying large amount of data to/from the ring can benefit from these APIs. Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> --- lib/librte_ring/meson.build | 3 +- lib/librte_ring/rte_ring_elem.h | 1 + lib/librte_ring/rte_ring_peek_sg.h | 552 +++++++++++++++++++++++++++++ 3 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 lib/librte_ring/rte_ring_peek_sg.h -- 2.17.1