stream.c 21 KB


  1. /*
  2. * Packet interface
  3. * Copyright (C) 1999 Kunihiro Ishiguro
  4. *
  5. * This file is part of GNU Zebra.
  6. *
  7. * GNU Zebra is free software; you can redistribute it and/or modify it
  8. * under the terms of the GNU General Public License as published by the
  9. * Free Software Foundation; either version 2, or (at your option) any
  10. * later version.
  11. *
  12. * GNU Zebra is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with GNU Zebra; see the file COPYING. If not, write to the Free
  19. * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
  20. * 02111-1307, USA.
  21. */
  22. #include <zebra.h>
  23. #include <stddef.h>
  24. #include "stream.h"
  25. #include "memory.h"
  26. #include "network.h"
  27. #include "prefix.h"
  28. #include "log.h"
  29. /* Tests whether a position is valid */
  30. #define GETP_VALID(S,G) \
  31. ((G) <= (S)->endp)
  32. #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
  33. #define ENDP_VALID(S,E) \
  34. ((E) <= (S)->size)
  35. /* asserting sanity checks. Following must be true before
  36. * stream functions are called:
  37. *
  38. * Following must always be true of stream elements
  39. * before and after calls to stream functions:
  40. *
  41. * getp <= endp <= size
  42. *
  43. * Note that after a stream function is called following may be true:
  44. * if (getp == endp) then stream is no longer readable
  45. * if (endp == size) then stream is no longer writeable
  46. *
  47. * It is valid to put to anywhere within the size of the stream, but only
  48. * using stream_put..._at() functions.
  49. */
  50. #define STREAM_WARN_OFFSETS(S) \
  51. zlog_warn ("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
  52. (void *)(S), \
  53. (unsigned long) (S)->size, \
  54. (unsigned long) (S)->getp, \
  55. (unsigned long) (S)->endp)\
  56. #define STREAM_VERIFY_SANE(S) \
  57. do { \
  58. if ( !(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp)) ) \
  59. STREAM_WARN_OFFSETS(S); \
  60. assert ( GETP_VALID(S, (S)->getp) ); \
  61. assert ( ENDP_VALID(S, (S)->endp) ); \
  62. } while (0)
  63. #define STREAM_BOUND_WARN(S, WHAT) \
  64. do { \
  65. zlog_warn ("%s: Attempt to %s out of bounds", __func__, (WHAT)); \
  66. STREAM_WARN_OFFSETS(S); \
  67. assert (0); \
  68. } while (0)
  69. /* XXX: Deprecated macro: do not use */
  70. #define CHECK_SIZE(S, Z) \
  71. do { \
  72. if (((S)->endp + (Z)) > (S)->size) \
  73. { \
  74. zlog_warn ("CHECK_SIZE: truncating requested size %lu\n", \
  75. (unsigned long) (Z)); \
  76. STREAM_WARN_OFFSETS(S); \
  77. (Z) = (S)->size - (S)->endp; \
  78. } \
  79. } while (0);
  80. /* Make stream buffer. */
  81. struct stream *
  82. stream_new (size_t size)
  83. {
  84. struct stream *s;
  85. assert (size > 0);
  86. if (size == 0)
  87. {
  88. zlog_warn ("stream_new(): called with 0 size!");
  89. return NULL;
  90. }
  91. s = XCALLOC (MTYPE_STREAM, sizeof (struct stream));
  92. if (s == NULL)
  93. return s;
  94. if ( (s->data = XMALLOC (MTYPE_STREAM_DATA, size)) == NULL)
  95. {
  96. XFREE (MTYPE_STREAM, s);
  97. return NULL;
  98. }
  99. s->size = size;
  100. return s;
  101. }
  102. /* Free it now. */
  103. void
  104. stream_free (struct stream *s)
  105. {
  106. if (!s)
  107. return;
  108. XFREE (MTYPE_STREAM_DATA, s->data);
  109. XFREE (MTYPE_STREAM, s);
  110. }
  111. struct stream *
  112. stream_copy (struct stream *new, struct stream *src)
  113. {
  114. STREAM_VERIFY_SANE (src);
  115. assert (new != NULL);
  116. assert (STREAM_SIZE(new) >= src->endp);
  117. new->endp = src->endp;
  118. new->getp = src->getp;
  119. memcpy (new->data, src->data, src->endp);
  120. return new;
  121. }
  122. struct stream *
  123. stream_dup (struct stream *s)
  124. {
  125. struct stream *new;
  126. STREAM_VERIFY_SANE (s);
  127. if ( (new = stream_new (s->endp)) == NULL)
  128. return NULL;
  129. return (stream_copy (new, s));
  130. }
  131. struct stream *
  132. stream_dupcat (struct stream *s1, struct stream *s2, size_t offset)
  133. {
  134. struct stream *new;
  135. STREAM_VERIFY_SANE (s1);
  136. STREAM_VERIFY_SANE (s2);
  137. if ( (new = stream_new (s1->endp + s2->endp)) == NULL)
  138. return NULL;
  139. memcpy (new->data, s1->data, offset);
  140. memcpy (new->data + offset, s2->data, s2->endp);
  141. memcpy (new->data + offset + s2->endp, s1->data + offset,
  142. (s1->endp - offset));
  143. new->endp = s1->endp + s2->endp;
  144. return new;
  145. }
  146. size_t
  147. stream_resize (struct stream *s, size_t newsize)
  148. {
  149. u_char *newdata;
  150. STREAM_VERIFY_SANE (s);
  151. newdata = XREALLOC (MTYPE_STREAM_DATA, s->data, newsize);
  152. if (newdata == NULL)
  153. return s->size;
  154. s->data = newdata;
  155. s->size = newsize;
  156. if (s->endp > s->size)
  157. s->endp = s->size;
  158. if (s->getp > s->endp)
  159. s->getp = s->endp;
  160. STREAM_VERIFY_SANE (s);
  161. return s->size;
  162. }
  163. size_t
  164. stream_get_getp (struct stream *s)
  165. {
  166. STREAM_VERIFY_SANE(s);
  167. return s->getp;
  168. }
  169. size_t
  170. stream_get_endp (struct stream *s)
  171. {
  172. STREAM_VERIFY_SANE(s);
  173. return s->endp;
  174. }
  175. size_t
  176. stream_get_size (struct stream *s)
  177. {
  178. STREAM_VERIFY_SANE(s);
  179. return s->size;
  180. }
  181. /* Stream structre' stream pointer related functions. */
  182. void
  183. stream_set_getp (struct stream *s, size_t pos)
  184. {
  185. STREAM_VERIFY_SANE(s);
  186. if (!GETP_VALID (s, pos))
  187. {
  188. STREAM_BOUND_WARN (s, "set getp");
  189. pos = s->endp;
  190. }
  191. s->getp = pos;
  192. }
  193. void
  194. stream_set_endp (struct stream *s, size_t pos)
  195. {
  196. STREAM_VERIFY_SANE(s);
  197. if (!ENDP_VALID(s, pos))
  198. {
  199. STREAM_BOUND_WARN (s, "set endp");
  200. return;
  201. }
  202. /*
  203. * Make sure the current read pointer is not beyond the new endp.
  204. */
  205. if (s->getp > pos)
  206. {
  207. STREAM_BOUND_WARN(s, "set endp");
  208. return;
  209. }
  210. s->endp = pos;
  211. STREAM_VERIFY_SANE(s);
  212. }
  213. /* Forward pointer. */
  214. void
  215. stream_forward_getp (struct stream *s, size_t size)
  216. {
  217. STREAM_VERIFY_SANE(s);
  218. if (!GETP_VALID (s, s->getp + size))
  219. {
  220. STREAM_BOUND_WARN (s, "seek getp");
  221. return;
  222. }
  223. s->getp += size;
  224. }
  225. void
  226. stream_forward_endp (struct stream *s, size_t size)
  227. {
  228. STREAM_VERIFY_SANE(s);
  229. if (!ENDP_VALID (s, s->endp + size))
  230. {
  231. STREAM_BOUND_WARN (s, "seek endp");
  232. return;
  233. }
  234. s->endp += size;
  235. }
  236. /* Copy from stream to destination. */
  237. void
  238. stream_get (void *dst, struct stream *s, size_t size)
  239. {
  240. STREAM_VERIFY_SANE(s);
  241. if (STREAM_READABLE(s) < size)
  242. {
  243. STREAM_BOUND_WARN (s, "get");
  244. return;
  245. }
  246. memcpy (dst, s->data + s->getp, size);
  247. s->getp += size;
  248. }
  249. /* Get next character from the stream. */
  250. u_char
  251. stream_getc (struct stream *s)
  252. {
  253. u_char c;
  254. STREAM_VERIFY_SANE (s);
  255. if (STREAM_READABLE(s) < sizeof (u_char))
  256. {
  257. STREAM_BOUND_WARN (s, "get char");
  258. return 0;
  259. }
  260. c = s->data[s->getp++];
  261. return c;
  262. }
  263. /* Get next character from the stream. */
  264. u_char
  265. stream_getc_from (struct stream *s, size_t from)
  266. {
  267. u_char c;
  268. STREAM_VERIFY_SANE(s);
  269. if (!GETP_VALID (s, from + sizeof (u_char)))
  270. {
  271. STREAM_BOUND_WARN (s, "get char");
  272. return 0;
  273. }
  274. c = s->data[from];
  275. return c;
  276. }
  277. /* Get next word from the stream. */
  278. u_int16_t
  279. stream_getw (struct stream *s)
  280. {
  281. u_int16_t w;
  282. STREAM_VERIFY_SANE (s);
  283. if (STREAM_READABLE (s) < sizeof (u_int16_t))
  284. {
  285. STREAM_BOUND_WARN (s, "get ");
  286. return 0;
  287. }
  288. w = s->data[s->getp++] << 8;
  289. w |= s->data[s->getp++];
  290. return w;
  291. }
  292. /* Get next word from the stream. */
  293. u_int16_t
  294. stream_getw_from (struct stream *s, size_t from)
  295. {
  296. u_int16_t w;
  297. STREAM_VERIFY_SANE(s);
  298. if (!GETP_VALID (s, from + sizeof (u_int16_t)))
  299. {
  300. STREAM_BOUND_WARN (s, "get ");
  301. return 0;
  302. }
  303. w = s->data[from++] << 8;
  304. w |= s->data[from];
  305. return w;
  306. }
  307. /* Get next long word from the stream. */
  308. u_int32_t
  309. stream_getl_from (struct stream *s, size_t from)
  310. {
  311. u_int32_t l;
  312. STREAM_VERIFY_SANE(s);
  313. if (!GETP_VALID (s, from + sizeof (u_int32_t)))
  314. {
  315. STREAM_BOUND_WARN (s, "get long");
  316. return 0;
  317. }
  318. l = s->data[from++] << 24;
  319. l |= s->data[from++] << 16;
  320. l |= s->data[from++] << 8;
  321. l |= s->data[from];
  322. return l;
  323. }
  324. u_int32_t
  325. stream_getl (struct stream *s)
  326. {
  327. u_int32_t l;
  328. STREAM_VERIFY_SANE(s);
  329. if (STREAM_READABLE (s) < sizeof (u_int32_t))
  330. {
  331. STREAM_BOUND_WARN (s, "get long");
  332. return 0;
  333. }
  334. l = s->data[s->getp++] << 24;
  335. l |= s->data[s->getp++] << 16;
  336. l |= s->data[s->getp++] << 8;
  337. l |= s->data[s->getp++];
  338. return l;
  339. }
  340. /* Get next quad word from the stream. */
  341. uint64_t
  342. stream_getq_from (struct stream *s, size_t from)
  343. {
  344. uint64_t q;
  345. STREAM_VERIFY_SANE(s);
  346. if (!GETP_VALID (s, from + sizeof (uint64_t)))
  347. {
  348. STREAM_BOUND_WARN (s, "get quad");
  349. return 0;
  350. }
  351. q = ((uint64_t) s->data[from++]) << 56;
  352. q |= ((uint64_t) s->data[from++]) << 48;
  353. q |= ((uint64_t) s->data[from++]) << 40;
  354. q |= ((uint64_t) s->data[from++]) << 32;
  355. q |= ((uint64_t) s->data[from++]) << 24;
  356. q |= ((uint64_t) s->data[from++]) << 16;
  357. q |= ((uint64_t) s->data[from++]) << 8;
  358. q |= ((uint64_t) s->data[from++]);
  359. return q;
  360. }
  361. uint64_t
  362. stream_getq (struct stream *s)
  363. {
  364. uint64_t q;
  365. STREAM_VERIFY_SANE(s);
  366. if (STREAM_READABLE (s) < sizeof (uint64_t))
  367. {
  368. STREAM_BOUND_WARN (s, "get quad");
  369. return 0;
  370. }
  371. q = ((uint64_t) s->data[s->getp++]) << 56;
  372. q |= ((uint64_t) s->data[s->getp++]) << 48;
  373. q |= ((uint64_t) s->data[s->getp++]) << 40;
  374. q |= ((uint64_t) s->data[s->getp++]) << 32;
  375. q |= ((uint64_t) s->data[s->getp++]) << 24;
  376. q |= ((uint64_t) s->data[s->getp++]) << 16;
  377. q |= ((uint64_t) s->data[s->getp++]) << 8;
  378. q |= ((uint64_t) s->data[s->getp++]);
  379. return q;
  380. }
  381. /* Get next long word from the stream. */
  382. u_int32_t
  383. stream_get_ipv4 (struct stream *s)
  384. {
  385. u_int32_t l;
  386. STREAM_VERIFY_SANE(s);
  387. if (STREAM_READABLE (s) < sizeof(u_int32_t))
  388. {
  389. STREAM_BOUND_WARN (s, "get ipv4");
  390. return 0;
  391. }
  392. memcpy (&l, s->data + s->getp, sizeof(u_int32_t));
  393. s->getp += sizeof(u_int32_t);
  394. return l;
  395. }
  396. float
  397. stream_getf (struct stream *s)
  398. {
  399. #if !defined(__STDC_IEC_559__) && __GCC_IEC_559 < 1
  400. #warning "Unknown floating-point format, __func__ may be wrong"
  401. #endif
  402. /* we assume 'float' is in the single precision IEC 60559 binary
  403. format, in host byte order */
  404. union {
  405. float r;
  406. uint32_t d;
  407. } u;
  408. u.d = stream_getl (s);
  409. return u.r;
  410. }
  411. double
  412. stream_getd (struct stream *s)
  413. {
  414. #if !defined(__STDC_IEC_559__) && __GCC_IEC_559 < 1
  415. #warning "Unknown floating-point format, __func__ may be wrong"
  416. #endif
  417. union {
  418. double r;
  419. uint64_t d;
  420. } u;
  421. u.d = stream_getq (s);
  422. return u.r;
  423. }
  424. /* Copy to source to stream.
  425. *
  426. * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
  427. * around. This should be fixed once the stream updates are working.
  428. *
  429. * stream_write() is saner
  430. */
  431. void
  432. stream_put (struct stream *s, const void *src, size_t size)
  433. {
  434. /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
  435. CHECK_SIZE(s, size);
  436. STREAM_VERIFY_SANE(s);
  437. if (STREAM_WRITEABLE (s) < size)
  438. {
  439. STREAM_BOUND_WARN (s, "put");
  440. return;
  441. }
  442. if (src)
  443. memcpy (s->data + s->endp, src, size);
  444. else
  445. memset (s->data + s->endp, 0, size);
  446. s->endp += size;
  447. }
  448. /* Put character to the stream. */
  449. int
  450. stream_putc (struct stream *s, u_char c)
  451. {
  452. STREAM_VERIFY_SANE(s);
  453. if (STREAM_WRITEABLE (s) < sizeof(u_char))
  454. {
  455. STREAM_BOUND_WARN (s, "put");
  456. return 0;
  457. }
  458. s->data[s->endp++] = c;
  459. return sizeof (u_char);
  460. }
  461. /* Put word to the stream. */
  462. int
  463. stream_putw (struct stream *s, u_int16_t w)
  464. {
  465. STREAM_VERIFY_SANE (s);
  466. if (STREAM_WRITEABLE (s) < sizeof (u_int16_t))
  467. {
  468. STREAM_BOUND_WARN (s, "put");
  469. return 0;
  470. }
  471. s->data[s->endp++] = (u_char)(w >> 8);
  472. s->data[s->endp++] = (u_char) w;
  473. return 2;
  474. }
  475. /* Put long word to the stream. */
  476. int
  477. stream_putl (struct stream *s, u_int32_t l)
  478. {
  479. STREAM_VERIFY_SANE (s);
  480. if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  481. {
  482. STREAM_BOUND_WARN (s, "put");
  483. return 0;
  484. }
  485. s->data[s->endp++] = (u_char)(l >> 24);
  486. s->data[s->endp++] = (u_char)(l >> 16);
  487. s->data[s->endp++] = (u_char)(l >> 8);
  488. s->data[s->endp++] = (u_char)l;
  489. return 4;
  490. }
  491. /* Put quad word to the stream. */
  492. int
  493. stream_putq (struct stream *s, uint64_t q)
  494. {
  495. STREAM_VERIFY_SANE (s);
  496. if (STREAM_WRITEABLE (s) < sizeof (uint64_t))
  497. {
  498. STREAM_BOUND_WARN (s, "put quad");
  499. return 0;
  500. }
  501. s->data[s->endp++] = (u_char)(q >> 56);
  502. s->data[s->endp++] = (u_char)(q >> 48);
  503. s->data[s->endp++] = (u_char)(q >> 40);
  504. s->data[s->endp++] = (u_char)(q >> 32);
  505. s->data[s->endp++] = (u_char)(q >> 24);
  506. s->data[s->endp++] = (u_char)(q >> 16);
  507. s->data[s->endp++] = (u_char)(q >> 8);
  508. s->data[s->endp++] = (u_char)q;
  509. return 8;
  510. }
  511. int
  512. stream_putf (struct stream *s, float f)
  513. {
  514. #if !defined(__STDC_IEC_559__) && __GCC_IEC_559 < 1
  515. #warning "Unknown floating-point format, __func__ may be wrong"
  516. #endif
  517. /* we can safely assume 'float' is in the single precision
  518. IEC 60559 binary format in host order */
  519. union {
  520. float i;
  521. uint32_t o;
  522. } u;
  523. u.i = f;
  524. return stream_putl (s, u.o);
  525. }
  526. int
  527. stream_putd (struct stream *s, double d)
  528. {
  529. #if !defined(__STDC_IEC_559__) && __GCC_IEC_559 < 1
  530. #warning "Unknown floating-point format, __func__ may be wrong"
  531. #endif
  532. union {
  533. double i;
  534. uint64_t o;
  535. } u;
  536. u.i = d;
  537. return stream_putq (s, u.o);
  538. }
  539. int
  540. stream_putc_at (struct stream *s, size_t putp, u_char c)
  541. {
  542. STREAM_VERIFY_SANE(s);
  543. if (!PUT_AT_VALID (s, putp + sizeof (u_char)))
  544. {
  545. STREAM_BOUND_WARN (s, "put");
  546. return 0;
  547. }
  548. s->data[putp] = c;
  549. return 1;
  550. }
  551. int
  552. stream_putw_at (struct stream *s, size_t putp, u_int16_t w)
  553. {
  554. STREAM_VERIFY_SANE(s);
  555. if (!PUT_AT_VALID (s, putp + sizeof (u_int16_t)))
  556. {
  557. STREAM_BOUND_WARN (s, "put");
  558. return 0;
  559. }
  560. s->data[putp] = (u_char)(w >> 8);
  561. s->data[putp + 1] = (u_char) w;
  562. return 2;
  563. }
  564. int
  565. stream_putl_at (struct stream *s, size_t putp, u_int32_t l)
  566. {
  567. STREAM_VERIFY_SANE(s);
  568. if (!PUT_AT_VALID (s, putp + sizeof (u_int32_t)))
  569. {
  570. STREAM_BOUND_WARN (s, "put");
  571. return 0;
  572. }
  573. s->data[putp] = (u_char)(l >> 24);
  574. s->data[putp + 1] = (u_char)(l >> 16);
  575. s->data[putp + 2] = (u_char)(l >> 8);
  576. s->data[putp + 3] = (u_char)l;
  577. return 4;
  578. }
  579. int
  580. stream_putq_at (struct stream *s, size_t putp, uint64_t q)
  581. {
  582. STREAM_VERIFY_SANE(s);
  583. if (!PUT_AT_VALID (s, putp + sizeof (uint64_t)))
  584. {
  585. STREAM_BOUND_WARN (s, "put");
  586. return 0;
  587. }
  588. s->data[putp] = (u_char)(q >> 56);
  589. s->data[putp + 1] = (u_char)(q >> 48);
  590. s->data[putp + 2] = (u_char)(q >> 40);
  591. s->data[putp + 3] = (u_char)(q >> 32);
  592. s->data[putp + 4] = (u_char)(q >> 24);
  593. s->data[putp + 5] = (u_char)(q >> 16);
  594. s->data[putp + 6] = (u_char)(q >> 8);
  595. s->data[putp + 7] = (u_char)q;
  596. return 8;
  597. }
  598. /* Put long word to the stream. */
  599. int
  600. stream_put_ipv4 (struct stream *s, u_int32_t l)
  601. {
  602. STREAM_VERIFY_SANE(s);
  603. if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  604. {
  605. STREAM_BOUND_WARN (s, "put");
  606. return 0;
  607. }
  608. memcpy (s->data + s->endp, &l, sizeof (u_int32_t));
  609. s->endp += sizeof (u_int32_t);
  610. return sizeof (u_int32_t);
  611. }
  612. /* Put long word to the stream. */
  613. int
  614. stream_put_in_addr (struct stream *s, struct in_addr *addr)
  615. {
  616. STREAM_VERIFY_SANE(s);
  617. if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  618. {
  619. STREAM_BOUND_WARN (s, "put");
  620. return 0;
  621. }
  622. memcpy (s->data + s->endp, addr, sizeof (u_int32_t));
  623. s->endp += sizeof (u_int32_t);
  624. return sizeof (u_int32_t);
  625. }
  626. /* Put prefix by nlri type format. */
  627. int
  628. stream_put_prefix (struct stream *s, struct prefix *p)
  629. {
  630. size_t psize;
  631. STREAM_VERIFY_SANE(s);
  632. psize = PSIZE (p->prefixlen);
  633. if (STREAM_WRITEABLE (s) < (psize + sizeof (u_char)))
  634. {
  635. STREAM_BOUND_WARN (s, "put");
  636. return 0;
  637. }
  638. s->data[s->endp++] = p->prefixlen;
  639. memcpy (s->data + s->endp, &p->u.prefix, psize);
  640. s->endp += psize;
  641. return psize;
  642. }
  643. /* Read size from fd. */
  644. int
  645. stream_read (struct stream *s, int fd, size_t size)
  646. {
  647. int nbytes;
  648. STREAM_VERIFY_SANE(s);
  649. if (STREAM_WRITEABLE (s) < size)
  650. {
  651. STREAM_BOUND_WARN (s, "put");
  652. return 0;
  653. }
  654. nbytes = readn (fd, s->data + s->endp, size);
  655. if (nbytes > 0)
  656. s->endp += nbytes;
  657. return nbytes;
  658. }
  659. ssize_t
  660. stream_read_try(struct stream *s, int fd, size_t size)
  661. {
  662. ssize_t nbytes;
  663. STREAM_VERIFY_SANE(s);
  664. if (STREAM_WRITEABLE(s) < size)
  665. {
  666. STREAM_BOUND_WARN (s, "put");
  667. /* Fatal (not transient) error, since retrying will not help
  668. (stream is too small to contain the desired data). */
  669. return -1;
  670. }
  671. if ((nbytes = read(fd, s->data + s->endp, size)) >= 0)
  672. {
  673. s->endp += nbytes;
  674. return nbytes;
  675. }
  676. /* Error: was it transient (return -2) or fatal (return -1)? */
  677. if (ERRNO_IO_RETRY(errno))
  678. return -2;
  679. zlog_warn("%s: read failed on fd %d: %s", __func__, fd, safe_strerror(errno));
  680. return -1;
  681. }
  682. /* Read up to size bytes into the stream from the fd, using recvmsgfrom
  683. * whose arguments match the remaining arguments to this function
  684. */
  685. ssize_t
  686. stream_recvfrom (struct stream *s, int fd, size_t size, int flags,
  687. struct sockaddr *from, socklen_t *fromlen)
  688. {
  689. ssize_t nbytes;
  690. STREAM_VERIFY_SANE(s);
  691. if (STREAM_WRITEABLE(s) < size)
  692. {
  693. STREAM_BOUND_WARN (s, "put");
  694. /* Fatal (not transient) error, since retrying will not help
  695. (stream is too small to contain the desired data). */
  696. return -1;
  697. }
  698. if ((nbytes = recvfrom (fd, s->data + s->endp, size,
  699. flags, from, fromlen)) >= 0)
  700. {
  701. s->endp += nbytes;
  702. return nbytes;
  703. }
  704. /* Error: was it transient (return -2) or fatal (return -1)? */
  705. if (ERRNO_IO_RETRY(errno))
  706. return -2;
  707. zlog_warn("%s: read failed on fd %d: %s", __func__, fd, safe_strerror(errno));
  708. return -1;
  709. }
  710. /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
  711. * from endp.
  712. * First iovec will be used to receive the data.
  713. * Stream need not be empty.
  714. */
  715. ssize_t
  716. stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags,
  717. size_t size)
  718. {
  719. int nbytes;
  720. struct iovec *iov;
  721. STREAM_VERIFY_SANE(s);
  722. assert (msgh->msg_iovlen > 0);
  723. if (STREAM_WRITEABLE (s) < size)
  724. {
  725. STREAM_BOUND_WARN (s, "put");
  726. /* This is a logic error in the calling code: the stream is too small
  727. to hold the desired data! */
  728. return -1;
  729. }
  730. iov = &(msgh->msg_iov[0]);
  731. iov->iov_base = (s->data + s->endp);
  732. iov->iov_len = size;
  733. nbytes = recvmsg (fd, msgh, flags);
  734. if (nbytes > 0)
  735. s->endp += nbytes;
  736. return nbytes;
  737. }
  738. /* Write data to buffer. */
  739. size_t
  740. stream_write (struct stream *s, const void *ptr, size_t size)
  741. {
  742. CHECK_SIZE(s, size);
  743. STREAM_VERIFY_SANE(s);
  744. if (STREAM_WRITEABLE (s) < size)
  745. {
  746. STREAM_BOUND_WARN (s, "put");
  747. return 0;
  748. }
  749. memcpy (s->data + s->endp, ptr, size);
  750. s->endp += size;
  751. return size;
  752. }
  753. /* Return current read pointer.
  754. * DEPRECATED!
  755. * Use stream_get_pnt_to if you must, but decoding streams properly
  756. * is preferred
  757. */
  758. u_char *
  759. stream_pnt (struct stream *s)
  760. {
  761. STREAM_VERIFY_SANE(s);
  762. return s->data + s->getp;
  763. }
  764. /* Check does this stream empty? */
  765. int
  766. stream_empty (struct stream *s)
  767. {
  768. STREAM_VERIFY_SANE(s);
  769. return (s->endp == 0);
  770. }
  771. /* Reset stream. */
  772. void
  773. stream_reset (struct stream *s)
  774. {
  775. STREAM_VERIFY_SANE (s);
  776. s->getp = s->endp = 0;
  777. }
  778. /* Discard read data (prior to the getp), and move the unread data
  779. * to the beginning of the stream.
  780. *
  781. * See also stream_fifo_* functions, for another approach to managing
  782. * streams.
  783. */
  784. void
  785. stream_discard (struct stream *s)
  786. {
  787. STREAM_VERIFY_SANE (s);
  788. if (s->getp == 0)
  789. return;
  790. if (s->getp == s->endp)
  791. {
  792. stream_reset (s);
  793. return;
  794. }
  795. s->data = memmove (s->data, s->data + s->getp, s->endp - s->getp);
  796. s->endp -= s->getp;
  797. s->getp = 0;
  798. }
  799. /* Write stream contens to the file discriptor. */
  800. int
  801. stream_flush (struct stream *s, int fd)
  802. {
  803. int nbytes;
  804. STREAM_VERIFY_SANE(s);
  805. nbytes = write (fd, s->data + s->getp, s->endp - s->getp);
  806. return nbytes;
  807. }
  808. /* Stream first in first out queue. */
  809. struct stream_fifo *
  810. stream_fifo_new (void)
  811. {
  812. struct stream_fifo *new;
  813. new = XCALLOC (MTYPE_STREAM_FIFO, sizeof (struct stream_fifo));
  814. return new;
  815. }
  816. /* Add new stream to fifo. */
  817. void
  818. stream_fifo_push (struct stream_fifo *fifo, struct stream *s)
  819. {
  820. if (fifo->tail)
  821. fifo->tail->next = s;
  822. else
  823. fifo->head = s;
  824. fifo->tail = s;
  825. fifo->count++;
  826. }
  827. /* Delete first stream from fifo. */
  828. struct stream *
  829. stream_fifo_pop (struct stream_fifo *fifo)
  830. {
  831. struct stream *s;
  832. s = fifo->head;
  833. if (s)
  834. {
  835. fifo->head = s->next;
  836. if (fifo->head == NULL)
  837. fifo->tail = NULL;
  838. fifo->count--;
  839. }
  840. return s;
  841. }
  842. /* Return first fifo entry. */
  843. struct stream *
  844. stream_fifo_head (struct stream_fifo *fifo)
  845. {
  846. return fifo->head;
  847. }
  848. void
  849. stream_fifo_clean (struct stream_fifo *fifo)
  850. {
  851. struct stream *s;
  852. struct stream *next;
  853. for (s = fifo->head; s; s = next)
  854. {
  855. next = s->next;
  856. stream_free (s);
  857. }
  858. fifo->head = fifo->tail = NULL;
  859. fifo->count = 0;
  860. }
  861. void
  862. stream_fifo_free (struct stream_fifo *fifo)
  863. {
  864. stream_fifo_clean (fifo);
  865. XFREE (MTYPE_STREAM_FIFO, fifo);
  866. }