Pipelines in C
Submitted by Lew on Thu, 01/07/2021 - 20:08
Back in 2016, a question came up in one of the groups as to how you would code a pipeline in C. The requestor wanted C code equivalent to the shell pipeline
- echo "Hello World" | tr "a-z" "A-Z" | sort | rev
Since I hadn't actually attempted before to write such a pipeline in C, I gave it a try. I came up with a couple of variations, which I never posted back to the newsgroup.
Recently, another question reminded me of my code. So, rather than let it moulder in my collection of coding bits, I post it here as a reference:
-
/* ** pipeline - demonstration of coding for Unix pipeline processing ** ** main program will ** - launch a pipeline to convert reversed, sorted text lines to uppercase ** - write mixed-case lines to the input end of the pipeline, and ** - read case-converted, reversed, sorted text lines from the output end ** of the pipeline */ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <sys/resource.h> #include <signal.h> #include <sys/types.h> #include <sys/wait.h> #define pREADFROM 0 #define pWRITETO 1 int pipeline(int [], char **); /* initiate a list of child processes as a pipeline */ int launch(int [], char *[]); /* initiate a child process to run given command */ int max_fd(void); /* determine highest fd that open() can return */ void obit(int); /* handle child termination signals */ void dumpData(char *, size_t, char *); /* hex & ascii dump specified buffer */ int main(int argc, char *argv[]) { char outbuf[256] = "Hello world,\nthis is a test\nof a multiline buffer.\nGoodbye\n", inbuf[256]; int iopipe[2]; ssize_t n_writ, n_read; char *cmdlist[] = {"/usr/bin/tr","tr","a-z","A-Z",(char *)NULL, "/usr/bin/sort","sort",(char *)NULL, "/usr/bin/rev","rev",(char *)NULL, (char *)NULL}; printf("== %s begins ==\n",argv[0]); pipeline(iopipe,cmdlist); /* ** Now we can write our source data to the pipeline */ printf("Original:\n%s\n", outbuf); if ((n_writ = write(iopipe[pWRITETO], outbuf, strlen(outbuf))) != strlen(outbuf)) if (n_writ < 0) perror("write to pipeline"); else fprintf(stderr,"%d (of %d) bytes written to pipeline\n",(int)n_writ,strlen(outbuf)); close(iopipe[pWRITETO]); printf("%d bytes written to pipeline. Now reading\n",(int) n_writ); { /* ** And read the transformed data from the pipeline */ ssize_t got = 0; for (n_read = got = 0; n_read != strlen(outbuf); n_read += got) { got = read(iopipe[pREADFROM], inbuf+n_read, 255); if (got <= 0) { if (got < 0) perror("read from pipeline"); break; } } } fprintf(stderr,"%d (of %d) bytes read from pipeline\n",(int) n_read, (int) n_writ); close(iopipe[pREADFROM]); printf("Processed:\n%s\n\n", inbuf); dumpData(outbuf,n_writ, "Written to pipeline"); dumpData(inbuf,n_read, "Read from pipeline"); printf("== %s ends ==\n",argv[0]); return EXIT_SUCCESS; } int pipeline(int iopipe[2], char **alist) { int pData[2]; iopipe[pREADFROM] = iopipe[pWRITETO] = -1; if (pipe(pData) != 0) { fputs("Cant allocate initial pipe pair\n",stderr); return 1; } iopipe[pWRITETO] = dup(pData[pWRITETO]); signal(SIGCHLD,obit); /* set up to collect child obituaries */ while (*alist) { char **child; for (child = alist; *alist; ++alist) {/*NOP*/} ++alist; /* skip the (char *)NULL argument list terminator */ launch(pData,child); } iopipe[pREADFROM] = dup(pData[pREADFROM]); close(pData[pWRITETO]); close(pData[pREADFROM]); return 0; } /* ** launch() launches a specified program with specified arguments ** Accepts: pDataIn: a pointer to a 2-element integer array ** containing the FDs related to the INPUT ** side of the intended subprocess ** On sucessful completion, launch() will ** update this array with the FDs relating ** to the OUTPUT side of the subprocess, to ** be used as the INPUT side of the next ** (consumer) (sub)process. ** args a pointer to an array of one or more ** pointers-to-character, each either pointing ** at a character string or set to NULL (as per ** the requirements of the execvp() syscall) ** Returns: 0 - no errors: pDataIn[] updated with new pipes ** 1 - argument error; pDataIn not affected ** 2 - cant obtain input pipe; pDataIn not affected ** 3 - cant fork(): pDataIn not affected ** ** Notes: ** 1) This version of launch() builds pipelines front-to-back. ** The first call to launch() will build the head of the pipeline, ** the process that handles the mainline's output to the pipeline. ** Subsequent calls will continue the chain, feeding the prior ** process stdout into the new process stdin. launch() makes ** public the stdout pipe pair; with the mainline reading the ** input end of the final pair. ** 2) A child process inherits the parent process' open files, ** including (in our case) the (saved) write end of the 1st ** child's stdin pipe. In order to permit the read end of ** that (or any) pipe to read EOF, we must close ALL write ends, ** including the one that the child process inherits. ** To be safe, we close ALL unused file handles before launching ** the requested binary. ** 3) If the execvp() call succeeds, it never returns to this code. ** If execvp() fails, it does return. We use a simple approach ** to determine failure: if THIS logic still has control after ** the execvp() call, then we know that the call failed, and ** we handle the error accordingly. */ int launch(int pDataIn[], char *args[]) { int pDataOut[2]; pid_t child; if ((pDataIn == NULL) || (pDataIn[pREADFROM] < 1) || (pDataIn[pWRITETO] < 1) || (args == NULL) || (*args == NULL)) { fputs("Argument error on launch()\n",stderr); return 1; } if (pipe(pDataOut) != 0) { fprintf(stderr,"Cant allocate output pipe pair for %s\n",args[0]); return 2; } if ((child = fork()) == 0) { /* ** This block of code executes in the child process. ** ** We must connect our plumbing, ** clean up the leftovers, and ** execute the requisite child program ** ** Plumbing: The parent gave the child two pairs of pipes ** The child will use one part of the first pair ** to get it's input from, and one part of the ** second pair to send it's output to. ** The child must get rid of any pipe "write" ** ends that it doesn't use, along with any other ** file handles that it doesn't need, so it closes ** ALL remaining file handles, except for stderr. ** ** Execution: execvp() only returns IF it fails to execute ** the requested binary. So, we treat ANY return ** of execvp() as an error, report it, and terminate. */ int fd; /* connect pipes to child process stdin and stdout */ dup2(pDataIn[pREADFROM],STDIN_FILENO); dup2(pDataOut[pWRITETO],STDOUT_FILENO); /* close ALL files other than stdin, stdout and stderr */ for (fd = max_fd(); fd > STDERR_FILENO; --fd) close(fd); execvp(args[0],args+1); perror(args[0]); return EXIT_FAILURE; } else if (child == -1) { close(pDataOut[pREADFROM]); close(pDataOut[pWRITETO]); fprintf(stderr,"Cant fork child for %s\n",args[0]); return 3; } else printf("Launched %s\n",args[0]); close(pDataIn[pREADFROM]); close(pDataIn[pWRITETO]); pDataIn[pREADFROM] = pDataOut[pREADFROM]; pDataIn[pWRITETO] = pDataOut[pWRITETO]; return 0; } /* ** max_fd() determines the largest fd that open() can return ** Accepts: nothing ** Returns: an integer representing the largest fd */ int max_fd(void) { struct rlimit maxfds; getrlimit(RLIMIT_NOFILE,&maxfds); return maxfds.rlim_cur - 1; } /* ** obit() handles SIGCHLD signals from terminating child processes ** Accepts: an integer indicating the intercepted signal ** Returns: nothing */ void obit(int signum) { signal(signum,SIG_IGN); wait(NULL); signal(signum,obit); } void dumpData(char *data, size_t length, char *title) { size_t disp; if (title && *title) printf("\n=== %s ===\n",title); puts(" _0_1_2_3 _4_5_6_7 _8_9_A_B _C_D_E_F _0123456789ABCDEF_"); for (disp = 0; disp < length; disp += 16) { size_t count, index; count = length - disp; if (count > 16) count = 16; printf("+%4.4x: ",(unsigned int)disp); for (index = 0; index < 16; ++index) if (index < count) printf("%2.2x%s",*(data+disp+index),index%4 == 3?" ":""); else printf(" %s",index%4 == 3?" ":""); printf(" |"); for (index = 0; index < 16; ++index) if (index < count) putchar((*(data+disp+index) > 31 && *(data+disp+index) < 128)?*(data+disp+index):'.'); else putchar(' '); puts("|"); } }
About:
System Management: