@@ -120,11 +120,6 @@ static inline odp_buffer_t buf_from_buf_hdr(odp_buffer_hdr_t *hdr)
return (odp_buffer_t)hdr;
}
-static inline odp_event_t event_from_buf_hdr(odp_buffer_hdr_t *hdr)
-{
- return (odp_event_t)hdr;
-}
-
#ifdef __cplusplus
}
#endif
@@ -44,6 +44,7 @@ struct queue_entry_s {
queue_deq_fn_t dequeue;
queue_enq_multi_fn_t enqueue_multi;
queue_deq_multi_fn_t dequeue_multi;
+ queue_deq_multi_fn_t orig_dequeue_multi;
uint32_t index;
odp_queue_t handle;
@@ -46,15 +46,15 @@ typedef struct {
queue_term_global_fn_t term_global;
queue_init_local_fn_t init_local;
queue_term_local_fn_t term_local;
- queue_enq_fn_t enq;
- queue_enq_multi_fn_t enq_multi;
- queue_deq_fn_t deq;
- queue_deq_multi_fn_t deq_multi;
queue_get_pktout_fn_t get_pktout;
queue_set_pktout_fn_t set_pktout;
queue_get_pktin_fn_t get_pktin;
queue_set_pktin_fn_t set_pktin;
queue_set_enq_deq_fn_t set_enq_deq_fn;
+
+ /* Original queue dequeue multi function (before override). May be used
+ * by an overriding dequeue function. */
+ queue_deq_multi_fn_t orig_deq_multi;
} queue_fn_t;
extern const queue_fn_t *queue_fn;
@@ -41,6 +41,7 @@ struct queue_entry_s {
queue_deq_fn_t dequeue;
queue_enq_multi_fn_t enqueue_multi;
queue_deq_multi_fn_t dequeue_multi;
+ queue_deq_multi_fn_t orig_dequeue_multi;
uint32_t index;
odp_queue_t handle;
@@ -20,6 +20,7 @@
#include <odp/api/plat/packet_inlines.h>
#include <odp/api/plat/thread_inlines.h>
#include <odp_packet_internal.h>
+#include <odp/api/plat/queue_inlines.h>
/* Inlined API functions */
#include <odp/api/plat/event_inlines.h>
@@ -20,6 +20,7 @@
#include <odp/api/plat/packet_inlines.h>
#include <odp/api/plat/thread_inlines.h>
#include <odp_packet_internal.h>
+#include <odp/api/plat/queue_inlines.h>
/* Inlined API functions */
#include <odp/api/plat/event_inlines.h>
@@ -18,6 +18,7 @@
#include <odp_debug_internal.h>
#include <odp_packet_internal.h>
#include <odp_ipsec_internal.h>
+#include <odp/api/plat/queue_inlines.h>
#include <protocols/eth.h>
#include <protocols/ip.h>
@@ -17,6 +17,7 @@
/* Inlined API functions */
#include <odp/api/plat/event_inlines.h>
+#include <odp/api/plat/queue_inlines.h>
typedef struct {
/* common buffer header */
@@ -28,6 +28,7 @@
#include <odp/api/time.h>
#include <odp/api/plat/time_inlines.h>
#include <odp_pcapng.h>
+#include <odp/api/plat/queue_inlines.h>
#include <string.h>
#include <inttypes.h>
@@ -625,7 +626,8 @@ static inline int pktin_recv_buf(pktio_entry_t *entry, int pktin_index,
if (pkt_hdr->p.input_flags.dst_queue) {
int ret;
- ret = queue_fn->enq(pkt_hdr->dst_queue, buf_hdr);
+ ret = odp_queue_enq(pkt_hdr->dst_queue,
+ odp_packet_to_event(pkt));
if (ret < 0)
odp_packet_free(pkt);
continue;
@@ -675,8 +677,7 @@ static odp_buffer_hdr_t *pktin_dequeue(odp_queue_t queue)
int pktin_index = pktin_queue.index;
pktio_entry_t *entry = get_pktio_entry(pktio);
- buf_hdr = queue_fn->deq(queue);
- if (buf_hdr != NULL)
+ if (queue_fn->orig_deq_multi(queue, &buf_hdr, 1) == 1)
return buf_hdr;
pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
@@ -688,7 +689,8 @@ static odp_buffer_hdr_t *pktin_dequeue(odp_queue_t queue)
int num_enq;
int num = pkts - 1;
- num_enq = queue_fn->enq_multi(queue, &hdr_tbl[1], num);
+ num_enq = odp_queue_enq_multi(queue,
+ (odp_event_t *)&hdr_tbl[1], num);
if (odp_unlikely(num_enq < num)) {
if (odp_unlikely(num_enq < 0))
@@ -715,7 +717,7 @@ static int pktin_deq_multi(odp_queue_t queue, odp_buffer_hdr_t *buf_hdr[],
int pktin_index = pktin_queue.index;
pktio_entry_t *entry = get_pktio_entry(pktio);
- nbr = queue_fn->deq_multi(queue, buf_hdr, num);
+ nbr = queue_fn->orig_deq_multi(queue, buf_hdr, num);
if (odp_unlikely(nbr > num))
ODP_ABORT("queue_deq_multi req: %d, returned %d\n", num, nbr);
@@ -740,7 +742,7 @@ static int pktin_deq_multi(odp_queue_t queue, odp_buffer_hdr_t *buf_hdr[],
if (j) {
int num_enq;
- num_enq = queue_fn->enq_multi(queue, hdr_tbl, j);
+ num_enq = odp_queue_enq_multi(queue, (odp_event_t *)hdr_tbl, j);
if (odp_unlikely(num_enq < j)) {
if (odp_unlikely(num_enq < 0))
@@ -785,9 +787,14 @@ int sched_cb_pktin_poll_one(int pktio_index,
pkt = packets[i];
pkt_hdr = packet_hdr(pkt);
if (odp_unlikely(pkt_hdr->p.input_flags.dst_queue)) {
+ int num_enq;
+
queue = pkt_hdr->dst_queue;
buf_hdr = packet_to_buf_hdr(pkt);
- if (queue_fn->enq_multi(queue, &buf_hdr, 1) < 0) {
+ num_enq = odp_queue_enq_multi(queue,
+ (odp_event_t *)&buf_hdr,
+ 1);
+ if (num_enq < 0) {
/* Queue full? */
odp_packet_free(pkt);
__atomic_fetch_add(&entry->s.stats.in_discards,
@@ -851,7 +858,8 @@ int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[])
}
queue = entry->s.in_queue[index[idx]].queue;
- num_enq = queue_fn->enq_multi(queue, hdr_tbl, num);
+ num_enq = odp_queue_enq_multi(queue,
+ (odp_event_t *)hdr_tbl, num);
if (odp_unlikely(num_enq < num)) {
if (odp_unlikely(num_enq < 0))
@@ -1371,41 +1379,6 @@ int odp_pktio_stats_reset(odp_pktio_t pktio)
return ret;
}
-static int abort_pktin_enqueue(odp_queue_t queue, odp_buffer_hdr_t *buf_hdr)
-{
- (void)queue;
- (void)buf_hdr;
- ODP_ABORT("attempted enqueue to a pktin queue");
- return -1;
-}
-
-static int abort_pktin_enq_multi(odp_queue_t queue,
- odp_buffer_hdr_t *buf_hdr[], int num)
-{
- (void)queue;
- (void)buf_hdr;
- (void)num;
- ODP_ABORT("attempted enqueue to a pktin queue");
- return 0;
-}
-
-static odp_buffer_hdr_t *abort_pktout_dequeue(odp_queue_t queue)
-{
- (void)queue;
- ODP_ABORT("attempted dequeue from a pktout queue");
- return NULL;
-}
-
-static int abort_pktout_deq_multi(odp_queue_t queue,
- odp_buffer_hdr_t *buf_hdr[], int num)
-{
- (void)queue;
- (void)buf_hdr;
- (void)num;
- ODP_ABORT("attempted dequeue from a pktout queue");
- return 0;
-}
-
int odp_pktin_queue_config(odp_pktio_t pktio,
const odp_pktin_queue_param_t *param)
{
@@ -1498,8 +1471,8 @@ int odp_pktin_queue_config(odp_pktio_t pktio,
if (mode == ODP_PKTIN_MODE_QUEUE) {
queue_fn->set_pktin(queue, pktio, i);
queue_fn->set_enq_deq_fn(queue,
- abort_pktin_enqueue,
- abort_pktin_enq_multi,
+ NULL,
+ NULL,
pktin_dequeue,
pktin_deq_multi);
}
@@ -1624,8 +1597,8 @@ int odp_pktout_queue_config(odp_pktio_t pktio,
queue_fn->set_enq_deq_fn(queue,
pktout_enqueue,
pktout_enq_multi,
- abort_pktout_dequeue,
- abort_pktout_deq_multi);
+ NULL,
+ NULL);
entry->s.out_queue[i].queue = queue;
}
@@ -305,9 +305,9 @@ static odp_queue_t queue_create(const char *name,
if (!queue->s.spsc &&
param->nonblocking == ODP_NONBLOCKING_LF) {
- queue_lf_func_t *lf_func;
+ queue_lf_func_t *lf_fn;
- lf_func = &queue_glb->queue_lf_func;
+ lf_fn = &queue_glb->queue_lf_func;
queue_lf = queue_lf_create(queue);
@@ -317,10 +317,11 @@ static odp_queue_t queue_create(const char *name,
}
queue->s.queue_lf = queue_lf;
- queue->s.enqueue = lf_func->enq;
- queue->s.enqueue_multi = lf_func->enq_multi;
- queue->s.dequeue = lf_func->deq;
- queue->s.dequeue_multi = lf_func->deq_multi;
+ queue->s.enqueue = lf_fn->enq;
+ queue->s.enqueue_multi = lf_fn->enq_multi;
+ queue->s.dequeue = lf_fn->deq;
+ queue->s.dequeue_multi = lf_fn->deq_multi;
+ queue->s.orig_dequeue_multi = lf_fn->deq_multi;
}
type = queue->s.type;
@@ -697,6 +698,8 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.enqueue_multi = queue_int_enq_multi;
queue->s.dequeue_multi = queue_int_deq_multi;
+ queue->s.orig_dequeue_multi = queue_int_deq_multi;
+
ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
queue_size);
}
@@ -883,6 +886,14 @@ static void queue_set_enq_deq_func(odp_queue_t handle,
qentry->s.dequeue_multi = deq_multi;
}
+static int queue_orig_multi(odp_queue_t handle,
+ odp_buffer_hdr_t **buf_hdr, int num)
+{
+ queue_entry_t *queue = qentry_from_handle(handle);
+
+ return queue->s.orig_dequeue_multi(handle, buf_hdr, num);
+}
+
/* API functions */
_odp_queue_api_fn_t queue_basic_api = {
.queue_create = queue_create,
@@ -910,13 +921,10 @@ queue_fn_t queue_basic_fn = {
.term_global = queue_term_global,
.init_local = queue_init_local,
.term_local = queue_term_local,
- .enq = queue_int_enq,
- .enq_multi = queue_int_enq_multi,
- .deq = queue_int_deq,
- .deq_multi = queue_int_deq_multi,
.get_pktout = queue_get_pktout,
.set_pktout = queue_set_pktout,
.get_pktin = queue_get_pktin,
.set_pktin = queue_set_pktin,
- .set_enq_deq_fn = queue_set_enq_deq_func
+ .set_enq_deq_fn = queue_set_enq_deq_func,
+ .orig_deq_multi = queue_orig_multi
};
@@ -128,6 +128,7 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.dequeue = _queue_deq;
queue->s.enqueue_multi = _queue_enq_multi;
queue->s.dequeue_multi = _queue_deq_multi;
+ queue->s.orig_dequeue_multi = _queue_deq_multi;
queue->s.pktin = PKTIN_INVALID;
sched_elem->node.next = NULL;
@@ -949,6 +950,13 @@ static void queue_set_enq_deq_func(odp_queue_t handle,
qentry_from_int(handle)->s.dequeue_multi = deq_multi;
}
+static int queue_orig_multi(odp_queue_t handle,
+ odp_buffer_hdr_t **buf_hdr, int num)
+{
+ return qentry_from_int(handle)->s.orig_dequeue_multi(handle,
+ buf_hdr, num);
+}
+
/* API functions */
_odp_queue_api_fn_t queue_scalable_api = {
.queue_create = queue_create,
@@ -976,13 +984,10 @@ queue_fn_t queue_scalable_fn = {
.term_global = queue_term_global,
.init_local = queue_init_local,
.term_local = queue_term_local,
- .enq = _queue_enq,
- .enq_multi = _queue_enq_multi,
- .deq = _queue_deq,
- .deq_multi = _queue_deq_multi,
.get_pktout = queue_get_pktout,
.set_pktout = queue_set_pktout,
.get_pktin = queue_get_pktin,
.set_pktin = queue_set_pktin,
- .set_enq_deq_fn = queue_set_enq_deq_func
+ .set_enq_deq_fn = queue_set_enq_deq_func,
+ .orig_deq_multi = queue_orig_multi
};
@@ -123,6 +123,7 @@ void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
queue->s.dequeue = queue_spsc_deq;
queue->s.enqueue_multi = queue_spsc_enq_multi;
queue->s.dequeue_multi = queue_spsc_deq_multi;
+ queue->s.orig_dequeue_multi = queue_spsc_deq_multi;
offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
@@ -28,6 +28,7 @@
#include <odp_timer_internal.h>
#include <odp_queue_basic_internal.h>
#include <odp_libconfig_internal.h>
+#include <odp/api/plat/queue_inlines.h>
/* Number of priority levels */
#define NUM_PRIO 8
@@ -669,7 +670,8 @@ static inline void ordered_stash_release(void)
buf_hdr = sched_local.ordered.stash[i].buf_hdr;
num = sched_local.ordered.stash[i].num;
- num_enq = queue_fn->enq_multi(queue, buf_hdr, num);
+ num_enq = odp_queue_enq_multi(queue,
+ (odp_event_t *)buf_hdr, num);
/* Drop packets that were not enqueued */
if (odp_unlikely(num_enq < num)) {
@@ -839,7 +841,7 @@ static inline int poll_pktin(uint32_t qi, int direct_recv,
q_int = qentry_from_index(qi);
- ret = queue_fn->enq_multi(q_int, b_hdr, num);
+ ret = odp_queue_enq_multi(q_int, (odp_event_t *)b_hdr, num);
/* Drop packets that were not enqueued */
if (odp_unlikely(ret < num)) {
@@ -26,6 +26,7 @@
#include <odp_config_internal.h>
#include <odp_timer_internal.h>
#include <odp_queue_basic_internal.h>
+#include <odp/api/plat/queue_inlines.h>
/* Number of priority levels */
#define NUM_SCHED_PRIO 8
@@ -1140,7 +1141,8 @@ static inline void ordered_stash_release(void)
buf_hdr = thread_local.ordered.stash[i].buf_hdr;
num = thread_local.ordered.stash[i].num;
- num_enq = queue_fn->enq_multi(queue, buf_hdr, num);
+ num_enq = odp_queue_enq_multi(queue,
+ (odp_event_t *)buf_hdr, num);
if (odp_unlikely(num_enq < num)) {
if (odp_unlikely(num_enq < 0))
@@ -56,6 +56,7 @@
#include <odp/api/plat/time_inlines.h>
#include <odp/api/timer.h>
#include <odp_timer_internal.h>
+#include <odp/api/plat/queue_inlines.h>
/* Inlined API functions */
#include <odp/api/plat/event_inlines.h>
@@ -18,6 +18,7 @@
#include <odp/api/hints.h>
#include <odp/api/plat/byteorder_inlines.h>
#include <odp_queue_if.h>
+#include <odp/api/plat/queue_inlines.h>
#include <protocols/eth.h>
#include <protocols/ip.h>
@@ -107,7 +108,7 @@ static int loopback_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
odp_ticketlock_lock(&pktio_entry->s.rxl);
queue = pkt_priv(pktio_entry)->loopq;
- nbr = queue_fn->deq_multi(queue, hdr_tbl, num);
+ nbr = odp_queue_deq_multi(queue, (odp_event_t *)hdr_tbl, num);
if (pktio_entry->s.config.pktin.bit.ts_all ||
pktio_entry->s.config.pktin.bit.ts_ptp) {
@@ -325,7 +326,7 @@ static int loopback_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED,
odp_ticketlock_lock(&pktio_entry->s.txl);
queue = pkt_priv(pktio_entry)->loopq;
- ret = queue_fn->enq_multi(queue, hdr_tbl, nb_tx);
+ ret = odp_queue_enq_multi(queue, (odp_event_t *)hdr_tbl, nb_tx);
if (ret > 0) {
pktio_entry->s.stats.out_ucast_pkts += ret;