Browse Source

+ initial edition of meta-queue for RIB updates processing (bug #431)

Denis Ovsienko 12 years ago
parent
commit
e96f92034d
8 changed files with 173 additions and 58 deletions
  1. 7 0
      lib/ChangeLog
  2. 2 14
      lib/workqueue.c
  3. 0 3
      lib/workqueue.h
  4. 15 0
      zebra/ChangeLog
  5. 0 13
      zebra/connected.c
  6. 15 1
      zebra/rib.h
  7. 133 27
      zebra/zebra_rib.c
  8. 1 0
      zebra/zserv.h

+ 7 - 0
lib/ChangeLog

@@ -1,3 +1,10 @@
+2008-06-02 Denis Ovsienko
+
+	* workqueue.[ch]: completely drop WQ_AIM_HEAD flag and
+	  work_queue_aim_head() function, they aren't needed any more
+	  with the new meta queue structure; (work_queue_run) don't
+	  increment the counter on work item requeueing
+
 2008-02-28 Paul Jakma <paul.jakma@sun.com>
 
 	* log.c: (mes_lookup) Sowmini Varadhan diagnosed a problem where

+ 2 - 14
lib/workqueue.c

@@ -67,7 +67,6 @@ work_queue_new (struct thread_master *m, const char *queue_name)
   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
   new->master = m;
   SET_FLAG (new->flags, WQ_UNPLUGGED);
-  UNSET_FLAG (new->flags, WQ_AIM_HEAD);
   
   if ( (new->items = list_new ()) == NULL)
     {
@@ -131,10 +130,7 @@ work_queue_add (struct work_queue *wq, void *data)
     }
   
   item->data = data;
-  if (CHECK_FLAG (wq->flags, WQ_AIM_HEAD))
-    listnode_add_after (wq->items, NULL, item);
-  else
-    listnode_add (wq->items, item);
+  listnode_add (wq->items, item);
   
   work_queue_schedule (wq, wq->spec.hold);
   
@@ -231,15 +227,6 @@ work_queue_unplug (struct work_queue *wq)
   work_queue_schedule (wq, wq->spec.hold);
 }
 
-void
-work_queue_aim_head (struct work_queue *wq, const unsigned aim_head)
-{
-  if (aim_head)
-    SET_FLAG (wq->flags, WQ_AIM_HEAD);
-  else
-    UNSET_FLAG (wq->flags, WQ_AIM_HEAD);
-}
-
 /* timer thread to process a work queue
  * will reschedule itself if required,
  * otherwise work_queue_item_add 
@@ -317,6 +304,7 @@ work_queue_run (struct thread *thread)
 	}
       case WQ_REQUEUE:
 	{
+	  item->ran--;
 	  work_queue_item_requeue (wq, node);
 	  break;
 	}

+ 0 - 3
lib/workqueue.h

@@ -48,7 +48,6 @@ struct work_queue_item
 };
 
 #define WQ_UNPLUGGED	(1 << 0) /* available for draining */
-#define WQ_AIM_HEAD	(1 << 1) /* add new items before list head, not after tail */
 
 struct work_queue
 {
@@ -119,8 +118,6 @@ extern void work_queue_add (struct work_queue *, void *);
 extern void work_queue_plug (struct work_queue *wq);
 /* unplug the queue, allow it to be drained again */
 extern void work_queue_unplug (struct work_queue *wq);
-/* control the value for WQ_AIM_HEAD flag */
-extern void work_queue_aim_head (struct work_queue *wq, const unsigned);
 
 /* Helpers, exported for thread.c and command.c */
 extern int work_queue_run (struct thread *);

+ 15 - 0
zebra/ChangeLog

@@ -1,3 +1,18 @@
+2008-06-02 Denis Ovsienko
+
+	* connected.c: (connected_up_ipv4, connected_down_ipv4,
+	  connected_up_ipv6, connected_down_ipv6): don't call
+	  work_queue_aim_head()
+	* rib.h: adjust RIB_ROUTE_QUEUED macro for meta_queue,
+	  declare meta_queue structure
+	* zebra_rib.c: (process_subq, meta_queue_process, rib_meta_queue_add,
+	  meta_queue_new) new functions; (rib_queue_add) don't try checking
+	  RIB_QUEUE_ADDED flag, rib_meta_queue_add() does it better, take care
+	  of the meta queue instead; (rib_queue_init) initialize the meta queue
+	  as well; (rib_lookup_and_pushup) don't call work_queue_aim_head();
+	  (rib_process) only do actual processing, don't do deallocation;
+	* zserv.h: include meta_queue field into zebra_t structure
+
 2008-05-29 Stephen Hemminger <stephen.hemminger@vyatta.com>
 
 	* rt_netlink.c: (netlink_install_filter) BPF filter to catch and

+ 0 - 13
zebra/connected.c

@@ -188,15 +188,8 @@ connected_up_ipv4 (struct interface *ifp, struct connected *ifc)
   if (prefix_ipv4_any (&p))
     return;
 
-  /* Always push arriving/departing connected routes into the head of
-   * the working queue to make possible proper validation of the rest
-   * of the RIB queue (which will contain the whole RIB after the first
-   * call to rib_update()).
-   */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, NULL, ifp->ifindex,
 	RT_TABLE_MAIN, ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -302,9 +295,7 @@ connected_down_ipv4 (struct interface *ifp, struct connected *ifc)
     return;
 
   /* Same logic as for connected_up_ipv4(): push the changes into the head. */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -349,10 +340,8 @@ connected_up_ipv6 (struct interface *ifp, struct connected *ifc)
     return;
 #endif
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0,
                 ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -426,9 +415,7 @@ connected_down_ipv6 (struct interface *ifp, struct connected *ifc)
   if (IN6_IS_ADDR_UNSPECIFIED (&p.prefix))
     return;
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }

+ 15 - 1
zebra/rib.h

@@ -40,7 +40,7 @@ struct rib
 {
   /* Status Flags for the *route_node*, but kept in the head RIB.. */
   u_char rn_status;
-#define RIB_ROUTE_QUEUED	(1 << 0)
+#define RIB_ROUTE_QUEUED(x)	(1 << (x))
 
   /* Link list. */
   struct rib *next;
@@ -83,6 +83,20 @@ struct rib
   u_char nexthop_fib_num;
 };
 
+/* meta-queue structure:
+ * sub-queue 0: connected, kernel
+ * sub-queue 1: static
+ * sub-queue 2: RIP, RIPng, OSPF, OSPF6, IS-IS
+ * sub-queue 3: iBGP, eBGP
+ * sub-queue 4: any other origin (if any)
+ */
+#define MQ_SIZE 5
+struct meta_queue
+{
+  struct list *subq[MQ_SIZE];
+  u_int32_t size; /* sum of lengths of all subqueues */
+};
+
 /* Static route information. */
 struct static_ipv4
 {

+ 133 - 27
zebra/zebra_rib.c

@@ -981,15 +981,14 @@ rib_uninstall (struct route_node *rn, struct rib *rib)
 static void rib_unlink (struct route_node *, struct rib *);
 
 /* Core function for processing routing information base. */
-static wq_item_status
-rib_process (struct work_queue *wq, void *data)
+static void
+rib_process (struct route_node *rn)
 {
   struct rib *rib;
   struct rib *next;
   struct rib *fib = NULL;
   struct rib *select = NULL;
   struct rib *del = NULL;
-  struct route_node *rn = data;
   int installed = 0;
   struct nexthop *nexthop = NULL;
   char buf[INET6_ADDRSTRLEN];
@@ -1177,10 +1176,95 @@ rib_process (struct work_queue *wq, void *data)
 end:
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p dequeued", __func__, buf, rn->p.prefixlen, rn);
-  if (rn->info)
-    UNSET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);  
-  route_unlock_node (rn); /* rib queue lock */
-  return WQ_SUCCESS;
+}
+
+/* Take a list of route_node structs and return 1, if there was a record picked from
+ * it and processed by rib_process(). Don't process more, than one RN record; operate
+ * only in the specified sub-queue.
+ */
+unsigned int
+process_subq (struct list * subq, u_char qindex)
+{
+  struct listnode *lnode;
+  struct route_node *rnode;
+  if (!(lnode = listhead (subq)))
+    return 0;
+  rnode = listgetdata (lnode);
+  rib_process (rnode);
+  if (rnode->info) /* The first RIB record is holding the flags bitmask. */
+    UNSET_FLAG (((struct rib *)rnode->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+  route_unlock_node (rnode);
+  list_delete_node (subq, lnode);
+  return 1;
+}
+
+/* Dispatch the meta queue by picking, processing and unlocking the next RN from
+ * a non-empty sub-queue with lowest priority. wq is equal to zebra->ribq and data
+ * is pointed to the meta queue structure.
+ */
+static wq_item_status
+meta_queue_process (struct work_queue *dummy, void *data)
+{
+  struct meta_queue * mq = data;
+  u_char i;
+  for (i = 0; i < MQ_SIZE; i++)
+    if (process_subq (mq->subq[i], i))
+    {
+      mq->size--;
+      break;
+    }
+  return mq->size ? WQ_REQUEUE : WQ_SUCCESS;
+}
+
+/* Look into the RN and queue it into one or more priority queues, increasing the size
+ * for each data push done.
+ */
+void rib_meta_queue_add (struct meta_queue *mq, struct route_node *rn)
+{
+  u_char qindex;
+  struct rib *rib;
+  char buf[INET6_ADDRSTRLEN];
+  if (IS_ZEBRA_DEBUG_RIB_Q)
+    inet_ntop (rn->p.family, &rn->p.u.prefix, buf, INET6_ADDRSTRLEN);
+  for (rib = rn->info; rib; rib = rib->next)
+  {
+    switch (rib->type)
+    {
+      case ZEBRA_ROUTE_KERNEL:
+      case ZEBRA_ROUTE_CONNECT:
+        qindex = 0;
+        break;
+      case ZEBRA_ROUTE_STATIC:
+        qindex = 1;
+        break;
+      case ZEBRA_ROUTE_RIP:
+      case ZEBRA_ROUTE_RIPNG:
+      case ZEBRA_ROUTE_OSPF:
+      case ZEBRA_ROUTE_OSPF6:
+      case ZEBRA_ROUTE_ISIS:
+        qindex = 2;
+        break;
+      case ZEBRA_ROUTE_BGP:
+        qindex = 3;
+        break;
+      default:
+        qindex = 4;
+        break;
+    }
+    /* Invariant: at this point we always have rn->info set. */
+    if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex)))
+    {
+      if (IS_ZEBRA_DEBUG_RIB_Q)
+        zlog_debug ("%s: %s/%d: rn %p is already queued in sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+      continue;
+    }
+    SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+    listnode_add (mq->subq[qindex], rn);
+    route_lock_node (rn);
+    mq->size++;
+    if (IS_ZEBRA_DEBUG_RIB_Q)
+      zlog_debug ("%s: %s/%d: queued rn %p into sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+  }
 }
 
 /* Add route_node to work queue and schedule processing */
@@ -1202,17 +1286,6 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
       return;
     }
 
-  /* Route-table node already queued, so nothing to do */
-  if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED))
-    {
-      if (IS_ZEBRA_DEBUG_RIB_Q)
-        zlog_debug ("%s: %s/%d: rn %p already queued", __func__, buf,
-          rn->p.prefixlen, rn);
-      return;
-    }
-
-  route_lock_node (rn); /* rib queue lock */
-
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_info ("%s: %s/%d: work queue added", __func__, buf, rn->p.prefixlen);
 
@@ -1221,13 +1294,21 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
   if (zebra->ribq == NULL)
     {
       zlog_err ("%s: work_queue does not exist!", __func__);
-      route_unlock_node (rn);
       return;
     }
-  
-  work_queue_add (zebra->ribq, rn);
 
-  SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);
+  /* The RIB queue should normally be either empty or holding the only work_queue_item
+   * element. In the latter case this element would hold a pointer to the meta queue
+   * structure, which must be used to actually queue the route nodes to process. So
+   * create the MQ holder, if necessary, then push the work into it in any case.
+   * This semantics was introduced after 0.99.9 release.
+   */
+
+  /* Should I invent work_queue_empty() and use it, or it's Ok to do as follows? */
+  if (!zebra->ribq->items->count)
+    work_queue_add (zebra->ribq, zebra->mq);
+
+  rib_meta_queue_add (zebra->mq, rn);
 
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p queued", __func__, buf, rn->p.prefixlen, rn);
@@ -1235,6 +1316,30 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
   return;
 }
 
+/* Create new meta queue. A destructor function doesn't seem to be necessary here. */
+struct meta_queue *
+meta_queue_new ()
+{
+  struct meta_queue *new;
+  unsigned i, failed = 0;
+
+  if ((new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct meta_queue))) == NULL)
+    return NULL;
+  for (i = 0; i < MQ_SIZE; i++)
+    if ((new->subq[i] = list_new ()) == NULL)
+      failed = 1;
+  if (failed)
+  {
+    for (i = 0; i < MQ_SIZE; i++)
+      if (new->subq[i])
+        list_delete (new->subq[i]);
+    XFREE (MTYPE_WORK_QUEUE, new);
+    return NULL;
+  }
+  new->size = 0;
+  return new;
+}
+
 /* initialise zebra rib work queue */
 static void
 rib_queue_init (struct zebra_t *zebra)
@@ -1249,12 +1354,17 @@ rib_queue_init (struct zebra_t *zebra)
     }
 
   /* fill in the work queue spec */
-  zebra->ribq->spec.workfunc = &rib_process;
+  zebra->ribq->spec.workfunc = &meta_queue_process;
   zebra->ribq->spec.errorfunc = NULL;
   /* XXX: TODO: These should be runtime configurable via vty */
   zebra->ribq->spec.max_retries = 3;
   zebra->ribq->spec.hold = rib_process_hold_time;
   
+  if (!(zebra->mq = meta_queue_new ()))
+  {
+    zlog_err ("%s: could not initialise meta queue!", __func__);
+    return;
+  }
   return;
 }
 
@@ -1663,11 +1773,7 @@ void rib_lookup_and_pushup (struct prefix_ipv4 * p)
     }
   }
   if (changed)
-  {
-    work_queue_aim_head (zebrad.ribq, 1);
     rib_queue_add (&zebrad, rn);
-    work_queue_aim_head (zebrad.ribq, 0);
-  }
 }
 
 int

+ 1 - 0
zebra/zserv.h

@@ -80,6 +80,7 @@ struct zebra_t
 
   /* rib work queue */
   struct work_queue *ribq;
+  struct meta_queue *mq;
 };
 
 /* Count prefix size from mask length */