Pipelines in C

Back in 2016, a question came up in one of the groups as to how you would code a pipeline in C. The requestor wanted C code equivalent to the shell pipeline

echo "Hello World" | tr "a-z" "A-Z" | sort | rev

Since I hadn't actually attempted before to write such a pipeline in C, I gave it a try. I came up with a couple of variations, which I never posted back to the newsgroup.

Recently, another question reminded me of my code. So, rather than let it moulder in my collection of coding bits, I post it here as a reference:


/*
** pipeline - demonstration of coding for Unix pipeline processing
**
** main program will
** - launch a pipeline to convert reversed, sorted text lines to uppercase
** - write mixed-case lines to the input end of the pipeline, and
** - read case-converted, reversed, sorted text lines from the output end
**   of the pipeline
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>

#define  pREADFROM	0
#define  pWRITETO	1

int pipeline(int [], char **);		/* initiate a list of child processes as a pipeline */
int launch(int [], char *[]);		/* initiate a child process to run given command */
int max_fd(void);			/* determine highest fd that open() can return */
void obit(int);				/* handle child termination signals */
void dumpData(char *, size_t, char *);	/* hex & ascii dump specified buffer */

int main(int argc, char *argv[])
{
  char outbuf[256] = "Hello world,\nthis is a test\nof a multiline buffer.\nGoodbye\n",
       inbuf[256];

  int iopipe[2];

  ssize_t n_writ,
	  n_read;

  char	*cmdlist[]	= {"/usr/bin/tr","tr","a-z","A-Z",(char *)NULL,
			   "/usr/bin/sort","sort",(char *)NULL,
			   "/usr/bin/rev","rev",(char *)NULL,
			   (char *)NULL};

  printf("== %s begins ==\n",argv[0]);

  pipeline(iopipe,cmdlist);

  /*
  ** Now we can write our source data to the pipeline
  */
  printf("Original:\n%s\n", outbuf);
  if ((n_writ = write(iopipe[pWRITETO], outbuf, strlen(outbuf))) != strlen(outbuf))
    if (n_writ < 0)
      perror("write to pipeline");
    else
      fprintf(stderr,"%d (of %d) bytes written to pipeline\n",(int)n_writ,strlen(outbuf));
  close(iopipe[pWRITETO]);
  printf("%d bytes written to pipeline. Now reading\n",(int) n_writ);

  {
    /*
    ** And read the transformed data from the pipeline
    */
    ssize_t got = 0;

    for (n_read = got = 0; n_read != strlen(outbuf); n_read += got)
    {
      got = read(iopipe[pREADFROM], inbuf+n_read, 255);
      if (got <= 0)
      {
	if (got < 0) perror("read from pipeline");
	break;
      }
    }
  }
  fprintf(stderr,"%d (of %d) bytes read from pipeline\n",(int) n_read, (int) n_writ);

  close(iopipe[pREADFROM]);
  printf("Processed:\n%s\n\n", inbuf);

  dumpData(outbuf,n_writ, "Written to pipeline");
  dumpData(inbuf,n_read, "Read from pipeline");

  printf("== %s ends ==\n",argv[0]);
  return EXIT_SUCCESS;
}

int pipeline(int iopipe[2], char **alist)
{
  int	pData[2];

  iopipe[pREADFROM] = iopipe[pWRITETO] = -1;

  if (pipe(pData) != 0)
  {
    fputs("Cant allocate initial pipe pair\n",stderr);
    return 1;
  }
  iopipe[pWRITETO] = dup(pData[pWRITETO]);

  signal(SIGCHLD,obit);	/* set up to collect child obituaries */

  while (*alist)
  {
    char **child;

    for (child = alist; *alist; ++alist) {/*NOP*/}
    ++alist; /* skip the (char *)NULL argument list terminator */

    launch(pData,child);
  }

  iopipe[pREADFROM] = dup(pData[pREADFROM]);
  close(pData[pWRITETO]); close(pData[pREADFROM]);

  return 0;
}

/*
** launch() launches a specified program with specified arguments
** Accepts:	pDataIn:	a pointer to a 2-element integer array
**				containing the FDs related to the INPUT
**				side of the intended subprocess
**				On sucessful completion, launch() will
**				update this array with the FDs relating
**				to the OUTPUT side of the subprocess, to
**				be used as the INPUT side of the next
**				(consumer) (sub)process.
**		args		a pointer to an array of one or more
**				pointers-to-character, each either pointing
**				at a character string or set to NULL (as per
**				the requirements of the execvp() syscall)
** Returns:	0 - no errors: pDataIn[] updated with new pipes
**		1 - argument error; pDataIn not affected
**		2 - cant obtain input pipe; pDataIn not affected
**		3 - cant fork():  pDataIn not affected
**
** Notes:
**	1)	This version of launch() builds pipelines front-to-back.
**		The first call to launch() will build the head of the pipeline,
**		the process that handles the mainline's output to the pipeline.
**		Subsequent calls will continue the chain, feeding the prior
**		process stdout into the new process stdin. launch() makes
**		public the stdout pipe pair; with the mainline reading the
**		input end of the final pair.
**	2)	A child process inherits the parent process' open files,
**		including (in our case) the (saved) write end of the 1st
**		child's stdin pipe. In order to permit the read end of
**		that (or any) pipe to read EOF, we must close ALL write ends,
**		including the one that the child process inherits.
**		To be safe, we close ALL unused file handles before launching
**		the requested binary.
**	3)	If the execvp() call succeeds, it never returns to this code.
**		If execvp() fails, it does return. We use a simple approach
**		to determine failure: if THIS logic still has control after
**		the execvp() call, then we know that the call failed, and
**		we handle the error accordingly.
*/
int launch(int pDataIn[], char *args[])
{
  int pDataOut[2];
  pid_t child;

  if ((pDataIn == NULL)
  ||  (pDataIn[pREADFROM] < 1)
  ||  (pDataIn[pWRITETO] < 1)
  ||  (args == NULL)
  ||  (*args == NULL))
  {
    fputs("Argument error on launch()\n",stderr);
    return 1;
  }

  if (pipe(pDataOut) != 0)
  {
    fprintf(stderr,"Cant allocate output pipe pair for %s\n",args[0]);
    return 2;
  }

  if ((child = fork()) == 0)
  {
    /*
    ** This block of code executes in the child process.
    **
    ** We must	connect our plumbing,
    **		clean up the leftovers, and
    **		execute the requisite child program
    **
    ** Plumbing: The parent gave the child two pairs of pipes
    **		 The child will use one part of the first pair
    **		 to get it's input from, and one part of the
    **		 second pair to send it's output to.
    **		 The child must get rid of any pipe "write"
    **		 ends that it doesn't use, along with any other
    **		 file handles that it doesn't need, so it closes
    **		 ALL remaining file handles, except for stderr.
    **
    ** Execution: execvp() only returns IF it fails to execute
    **		 the requested binary. So, we treat ANY return
    **		 of execvp() as an error, report it, and terminate.
    */
    int fd;

    /* connect pipes to child process stdin and stdout */
    dup2(pDataIn[pREADFROM],STDIN_FILENO);
    dup2(pDataOut[pWRITETO],STDOUT_FILENO);

    /* close ALL files other than stdin, stdout and stderr */
    for (fd = max_fd(); fd > STDERR_FILENO; --fd)
      close(fd);

    execvp(args[0],args+1);
    perror(args[0]);
    return EXIT_FAILURE;
  }
  else if (child == -1)
  {
    close(pDataOut[pREADFROM]);
    close(pDataOut[pWRITETO]);
    fprintf(stderr,"Cant fork child for %s\n",args[0]);
    return 3;
  }
  else printf("Launched %s\n",args[0]);

  close(pDataIn[pREADFROM]);
  close(pDataIn[pWRITETO]);

  pDataIn[pREADFROM] = pDataOut[pREADFROM];
  pDataIn[pWRITETO]  = pDataOut[pWRITETO];

  return 0;
}

/*
** max_fd() determines the largest fd that open() can return
** Accepts:	nothing
** Returns:	an integer representing the largest fd
*/
int max_fd(void)
{
  struct rlimit maxfds;

  getrlimit(RLIMIT_NOFILE,&maxfds);
  return maxfds.rlim_cur - 1;
}

/*
** obit()	handles SIGCHLD signals from terminating child processes
** Accepts:	an integer indicating the intercepted signal
** Returns:	nothing
*/
void obit(int signum)
{
  signal(signum,SIG_IGN);
  wait(NULL);
  signal(signum,obit);
}

void dumpData(char *data, size_t length, char *title)
{
  size_t disp;

  if (title && *title) printf("\n=== %s ===\n",title);

  puts("       _0_1_2_3 _4_5_6_7 _8_9_A_B _C_D_E_F   _0123456789ABCDEF_");
  for (disp = 0; disp < length; disp += 16)
  {
    size_t count, index;

    count = length - disp;
    if (count > 16) count = 16;

    printf("+%4.4x: ",(unsigned int)disp);
    for (index = 0; index < 16; ++index)
      if (index < count)
	printf("%2.2x%s",*(data+disp+index),index%4 == 3?" ":"");
      else
        printf("  %s",index%4 == 3?" ":"");

    printf("  |");
    for (index = 0; index < 16; ++index)
      if (index < count)
	putchar((*(data+disp+index) > 31 && *(data+disp+index) < 128)?*(data+disp+index):'.');
      else
	putchar(' ');
    puts("|");
  }
}
System Management: