Next: , Previous: Attribute, Up: libmailbox


3.1.9 Stream

     #include <mailutils/stream.h>

These generic flags are interpreted as appropriate to the specific streams.

MU_STREAM_READ
The stream is open read only.
MU_STREAM_WRITE
The stream is open write only.
MU_STREAM_RDWR
The stream is open read and write.
MU_STREAM_APPEND
The stream is open in append mode for writing.
MU_STREAM_CREAT
The stream open will create the underlying resource (such as a file) if it doesn't exist already.
MU_STREAM_NONBLOCK
The stream is set non blocking.
MU_STREAM_NO_CHECK
Stream is destroyed without checking for the owner.
MU_STREAM_SEEKABLE

MU_STREAM_NO_CLOSE
Stream doesn't close it's underlying resource when it is closed or destroyed.
MU_STREAM_ALLOW_LINKS
— Function: int mu_file_stream_create (mu_stream_t *stream, const char *filename, int flags)
— Function: int mu_tcp_stream_create (mu_stream_t *stream, const char *host, int port, int flags)
— Function: int mu_mapfile_stream_create (mu_stream_t *stream, const char *filename, int flags)
— Function: int mu_memory_stream_create (mu_stream_t *stream, const char *filename, int flags)
— Function: int mu_encoder_stream_create (mu_stream_t *stream, mu_stream_t iostream, const char *encoding)
— Function: int mu_decoder_stream_create (mu_stream_t *stream, mu_stream_t iostream, const char *encoding)
— Function: int mu_stdio_stream_create (mu_stream_t *stream, FILE *stdio, int flags)

If MU_STREAM_NO_CLOSE is specified, fclose() will not be called on stdio when the stream is closed.

— Function: int mu_prog_stream_create (mu_stream_t *stream, const char *progname, int flags)
— Function: int mu_filter_prog_stream_create (mu_stream_t *stream, const char *progname, mu_stream_t input)
— Function: void mu_stream_destroy (mu_stream_t *stream, void *owner)
— Function: int mu_stream_open (mu_stream_t stream)
— Function: int mu_stream_close (mu_stream_t stream)
— Function: int mu_stream_is_seekable (mu_stream_t stream)
— Function: int mu_stream_get_fd (mu_stream_t stream, int *fd)
— Function: int mu_stream_get_fd2 (mu_stream_t stream, int *fd1, int *fd2)
— Function: int mu_stream_read (mu_stream_t stream, char *buffer, size_t buflen, mu_off_t offset, size_t *writen)
— Function: int mu_stream_readline (mu_stream_t stream, char *buffer, size_t buflen, mu_off_t offset, size_t *writen)
— Function: int mu_stream_size (mu_stream_t stream, mu_off_t *size)
— Function: n int mu_stream_truncate (mu_stream_t stream, mu_off_t size)
— Function: int mu_stream_write (mu_stream_t stream, const char *buffer, size_t buflen, mu_off_t offset, size_t *writen)
— Function: int mu_stream_setbufsiz (mu_stream_t stream, size_t size)
— Function: int mu_stream_flush (mu_stream_t stream)
— Function: int mu_stream_create (mu_stream_t *stream, int flags, void *owner)

Used to implement a new kind of stream.

— Function: void* mu_stream_get_owner (mu_stream_t stream)
— Function: void mu_stream_set_owner (mu_stream_t stream, void *owner)
— Function: int mu_stream_get_flags (mu_stream_t stream, int *flags)
— Function: int mu_stream_set_flags (mu_stream_t stream, int flags)
— Function: int mu_stream_get_property (mu_stream_t stream, property_t *)
— Function: int mu_stream_set_property (mu_stream_t stream, property_t, void *)
— Function: int mu_stream_get_state (mu_stream_t stream, int *state)
MU_STREAM_STATE_OPEN
Last action was mu_stream_open.
MU_STREAM_STATE_READ
Last action was mu_stream_read or mu_stream_readline.
MU_STREAM_STATE_WRITE
Last action was mu_stream_write.
MU_STREAM_STATE_CLOSE
Last action was mu_stream_close.

— Function: int mu_stream_set_destroy (mu_stream_t stream, void (*_destroy) (mu_stream_t), void *owner)
— Function: int mu_stream_set_open (mu_stream_t stream, int (*_open) (mu_stream_t), void *owner)
— Function: int mu_stream_set_close (mu_stream_t stream, int (*_close) (mu_stream_t), void *owner)
— Function: int mu_stream_set_fd (mu_stream_t stream, int (*_get_fd) (mu_stream_t, int *, int *), void *owner)
— Function: int mu_stream_set_read (mu_stream_t stream, int (*_read) (mu_stream_t, char *, size_t, mu_off_t, size_t *), void *owner)
— Function: int mu_stream_set_readline (mu_stream_t stream, int (*_readline) (mu_stream_t, char *, size_t, mu_off_t, size_t *), void *owner)
— Function: int mu_stream_set_size (mu_stream_t stream, int (*_size) (mu_stream_t, mu_off_t *), void *owner)
— Function: int mu_stream_set_truncate (mu_stream_t stream, int (*_truncate) (mu_stream_t, mu_off_t), void *owner)
— Function: int mu_stream_set_write (mu_stream_t stream, int (*_write) (mu_stream_t, const char *, size_t, mu_off_t, size_t *), void *owner)
— Function: int mu_stream_set_flush (mu_stream_t stream, int (*_flush) (mu_stream_t), void *owner)
— Function: int mu_stream_set_strerror (mu_stream_t stream, int (*_fp) (mu_stream_t, char **), void *owner)
— Function: int mu_stream_sequential_readline (mu_stream_ts stream, char *buf, size_t size, size_t *nbytes)
— Function: int mu_stream_sequential_write (mu_stream_t stream, char *buf, size_t size)
— Function: int mu_stream_seek (mu_stream_t stream, mu_off_t off, int whence)
— Function: int mu_stream_strerror (mu_stream_t stream, char **p)

An example using mu_tcp_stream_create() to make a simple web client:

     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     /* This is an example program to illustrate the use of stream functions.
        It connects to a remote HTTP server and prints the contents of its
        index page */
     
     #include <stdio.h>
     #include <stdlib.h>
     #include <string.h>
     #include <errno.h>
     #include <unistd.h>
     #include <sys/time.h>
     
     #include <mailutils/mailutils.h>
     
     char wbuf[1024];
     char rbuf[1024];
     
     size_t io_timeout = 3;
     size_t io_attempts = 3;
     
     int
     http_stream_wait (mu_stream_t stream, int flags, size_t *attempt)
     {
       int rc;
       int oflags = flags;
       struct timeval tv;
     
       while (*attempt < io_attempts)
         {
           tv.tv_sec = io_timeout;
           tv.tv_usec = 0;
           rc = mu_stream_wait (stream, &oflags, &tv);
           switch (rc) {
           case 0:
     	if (flags & oflags)
     	  return 0;
     	/* FALLTHROUGH */
           case EAGAIN:
           case EINPROGRESS:
     	++*attempt;
     	continue;
     
           default:
     	return rc;
           }
         }
       return ETIMEDOUT;
     }
     
     int
     main (int argc, char **argv)
     {
       int ret, off = 0;
       mu_stream_t stream;
       size_t nb, size;
       size_t attempt;
       char *url = "www.gnu.org";
     
       if (argc > 3)
         {
           fprintf (stderr, "usage: %s [hostname [url]]\n", argv[0]);
           exit (1);
         }
     
       if (argc > 1)
         url = argv[1];
     
       snprintf (wbuf, sizeof wbuf, "GET %s HTTP/1.0\r\n\r\n",
     	    argc == 3 ? argv[2] : "/");
     
       ret = mu_tcp_stream_create (&stream, url, 80, MU_STREAM_NONBLOCK);
       if (ret != 0)
         {
           mu_error ("mu_tcp_stream_create: %s", mu_strerror (ret));
           exit (EXIT_FAILURE);
         }
     
       for (attempt = 0; (ret = mu_stream_open (stream)); )
         {
           if ((ret == EAGAIN || ret == EINPROGRESS) && attempt < io_attempts)
     	{
     	  ret = http_stream_wait(stream, MU_STREAM_READY_WR, &attempt);
     	  if (ret == 0)
     	    continue;
     	}
           mu_error ("mu_stream_open: %s", mu_strerror (ret));
           exit (EXIT_FAILURE);
         }
     
       for (attempt = 0, size = strlen (wbuf); size > 0; )
         {
           ret = mu_stream_write (stream, wbuf + off, strlen (wbuf), 0, &nb);
           if (ret == 0)
     	{
     	  if (nb == 0)
     	    {
     	      mu_error("mu_stream_write: wrote 0 bytes");
     	      exit (EXIT_FAILURE);
     	    }
     	  off += nb;
     	  size -= nb;
     	}
           else if (ret == EAGAIN)
             {
     	  if (attempt < io_attempts)
     	    {
     	      ret = http_stream_wait (stream, MU_STREAM_READY_WR, &attempt);
     	      if (ret)
     		{
     		  mu_error ("http_wait failed: %s", mu_strerror (ret));
     		  return -1;
     		}
     	      continue;
     	    }
     	  else
     	    {
     	      mu_error ("mu_stream_write timed out");
     	      exit (EXIT_FAILURE);
     	    }
     	}
           else
     	{
     	  mu_error ("mu_stream_write: %s", mu_strerror (ret));
     	  exit (EXIT_FAILURE);
     	}
         }
     
       attempt = 0;
       for (;;)
         {
           ret = mu_stream_read (stream, rbuf, sizeof (rbuf), 0, &nb);
           if (ret == 0)
     	{
     	  if (nb == 0)
     	    break;
     	  write (1, rbuf, nb);
     	}
           else if (ret == EAGAIN)
     	{
     	  if (attempt < io_attempts)
     	    {
     	      ret = http_stream_wait (stream, MU_STREAM_READY_RD, &attempt);
     	      if (ret)
     		{
     		  mu_error ("http_stream_wait failed: %s", mu_strerror (ret));
     		  exit (EXIT_FAILURE);
     		}
     	    }
     	  else
     	    {
     	      mu_error ("mu_stream_read: %s", mu_strerror (ret));
     	      exit (EXIT_FAILURE);
     	    }
     	}
         }
     
       ret = mu_stream_close (stream);
       if (ret != 0)
         {
           mu_error ("mu_stream_close: %s", mu_strerror (ret));
           exit (EXIT_FAILURE);
         }
     
       mu_stream_destroy (&stream, NULL);
       exit (EXIT_SUCCESS);
     }