Discussion:
[PATCH 3/3] xen-netback: reintroduce guest Rx stall detection
(too old to reply)
David Vrabel
2014-10-22 13:08:55 UTC
Permalink
If a frontend not receiving packets it is useful to detect this and
turn off the carrier so packets are dropped early instead of being
queued and drained when they expire.

A to-guest queue is stalled if it doesn't have enough free slots for a
an extended period of time (default 60 s).

If at least one queue is stalled, the carrier is turned off (in the
expectation that the other queues will soon stall as well). The
carrier is only turned on once all queues are ready.

When the frontend connects, all the queues start in the stalled state
and only become ready once the frontend queues enough Rx requests.

Signed-off-by: David Vrabel <***@citrix.com>
---
drivers/net/xen-netback/common.h | 5 +++
drivers/net/xen-netback/interface.c | 5 ++-
drivers/net/xen-netback/netback.c | 76 +++++++++++++++++++++++++++++++++++
drivers/net/xen-netback/xenbus.c | 1 +
4 files changed, 86 insertions(+), 1 deletion(-)

diff --git a/drivers/net/xen-netback/common.h b/drivers/net/xen-netback/common.h
index c264240..083ecc9 100644
--- a/drivers/net/xen-netback/common.h
+++ b/drivers/net/xen-netback/common.h
@@ -179,6 +179,8 @@ struct xenvif_queue { /* Per-queue data for xenvif */

unsigned int rx_queue_max;
unsigned int rx_queue_len;
+ unsigned long last_rx_time;
+ bool stalled;

struct gnttab_copy grant_copy_op[MAX_GRANT_COPY_OPS];

@@ -232,6 +234,9 @@ struct xenvif {
/* Queues */
struct xenvif_queue *queues;
unsigned int num_queues; /* active queues, resource allocated */
+ unsigned int stalled_queues;
+
+ spinlock_t lock;

#ifdef CONFIG_DEBUG_FS
struct dentry *xenvif_dbg_root;
diff --git a/drivers/net/xen-netback/interface.c b/drivers/net/xen-netback/interface.c
index a134d52..895fe84 100644
--- a/drivers/net/xen-netback/interface.c
+++ b/drivers/net/xen-netback/interface.c
@@ -419,6 +419,8 @@ struct xenvif *xenvif_alloc(struct device *parent, domid_t domid,
vif->queues = NULL;
vif->num_queues = 0;

+ spin_lock_init(&vif->lock);
+
dev->netdev_ops = &xenvif_netdev_ops;
dev->hw_features = NETIF_F_SG |
NETIF_F_IP_CSUM | NETIF_F_IPV6_CSUM |
@@ -505,7 +507,6 @@ void xenvif_carrier_on(struct xenvif *vif)
dev_set_mtu(vif->dev, ETH_DATA_LEN);
netdev_update_features(vif->dev);
set_bit(VIF_STATUS_CONNECTED, &vif->status);
- netif_carrier_on(vif->dev);
if (netif_running(vif->dev))
xenvif_up(vif);
rtnl_unlock();
@@ -565,6 +566,8 @@ int xenvif_connect(struct xenvif_queue *queue, unsigned long tx_ring_ref,
disable_irq(queue->rx_irq);
}

+ queue->stalled = true;
+
task = kthread_create(xenvif_kthread_guest_rx,
(void *)queue, "%s-guest-rx", queue->name);
if (IS_ERR(task)) {
diff --git a/drivers/net/xen-netback/netback.c b/drivers/net/xen-netback/netback.c
index 15893dc..25f4c06 100644
--- a/drivers/net/xen-netback/netback.c
+++ b/drivers/net/xen-netback/netback.c
@@ -62,6 +62,13 @@ unsigned int rx_drain_timeout_msecs = 10000;
module_param(rx_drain_timeout_msecs, uint, 0444);
unsigned int rx_drain_timeout_jiffies;

+/* The length of time before the frontend is considered unresponsive
+ * because it isn't providing Rx slots.
+ */
+static unsigned int rx_stall_timeout_msecs = 60000;
+module_param(rx_stall_timeout_msecs, uint, 0444);
+static unsigned int rx_stall_timeout_jiffies;
+
unsigned int xenvif_max_queues;
module_param_named(max_queues, xenvif_max_queues, uint, 0644);
MODULE_PARM_DESC(max_queues,
@@ -649,6 +656,8 @@ static void xenvif_rx_action(struct xenvif_queue *queue)
RING_IDX ring_slots_used;
int i;

+ queue->last_rx_time = jiffies;
+
/* We need a cheap worse case estimate for the number of
* slots we'll use.
*/
@@ -1972,10 +1981,67 @@ err:
return err;
}

+static void xenvif_queue_carrier_off(struct xenvif_queue *queue)
+{
+ struct xenvif *vif = queue->vif;
+
+ queue->stalled = true;
+
+ /* At least one queue has stalled? Disable the carrier. */
+ spin_lock(&vif->lock);
+ if (vif->stalled_queues++ == 0) {
+ netdev_info(vif->dev, "Guest Rx stalled");
+ netif_carrier_off(vif->dev);
+ }
+ spin_unlock(&vif->lock);
+}
+
+static void xenvif_queue_carrier_on(struct xenvif_queue *queue)
+{
+ struct xenvif *vif = queue->vif;
+
+ queue->last_rx_time = jiffies; /* Reset Rx stall detection. */
+ queue->stalled = false;
+
+ /* All queues are ready? Enable the carrier. */
+ spin_lock(&vif->lock);
+ if (--vif->stalled_queues == 0) {
+ netdev_info(vif->dev, "Guest Rx ready");
+ netif_carrier_on(vif->dev);
+ }
+ spin_unlock(&vif->lock);
+}
+
+static bool xenvif_rx_queue_stalled(struct xenvif_queue *queue)
+{
+ RING_IDX prod, cons;
+
+ prod = queue->rx.sring->req_prod;
+ cons = queue->rx.req_cons;
+
+ return !queue->stalled
+ && prod - cons < XEN_NETBK_RX_SLOTS_MAX
+ && time_after(jiffies,
+ queue->last_rx_time + rx_stall_timeout_jiffies);
+}
+
+static bool xenvif_rx_queue_ready(struct xenvif_queue *queue)
+{
+ RING_IDX prod, cons;
+
+ prod = queue->rx.sring->req_prod;
+ cons = queue->rx.req_cons;
+
+ return queue->stalled
+ && prod - cons >= XEN_NETBK_RX_SLOTS_MAX;
+}
+
static bool xenvif_have_rx_work(struct xenvif_queue *queue)
{
return (!skb_queue_empty(&queue->rx_queue)
&& xenvif_rx_ring_slots_available(queue, XEN_NETBK_RX_SLOTS_MAX))
+ || xenvif_rx_queue_stalled(queue)
+ || xenvif_rx_queue_ready(queue)
|| kthread_should_stop()
|| queue->vif->disabled;
}
@@ -2050,6 +2116,15 @@ int xenvif_kthread_guest_rx(void *data)
if (!skb_queue_empty(&queue->rx_queue))
xenvif_rx_action(queue);

+ /* If the guest hasn't provided any Rx slots for a
+ * while it's probably not responsive, drop the
+ * carrier so packets are dropped earlier.
+ */
+ if (xenvif_rx_queue_stalled(queue))
+ xenvif_queue_carrier_off(queue);
+ else if (xenvif_rx_queue_ready(queue))
+ xenvif_queue_carrier_on(queue);
+
/* Queued packets may have foreign pages from other
* domains. These cannot be queued indefinitely as
* this would starve guests of grant refs and transmit
@@ -2120,6 +2195,7 @@ static int __init netback_init(void)
goto failed_init;

rx_drain_timeout_jiffies = msecs_to_jiffies(rx_drain_timeout_msecs);
+ rx_stall_timeout_jiffies = msecs_to_jiffies(rx_stall_timeout_msecs);

#ifdef CONFIG_DEBUG_FS
xen_netback_dbg_root = debugfs_create_dir("xen-netback", NULL);
diff --git a/drivers/net/xen-netback/xenbus.c b/drivers/net/xen-netback/xenbus.c
index 96a754d..4e56a27 100644
--- a/drivers/net/xen-netback/xenbus.c
+++ b/drivers/net/xen-netback/xenbus.c
@@ -711,6 +711,7 @@ static void connect(struct backend_info *be)
be->vif->queues = vzalloc(requested_num_queues *
sizeof(struct xenvif_queue));
be->vif->num_queues = requested_num_queues;
+ be->vif->stalled_queues = requested_num_queues;

for (queue_index = 0; queue_index < requested_num_queues; ++queue_index) {
queue = &be->vif->queues[queue_index];
--
1.7.10.4
David Vrabel
2014-10-22 13:08:53 UTC
Permalink
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).

This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.

Signed-off-by: David Vrabel <***@citrix.com>
---
drivers/net/xen-netback/common.h | 5 -----
drivers/net/xen-netback/interface.c | 12 +-----------
drivers/net/xen-netback/xenbus.c | 13 ++++---------
3 files changed, 5 insertions(+), 25 deletions(-)

diff --git a/drivers/net/xen-netback/common.h b/drivers/net/xen-netback/common.h
index d4eb8d2..93ca77c 100644
--- a/drivers/net/xen-netback/common.h
+++ b/drivers/net/xen-netback/common.h
@@ -228,9 +228,6 @@ struct xenvif {
u8 ip_csum:1;
u8 ipv6_csum:1;

- /* Internal feature information. */
- u8 can_queue:1; /* can queue packets for receiver? */
-
/* Is this interface disabled? True when backend discovers
* frontend is rogue.
*/
@@ -272,8 +269,6 @@ void xenvif_xenbus_fini(void);

int xenvif_schedulable(struct xenvif *vif);

-int xenvif_must_stop_queue(struct xenvif_queue *queue);
-
int xenvif_queue_stopped(struct xenvif_queue *queue);
void xenvif_wake_queue(struct xenvif_queue *queue);

diff --git a/drivers/net/xen-netback/interface.c b/drivers/net/xen-netback/interface.c
index f379689..c6759b1 100644
--- a/drivers/net/xen-netback/interface.c
+++ b/drivers/net/xen-netback/interface.c
@@ -60,16 +60,6 @@ void xenvif_skb_zerocopy_complete(struct xenvif_queue *queue)
atomic_dec(&queue->inflight_packets);
}

-static inline void xenvif_stop_queue(struct xenvif_queue *queue)
-{
- struct net_device *dev = queue->vif->dev;
-
- if (!queue->vif->can_queue)
- return;
-
- netif_tx_stop_queue(netdev_get_tx_queue(dev, queue->id));
-}
-
int xenvif_schedulable(struct xenvif *vif)
{
return netif_running(vif->dev) &&
@@ -209,7 +199,7 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
if (!xenvif_rx_ring_slots_available(queue, min_slots_needed)) {
queue->rx_stalled.function = xenvif_rx_stalled;
queue->rx_stalled.data = (unsigned long)queue;
- xenvif_stop_queue(queue);
+ netif_tx_stop_queue(netdev_get_tx_queue(dev, queue->id));
mod_timer(&queue->rx_stalled,
jiffies + rx_drain_timeout_jiffies);
}
diff --git a/drivers/net/xen-netback/xenbus.c b/drivers/net/xen-netback/xenbus.c
index 8079c31..9060857 100644
--- a/drivers/net/xen-netback/xenbus.c
+++ b/drivers/net/xen-netback/xenbus.c
@@ -873,15 +873,10 @@ static int read_xenbus_vif_flags(struct backend_info *be)
if (!rx_copy)
return -EOPNOTSUPP;

- if (vif->dev->tx_queue_len != 0) {
- if (xenbus_scanf(XBT_NIL, dev->otherend,
- "feature-rx-notify", "%d", &val) < 0)
- val = 0;
- if (val)
- vif->can_queue = 1;
- else
- /* Must be non-zero for pfifo_fast to work. */
- vif->dev->tx_queue_len = 1;
+ if (xenbus_scanf(XBT_NIL, dev->otherend,
+ "feature-rx-notify", "%d", &val) < 0 || val == 0) {
+ xenbus_dev_fatal(dev, -EINVAL, "feature-rx-notify is mandatory");
+ return -EINVAL;
}

if (xenbus_scanf(XBT_NIL, dev->otherend, "feature-sg",
--
1.7.10.4
Wei Liu
2014-10-23 11:16:29 UTC
Permalink
Post by David Vrabel
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).
This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.
Acked-by: Wei Liu <***@citrix.com>
Wei Liu
2014-10-23 11:32:06 UTC
Permalink
Post by David Vrabel
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).
This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.
While I can understand this patch by itself, can you elaborate a little
bit on how it affects later patches? Because what I'm thinking is that
this patch is not for stable while other two should go to stable.

Wei.
David Vrabel
2014-10-23 11:37:54 UTC
Permalink
Post by Wei Liu
Post by David Vrabel
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).
This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.
While I can understand this patch by itself, can you elaborate a little
bit on how it affects later patches? Because what I'm thinking is that
this patch is not for stable while other two should go to stable.
"The first patch is a prerequite. Removing support for frontends with
feature-rx-notify makes it easier to reason about the correctness of
netback since it no longer has to support this outdated and broken
mode."

The other patches do not meet the stable kernel requirements (they're
too long one thing).

David
Wei Liu
2014-10-23 11:44:53 UTC
Permalink
Post by David Vrabel
Post by Wei Liu
Post by David Vrabel
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).
This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.
While I can understand this patch by itself, can you elaborate a little
bit on how it affects later patches? Because what I'm thinking is that
this patch is not for stable while other two should go to stable.
"The first patch is a prerequite. Removing support for frontends with
feature-rx-notify makes it easier to reason about the correctness of
netback since it no longer has to support this outdated and broken
mode."
I saw that.

I think you should make it a little bit clearer.

The queue is not guaranteed to stop if we keep this feature.
Post by David Vrabel
The other patches do not meet the stable kernel requirements (they're
too long one thing).
Does length matter? I surely had written long patch for stable.

Wei.
Post by David Vrabel
David
David Vrabel
2014-10-23 11:52:29 UTC
Permalink
Post by Wei Liu
Post by David Vrabel
Post by Wei Liu
Post by David Vrabel
Frontends that do not provide feature-rx-notify may stall because
netback depends on the notification from frontend to wake the guest Rx
thread (even if can_queue is false).
This could be fixed but feature-rx-notify was introduced in 2006 and I
am not aware of any frontends that do not implement this.
While I can understand this patch by itself, can you elaborate a little
bit on how it affects later patches? Because what I'm thinking is that
this patch is not for stable while other two should go to stable.
"The first patch is a prerequite. Removing support for frontends with
feature-rx-notify makes it easier to reason about the correctness of
netback since it no longer has to support this outdated and broken
mode."
I saw that.
I think you should make it a little bit clearer.
I'm not sure how I can make this any clearer. Perhaps you should wander
over to my desk to discuss this in person?
Post by Wei Liu
The queue is not guaranteed to stop if we keep this feature.
Post by David Vrabel
The other patches do not meet the stable kernel requirements (they're
too long one thing).
Does length matter? I surely had written long patch for stable.
"- It cannot be bigger than 100 lines, with context."

David

David Vrabel
2014-10-22 13:08:54 UTC
Permalink
Netback needs to discard old to-guest skb's (guest Rx queue drain) and
it needs detect guest Rx stalls (to disable the carrier so packets are
discarded earlier), but the current implementation is very broken.

1. The check in hard_start_xmit of the slot availability did not
consider the number of packets that were already in the guest Rx
queue. This could allow the queue to grow without bound.

The guest stops consuming packets and the ring was allowed to fill
leaving S slot free. Netback queues a packet requiring more than S
slots (ensuring that the ring stays with S slots free). Netback
queue indefinately packets provided that then require S or fewer
slots.

2. The Rx stall detection is not triggered in this case since the
(host) Tx queue is not stopped.

3. If the Tx queue is stopped and a guest Rx interrupt occurs, netback
will consider this an Rx purge event which may result in it taking
the carrier down unnecessarily. It also considers a queue with
only 1 slot free as unstalled (even though the next packet might
not fit in this).

The internal guest Rx queue is limited by a byte length (to 512 Kib,
enough for half the ring). The (host) Tx queue is stopped and started
based on this limit. This sets an upper bound on the amount of memory
used by packets on the internal queue.

This allows the estimatation of the number of slots for an skb to be
removed (it wasn't a very good estimate anyway). Instead, the guest
Rx thread just waits for enough free slots for a maximum sized packet.

skbs queued on the internal queue have an 'expires' time (set to the
current time plus the drain timeout). The guest Rx thread will detect
when the skb at the head of the queue has expired and discard expired
skbs. This sets a clear upper bound on the length of time an skb can
be queued for. For a guest being destroyed the maximum time needed to
wait for all the packets it sent to be dropped is still the drain
timeout (10 s) since it will not be sending new packets.

Rx stall detection is reintroduced in a later commit.

Signed-off-by: David Vrabel <***@citrix.com>
---
drivers/net/xen-netback/common.h | 29 +++--
drivers/net/xen-netback/interface.c | 59 ++-------
drivers/net/xen-netback/netback.c | 243 ++++++++++++++++++-----------------
drivers/net/xen-netback/xenbus.c | 8 ++
4 files changed, 161 insertions(+), 178 deletions(-)

diff --git a/drivers/net/xen-netback/common.h b/drivers/net/xen-netback/common.h
index 93ca77c..c264240 100644
--- a/drivers/net/xen-netback/common.h
+++ b/drivers/net/xen-netback/common.h
@@ -176,10 +176,9 @@ struct xenvif_queue { /* Per-queue data for xenvif */
char rx_irq_name[IRQ_NAME_SIZE]; /* DEVNAME-qN-rx */
struct xen_netif_rx_back_ring rx;
struct sk_buff_head rx_queue;
- RING_IDX rx_last_skb_slots;
- unsigned long status;

- struct timer_list rx_stalled;
+ unsigned int rx_queue_max;
+ unsigned int rx_queue_len;

struct gnttab_copy grant_copy_op[MAX_GRANT_COPY_OPS];

@@ -199,18 +198,14 @@ struct xenvif_queue { /* Per-queue data for xenvif */
struct xenvif_stats stats;
};

+/* Maximum number of Rx slots a to-guest packet may use, including the
+ * slot needed for GSO meta-data.
+ */
+#define XEN_NETBK_RX_SLOTS_MAX (MAX_SKB_FRAGS + 1)
+
enum state_bit_shift {
/* This bit marks that the vif is connected */
VIF_STATUS_CONNECTED,
- /* This bit signals the RX thread that queuing was stopped (in
- * start_xmit), and either the timer fired or an RX interrupt came
- */
- QUEUE_STATUS_RX_PURGE_EVENT,
- /* This bit tells the interrupt handler that this queue was the reason
- * for the carrier off, so it should kick the thread. Only queues which
- * brought it down can turn on the carrier.
- */
- QUEUE_STATUS_RX_STALLED
};

struct xenvif {
@@ -246,6 +241,14 @@ struct xenvif {
struct net_device *dev;
};

+struct xenvif_rx_cb {
+ unsigned long expires;
+ int meta_slots_used;
+ bool full_coalesce;
+};
+
+#define XENVIF_RX_CB(skb) ((struct xenvif_rx_cb *)(skb)->cb)
+
static inline struct xenbus_device *xenvif_to_xenbus_device(struct xenvif *vif)
{
return to_xenbus_device(vif->dev->dev.parent);
@@ -291,6 +294,8 @@ void xenvif_kick_thread(struct xenvif_queue *queue);

int xenvif_dealloc_kthread(void *data);

+void xenvif_rx_queue_tail(struct xenvif_queue *queue, struct sk_buff *skb);
+
/* Determine whether the needed number of slots (req) are available,
* and set req_event if not.
*/
diff --git a/drivers/net/xen-netback/interface.c b/drivers/net/xen-netback/interface.c
index c6759b1..a134d52 100644
--- a/drivers/net/xen-netback/interface.c
+++ b/drivers/net/xen-netback/interface.c
@@ -43,6 +43,9 @@
#define XENVIF_QUEUE_LENGTH 32
#define XENVIF_NAPI_WEIGHT 64

+/* Number of bytes allowed on the internal guest Rx queue. */
+#define XENVIF_RX_QUEUE_BYTES (XEN_NETIF_RX_RING_SIZE/2 * PAGE_SIZE)
+
/* This function is used to set SKBTX_DEV_ZEROCOPY as well as
* increasing the inflight counter. We need to increase the inflight
* counter because core driver calls into xenvif_zerocopy_callback
@@ -63,7 +66,8 @@ void xenvif_skb_zerocopy_complete(struct xenvif_queue *queue)
int xenvif_schedulable(struct xenvif *vif)
{
return netif_running(vif->dev) &&
- test_bit(VIF_STATUS_CONNECTED, &vif->status);
+ test_bit(VIF_STATUS_CONNECTED, &vif->status) &&
+ !vif->disabled;
}

static irqreturn_t xenvif_tx_interrupt(int irq, void *dev_id)
@@ -104,16 +108,7 @@ int xenvif_poll(struct napi_struct *napi, int budget)
static irqreturn_t xenvif_rx_interrupt(int irq, void *dev_id)
{
struct xenvif_queue *queue = dev_id;
- struct netdev_queue *net_queue =
- netdev_get_tx_queue(queue->vif->dev, queue->id);

- /* QUEUE_STATUS_RX_PURGE_EVENT is only set if either QDisc was off OR
- * the carrier went down and this queue was previously blocked
- */
- if (unlikely(netif_tx_queue_stopped(net_queue) ||
- (!netif_carrier_ok(queue->vif->dev) &&
- test_bit(QUEUE_STATUS_RX_STALLED, &queue->status))))
- set_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status);
xenvif_kick_thread(queue);

return IRQ_HANDLED;
@@ -141,24 +136,13 @@ void xenvif_wake_queue(struct xenvif_queue *queue)
netif_tx_wake_queue(netdev_get_tx_queue(dev, id));
}

-/* Callback to wake the queue's thread and turn the carrier off on timeout */
-static void xenvif_rx_stalled(unsigned long data)
-{
- struct xenvif_queue *queue = (struct xenvif_queue *)data;
-
- if (xenvif_queue_stopped(queue)) {
- set_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status);
- xenvif_kick_thread(queue);
- }
-}
-
static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
{
struct xenvif *vif = netdev_priv(dev);
struct xenvif_queue *queue = NULL;
unsigned int num_queues = vif->num_queues;
u16 index;
- int min_slots_needed;
+ struct xenvif_rx_cb *cb;

BUG_ON(skb->dev != dev);

@@ -181,30 +165,10 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
!xenvif_schedulable(vif))
goto drop;

- /* At best we'll need one slot for the header and one for each
- * frag.
- */
- min_slots_needed = 1 + skb_shinfo(skb)->nr_frags;
+ cb = XENVIF_RX_CB(skb);
+ cb->expires = jiffies + rx_drain_timeout_jiffies;

- /* If the skb is GSO then we'll also need an extra slot for the
- * metadata.
- */
- if (skb_is_gso(skb))
- min_slots_needed++;
-
- /* If the skb can't possibly fit in the remaining slots
- * then turn off the queue to give the ring a chance to
- * drain.
- */
- if (!xenvif_rx_ring_slots_available(queue, min_slots_needed)) {
- queue->rx_stalled.function = xenvif_rx_stalled;
- queue->rx_stalled.data = (unsigned long)queue;
- netif_tx_stop_queue(netdev_get_tx_queue(dev, queue->id));
- mod_timer(&queue->rx_stalled,
- jiffies + rx_drain_timeout_jiffies);
- }
-
- skb_queue_tail(&queue->rx_queue, skb);
+ xenvif_rx_queue_tail(queue, skb);
xenvif_kick_thread(queue);

return NETDEV_TX_OK;
@@ -498,6 +462,8 @@ int xenvif_init_queue(struct xenvif_queue *queue)
init_timer(&queue->credit_timeout);
queue->credit_window_start = get_jiffies_64();

+ queue->rx_queue_max = XENVIF_RX_QUEUE_BYTES;
+
skb_queue_head_init(&queue->rx_queue);
skb_queue_head_init(&queue->tx_queue);

@@ -529,8 +495,6 @@ int xenvif_init_queue(struct xenvif_queue *queue)
queue->grant_tx_handle[i] = NETBACK_INVALID_HANDLE;
}

- init_timer(&queue->rx_stalled);
-
return 0;
}

@@ -664,7 +628,6 @@ void xenvif_disconnect(struct xenvif *vif)
netif_napi_del(&queue->napi);

if (queue->task) {
- del_timer_sync(&queue->rx_stalled);
kthread_stop(queue->task);
queue->task = NULL;
}
diff --git a/drivers/net/xen-netback/netback.c b/drivers/net/xen-netback/netback.c
index 08f6599..15893dc 100644
--- a/drivers/net/xen-netback/netback.c
+++ b/drivers/net/xen-netback/netback.c
@@ -55,8 +55,8 @@
bool separate_tx_rx_irq = 1;
module_param(separate_tx_rx_irq, bool, 0644);

-/* When guest ring is filled up, qdisc queues the packets for us, but we have
- * to timeout them, otherwise other guests' packets can get stuck there
+/* The time that packets can stay on the guest Rx internal queue
+ * before they are dropped.
*/
unsigned int rx_drain_timeout_msecs = 10000;
module_param(rx_drain_timeout_msecs, uint, 0444);
@@ -83,7 +83,6 @@ static void make_tx_response(struct xenvif_queue *queue,
s8 st);

static inline int tx_work_todo(struct xenvif_queue *queue);
-static inline int rx_work_todo(struct xenvif_queue *queue);

static struct xen_netif_rx_response *make_rx_response(struct xenvif_queue *queue,
u16 id,
@@ -163,6 +162,69 @@ bool xenvif_rx_ring_slots_available(struct xenvif_queue *queue, int needed)
return false;
}

+void xenvif_rx_queue_tail(struct xenvif_queue *queue, struct sk_buff *skb)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&queue->rx_queue.lock, flags);
+
+ __skb_queue_tail(&queue->rx_queue, skb);
+
+ queue->rx_queue_len += skb->len;
+ if (queue->rx_queue_len > queue->rx_queue_max)
+ netif_tx_stop_queue(netdev_get_tx_queue(queue->vif->dev, queue->id));
+
+ spin_unlock_irqrestore(&queue->rx_queue.lock, flags);
+}
+
+static struct sk_buff *xenvif_rx_dequeue(struct xenvif_queue *queue)
+{
+ struct sk_buff *skb;
+
+ spin_lock_irq(&queue->rx_queue.lock);
+
+ skb = __skb_dequeue(&queue->rx_queue);
+ if (skb)
+ queue->rx_queue_len -= skb->len;
+
+ spin_unlock_irq(&queue->rx_queue.lock);
+
+ return skb;
+}
+
+static void xenvif_rx_queue_maybe_wake(struct xenvif_queue *queue)
+{
+ spin_lock_irq(&queue->rx_queue.lock);
+
+ if (queue->rx_queue_len < queue->rx_queue_max)
+ netif_tx_wake_queue(netdev_get_tx_queue(queue->vif->dev, queue->id));
+
+ spin_unlock_irq(&queue->rx_queue.lock);
+}
+
+
+static void xenvif_rx_queue_purge(struct xenvif_queue *queue)
+{
+ struct sk_buff *skb;
+ while ((skb = xenvif_rx_dequeue(queue)) != NULL)
+ kfree_skb(skb);
+}
+
+static void xenvif_rx_queue_drop_expired(struct xenvif_queue *queue)
+{
+ struct sk_buff *skb;
+
+ for(;;) {
+ skb = skb_peek(&queue->rx_queue);
+ if (!skb)
+ break;
+ if (time_before(jiffies, XENVIF_RX_CB(skb)->expires))
+ break;
+ xenvif_rx_dequeue(queue);
+ kfree_skb(skb);
+ }
+}
+
/*
* Returns true if we should start a new receive buffer instead of
* adding 'size' bytes to a buffer which currently contains 'offset'
@@ -237,13 +299,6 @@ static struct xenvif_rx_meta *get_next_rx_buffer(struct xenvif_queue *queue,
return meta;
}

-struct xenvif_rx_cb {
- int meta_slots_used;
- bool full_coalesce;
-};
-
-#define XENVIF_RX_CB(skb) ((struct xenvif_rx_cb *)(skb)->cb)
-
/*
* Set up the grant operations for this fragment. If it's a flipping
* interface, we also set up the unmap request from here.
@@ -587,7 +642,8 @@ static void xenvif_rx_action(struct xenvif_queue *queue)

skb_queue_head_init(&rxq);

- while ((skb = skb_dequeue(&queue->rx_queue)) != NULL) {
+ while (xenvif_rx_ring_slots_available(queue, XEN_NETBK_RX_SLOTS_MAX)
+ && (skb = xenvif_rx_dequeue(queue)) != NULL) {
RING_IDX max_slots_needed;
RING_IDX old_req_cons;
RING_IDX ring_slots_used;
@@ -634,15 +690,6 @@ static void xenvif_rx_action(struct xenvif_queue *queue)
skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6))
max_slots_needed++;

- /* If the skb may not fit then bail out now */
- if (!xenvif_rx_ring_slots_available(queue, max_slots_needed)) {
- skb_queue_head(&queue->rx_queue, skb);
- need_to_notify = true;
- queue->rx_last_skb_slots = max_slots_needed;
- break;
- } else
- queue->rx_last_skb_slots = 0;
-
old_req_cons = queue->rx.req_cons;
XENVIF_RX_CB(skb)->meta_slots_used = xenvif_gop_skb(skb, &npo, queue);
ring_slots_used = queue->rx.req_cons - old_req_cons;
@@ -1869,12 +1916,6 @@ void xenvif_idx_unmap(struct xenvif_queue *queue, u16 pending_idx)
}
}

-static inline int rx_work_todo(struct xenvif_queue *queue)
-{
- return (!skb_queue_empty(&queue->rx_queue) &&
- xenvif_rx_ring_slots_available(queue, queue->rx_last_skb_slots));
-}
-
static inline int tx_work_todo(struct xenvif_queue *queue)
{
if (likely(RING_HAS_UNCONSUMED_REQUESTS(&queue->tx)))
@@ -1931,92 +1972,64 @@ err:
return err;
}

-static void xenvif_start_queue(struct xenvif_queue *queue)
+static bool xenvif_have_rx_work(struct xenvif_queue *queue)
{
- if (xenvif_schedulable(queue->vif))
- xenvif_wake_queue(queue);
+ return (!skb_queue_empty(&queue->rx_queue)
+ && xenvif_rx_ring_slots_available(queue, XEN_NETBK_RX_SLOTS_MAX))
+ || kthread_should_stop()
+ || queue->vif->disabled;
}

-/* Only called from the queue's thread, it handles the situation when the guest
- * doesn't post enough requests on the receiving ring.
- * First xenvif_start_xmit disables QDisc and start a timer, and then either the
- * timer fires, or the guest send an interrupt after posting new request. If it
- * is the timer, the carrier is turned off here.
- * */
-static void xenvif_rx_purge_event(struct xenvif_queue *queue)
+static long xenvif_rx_queue_timeout(struct xenvif_queue *queue)
{
- /* Either the last unsuccesful skb or at least 1 slot should fit */
- int needed = queue->rx_last_skb_slots ?
- queue->rx_last_skb_slots : 1;
+ struct sk_buff *skb;
+ long timeout;

- /* It is assumed that if the guest post new slots after this, the RX
- * interrupt will set the QUEUE_STATUS_RX_PURGE_EVENT bit and wake up
- * the thread again
- */
- set_bit(QUEUE_STATUS_RX_STALLED, &queue->status);
- if (!xenvif_rx_ring_slots_available(queue, needed)) {
- rtnl_lock();
- if (netif_carrier_ok(queue->vif->dev)) {
- /* Timer fired and there are still no slots. Turn off
- * everything except the interrupts
- */
- netif_carrier_off(queue->vif->dev);
- skb_queue_purge(&queue->rx_queue);
- queue->rx_last_skb_slots = 0;
- if (net_ratelimit())
- netdev_err(queue->vif->dev, "Carrier off due to lack of guest response on queue %d\n", queue->id);
- } else {
- /* Probably an another queue already turned the carrier
- * off, make sure nothing is stucked in the internal
- * queue of this queue
- */
- skb_queue_purge(&queue->rx_queue);
- queue->rx_last_skb_slots = 0;
- }
- rtnl_unlock();
- } else if (!netif_carrier_ok(queue->vif->dev)) {
- unsigned int num_queues = queue->vif->num_queues;
- unsigned int i;
- /* The carrier was down, but an interrupt kicked
- * the thread again after new requests were
- * posted
- */
- clear_bit(QUEUE_STATUS_RX_STALLED,
- &queue->status);
- rtnl_lock();
- netif_carrier_on(queue->vif->dev);
- netif_tx_wake_all_queues(queue->vif->dev);
- rtnl_unlock();
+ skb = skb_peek(&queue->rx_queue);
+ if (!skb)
+ return MAX_SCHEDULE_TIMEOUT;

- for (i = 0; i < num_queues; i++) {
- struct xenvif_queue *temp = &queue->vif->queues[i];
+ timeout = XENVIF_RX_CB(skb)->expires - jiffies;
+ return timeout < 0 ? 0 : timeout;
+}

- xenvif_napi_schedule_or_enable_events(temp);
- }
- if (net_ratelimit())
- netdev_err(queue->vif->dev, "Carrier on again\n");
- } else {
- /* Queuing were stopped, but the guest posted
- * new requests and sent an interrupt
- */
- clear_bit(QUEUE_STATUS_RX_STALLED,
- &queue->status);
- del_timer_sync(&queue->rx_stalled);
- xenvif_start_queue(queue);
+/* Wait until the guest Rx thread has work.
+ *
+ * The timeout needs to be adjusted based on the current head of the
+ * queue (and not just the head at the beginning). In particular, if
+ * the queue is initially empty an infinite timeout is used and this
+ * needs to be reduced when a skb is queued.
+ *
+ * This cannot be done with wait_event_timeout() because it only
+ * calculates the timeout once.
+ */
+static void xenvif_wait_for_rx_work(struct xenvif_queue *queue)
+{
+ DEFINE_WAIT(wait);
+
+ if (xenvif_have_rx_work(queue))
+ return;
+
+ for (;;) {
+ long ret;
+
+ prepare_to_wait(&queue->wq, &wait, TASK_INTERRUPTIBLE);
+ if (xenvif_have_rx_work(queue))
+ break;
+ ret = schedule_timeout(xenvif_rx_queue_timeout(queue));
+ if (!ret)
+ break;
}
+ finish_wait(&queue->wq, &wait);
}

int xenvif_kthread_guest_rx(void *data)
{
struct xenvif_queue *queue = data;
- struct sk_buff *skb;
+ struct xenvif *vif = queue->vif;

- while (!kthread_should_stop()) {
- wait_event_interruptible(queue->wq,
- rx_work_todo(queue) ||
- queue->vif->disabled ||
- test_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status) ||
- kthread_should_stop());
+ for (;;) {
+ xenvif_wait_for_rx_work(queue);

if (kthread_should_stop())
break;
@@ -2028,35 +2041,29 @@ int xenvif_kthread_guest_rx(void *data)
* context so we defer it here, if this thread is
* associated with queue 0.
*/
- if (unlikely(queue->vif->disabled && queue->id == 0)) {
- xenvif_carrier_off(queue->vif);
- } else if (unlikely(queue->vif->disabled)) {
- /* kthread_stop() would be called upon this thread soon,
- * be a bit proactive
- */
- skb_queue_purge(&queue->rx_queue);
- queue->rx_last_skb_slots = 0;
- } else if (unlikely(test_and_clear_bit(QUEUE_STATUS_RX_PURGE_EVENT,
- &queue->status))) {
- xenvif_rx_purge_event(queue);
- } else if (!netif_carrier_ok(queue->vif->dev)) {
- /* Another queue stalled and turned the carrier off, so
- * purge the internal queue of queues which were not
- * blocked
- */
- skb_queue_purge(&queue->rx_queue);
- queue->rx_last_skb_slots = 0;
+ if (unlikely(vif->disabled && queue->id == 0)) {
+ xenvif_carrier_off(vif);
+ xenvif_rx_queue_purge(queue);
+ continue;
}

if (!skb_queue_empty(&queue->rx_queue))
xenvif_rx_action(queue);

+ /* Queued packets may have foreign pages from other
+ * domains. These cannot be queued indefinitely as
+ * this would starve guests of grant refs and transmit
+ * slots.
+ */
+ xenvif_rx_queue_drop_expired(queue);
+
+ xenvif_rx_queue_maybe_wake(queue);
+
cond_resched();
}

/* Bin any remaining skbs */
- while ((skb = skb_dequeue(&queue->rx_queue)) != NULL)
- dev_kfree_skb(skb);
+ xenvif_rx_queue_purge(queue);

return 0;
}
diff --git a/drivers/net/xen-netback/xenbus.c b/drivers/net/xen-netback/xenbus.c
index 9060857..96a754d 100644
--- a/drivers/net/xen-netback/xenbus.c
+++ b/drivers/net/xen-netback/xenbus.c
@@ -52,6 +52,7 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v)
struct xenvif_queue *queue = m->private;
struct xen_netif_tx_back_ring *tx_ring = &queue->tx;
struct xen_netif_rx_back_ring *rx_ring = &queue->rx;
+ struct netdev_queue *dev_queue;

if (tx_ring->sring) {
struct xen_netif_tx_sring *sring = tx_ring->sring;
@@ -112,6 +113,13 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v)
queue->credit_timeout.expires,
jiffies);

+ dev_queue = netdev_get_tx_queue(queue->vif->dev, queue->id);
+
+ seq_printf(m, "\nRx internal queue: len %u max %u pkts %u %s\n",
+ queue->rx_queue_len, queue->rx_queue_max,
+ skb_queue_len(&queue->rx_queue),
+ netif_tx_queue_stopped(dev_queue) ? "stopped" : "running");
+
return 0;
}
--
1.7.10.4
Wei Liu
2014-10-23 11:40:17 UTC
Permalink
Post by David Vrabel
Netback needs to discard old to-guest skb's (guest Rx queue drain) and
it needs detect guest Rx stalls (to disable the carrier so packets are
discarded earlier), but the current implementation is very broken.
1. The check in hard_start_xmit of the slot availability did not
consider the number of packets that were already in the guest Rx
queue. This could allow the queue to grow without bound.
The guest stops consuming packets and the ring was allowed to fill
leaving S slot free. Netback queues a packet requiring more than S
slots (ensuring that the ring stays with S slots free). Netback
queue indefinately packets provided that then require S or fewer
slots.
2. The Rx stall detection is not triggered in this case since the
(host) Tx queue is not stopped.
3. If the Tx queue is stopped and a guest Rx interrupt occurs, netback
will consider this an Rx purge event which may result in it taking
the carrier down unnecessarily. It also considers a queue with
only 1 slot free as unstalled (even though the next packet might
not fit in this).
The internal guest Rx queue is limited by a byte length (to 512 Kib,
enough for half the ring). The (host) Tx queue is stopped and started
based on this limit. This sets an upper bound on the amount of memory
used by packets on the internal queue.
This allows the estimatation of the number of slots for an skb to be
removed (it wasn't a very good estimate anyway). Instead, the guest
+1 for this.
Post by David Vrabel
Rx thread just waits for enough free slots for a maximum sized packet.
skbs queued on the internal queue have an 'expires' time (set to the
current time plus the drain timeout). The guest Rx thread will detect
when the skb at the head of the queue has expired and discard expired
skbs. This sets a clear upper bound on the length of time an skb can
be queued for. For a guest being destroyed the maximum time needed to
wait for all the packets it sent to be dropped is still the drain
timeout (10 s) since it will not be sending new packets.
Rx stall detection is reintroduced in a later commit.
Reviewed-by: Wei Liu <***@citrix.com>
Wei Liu
2014-10-23 11:49:52 UTC
Permalink
Post by David Vrabel
If a frontend not receiving packets it is useful to detect this and
turn off the carrier so packets are dropped early instead of being
queued and drained when they expire.
A to-guest queue is stalled if it doesn't have enough free slots for a
an extended period of time (default 60 s).
If at least one queue is stalled, the carrier is turned off (in the
expectation that the other queues will soon stall as well). The
carrier is only turned on once all queues are ready.
When the frontend connects, all the queues start in the stalled state
and only become ready once the frontend queues enough Rx requests.
Reviewed-by: Wei Liu <***@citrix.com>
Continue reading on narkive:
Loading...