Browse Source

2005-02-28 Andrew J. Schorr <ajschorr@alumni.princeton.edu>

	* zserv.c: Must include network.h and buffer.h for non-blocking I/O.
	  Remove global message_queue and t_write (need separate buffering for
	  each client).
	  (zebra_server_dequeue,zebra_server_enqueue) Remove functions
	  related to old buggy buffering code.
	  (zserv_delayed_close) New thread callback function to delete a client.
	  (zserv_flush_data) New thread callback function to flush buffered
	  data to client.
	  (zebra_server_send_message) Rewritten to use buffer_write (so
	  buffering of writes and non-blocking I/O work properly).
	  (zsend_interface_add,zsend_interface_delete,zsend_interface_address,
	  zsend_interface_update) Return 0 instead of -1 if !client->ifinfo
	  (this is not really an error).  Return value from
	  zebra_server_send_message.
	  (zsend_route_multipath,zsend_ipv4_nexthop_lookup,
	  zsend_ipv4_import_lookup) Return value from zebra_server_send_message.
	  (zsend_ipv6_nexthop_lookup) Fix scope to static, and return value
	  from zebra_server_send_message.
	  (zsend_router_id_update) Must use zebra_server_send_message instead
	  of deprecated writen function.  Return 0 instead of -1 if this client
	  is not subscribed to router-id updates (since this is not really
	  an error).
	  (zread_interface_add) Change type to static int.  If
	  zsend_interface_add fails or zsend_interface_address fails, return -1
	  immediately (since the client has had an I/O error).
	  (zread_interface_delete,zread_ipv4_add,zread_ipv4_delete,
	  zread_ipv6_add,zread_ipv6_delete,zread_router_id_delete) Return 0
	  to indicate success.
	  (zread_ipv4_nexthop_lookup) Return value from
	  zsend_ipv4_nexthop_lookup.
	  (zread_ipv4_import_lookup) Return value from zsend_ipv4_import_lookup.
	  (zebra_read_ipv6) Remove unused function.
	  (zread_ipv6_nexthop_lookup) Return value from
	  zsend_ipv6_nexthop_lookup.
	  (zread_router_id_add) Return value from zsend_router_id_update.
	  (zebra_client_close) Call buffer_free(client->wb) and
	  thread_cancel(client->t_suicide).
	  (zebra_client_create) Allocate client->wb using buffer_new.
	  (zebra_client_read) Support non-blocking I/O by using stream_read_try.
	  Use ZEBRA_HEADER_SIZE instead of 3.
	  (zebra_accept) Fix bug: reset accept thread at top.  Make client
	  socket non-blocking using the set_nonblocking function.
	  (config_write_forwarding) Fix scope to static.
	  (zebra_init) Remove initialization code for old buggy write buffering.
	* zserv.h: Add 2 new fields to struct zserv: struct buffer *wb
	  (to enable buffered writes with non-blocking I/), and
	  struct thread *t_suicide to support delayed close on I/O
	  errors.
	* router-id.h: Remove prototypes for zread_router_id_add and
	  zread_router_id_delete (their scope should be static to zserv.c).
ajs 15 years ago
parent
commit
719e97414a
4 changed files with 215 additions and 201 deletions
  1. 53 0
      zebra/ChangeLog
  2. 0 3
      zebra/router-id.h
  3. 156 198
      zebra/zserv.c
  4. 6 0
      zebra/zserv.h

+ 53 - 0
zebra/ChangeLog

@@ -1,3 +1,56 @@
+2005-02-28 Andrew J. Schorr <ajschorr@alumni.princeton.edu>
+
+	* zserv.c: Must include network.h and buffer.h for non-blocking I/O.
+	  Remove global message_queue and t_write (need separate buffering for
+	  each client).
+	  (zebra_server_dequeue,zebra_server_enqueue) Remove functions
+	  related to old buggy buffering code.
+	  (zserv_delayed_close) New thread callback function to delete a client.
+	  (zserv_flush_data) New thread callback function to flush buffered
+	  data to client.
+	  (zebra_server_send_message) Rewritten to use buffer_write (so
+	  buffering of writes and non-blocking I/O work properly).
+	  (zsend_interface_add,zsend_interface_delete,zsend_interface_address,
+	  zsend_interface_update) Return 0 instead of -1 if !client->ifinfo
+	  (this is not really an error).  Return value from
+	  zebra_server_send_message.
+	  (zsend_route_multipath,zsend_ipv4_nexthop_lookup,
+	  zsend_ipv4_import_lookup) Return value from zebra_server_send_message.
+	  (zsend_ipv6_nexthop_lookup) Fix scope to static, and return value
+	  from zebra_server_send_message.
+	  (zsend_router_id_update) Must use zebra_server_send_message instead
+	  of deprecated writen function.  Return 0 instead of -1 if this client
+	  is not subscribed to router-id updates (since this is not really
+	  an error).
+	  (zread_interface_add) Change type to static int.  If
+	  zsend_interface_add fails or zsend_interface_address fails, return -1
+	  immediately (since the client has had an I/O error).
+	  (zread_interface_delete,zread_ipv4_add,zread_ipv4_delete,
+	  zread_ipv6_add,zread_ipv6_delete,zread_router_id_delete) Return 0
+	  to indicate success.
+	  (zread_ipv4_nexthop_lookup) Return value from
+	  zsend_ipv4_nexthop_lookup.
+	  (zread_ipv4_import_lookup) Return value from zsend_ipv4_import_lookup.
+	  (zebra_read_ipv6) Remove unused function.
+	  (zread_ipv6_nexthop_lookup) Return value from
+	  zsend_ipv6_nexthop_lookup.
+	  (zread_router_id_add) Return value from zsend_router_id_update.
+	  (zebra_client_close) Call buffer_free(client->wb) and
+	  thread_cancel(client->t_suicide).
+	  (zebra_client_create) Allocate client->wb using buffer_new.
+	  (zebra_client_read) Support non-blocking I/O by using stream_read_try.
+	  Use ZEBRA_HEADER_SIZE instead of 3.
+	  (zebra_accept) Fix bug: reset accept thread at top.  Make client
+	  socket non-blocking using the set_nonblocking function.
+	  (config_write_forwarding) Fix scope to static.
+	  (zebra_init) Remove initialization code for old buggy write buffering.
+	* zserv.h: Add 2 new fields to struct zserv: struct buffer *wb
+	  (to enable buffered writes with non-blocking I/), and 
+	  struct thread *t_suicide to support delayed close on I/O
+	  errors.
+	* router-id.h: Remove prototypes for zread_router_id_add and
+	  zread_router_id_delete (their scope should be static to zserv.c).
+
 2005-02-24 Andrew J. Schorr <ajschorr@alumni.princeton.edu>
 
 	* redistribute.c: (zebra_check_addr,is_default,

+ 0 - 3
zebra/router-id.h

@@ -37,7 +37,4 @@ extern void router_id_init(void);
 extern void router_id_write(struct vty *);
 extern void router_id_get(struct prefix *);
 
-extern void zread_router_id_add(struct zserv *, u_short);
-extern void zread_router_id_delete(struct zserv *, u_short);
-
 #endif

+ 156 - 198
zebra/zserv.c

@@ -34,6 +34,8 @@
 #include "log.h"
 #include "zclient.h"
 #include "privs.h"
+#include "network.h"
+#include "buffer.h"
 
 #include "zebra/zserv.h"
 #include "zebra/router-id.h"
@@ -77,102 +79,74 @@ static const char *zebra_command_str [] =
   "ZEBRA_ROUTER_ID_UPDATE"
 };
 
-struct zebra_message_queue
-{
-  struct nsm_message_queue *next;
-  struct nsm_message_queue *prev;
 
-  u_char *buf;
-  u_int16_t length;
-  u_int16_t written;
-};
+static void zebra_client_close (struct zserv *client);
 
-struct thread *t_write;
-struct fifo message_queue;
-
-int
-zebra_server_dequeue (struct thread *t)
+static int
+zserv_delayed_close(struct thread *thread)
 {
-  int sock;
-  int nbytes;
-  struct zebra_message_queue *queue;
-
-  sock = THREAD_FD (t);
-  t_write = NULL;
-
-  queue = (struct zebra_message_queue *) FIFO_HEAD (&message_queue);
-  if (queue)
-    {
-      nbytes = write (sock, queue->buf + queue->written,
-		      queue->length - queue->written);
-
-      if (nbytes <= 0)
-        {
-          if (errno != EAGAIN)
-	    return -1;
-        }
-      else if (nbytes != (queue->length - queue->written))
-	{
-	  queue->written += nbytes;
-	}
-      else
-        {
-          FIFO_DEL (queue);
-          XFREE (MTYPE_TMP, queue->buf);
-          XFREE (MTYPE_TMP, queue);
-        }
-    }
-
-  if (FIFO_TOP (&message_queue))
-    THREAD_WRITE_ON (zebrad.master, t_write, zebra_server_dequeue, 
-                     NULL, sock);
+  struct zserv *client = THREAD_ARG(thread);
 
+  client->t_suicide = NULL;
+  zebra_client_close(client);
   return 0;
 }
 
-/* Enqueu message.  */
-void
-zebra_server_enqueue (int sock, u_char *buf, unsigned long length,
-		      unsigned long written)
-{
-  struct zebra_message_queue *queue;
-
-  queue = XCALLOC (MTYPE_TMP, sizeof (struct zebra_message_queue));
-  queue->buf = XMALLOC (MTYPE_TMP, length);
-  memcpy (queue->buf, buf, length);
-  queue->length = length;
-  queue->written = written;
-
-  FIFO_ADD (&message_queue, queue);
-
-  THREAD_WRITE_ON (zebrad.master, t_write, zebra_server_dequeue, NULL, sock);
-}
-
-int
-zebra_server_send_message (int sock, u_char *buf, unsigned long length)
+static int
+zserv_flush_data(struct thread *thread)
 {
-  int nbytes;
+  struct zserv *client = THREAD_ARG(thread);
 
-  if (FIFO_TOP (&message_queue))
+  client->t_write = NULL;
+  if (client->t_suicide)
     {
-      zebra_server_enqueue (sock, buf, length, 0);
-      return 0;
+      zebra_client_close(client);
+      return -1;
     }
-
-  /* Send message.  */
-  nbytes = write (sock, buf, length);
-
-  if (nbytes <= 0)
+  switch (buffer_flush_available(client->wb, client->sock))
     {
-      if (errno == EAGAIN)
-        zebra_server_enqueue (sock, buf, length, 0);
-      else
-	return -1;
+    case BUFFER_ERROR:
+      zlog_warn("%s: buffer_flush_available failed on zserv client fd %d, "
+      		"closing", __func__, client->sock);
+      zebra_client_close(client);
+      break;
+    case BUFFER_PENDING:
+      client->t_write = thread_add_write(zebrad.master, zserv_flush_data,
+      					 client, client->sock);
+      break;
+    case BUFFER_EMPTY:
+      break;
     }
-  /* It's clear that nbytes is positive at this point. */
-  else if ((unsigned) nbytes != length)
-    zebra_server_enqueue (sock, buf, length, nbytes);
+  return 0;
+}
 
+static int
+zebra_server_send_message(struct zserv *client)
+{
+  if (client->t_suicide)
+    return -1;
+  switch (buffer_write(client->wb, client->sock, STREAM_DATA(client->obuf),
+		       stream_get_endp(client->obuf)))
+    {
+    case BUFFER_ERROR:
+      zlog_warn("%s: buffer_write failed to zserv client fd %d, closing",
+      		 __func__, client->sock);
+      /* Schedule a delayed close since many of the functions that call this
+         one do not check the return code.  They do not allow for the
+	 possibility that an I/O error may have caused the client to be
+	 deleted. */
+      client->t_suicide = thread_add_event(zebrad.master, zserv_delayed_close,
+					   client, 0);
+      return -1;
+      break;
+    case BUFFER_EMPTY:
+      THREAD_OFF(client->t_write);
+      break;
+    case BUFFER_PENDING:
+      THREAD_WRITE_ON(zebrad.master, client->t_write,
+		      zserv_flush_data, client, client->sock);
+      break;
+    }
   return 0;
 }
 
@@ -194,7 +168,7 @@ zsend_interface_add (struct zserv *client, struct interface *ifp)
 
   /* Check this client need interface information. */
   if (! client->ifinfo)
-    return -1;
+    return 0;
 
   s = client->obuf;
   stream_reset (s);
@@ -225,9 +199,7 @@ zsend_interface_add (struct zserv *client, struct interface *ifp)
   /* Write packet size. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 /* Interface deletion from zebra daemon. */
@@ -244,7 +216,7 @@ zsend_interface_delete (struct zserv *client, struct interface *ifp)
 
   /* Check this client need interface information. */
   if (! client->ifinfo)
-    return -1;
+    return 0;
 
   s = client->obuf;
   stream_reset (s);
@@ -266,9 +238,7 @@ zsend_interface_delete (struct zserv *client, struct interface *ifp)
   /* Write packet length. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message (client);
 }
 #endif /* (defined(RTM_IFANNOUNCE) || defined(HAVE_LINUX_RTNETLINK_H)) */
 
@@ -320,7 +290,7 @@ zsend_interface_address (int cmd, struct zserv *client,
 
   /* Check this client need interface information. */
   if (! client->ifinfo)
-    return -1;
+    return 0;
 
   s = client->obuf;
   stream_reset (s);
@@ -357,9 +327,7 @@ zsend_interface_address (int cmd, struct zserv *client,
   /* Write packet size. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 /*
@@ -379,7 +347,7 @@ zsend_interface_update (int cmd, struct zserv *client, struct interface *ifp)
 
   /* Check this client need interface information. */
   if (! client->ifinfo)
-    return -1;
+    return 0;
 
   s = client->obuf;
   stream_reset (s);
@@ -403,9 +371,7 @@ zsend_interface_update (int cmd, struct zserv *client, struct interface *ifp)
   /* Write packet size. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 /*
@@ -531,13 +497,11 @@ zsend_route_multipath (int cmd, struct zserv *client, struct prefix *p,
   /* Write packet size. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 #ifdef HAVE_IPV6
-int
+static int
 zsend_ipv6_nexthop_lookup (struct zserv *client, struct in6_addr *addr)
 {
   struct stream *s;
@@ -598,9 +562,7 @@ zsend_ipv6_nexthop_lookup (struct zserv *client, struct in6_addr *addr)
 
   stream_putw_at (s, 0, stream_get_endp (s));
   
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 #endif /* HAVE_IPV6 */
 
@@ -660,9 +622,7 @@ zsend_ipv4_nexthop_lookup (struct zserv *client, struct in_addr addr)
 
   stream_putw_at (s, 0, stream_get_endp (s));
   
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 static int
@@ -721,9 +681,7 @@ zsend_ipv4_import_lookup (struct zserv *client, struct prefix_ipv4 *p)
 
   stream_putw_at (s, 0, stream_get_endp (s));
   
-  zebra_server_send_message (client->sock, s->data, stream_get_endp (s));
-
-  return 0;
+  return zebra_server_send_message(client);
 }
 
 /* Router-id is updated. Send ZEBRA_ROUTER_ID_ADD to client. */
@@ -735,7 +693,7 @@ zsend_router_id_update (struct zserv *client, struct prefix *p)
 
   /* Check this client need interface information. */
   if (!client->ridinfo)
-    return -1;
+    return 0;
 
   s = client->obuf;
   stream_reset (s);
@@ -755,12 +713,12 @@ zsend_router_id_update (struct zserv *client, struct prefix *p)
   /* Write packet size. */
   stream_putw_at (s, 0, stream_get_endp (s));
 
-  return writen (client->sock, s->data, stream_get_endp (s));
+  return zebra_server_send_message(client);
 }
 
 /* Register zebra server interface information.  Send current all
    interface and address information. */
-static void
+static int
 zread_interface_add (struct zserv *client, u_short length)
 {
   struct listnode *ifnode;
@@ -779,23 +737,27 @@ zread_interface_add (struct zserv *client, u_short length)
       if (! CHECK_FLAG (ifp->status, ZEBRA_INTERFACE_ACTIVE))
 	continue;
 
-      zsend_interface_add (client, ifp);
+      if (zsend_interface_add (client, ifp) < 0)
+        return -1;
 
       for (cnode = listhead (ifp->connected); cnode; nextnode (cnode))
 	{
 	  c = getdata (cnode);
-	  if (CHECK_FLAG (c->conf, ZEBRA_IFC_REAL))
-	    zsend_interface_address (ZEBRA_INTERFACE_ADDRESS_ADD, client, 
-				     ifp, c);
+	  if (CHECK_FLAG (c->conf, ZEBRA_IFC_REAL) &&
+	      (zsend_interface_address (ZEBRA_INTERFACE_ADDRESS_ADD, client, 
+				        ifp, c) < 0))
+	    return -1;
 	}
     }
+  return 0;
 }
 
 /* Unregister zebra server interface information. */
-static void
+static int
 zread_interface_delete (struct zserv *client, u_short length)
 {
   client->ifinfo = 0;
+  return 0;
 }
 
 /* This function support multiple nexthop. */
@@ -803,7 +765,7 @@ zread_interface_delete (struct zserv *client, u_short length)
  * Parse the ZEBRA_IPV4_ROUTE_ADD sent from client. Update rib and
  * add kernel route. 
  */
-static void
+static int
 zread_ipv4_add (struct zserv *client, u_short length)
 {
   int i;
@@ -878,10 +840,11 @@ zread_ipv4_add (struct zserv *client, u_short length)
     rib->metric = stream_getl (s);
     
   rib_add_ipv4_multipath (&p, rib);
+  return 0;
 }
 
 /* Zebra server IPv4 prefix delete function. */
-static void
+static int
 zread_ipv4_delete (struct zserv *client, u_short length)
 {
   int i;
@@ -951,20 +914,21 @@ zread_ipv4_delete (struct zserv *client, u_short length)
     
   rib_delete_ipv4 (api.type, api.flags, &p, &nexthop, ifindex,
 		   client->rtm_table);
+  return 0;
 }
 
 /* Nexthop lookup for IPv4. */
-static void
+static int
 zread_ipv4_nexthop_lookup (struct zserv *client, u_short length)
 {
   struct in_addr addr;
 
   addr.s_addr = stream_get_ipv4 (client->ibuf);
-  zsend_ipv4_nexthop_lookup (client, addr);
+  return zsend_ipv4_nexthop_lookup (client, addr);
 }
 
 /* Nexthop lookup for IPv4. */
-static void
+static int
 zread_ipv4_import_lookup (struct zserv *client, u_short length)
 {
   struct prefix_ipv4 p;
@@ -973,12 +937,12 @@ zread_ipv4_import_lookup (struct zserv *client, u_short length)
   p.prefixlen = stream_getc (client->ibuf);
   p.prefix.s_addr = stream_get_ipv4 (client->ibuf);
 
-  zsend_ipv4_import_lookup (client, &p);
+  return zsend_ipv4_import_lookup (client, &p);
 }
 
 #ifdef HAVE_IPV6
 /* Zebra server IPv6 prefix add function. */
-static void
+static int
 zread_ipv6_add (struct zserv *client, u_short length)
 {
   int i;
@@ -1039,10 +1003,11 @@ zread_ipv6_add (struct zserv *client, u_short length)
     rib_add_ipv6 (api.type, api.flags, &p, NULL, ifindex, 0);
   else
     rib_add_ipv6 (api.type, api.flags, &p, &nexthop, ifindex, 0);
+  return 0;
 }
 
 /* Zebra server IPv6 prefix delete function. */
-static void
+static int
 zread_ipv6_delete (struct zserv *client, u_short length)
 {
   int i;
@@ -1102,46 +1067,10 @@ zread_ipv6_delete (struct zserv *client, u_short length)
     rib_delete_ipv6 (api.type, api.flags, &p, NULL, ifindex, 0);
   else
     rib_delete_ipv6 (api.type, api.flags, &p, &nexthop, ifindex, 0);
+  return 0;
 }
 
-void
-zebra_read_ipv6 (int command, struct zserv *client, u_short length)
-{
-  u_char type;
-  u_char flags;
-  struct in6_addr nexthop, *gate;
-  unsigned int ifindex;
-
-  type = stream_getc (client->ibuf);
-  flags = stream_getc (client->ibuf);
-  stream_get (&nexthop, client->ibuf, sizeof (struct in6_addr));
-  
-  while (STREAM_READABLE (client->ibuf))
-    {
-      int size;
-      struct prefix_ipv6 p;
-      
-      ifindex = stream_getl (client->ibuf);
-
-      memset (&p, 0, sizeof (struct prefix_ipv6));
-      p.family = AF_INET6;
-      p.prefixlen = stream_getc (client->ibuf);
-      size = PSIZE(p.prefixlen);
-      stream_get (&p.prefix, client->ibuf, size);
-
-      if (IN6_IS_ADDR_UNSPECIFIED (&nexthop))
-        gate = NULL;
-      else
-        gate = &nexthop;
-
-      if (command == ZEBRA_IPV6_ROUTE_ADD)
-        rib_add_ipv6 (type, flags, &p, gate, ifindex, 0);
-      else
-        rib_delete_ipv6 (type, flags, &p, gate, ifindex, 0);
-    }
-}
-
-static void
+static int
 zread_ipv6_nexthop_lookup (struct zserv *client, u_short length)
 {
   struct in6_addr addr;
@@ -1150,12 +1079,12 @@ zread_ipv6_nexthop_lookup (struct zserv *client, u_short length)
   stream_get (&addr, client->ibuf, 16);
   printf ("DEBUG %s\n", inet_ntop (AF_INET6, &addr, buf, BUFSIZ));
 
-  zsend_ipv6_nexthop_lookup (client, &addr);
+  return zsend_ipv6_nexthop_lookup (client, &addr);
 }
 #endif /* HAVE_IPV6 */
 
 /* Register zebra server router-id information.  Send current router-id */
-void
+static int
 zread_router_id_add (struct zserv *client, u_short length)
 {
   struct prefix p;
@@ -1165,14 +1094,15 @@ zread_router_id_add (struct zserv *client, u_short length)
 
   router_id_get (&p);
 
-  zsend_router_id_update (client,&p);
+  return zsend_router_id_update (client,&p);
 }
 
 /* Unregister zebra server router-id information. */
-void
+static int
 zread_router_id_delete (struct zserv *client, u_short length)
 {
   client->ridinfo = 0;
+  return 0;
 }
 
 /* Close zebra client. */
@@ -1191,12 +1121,16 @@ zebra_client_close (struct zserv *client)
     stream_free (client->ibuf);
   if (client->obuf)
     stream_free (client->obuf);
+  if (client->wb)
+    buffer_free(client->wb);
 
   /* Release threads. */
   if (client->t_read)
     thread_cancel (client->t_read);
   if (client->t_write)
     thread_cancel (client->t_write);
+  if (client->t_suicide)
+    thread_cancel (client->t_suicide);
 
   /* Free client structure. */
   listnode_delete (zebrad.client_list, client);
@@ -1215,6 +1149,7 @@ zebra_client_create (int sock)
   client->sock = sock;
   client->ibuf = stream_new (ZEBRA_MAX_PACKET_SIZ);
   client->obuf = stream_new (ZEBRA_MAX_PACKET_SIZ);
+  client->wb = buffer_new(0);
 
   /* Set table number. */
   client->rtm_table = zebrad.rtm_table_default;
@@ -1241,41 +1176,68 @@ zebra_client_read (struct thread *thread)
   client = THREAD_ARG (thread);
   client->t_read = NULL;
 
-  /* Read length and command. */
-  nbyte = stream_read (client->ibuf, sock, 3);
-  if (nbyte <= 0) 
+  if (client->t_suicide)
     {
-      if (IS_ZEBRA_DEBUG_EVENT)
-	zlog_debug ("connection closed socket [%d]", sock);
-      zebra_client_close (client);
+      zebra_client_close(client);
       return -1;
     }
+
+  /* Read length and command (if we don't have it already). */
+  if (stream_get_endp(client->ibuf) < ZEBRA_HEADER_SIZE)
+    {
+      if (((nbyte = stream_read_try (client->ibuf, sock,
+				     ZEBRA_HEADER_SIZE)) == 0) ||
+	  (nbyte == -1))
+	{
+	  if (IS_ZEBRA_DEBUG_EVENT)
+	    zlog_debug ("connection closed socket [%d]", sock);
+	  zebra_client_close (client);
+	  return -1;
+	}
+      if (stream_get_endp(client->ibuf) < ZEBRA_HEADER_SIZE)
+	{
+	  /* Try again later. */
+	  zebra_event (ZEBRA_READ, sock, client);
+	  return 0;
+	}
+    }
+
+  /* Reset to read from the beginning of the incoming packet. */
+  stream_set_getp(client->ibuf, 0);
+
   length = stream_getw (client->ibuf);
   command = stream_getc (client->ibuf);
 
-  if (length < 3) 
+  if (length < ZEBRA_HEADER_SIZE) 
     {
       if (IS_ZEBRA_DEBUG_EVENT)
-	zlog_debug ("length %d is less than 3 ", length);
+	zlog_debug ("length %d is less than %d ", length, ZEBRA_HEADER_SIZE);
       zebra_client_close (client);
       return -1;
     }
 
-  length -= 3;
-
   /* Read rest of data. */
-  if (length)
+  if (stream_get_endp(client->ibuf) < length)
     {
-      nbyte = stream_read (client->ibuf, sock, length);
-      if (nbyte <= 0) 
+      nbyte = stream_read_try (client->ibuf, sock,
+			       length-stream_get_endp(client->ibuf));
+      if ((nbyte == 0) || (nbyte == -1))
 	{
 	  if (IS_ZEBRA_DEBUG_EVENT)
 	    zlog_debug ("connection closed [%d] when reading zebra data", sock);
 	  zebra_client_close (client);
 	  return -1;
 	}
+      if (stream_get_endp(client->ibuf) < length)
+        {
+	  /* Try again later. */
+	  zebra_event (ZEBRA_READ, sock, client);
+	  return 0;
+	}
     }
 
+  length -= ZEBRA_HEADER_SIZE;
+
   /* Debug packet information. */
   if (IS_ZEBRA_DEBUG_EVENT)
     zlog_debug ("zebra message comes from socket [%d]", sock);
@@ -1340,9 +1302,15 @@ zebra_client_read (struct thread *thread)
       break;
     }
 
+  if (client->t_suicide)
+    {
+      /* No need to wait for thread callback, just kill immediately. */
+      zebra_client_close(client);
+      return -1;
+    }
+
   stream_reset (client->ibuf);
   zebra_event (ZEBRA_READ, sock, client);
-
   return 0;
 }
 
@@ -1351,7 +1319,6 @@ zebra_client_read (struct thread *thread)
 static int
 zebra_accept (struct thread *thread)
 {
-  int val;
   int accept_sock;
   int client_sock;
   struct sockaddr_in client;
@@ -1359,6 +1326,9 @@ zebra_accept (struct thread *thread)
 
   accept_sock = THREAD_FD (thread);
 
+  /* Reregister myself. */
+  zebra_event (ZEBRA_SERV, accept_sock, NULL);
+
   len = sizeof (struct sockaddr_in);
   client_sock = accept (accept_sock, (struct sockaddr *) &client, &len);
 
@@ -1369,20 +1339,11 @@ zebra_accept (struct thread *thread)
     }
 
   /* Make client socket non-blocking.  */
-  /* XXX: We dont requeue failed writes, so this leads to inconsistencies.
-   * for now socket must remain blocking, regardless of risk of deadlocks.
-   */
-  /*
-  val = fcntl (client_sock, F_GETFL, 0);
-  fcntl (client_sock, F_SETFL, (val | O_NONBLOCK));
-  */
+  set_nonblocking(client_sock);
   
   /* Create new zebra client. */
   zebra_client_create (client_sock);
 
-  /* Register myself. */
-  zebra_event (ZEBRA_SERV, accept_sock, NULL);
-
   return 0;
 }
 
@@ -1718,7 +1679,7 @@ DEFUN (no_ipv6_forwarding,
 #endif /* HAVE_IPV6 */
 
 /* IPForwarding configuration write function. */
-int
+static int
 config_write_forwarding (struct vty *vty)
 {
   /* FIXME: Find better place for that. */
@@ -1779,7 +1740,4 @@ zebra_init ()
   install_element (CONFIG_NODE, &ipv6_forwarding_cmd);
   install_element (CONFIG_NODE, &no_ipv6_forwarding_cmd);
 #endif /* HAVE_IPV6 */
-
-  FIFO_INIT(&message_queue);
-  t_write = NULL;
 }

+ 6 - 0
zebra/zserv.h

@@ -41,10 +41,16 @@ struct zserv
   struct stream *ibuf;
   struct stream *obuf;
 
+  /* Buffer of data waiting to be written to client. */
+  struct buffer *wb;
+
   /* Threads for read/write. */
   struct thread *t_read;
   struct thread *t_write;
 
+  /* Thread for delayed close. */
+  struct thread *t_suicide;
+
   /* default routing table this client munges */
   int rtm_table;