@@ -28,6 +28,11 @@ static inline odp_buffer_t buf_from_buf_hdr(odp_buffer_hdr_t *hdr)
return (odp_buffer_t)hdr;
}
+static inline odp_event_t event_from_buf_hdr(odp_buffer_hdr_t *hdr)
+{
+ return (odp_event_t)hdr;
+}
+
#ifdef __cplusplus
}
#endif
@@ -30,6 +30,7 @@ extern "C" {
#include <odp/api/ticketlock.h>
#include <odp_config_internal.h>
#include <odp_ring_st_internal.h>
+#include <odp_queue_lf.h>
#define QUEUE_STATUS_FREE 0
#define QUEUE_STATUS_DESTROYED 1
@@ -62,6 +63,27 @@ union queue_entry_u {
uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
};
+typedef struct ODP_ALIGNED_CACHE {
+ /* Storage space for ring data */
+ uint32_t data[CONFIG_QUEUE_SIZE];
+} queue_ring_data_t;
+
+typedef struct queue_global_t {
+ queue_entry_t queue[ODP_CONFIG_QUEUES];
+ queue_ring_data_t ring_data[ODP_CONFIG_QUEUES];
+ uint32_t queue_lf_num;
+ uint32_t queue_lf_size;
+ queue_lf_func_t queue_lf_func;
+
+} queue_global_t;
+
+extern queue_global_t *queue_glb;
+
+static inline queue_t queue_index_to_qint(uint32_t queue_id)
+{
+ return (queue_t)&queue_glb->queue[queue_id];
+}
+
static inline uint32_t queue_to_index(odp_queue_t handle)
{
return _odp_typeval(handle) - 1;
@@ -12,7 +12,6 @@ extern "C" {
#endif
#include <odp_queue_if.h>
-#include <odp_queue_internal.h>
/* Lock-free queue functions */
typedef struct {
@@ -77,7 +77,9 @@ typedef struct schedule_fn_t {
extern const schedule_fn_t *sched_fn;
/* Interface for the scheduler */
-int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
+int sched_cb_pktin_poll(int pktio_index, int pktin_index,
+ odp_buffer_hdr_t *hdr_tbl[], int num);
+int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[]);
int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]);
void sched_cb_pktio_stop_finalize(int pktio_index);
odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
@@ -559,7 +559,7 @@ odp_pktio_t odp_pktio_lookup(const char *name)
return hdl;
}
-static inline int pktin_recv_buf(odp_pktin_queue_t queue,
+static inline int pktin_recv_buf(pktio_entry_t *entry, int pktin_index,
odp_buffer_hdr_t *buffer_hdrs[], int num)
{
odp_packet_t pkt;
@@ -570,7 +570,7 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue,
int pkts;
int num_rx = 0;
- pkts = odp_pktin_recv(queue, packets, num);
+ pkts = entry->s.ops->recv(entry, pktin_index, packets, num);
for (i = 0; i < pkts; i++) {
pkt = packets[i];
@@ -624,13 +624,16 @@ static odp_buffer_hdr_t *pktin_dequeue(queue_t q_int)
odp_buffer_hdr_t *buf_hdr;
odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
int pkts;
+ odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int);
+ odp_pktio_t pktio = pktin_queue.pktio;
+ int pktin_index = pktin_queue.index;
+ pktio_entry_t *entry = get_pktio_entry(pktio);
buf_hdr = queue_fn->deq(q_int);
if (buf_hdr != NULL)
return buf_hdr;
- pkts = pktin_recv_buf(queue_fn->get_pktin(q_int),
- hdr_tbl, QUEUE_MULTI_MAX);
+ pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
if (pkts <= 0)
return NULL;
@@ -646,6 +649,10 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num)
int nbr;
odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
int pkts, i, j;
+ odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int);
+ odp_pktio_t pktio = pktin_queue.pktio;
+ int pktin_index = pktin_queue.index;
+ pktio_entry_t *entry = get_pktio_entry(pktio);
nbr = queue_fn->deq_multi(q_int, buf_hdr, num);
if (odp_unlikely(nbr > num))
@@ -657,8 +664,8 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num)
if (nbr == num)
return nbr;
- pkts = pktin_recv_buf(queue_fn->get_pktin(q_int),
- hdr_tbl, QUEUE_MULTI_MAX);
+ pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
+
if (pkts <= 0)
return nbr;
@@ -720,12 +727,29 @@ int sched_cb_pktin_poll_one(int pktio_index,
return num_rx;
}
-int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
+int sched_cb_pktin_poll(int pktio_index, int pktin_index,
+ odp_buffer_hdr_t *hdr_tbl[], int num)
+{
+ pktio_entry_t *entry = pktio_entry_by_index(pktio_index);
+ int state = entry->s.state;
+
+ if (odp_unlikely(state != PKTIO_STATE_STARTED)) {
+ if (state < PKTIO_STATE_ACTIVE ||
+ state == PKTIO_STATE_STOP_PENDING)
+ return -1;
+
+ ODP_DBG("interface not started\n");
+ return 0;
+ }
+
+ return pktin_recv_buf(entry, pktin_index, hdr_tbl, num);
+}
+
+int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[])
{
odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
int num, idx;
- pktio_entry_t *entry;
- entry = pktio_entry_by_index(pktio_index);
+ pktio_entry_t *entry = pktio_entry_by_index(pktio_index);
int state = entry->s.state;
if (odp_unlikely(state != PKTIO_STATE_STARTED)) {
@@ -739,9 +763,9 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
for (idx = 0; idx < num_queue; idx++) {
queue_t q_int;
- odp_pktin_queue_t pktin = entry->s.in_queue[index[idx]].pktin;
- num = pktin_recv_buf(pktin, hdr_tbl, QUEUE_MULTI_MAX);
+ num = pktin_recv_buf(entry, index[idx], hdr_tbl,
+ QUEUE_MULTI_MAX);
if (num == 0)
continue;
@@ -8,7 +8,6 @@
#include <odp/api/queue.h>
#include <odp_queue_internal.h>
-#include <odp_queue_lf.h>
#include <odp_queue_if.h>
#include <odp/api/std_types.h>
#include <odp/api/align.h>
@@ -40,21 +39,7 @@
static int queue_init(queue_entry_t *queue, const char *name,
const odp_queue_param_t *param);
-typedef struct ODP_ALIGNED_CACHE {
- /* Storage space for ring data */
- uint32_t data[CONFIG_QUEUE_SIZE];
-} ring_data_t;
-
-typedef struct queue_global_t {
- queue_entry_t queue[ODP_CONFIG_QUEUES];
- ring_data_t ring_data[ODP_CONFIG_QUEUES];
- uint32_t queue_lf_num;
- uint32_t queue_lf_size;
- queue_lf_func_t queue_lf_func;
-
-} queue_global_t;
-
-static queue_global_t *queue_glb;
+queue_global_t *queue_glb;
static inline queue_entry_t *get_qentry(uint32_t queue_id)
{
@@ -7,7 +7,7 @@
#include <odp/api/queue.h>
#include <odp/api/atomic.h>
#include <odp/api/shared_memory.h>
-#include <odp_queue_lf.h>
+#include <odp_queue_internal.h>
#include <string.h>
#include <stdio.h>
@@ -26,6 +26,7 @@
#include <odp_ring_internal.h>
#include <odp_timer_internal.h>
#include <odp_queue_internal.h>
+#include <odp_buffer_inlines.h>
/* Number of priority levels */
#define NUM_PRIO 8
@@ -699,14 +700,20 @@ static inline int queue_is_pktin(uint32_t queue_index)
return sched->queue[queue_index].poll_pktin;
}
-static inline int poll_pktin(uint32_t qi)
+static inline int poll_pktin(uint32_t qi, int atomic)
{
- int pktio_index, pktin_index, num, num_pktin;
+ odp_buffer_hdr_t *b_hdr[MAX_DEQ];
+ int pktio_index, pktin_index, num, num_pktin, i;
+ int ret;
+ queue_t qint;
pktio_index = sched->queue[qi].pktio_index;
pktin_index = sched->queue[qi].pktin_index;
- num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index);
+ num = sched_cb_pktin_poll(pktio_index, pktin_index, b_hdr, MAX_DEQ);
+
+ if (num == 0)
+ return 0;
/* Pktio stopped or closed. Call stop_finalize when we have stopped
* polling all pktin queues of the pktio. */
@@ -720,9 +727,32 @@ static inline int poll_pktin(uint32_t qi)
if (num_pktin == 0)
sched_cb_pktio_stop_finalize(pktio_index);
+
+ return num;
}
- return num;
+ if (atomic) {
+ for (i = 0; i < num; i++)
+ sched_local.ev_stash[i] = event_from_buf_hdr(b_hdr[i]);
+
+ return num;
+ }
+
+ qint = queue_index_to_qint(qi);
+
+ ret = queue_fn->enq_multi(qint, b_hdr, num);
+
+ /* Drop packets that were not enqueued */
+ if (odp_unlikely(ret < num)) {
+ int num_enq = ret;
+
+ if (odp_unlikely(ret < 0))
+ num_enq = 0;
+
+ buffer_free_multi(&b_hdr[num_enq], num - num_enq);
+ }
+
+ return ret;
}
static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
@@ -805,17 +835,26 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
* priorities. Stop scheduling queue when pktio
* has been stopped. */
if (pktin) {
- int num_pkt = poll_pktin(qi);
+ int atomic = queue_is_atomic(qi);
+ int num_pkt = poll_pktin(qi, atomic);
- if (odp_likely(num_pkt >= 0)) {
+ if (odp_unlikely(num_pkt < 0))
+ continue;
+
+ if (num_pkt == 0 || !atomic) {
ring_enq(ring, RING_MASK, qi);
break;
}
- }
- /* Remove empty queue from scheduling. Continue
- * scheduling the same priority queue. */
- continue;
+ /* Process packets from an atomic queue
+ * right away */
+ num = num_pkt;
+ } else {
+ /* Remove empty queue from scheduling.
+ * Continue scheduling the same priority
+ * queue. */
+ continue;
+ }
}
handle = queue_from_index(qi);
@@ -674,9 +674,9 @@ static inline void pktio_poll_input(void)
cmd = &sched->pktio_poll.commands[index];
/* Poll packet input */
- if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,
- cmd->count,
- cmd->pktin))) {
+ if (odp_unlikely(sched_cb_pktin_poll_old(cmd->pktio,
+ cmd->count,
+ cmd->pktin))) {
/* Pktio stopped or closed. Remove poll
* command and call stop_finalize when all
* commands of the pktio has been removed.
@@ -524,8 +524,9 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait,
cmd = sched_cmd();
if (cmd && cmd->s.type == CMD_PKTIO) {
- if (sched_cb_pktin_poll(cmd->s.index, cmd->s.num_pktin,
- cmd->s.pktin_idx)) {
+ if (sched_cb_pktin_poll_old(cmd->s.index,
+ cmd->s.num_pktin,
+ cmd->s.pktin_idx)) {
/* Pktio stopped or closed. */
sched_cb_pktio_stop_finalize(cmd->s.index);
} else {