zebra_fpm.c 32 KB


  1. /*
  2. * Main implementation file for interface to Forwarding Plane Manager.
  3. *
  4. * Copyright (C) 2012 by Open Source Routing.
  5. * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
  6. *
  7. * This file is part of GNU Zebra.
  8. *
  9. * GNU Zebra is free software; you can redistribute it and/or modify it
  10. * under the terms of the GNU General Public License as published by the
  11. * Free Software Foundation; either version 2, or (at your option) any
  12. * later version.
  13. *
  14. * GNU Zebra is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17. * General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU General Public License
  20. * along with GNU Zebra; see the file COPYING. If not, write to the Free
  21. * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
  22. * 02111-1307, USA.
  23. */
  24. #include <zebra.h>
  25. #include "log.h"
  26. #include "stream.h"
  27. #include "thread.h"
  28. #include "network.h"
  29. #include "command.h"
  30. #include "zebra/rib.h"
  31. #include "fpm/fpm.h"
  32. #include "zebra_fpm.h"
  33. #include "zebra_fpm_private.h"
  34. /*
  35. * Interval at which we attempt to connect to the FPM.
  36. */
  37. #define ZFPM_CONNECT_RETRY_IVL 5
  38. /*
  39. * Sizes of outgoing and incoming stream buffers for writing/reading
  40. * FPM messages.
  41. */
  42. #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
  43. #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
  44. /*
  45. * The maximum number of times the FPM socket write callback can call
  46. * 'write' before it yields.
  47. */
  48. #define ZFPM_MAX_WRITES_PER_RUN 10
  49. /*
  50. * Interval over which we collect statistics.
  51. */
  52. #define ZFPM_STATS_IVL_SECS 10
  53. /*
  54. * Structure that holds state for iterating over all route_node
  55. * structures that are candidates for being communicated to the FPM.
  56. */
  57. typedef struct zfpm_rnodes_iter_t_
  58. {
  59. rib_tables_iter_t tables_iter;
  60. route_table_iter_t iter;
  61. } zfpm_rnodes_iter_t;
  62. /*
  63. * Statistics.
  64. */
  65. typedef struct zfpm_stats_t_ {
  66. unsigned long connect_calls;
  67. unsigned long connect_no_sock;
  68. unsigned long read_cb_calls;
  69. unsigned long write_cb_calls;
  70. unsigned long write_calls;
  71. unsigned long partial_writes;
  72. unsigned long max_writes_hit;
  73. unsigned long t_write_yields;
  74. unsigned long nop_deletes_skipped;
  75. unsigned long route_adds;
  76. unsigned long route_dels;
  77. unsigned long updates_triggered;
  78. unsigned long redundant_triggers;
  79. unsigned long non_fpm_table_triggers;
  80. unsigned long dests_del_after_update;
  81. unsigned long t_conn_down_starts;
  82. unsigned long t_conn_down_dests_processed;
  83. unsigned long t_conn_down_yields;
  84. unsigned long t_conn_down_finishes;
  85. unsigned long t_conn_up_starts;
  86. unsigned long t_conn_up_dests_processed;
  87. unsigned long t_conn_up_yields;
  88. unsigned long t_conn_up_aborts;
  89. unsigned long t_conn_up_finishes;
  90. } zfpm_stats_t;
  91. /*
  92. * States for the FPM state machine.
  93. */
  94. typedef enum {
  95. /*
  96. * In this state we are not yet ready to connect to the FPM. This
  97. * can happen when this module is disabled, or if we're cleaning up
  98. * after a connection has gone down.
  99. */
  100. ZFPM_STATE_IDLE,
  101. /*
  102. * Ready to talk to the FPM and periodically trying to connect to
  103. * it.
  104. */
  105. ZFPM_STATE_ACTIVE,
  106. /*
  107. * In the middle of bringing up a TCP connection. Specifically,
  108. * waiting for a connect() call to complete asynchronously.
  109. */
  110. ZFPM_STATE_CONNECTING,
  111. /*
  112. * TCP connection to the FPM is up.
  113. */
  114. ZFPM_STATE_ESTABLISHED
  115. } zfpm_state_t;
  116. /*
  117. * Globals.
  118. */
  119. typedef struct zfpm_glob_t_
  120. {
  121. /*
  122. * True if the FPM module has been enabled.
  123. */
  124. int enabled;
  125. struct thread_master *master;
  126. zfpm_state_t state;
  127. /*
  128. * Port on which the FPM is running.
  129. */
  130. int fpm_port;
  131. /*
  132. * List of rib_dest_t structures to be processed
  133. */
  134. TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
  135. /*
  136. * Stream socket to the FPM.
  137. */
  138. int sock;
  139. /*
  140. * Buffers for messages to/from the FPM.
  141. */
  142. struct stream *obuf;
  143. struct stream *ibuf;
  144. /*
  145. * Threads for I/O.
  146. */
  147. struct thread *t_connect;
  148. struct thread *t_write;
  149. struct thread *t_read;
  150. /*
  151. * Thread to clean up after the TCP connection to the FPM goes down
  152. * and the state that belongs to it.
  153. */
  154. struct thread *t_conn_down;
  155. struct {
  156. zfpm_rnodes_iter_t iter;
  157. } t_conn_down_state;
  158. /*
  159. * Thread to take actions once the TCP conn to the FPM comes up, and
  160. * the state that belongs to it.
  161. */
  162. struct thread *t_conn_up;
  163. struct {
  164. zfpm_rnodes_iter_t iter;
  165. } t_conn_up_state;
  166. unsigned long connect_calls;
  167. time_t last_connect_call_time;
  168. /*
  169. * Stats from the start of the current statistics interval up to
  170. * now. These are the counters we typically update in the code.
  171. */
  172. zfpm_stats_t stats;
  173. /*
  174. * Statistics that were gathered in the last collection interval.
  175. */
  176. zfpm_stats_t last_ivl_stats;
  177. /*
  178. * Cumulative stats from the last clear to the start of the current
  179. * statistics interval.
  180. */
  181. zfpm_stats_t cumulative_stats;
  182. /*
  183. * Stats interval timer.
  184. */
  185. struct thread *t_stats;
  186. /*
  187. * If non-zero, the last time when statistics were cleared.
  188. */
  189. time_t last_stats_clear_time;
  190. } zfpm_glob_t;
  191. static zfpm_glob_t zfpm_glob_space;
  192. static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
  193. static int zfpm_read_cb (struct thread *thread);
  194. static int zfpm_write_cb (struct thread *thread);
  195. static void zfpm_set_state (zfpm_state_t state, const char *reason);
  196. static void zfpm_start_connect_timer (const char *reason);
  197. static void zfpm_start_stats_timer (void);
  198. /*
  199. * zfpm_thread_should_yield
  200. */
  201. static inline int
  202. zfpm_thread_should_yield (struct thread *t)
  203. {
  204. return thread_should_yield (t);
  205. }
  206. /*
  207. * zfpm_state_to_str
  208. */
  209. static const char *
  210. zfpm_state_to_str (zfpm_state_t state)
  211. {
  212. switch (state)
  213. {
  214. case ZFPM_STATE_IDLE:
  215. return "idle";
  216. case ZFPM_STATE_ACTIVE:
  217. return "active";
  218. case ZFPM_STATE_CONNECTING:
  219. return "connecting";
  220. case ZFPM_STATE_ESTABLISHED:
  221. return "established";
  222. default:
  223. return "unknown";
  224. }
  225. }
  226. /*
  227. * zfpm_get_time
  228. */
  229. static time_t
  230. zfpm_get_time (void)
  231. {
  232. struct timeval tv;
  233. if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
  234. zlog_warn ("FPM: quagga_gettime failed!!");
  235. return tv.tv_sec;
  236. }
  237. /*
  238. * zfpm_get_elapsed_time
  239. *
  240. * Returns the time elapsed (in seconds) since the given time.
  241. */
  242. static time_t
  243. zfpm_get_elapsed_time (time_t reference)
  244. {
  245. time_t now;
  246. now = zfpm_get_time ();
  247. if (now < reference)
  248. {
  249. assert (0);
  250. return 0;
  251. }
  252. return now - reference;
  253. }
  254. /*
  255. * zfpm_is_table_for_fpm
  256. *
  257. * Returns TRUE if the the given table is to be communicated to the
  258. * FPM.
  259. */
  260. static inline int
  261. zfpm_is_table_for_fpm (struct route_table *table)
  262. {
  263. rib_table_info_t *info;
  264. info = rib_table_info (table);
  265. /*
  266. * We only send the unicast tables in the main instance to the FPM
  267. * at this point.
  268. */
  269. if (info->zvrf->vrf_id != 0)
  270. return 0;
  271. if (info->safi != SAFI_UNICAST)
  272. return 0;
  273. return 1;
  274. }
  275. /*
  276. * zfpm_rnodes_iter_init
  277. */
  278. static inline void
  279. zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
  280. {
  281. memset (iter, 0, sizeof (*iter));
  282. rib_tables_iter_init (&iter->tables_iter);
  283. /*
  284. * This is a hack, but it makes implementing 'next' easier by
  285. * ensuring that route_table_iter_next() will return NULL the first
  286. * time we call it.
  287. */
  288. route_table_iter_init (&iter->iter, NULL);
  289. route_table_iter_cleanup (&iter->iter);
  290. }
  291. /*
  292. * zfpm_rnodes_iter_next
  293. */
  294. static inline struct route_node *
  295. zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
  296. {
  297. struct route_node *rn;
  298. struct route_table *table;
  299. while (1)
  300. {
  301. rn = route_table_iter_next (&iter->iter);
  302. if (rn)
  303. return rn;
  304. /*
  305. * We've made our way through this table, go to the next one.
  306. */
  307. route_table_iter_cleanup (&iter->iter);
  308. while ((table = rib_tables_iter_next (&iter->tables_iter)))
  309. {
  310. if (zfpm_is_table_for_fpm (table))
  311. break;
  312. }
  313. if (!table)
  314. return NULL;
  315. route_table_iter_init (&iter->iter, table);
  316. }
  317. return NULL;
  318. }
  319. /*
  320. * zfpm_rnodes_iter_pause
  321. */
  322. static inline void
  323. zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
  324. {
  325. route_table_iter_pause (&iter->iter);
  326. }
  327. /*
  328. * zfpm_rnodes_iter_cleanup
  329. */
  330. static inline void
  331. zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
  332. {
  333. route_table_iter_cleanup (&iter->iter);
  334. rib_tables_iter_cleanup (&iter->tables_iter);
  335. }
  336. /*
  337. * zfpm_stats_init
  338. *
  339. * Initialize a statistics block.
  340. */
  341. static inline void
  342. zfpm_stats_init (zfpm_stats_t *stats)
  343. {
  344. memset (stats, 0, sizeof (*stats));
  345. }
  346. /*
  347. * zfpm_stats_reset
  348. */
  349. static inline void
  350. zfpm_stats_reset (zfpm_stats_t *stats)
  351. {
  352. zfpm_stats_init (stats);
  353. }
  354. /*
  355. * zfpm_stats_copy
  356. */
  357. static inline void
  358. zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
  359. {
  360. memcpy (dest, src, sizeof (*dest));
  361. }
  362. /*
  363. * zfpm_stats_compose
  364. *
  365. * Total up the statistics in two stats structures ('s1 and 's2') and
  366. * return the result in the third argument, 'result'. Note that the
  367. * pointer 'result' may be the same as 's1' or 's2'.
  368. *
  369. * For simplicity, the implementation below assumes that the stats
  370. * structure is composed entirely of counters. This can easily be
  371. * changed when necessary.
  372. */
  373. static void
  374. zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
  375. zfpm_stats_t *result)
  376. {
  377. const unsigned long *p1, *p2;
  378. unsigned long *result_p;
  379. int i, num_counters;
  380. p1 = (const unsigned long *) s1;
  381. p2 = (const unsigned long *) s2;
  382. result_p = (unsigned long *) result;
  383. num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
  384. for (i = 0; i < num_counters; i++)
  385. {
  386. result_p[i] = p1[i] + p2[i];
  387. }
  388. }
  389. /*
  390. * zfpm_read_on
  391. */
  392. static inline void
  393. zfpm_read_on (void)
  394. {
  395. assert (!zfpm_g->t_read);
  396. assert (zfpm_g->sock >= 0);
  397. THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
  398. zfpm_g->sock);
  399. }
  400. /*
  401. * zfpm_write_on
  402. */
  403. static inline void
  404. zfpm_write_on (void)
  405. {
  406. assert (!zfpm_g->t_write);
  407. assert (zfpm_g->sock >= 0);
  408. THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
  409. zfpm_g->sock);
  410. }
  411. /*
  412. * zfpm_read_off
  413. */
  414. static inline void
  415. zfpm_read_off (void)
  416. {
  417. THREAD_READ_OFF (zfpm_g->t_read);
  418. }
  419. /*
  420. * zfpm_write_off
  421. */
  422. static inline void
  423. zfpm_write_off (void)
  424. {
  425. THREAD_WRITE_OFF (zfpm_g->t_write);
  426. }
  427. /*
  428. * zfpm_conn_up_thread_cb
  429. *
  430. * Callback for actions to be taken when the connection to the FPM
  431. * comes up.
  432. */
  433. static int
  434. zfpm_conn_up_thread_cb (struct thread *thread)
  435. {
  436. struct route_node *rnode;
  437. zfpm_rnodes_iter_t *iter;
  438. rib_dest_t *dest;
  439. assert (zfpm_g->t_conn_up);
  440. zfpm_g->t_conn_up = NULL;
  441. iter = &zfpm_g->t_conn_up_state.iter;
  442. if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
  443. {
  444. zfpm_debug ("Connection not up anymore, conn_up thread aborting");
  445. zfpm_g->stats.t_conn_up_aborts++;
  446. goto done;
  447. }
  448. while ((rnode = zfpm_rnodes_iter_next (iter)))
  449. {
  450. dest = rib_dest_from_rnode (rnode);
  451. if (dest)
  452. {
  453. zfpm_g->stats.t_conn_up_dests_processed++;
  454. zfpm_trigger_update (rnode, NULL);
  455. }
  456. /*
  457. * Yield if need be.
  458. */
  459. if (!zfpm_thread_should_yield (thread))
  460. continue;
  461. zfpm_g->stats.t_conn_up_yields++;
  462. zfpm_rnodes_iter_pause (iter);
  463. zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
  464. zfpm_conn_up_thread_cb,
  465. 0, 0);
  466. return 0;
  467. }
  468. zfpm_g->stats.t_conn_up_finishes++;
  469. done:
  470. zfpm_rnodes_iter_cleanup (iter);
  471. return 0;
  472. }
  473. /*
  474. * zfpm_connection_up
  475. *
  476. * Called when the connection to the FPM comes up.
  477. */
  478. static void
  479. zfpm_connection_up (const char *detail)
  480. {
  481. assert (zfpm_g->sock >= 0);
  482. zfpm_read_on ();
  483. zfpm_write_on ();
  484. zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
  485. /*
  486. * Start thread to push existing routes to the FPM.
  487. */
  488. assert (!zfpm_g->t_conn_up);
  489. zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
  490. zfpm_debug ("Starting conn_up thread");
  491. zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
  492. zfpm_conn_up_thread_cb, 0, 0);
  493. zfpm_g->stats.t_conn_up_starts++;
  494. }
  495. /*
  496. * zfpm_connect_check
  497. *
  498. * Check if an asynchronous connect() to the FPM is complete.
  499. */
  500. static void
  501. zfpm_connect_check ()
  502. {
  503. int status;
  504. socklen_t slen;
  505. int ret;
  506. zfpm_read_off ();
  507. zfpm_write_off ();
  508. slen = sizeof (status);
  509. ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
  510. &slen);
  511. if (ret >= 0 && status == 0)
  512. {
  513. zfpm_connection_up ("async connect complete");
  514. return;
  515. }
  516. /*
  517. * getsockopt() failed or indicated an error on the socket.
  518. */
  519. close (zfpm_g->sock);
  520. zfpm_g->sock = -1;
  521. zfpm_start_connect_timer ("getsockopt() after async connect failed");
  522. return;
  523. }
  524. /*
  525. * zfpm_conn_down_thread_cb
  526. *
  527. * Callback that is invoked to clean up state after the TCP connection
  528. * to the FPM goes down.
  529. */
  530. static int
  531. zfpm_conn_down_thread_cb (struct thread *thread)
  532. {
  533. struct route_node *rnode;
  534. zfpm_rnodes_iter_t *iter;
  535. rib_dest_t *dest;
  536. assert (zfpm_g->state == ZFPM_STATE_IDLE);
  537. assert (zfpm_g->t_conn_down);
  538. zfpm_g->t_conn_down = NULL;
  539. iter = &zfpm_g->t_conn_down_state.iter;
  540. while ((rnode = zfpm_rnodes_iter_next (iter)))
  541. {
  542. dest = rib_dest_from_rnode (rnode);
  543. if (dest)
  544. {
  545. if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
  546. {
  547. TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
  548. }
  549. UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
  550. UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
  551. zfpm_g->stats.t_conn_down_dests_processed++;
  552. /*
  553. * Check if the dest should be deleted.
  554. */
  555. rib_gc_dest(rnode);
  556. }
  557. /*
  558. * Yield if need be.
  559. */
  560. if (!zfpm_thread_should_yield (thread))
  561. continue;
  562. zfpm_g->stats.t_conn_down_yields++;
  563. zfpm_rnodes_iter_pause (iter);
  564. zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
  565. zfpm_conn_down_thread_cb,
  566. 0, 0);
  567. return 0;
  568. }
  569. zfpm_g->stats.t_conn_down_finishes++;
  570. zfpm_rnodes_iter_cleanup (iter);
  571. /*
  572. * Start the process of connecting to the FPM again.
  573. */
  574. zfpm_start_connect_timer ("cleanup complete");
  575. return 0;
  576. }
  577. /*
  578. * zfpm_connection_down
  579. *
  580. * Called when the connection to the FPM has gone down.
  581. */
  582. static void
  583. zfpm_connection_down (const char *detail)
  584. {
  585. if (!detail)
  586. detail = "unknown";
  587. assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
  588. zlog_info ("connection to the FPM has gone down: %s", detail);
  589. zfpm_read_off ();
  590. zfpm_write_off ();
  591. stream_reset (zfpm_g->ibuf);
  592. stream_reset (zfpm_g->obuf);
  593. if (zfpm_g->sock >= 0) {
  594. close (zfpm_g->sock);
  595. zfpm_g->sock = -1;
  596. }
  597. /*
  598. * Start thread to clean up state after the connection goes down.
  599. */
  600. assert (!zfpm_g->t_conn_down);
  601. zfpm_debug ("Starting conn_down thread");
  602. zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
  603. zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
  604. zfpm_conn_down_thread_cb, 0, 0);
  605. zfpm_g->stats.t_conn_down_starts++;
  606. zfpm_set_state (ZFPM_STATE_IDLE, detail);
  607. }
  608. /*
  609. * zfpm_read_cb
  610. */
  611. static int
  612. zfpm_read_cb (struct thread *thread)
  613. {
  614. size_t already;
  615. struct stream *ibuf;
  616. uint16_t msg_len;
  617. fpm_msg_hdr_t *hdr;
  618. zfpm_g->stats.read_cb_calls++;
  619. assert (zfpm_g->t_read);
  620. zfpm_g->t_read = NULL;
  621. /*
  622. * Check if async connect is now done.
  623. */
  624. if (zfpm_g->state == ZFPM_STATE_CONNECTING)
  625. {
  626. zfpm_connect_check();
  627. return 0;
  628. }
  629. assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
  630. assert (zfpm_g->sock >= 0);
  631. ibuf = zfpm_g->ibuf;
  632. already = stream_get_endp (ibuf);
  633. if (already < FPM_MSG_HDR_LEN)
  634. {
  635. ssize_t nbyte;
  636. nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
  637. if (nbyte == 0 || nbyte == -1)
  638. {
  639. zfpm_connection_down ("closed socket in read");
  640. return 0;
  641. }
  642. if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
  643. goto done;
  644. already = FPM_MSG_HDR_LEN;
  645. }
  646. stream_set_getp (ibuf, 0);
  647. hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
  648. if (!fpm_msg_hdr_ok (hdr))
  649. {
  650. zfpm_connection_down ("invalid message header");
  651. return 0;
  652. }
  653. msg_len = fpm_msg_len (hdr);
  654. /*
  655. * Read out the rest of the packet.
  656. */
  657. if (already < msg_len)
  658. {
  659. ssize_t nbyte;
  660. nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
  661. if (nbyte == 0 || nbyte == -1)
  662. {
  663. zfpm_connection_down ("failed to read message");
  664. return 0;
  665. }
  666. if (nbyte != (ssize_t) (msg_len - already))
  667. goto done;
  668. }
  669. zfpm_debug ("Read out a full fpm message");
  670. /*
  671. * Just throw it away for now.
  672. */
  673. stream_reset (ibuf);
  674. done:
  675. zfpm_read_on ();
  676. return 0;
  677. }
  678. /*
  679. * zfpm_writes_pending
  680. *
  681. * Returns TRUE if we may have something to write to the FPM.
  682. */
  683. static int
  684. zfpm_writes_pending (void)
  685. {
  686. /*
  687. * Check if there is any data in the outbound buffer that has not
  688. * been written to the socket yet.
  689. */
  690. if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
  691. return 1;
  692. /*
  693. * Check if there are any prefixes on the outbound queue.
  694. */
  695. if (!TAILQ_EMPTY (&zfpm_g->dest_q))
  696. return 1;
  697. return 0;
  698. }
  699. /*
  700. * zfpm_encode_route
  701. *
  702. * Encode a message to the FPM with information about the given route.
  703. *
  704. * Returns the number of bytes written to the buffer. 0 or a negative
  705. * value indicates an error.
  706. */
  707. static inline int
  708. zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
  709. size_t in_buf_len)
  710. {
  711. #ifndef HAVE_NETLINK
  712. return 0;
  713. #else
  714. int cmd;
  715. cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
  716. return zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
  717. #endif /* HAVE_NETLINK */
  718. }
  719. /*
  720. * zfpm_route_for_update
  721. *
  722. * Returns the rib that is to be sent to the FPM for a given dest.
  723. */
  724. static struct rib *
  725. zfpm_route_for_update (rib_dest_t *dest)
  726. {
  727. struct rib *rib;
  728. RIB_DEST_FOREACH_ROUTE (dest, rib)
  729. {
  730. if (!CHECK_FLAG (rib->flags, ZEBRA_FLAG_SELECTED))
  731. continue;
  732. return rib;
  733. }
  734. /*
  735. * We have no route for this destination.
  736. */
  737. return NULL;
  738. }
  739. /*
  740. * zfpm_build_updates
  741. *
  742. * Process the outgoing queue and write messages to the outbound
  743. * buffer.
  744. */
  745. static void
  746. zfpm_build_updates (void)
  747. {
  748. struct stream *s;
  749. rib_dest_t *dest;
  750. unsigned char *buf, *data, *buf_end;
  751. size_t msg_len;
  752. size_t data_len;
  753. fpm_msg_hdr_t *hdr;
  754. struct rib *rib;
  755. int is_add, write_msg;
  756. s = zfpm_g->obuf;
  757. assert (stream_empty (s));
  758. do {
  759. /*
  760. * Make sure there is enough space to write another message.
  761. */
  762. if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
  763. break;
  764. buf = STREAM_DATA (s) + stream_get_endp (s);
  765. buf_end = buf + STREAM_WRITEABLE (s);
  766. dest = TAILQ_FIRST (&zfpm_g->dest_q);
  767. if (!dest)
  768. break;
  769. assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
  770. hdr = (fpm_msg_hdr_t *) buf;
  771. hdr->version = FPM_PROTO_VERSION;
  772. hdr->msg_type = FPM_MSG_TYPE_NETLINK;
  773. data = fpm_msg_data (hdr);
  774. rib = zfpm_route_for_update (dest);
  775. is_add = rib ? 1 : 0;
  776. write_msg = 1;
  777. /*
  778. * If this is a route deletion, and we have not sent the route to
  779. * the FPM previously, skip it.
  780. */
  781. if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
  782. {
  783. write_msg = 0;
  784. zfpm_g->stats.nop_deletes_skipped++;
  785. }
  786. if (write_msg) {
  787. data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data);
  788. assert (data_len);
  789. if (data_len)
  790. {
  791. msg_len = fpm_data_len_to_msg_len (data_len);
  792. hdr->msg_len = htons (msg_len);
  793. stream_forward_endp (s, msg_len);
  794. if (is_add)
  795. zfpm_g->stats.route_adds++;
  796. else
  797. zfpm_g->stats.route_dels++;
  798. }
  799. }
  800. /*
  801. * Remove the dest from the queue, and reset the flag.
  802. */
  803. UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
  804. TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
  805. if (is_add)
  806. {
  807. SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
  808. }
  809. else
  810. {
  811. UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
  812. }
  813. /*
  814. * Delete the destination if necessary.
  815. */
  816. if (rib_gc_dest (dest->rnode))
  817. zfpm_g->stats.dests_del_after_update++;
  818. } while (1);
  819. }
  820. /*
  821. * zfpm_write_cb
  822. */
  823. static int
  824. zfpm_write_cb (struct thread *thread)
  825. {
  826. struct stream *s;
  827. int num_writes;
  828. zfpm_g->stats.write_cb_calls++;
  829. assert (zfpm_g->t_write);
  830. zfpm_g->t_write = NULL;
  831. /*
  832. * Check if async connect is now done.
  833. */
  834. if (zfpm_g->state == ZFPM_STATE_CONNECTING)
  835. {
  836. zfpm_connect_check ();
  837. return 0;
  838. }
  839. assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
  840. assert (zfpm_g->sock >= 0);
  841. num_writes = 0;
  842. do
  843. {
  844. int bytes_to_write, bytes_written;
  845. s = zfpm_g->obuf;
  846. /*
  847. * If the stream is empty, try fill it up with data.
  848. */
  849. if (stream_empty (s))
  850. {
  851. zfpm_build_updates ();
  852. }
  853. bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
  854. if (!bytes_to_write)
  855. break;
  856. bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
  857. zfpm_g->stats.write_calls++;
  858. num_writes++;
  859. if (bytes_written < 0)
  860. {
  861. if (ERRNO_IO_RETRY (errno))
  862. break;
  863. zfpm_connection_down ("failed to write to socket");
  864. return 0;
  865. }
  866. if (bytes_written != bytes_to_write)
  867. {
  868. /*
  869. * Partial write.
  870. */
  871. stream_forward_getp (s, bytes_written);
  872. zfpm_g->stats.partial_writes++;
  873. break;
  874. }
  875. /*
  876. * We've written out the entire contents of the stream.
  877. */
  878. stream_reset (s);
  879. if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
  880. {
  881. zfpm_g->stats.max_writes_hit++;
  882. break;
  883. }
  884. if (zfpm_thread_should_yield (thread))
  885. {
  886. zfpm_g->stats.t_write_yields++;
  887. break;
  888. }
  889. } while (1);
  890. if (zfpm_writes_pending ())
  891. zfpm_write_on ();
  892. return 0;
  893. }
  894. /*
  895. * zfpm_connect_cb
  896. */
  897. static int
  898. zfpm_connect_cb (struct thread *t)
  899. {
  900. int sock, ret;
  901. struct sockaddr_in serv;
  902. assert (zfpm_g->t_connect);
  903. zfpm_g->t_connect = NULL;
  904. assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
  905. sock = socket (AF_INET, SOCK_STREAM, 0);
  906. if (sock < 0)
  907. {
  908. zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
  909. zfpm_g->stats.connect_no_sock++;
  910. return 0;
  911. }
  912. set_nonblocking(sock);
  913. /* Make server socket. */
  914. memset (&serv, 0, sizeof (serv));
  915. serv.sin_family = AF_INET;
  916. serv.sin_port = htons (zfpm_g->fpm_port);
  917. #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
  918. serv.sin_len = sizeof (struct sockaddr_in);
  919. #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
  920. serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
  921. /*
  922. * Connect to the FPM.
  923. */
  924. zfpm_g->connect_calls++;
  925. zfpm_g->stats.connect_calls++;
  926. zfpm_g->last_connect_call_time = zfpm_get_time ();
  927. ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
  928. if (ret >= 0)
  929. {
  930. zfpm_g->sock = sock;
  931. zfpm_connection_up ("connect succeeded");
  932. return 1;
  933. }
  934. if (errno == EINPROGRESS)
  935. {
  936. zfpm_g->sock = sock;
  937. zfpm_read_on ();
  938. zfpm_write_on ();
  939. zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
  940. return 0;
  941. }
  942. zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
  943. close (sock);
  944. /*
  945. * Restart timer for retrying connection.
  946. */
  947. zfpm_start_connect_timer ("connect() failed");
  948. return 0;
  949. }
  950. /*
  951. * zfpm_set_state
  952. *
  953. * Move state machine into the given state.
  954. */
  955. static void
  956. zfpm_set_state (zfpm_state_t state, const char *reason)
  957. {
  958. zfpm_state_t cur_state = zfpm_g->state;
  959. if (!reason)
  960. reason = "Unknown";
  961. if (state == cur_state)
  962. return;
  963. zfpm_debug("beginning state transition %s -> %s. Reason: %s",
  964. zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
  965. reason);
  966. switch (state) {
  967. case ZFPM_STATE_IDLE:
  968. assert (cur_state == ZFPM_STATE_ESTABLISHED);
  969. break;
  970. case ZFPM_STATE_ACTIVE:
  971. assert (cur_state == ZFPM_STATE_IDLE ||
  972. cur_state == ZFPM_STATE_CONNECTING);
  973. assert (zfpm_g->t_connect);
  974. break;
  975. case ZFPM_STATE_CONNECTING:
  976. assert (zfpm_g->sock);
  977. assert (cur_state == ZFPM_STATE_ACTIVE);
  978. assert (zfpm_g->t_read);
  979. assert (zfpm_g->t_write);
  980. break;
  981. case ZFPM_STATE_ESTABLISHED:
  982. assert (cur_state == ZFPM_STATE_ACTIVE ||
  983. cur_state == ZFPM_STATE_CONNECTING);
  984. assert (zfpm_g->sock);
  985. assert (zfpm_g->t_read);
  986. assert (zfpm_g->t_write);
  987. break;
  988. }
  989. zfpm_g->state = state;
  990. }
  991. /*
  992. * zfpm_calc_connect_delay
  993. *
  994. * Returns the number of seconds after which we should attempt to
  995. * reconnect to the FPM.
  996. */
  997. static long
  998. zfpm_calc_connect_delay (void)
  999. {
  1000. time_t elapsed;
  1001. /*
  1002. * Return 0 if this is our first attempt to connect.
  1003. */
  1004. if (zfpm_g->connect_calls == 0)
  1005. {
  1006. return 0;
  1007. }
  1008. elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
  1009. if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
  1010. return 0;
  1011. }
  1012. return ZFPM_CONNECT_RETRY_IVL - elapsed;
  1013. }
  1014. /*
  1015. * zfpm_start_connect_timer
  1016. */
  1017. static void
  1018. zfpm_start_connect_timer (const char *reason)
  1019. {
  1020. long delay_secs;
  1021. assert (!zfpm_g->t_connect);
  1022. assert (zfpm_g->sock < 0);
  1023. assert(zfpm_g->state == ZFPM_STATE_IDLE ||
  1024. zfpm_g->state == ZFPM_STATE_ACTIVE ||
  1025. zfpm_g->state == ZFPM_STATE_CONNECTING);
  1026. delay_secs = zfpm_calc_connect_delay();
  1027. zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
  1028. THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
  1029. delay_secs);
  1030. zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
  1031. }
  1032. /*
  1033. * zfpm_is_enabled
  1034. *
  1035. * Returns TRUE if the zebra FPM module has been enabled.
  1036. */
  1037. static inline int
  1038. zfpm_is_enabled (void)
  1039. {
  1040. return zfpm_g->enabled;
  1041. }
  1042. /*
  1043. * zfpm_conn_is_up
  1044. *
  1045. * Returns TRUE if the connection to the FPM is up.
  1046. */
  1047. static inline int
  1048. zfpm_conn_is_up (void)
  1049. {
  1050. if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
  1051. return 0;
  1052. assert (zfpm_g->sock >= 0);
  1053. return 1;
  1054. }
  1055. /*
  1056. * zfpm_trigger_update
  1057. *
  1058. * The zebra code invokes this function to indicate that we should
  1059. * send an update to the FPM about the given route_node.
  1060. */
  1061. void
  1062. zfpm_trigger_update (struct route_node *rn, const char *reason)
  1063. {
  1064. rib_dest_t *dest;
  1065. char buf[PREFIX_STRLEN];
  1066. /*
  1067. * Ignore if the connection is down. We will update the FPM about
  1068. * all destinations once the connection comes up.
  1069. */
  1070. if (!zfpm_conn_is_up ())
  1071. return;
  1072. dest = rib_dest_from_rnode (rn);
  1073. /*
  1074. * Ignore the trigger if the dest is not in a table that we would
  1075. * send to the FPM.
  1076. */
  1077. if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
  1078. {
  1079. zfpm_g->stats.non_fpm_table_triggers++;
  1080. return;
  1081. }
  1082. if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
  1083. zfpm_g->stats.redundant_triggers++;
  1084. return;
  1085. }
  1086. if (reason)
  1087. {
  1088. zfpm_debug ("%s triggering update to FPM - Reason: %s",
  1089. prefix2str (&rn->p, buf, sizeof(buf)), reason);
  1090. }
  1091. SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
  1092. TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
  1093. zfpm_g->stats.updates_triggered++;
  1094. /*
  1095. * Make sure that writes are enabled.
  1096. */
  1097. if (zfpm_g->t_write)
  1098. return;
  1099. zfpm_write_on ();
  1100. }
  1101. /*
  1102. * zfpm_stats_timer_cb
  1103. */
  1104. static int
  1105. zfpm_stats_timer_cb (struct thread *t)
  1106. {
  1107. assert (zfpm_g->t_stats);
  1108. zfpm_g->t_stats = NULL;
  1109. /*
  1110. * Remember the stats collected in the last interval for display
  1111. * purposes.
  1112. */
  1113. zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
  1114. /*
  1115. * Add the current set of stats into the cumulative statistics.
  1116. */
  1117. zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
  1118. &zfpm_g->cumulative_stats);
  1119. /*
  1120. * Start collecting stats afresh over the next interval.
  1121. */
  1122. zfpm_stats_reset (&zfpm_g->stats);
  1123. zfpm_start_stats_timer ();
  1124. return 0;
  1125. }
  1126. /*
  1127. * zfpm_stop_stats_timer
  1128. */
  1129. static void
  1130. zfpm_stop_stats_timer (void)
  1131. {
  1132. if (!zfpm_g->t_stats)
  1133. return;
  1134. zfpm_debug ("Stopping existing stats timer");
  1135. THREAD_TIMER_OFF (zfpm_g->t_stats);
  1136. }
  1137. /*
  1138. * zfpm_start_stats_timer
  1139. */
  1140. void
  1141. zfpm_start_stats_timer (void)
  1142. {
  1143. assert (!zfpm_g->t_stats);
  1144. THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
  1145. ZFPM_STATS_IVL_SECS);
  1146. }
  1147. /*
  1148. * Helper macro for zfpm_show_stats() below.
  1149. */
  1150. #define ZFPM_SHOW_STAT(counter) \
  1151. do { \
  1152. vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
  1153. zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
  1154. } while (0)
  1155. /*
  1156. * zfpm_show_stats
  1157. */
  1158. static void
  1159. zfpm_show_stats (struct vty *vty)
  1160. {
  1161. zfpm_stats_t total_stats;
  1162. time_t elapsed;
  1163. vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
  1164. "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
  1165. /*
  1166. * Compute the total stats up to this instant.
  1167. */
  1168. zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
  1169. &total_stats);
  1170. ZFPM_SHOW_STAT (connect_calls);
  1171. ZFPM_SHOW_STAT (connect_no_sock);
  1172. ZFPM_SHOW_STAT (read_cb_calls);
  1173. ZFPM_SHOW_STAT (write_cb_calls);
  1174. ZFPM_SHOW_STAT (write_calls);
  1175. ZFPM_SHOW_STAT (partial_writes);
  1176. ZFPM_SHOW_STAT (max_writes_hit);
  1177. ZFPM_SHOW_STAT (t_write_yields);
  1178. ZFPM_SHOW_STAT (nop_deletes_skipped);
  1179. ZFPM_SHOW_STAT (route_adds);
  1180. ZFPM_SHOW_STAT (route_dels);
  1181. ZFPM_SHOW_STAT (updates_triggered);
  1182. ZFPM_SHOW_STAT (non_fpm_table_triggers);
  1183. ZFPM_SHOW_STAT (redundant_triggers);
  1184. ZFPM_SHOW_STAT (dests_del_after_update);
  1185. ZFPM_SHOW_STAT (t_conn_down_starts);
  1186. ZFPM_SHOW_STAT (t_conn_down_dests_processed);
  1187. ZFPM_SHOW_STAT (t_conn_down_yields);
  1188. ZFPM_SHOW_STAT (t_conn_down_finishes);
  1189. ZFPM_SHOW_STAT (t_conn_up_starts);
  1190. ZFPM_SHOW_STAT (t_conn_up_dests_processed);
  1191. ZFPM_SHOW_STAT (t_conn_up_yields);
  1192. ZFPM_SHOW_STAT (t_conn_up_aborts);
  1193. ZFPM_SHOW_STAT (t_conn_up_finishes);
  1194. if (!zfpm_g->last_stats_clear_time)
  1195. return;
  1196. elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
  1197. vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
  1198. (unsigned long) elapsed, VTY_NEWLINE);
  1199. }
  1200. /*
  1201. * zfpm_clear_stats
  1202. */
  1203. static void
  1204. zfpm_clear_stats (struct vty *vty)
  1205. {
  1206. if (!zfpm_is_enabled ())
  1207. {
  1208. vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
  1209. return;
  1210. }
  1211. zfpm_stats_reset (&zfpm_g->stats);
  1212. zfpm_stats_reset (&zfpm_g->last_ivl_stats);
  1213. zfpm_stats_reset (&zfpm_g->cumulative_stats);
  1214. zfpm_stop_stats_timer ();
  1215. zfpm_start_stats_timer ();
  1216. zfpm_g->last_stats_clear_time = zfpm_get_time();
  1217. vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
  1218. }
  1219. /*
  1220. * show_zebra_fpm_stats
  1221. */
  1222. DEFUN (show_zebra_fpm_stats,
  1223. show_zebra_fpm_stats_cmd,
  1224. "show zebra fpm stats",
  1225. SHOW_STR
  1226. "Zebra information\n"
  1227. "Forwarding Path Manager information\n"
  1228. "Statistics\n")
  1229. {
  1230. zfpm_show_stats (vty);
  1231. return CMD_SUCCESS;
  1232. }
  1233. /*
  1234. * clear_zebra_fpm_stats
  1235. */
  1236. DEFUN (clear_zebra_fpm_stats,
  1237. clear_zebra_fpm_stats_cmd,
  1238. "clear zebra fpm stats",
  1239. CLEAR_STR
  1240. "Zebra information\n"
  1241. "Clear Forwarding Path Manager information\n"
  1242. "Statistics\n")
  1243. {
  1244. zfpm_clear_stats (vty);
  1245. return CMD_SUCCESS;
  1246. }
  1247. /**
  1248. * zfpm_init
  1249. *
  1250. * One-time initialization of the Zebra FPM module.
  1251. *
  1252. * @param[in] port port at which FPM is running.
  1253. * @param[in] enable TRUE if the zebra FPM module should be enabled
  1254. *
  1255. * Returns TRUE on success.
  1256. */
  1257. int
  1258. zfpm_init (struct thread_master *master, int enable, uint16_t port)
  1259. {
  1260. static int initialized = 0;
  1261. if (initialized) {
  1262. return 1;
  1263. }
  1264. initialized = 1;
  1265. memset (zfpm_g, 0, sizeof (*zfpm_g));
  1266. zfpm_g->master = master;
  1267. TAILQ_INIT(&zfpm_g->dest_q);
  1268. zfpm_g->sock = -1;
  1269. zfpm_g->state = ZFPM_STATE_IDLE;
  1270. /*
  1271. * Netlink must currently be available for the Zebra-FPM interface
  1272. * to be enabled.
  1273. */
  1274. #ifndef HAVE_NETLINK
  1275. enable = 0;
  1276. #endif
  1277. zfpm_g->enabled = enable;
  1278. zfpm_stats_init (&zfpm_g->stats);
  1279. zfpm_stats_init (&zfpm_g->last_ivl_stats);
  1280. zfpm_stats_init (&zfpm_g->cumulative_stats);
  1281. install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
  1282. install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
  1283. if (!enable) {
  1284. return 1;
  1285. }
  1286. if (!port)
  1287. port = FPM_DEFAULT_PORT;
  1288. zfpm_g->fpm_port = port;
  1289. zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
  1290. zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
  1291. zfpm_start_stats_timer ();
  1292. zfpm_start_connect_timer ("initialized");
  1293. return 1;
  1294. }