- #include <stdlib.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <time.h>
- #include <strings.h>
- #include <string.h>
- #include <sys/types.h>
- #include <fcntl.h>
- #include <signal.h>
- #include <wait.h>
- #include <errno.h>
- #include <limits.h>
- #include <ctype.h>
- #include <sys/stat.h>
- #define MYSIZE 100
- #define MYLOCK ".my_lock_file"
- //管道最大容量
- #if !defined(PIPE_BUF)
- #define PIPE_BUF 4096
- #endif
- //#undef SA_SIGINFO
- //#undef SIGRTMIN
- //#undef SIGRTMAX
- //如果支持实时信号,则使用实时信号同步父子进程
- #if defined(SA_SIGINFO) && defined(SIGRTMIN) && defined(SIGRTMAX)
- #define USE_RTSIGNAL
- #define NOTIFY_SIGNO SIGRTMAX
- #else
- #define NOTIFY_SIGNO SIGUSR1
- #endif
- #define MYMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
- #define writew_lock(fd) \\
- lock_reg(fd,F_SETLKW,F_WRLCK)
- #define readw_lock(fd) \\
- lock_reg(fd,F_SETLKW,F_RDLCK)
- #define un_lock(fd) \\
- lock_reg(fd,F_SETLK,F_UNLCK)
- typedef void (*signal_f)(int signo);
- typedef void (*signalrt_f)(int signo,siginfo_t *,void *);
- #define ALIGN(x) (((size_t)(x) + (sizeof(size_t) - 1)) & ~(sizeof(size_t) - 1))
- #define BLOCK_HEAD (sizeof(block_data_t)-ALIGN(sizeof(char)))
- #define BLOCK_FLAG 0xfeac
- typedef struct block_data_t{
- u_int16_t flag;
- size_t off;
- size_t len;
- char buf;
- }block_data_t;
- static int my_lock_fd,is_print_msg,bufsize=MYSIZE;
- void create_file(int argc,char *argv[],int fd[2]);
- void init_process(int src_fd,int des_fd);
- int producer(int src_fd,int pipe_fd[2]);
- int customer(int des_fd,int pipe_fd[2]);
- int lock_reg(int fd,int mode,int flag);
- void sig_usr(int signo);
- void wait_any_childproc(int signo);
- signal_f handle_signal(int signo,void *sigfunc);
- void notify_parent(int signo);
- int main(int argc,char *argv[]){
- int fd[2];
- create_file(argc,argv,fd);
- init_process(fd[0],fd[1]);
- //等待结束的子进程,防止僵死进程
- while(waitpid(-1,0,0)>0);
- exit(0);
- }
- void create_file(int argc,char *argv[],int fd[2]){
- if((my_lock_fd=open(MYLOCK,O_WRONLY|O_CREAT|O_TRUNC,0600))==-1){
- fprintf(stderr,"create lock file %s faild:%s\\n",MYLOCK,strerror(errno));
- exit(-1);
- }
- unlink(MYLOCK);
- if(argc!=4 && argc!=3 && argc!=5){
- fprintf(stderr," usage %s <source> <destination> [bufsize] [p]\\n",argv[0]);
- exit(-1);
- }
- //如果有第3个参数,则为缓冲大小
- if(argc>3)
- bufsize=atoi(argv[3]);
- if(bufsize>(int)(PIPE_BUF))
- bufsize=(int)(PIPE_BUF);
- //如果有第4个参数,册打印同步信息
- if(argc>4 && toupper(argv[4][0])=='P')
- is_print_msg=1;
- if((fd[0]=open(argv[1],O_RDONLY))<0){
- fprintf(stderr,"open infile.txt error!\\n");
- exit(-1);
- }
- if((fd[1]=open(argv[2],O_WRONLY | O_CREAT | O_TRUNC,MYMODE))<0){
- fprintf(stderr," create file failed:%s\\n",strerror(errno));
- exit(-1);
- }
- }
- //非实时信号
- void sig_usr(int signo){
- fprintf(stderr,"one child process have running......\\n");
- }
- //实时信号
- void sig_usr_rt(int signo,siginfo_t *info,void *context){
- fprintf(stderr,"one child process %d have running......\\n",info->si_value.sival_int);
- }
- void notify_parent(int signo){
- #ifdef USE_RTSIGNAL
- union sigval val;
- val.sival_int=getpid();
- if(sigqueue(getppid(),signo,val)==-1){
- fprintf(stderr,"sigqueue() error:%s\\n",strerror(errno));
- exit(-1);
- }
- #else
- if(kill(getppid(),signo)==-1){
- fprintf(stderr,"kill() error:%s\\n",strerror(errno));
- exit(-1);
- }
- #endif
- }
- signal_f handle_signal(int signo,void *sigfunc){
- struct sigaction sign,sigo;
- bzero(&sign,sizeof(sign));
- sigemptyset(&sign.sa_mask);
- #ifdef USE_RTSIGNAL
- sigaddset(&sign.sa_mask,NOTIFY_SIGNO);
- sign.sa_flags=SA_SIGINFO;
- sign.sa_sigaction=(signalrt_f)sigfunc;
- #else
- sign.sa_handler=(signal_f)sigfunc;
- #endif
- if(signo==SIGALRM){
- #ifdef SA_INTERRUPT
- sign.sa_flags|=SA_INTERRUPT;
- #endif
- }
- else{
- #ifdef SA_RESTART
- sign.sa_flags|=SA_RESTART;
- #endif
- }
- if(sigaction(signo,&sign,&sigo)==-1)
- fprintf(stderr,"handle_signal() error:%s\\n",strerror(errno));
- return sigo.sa_handler;
- }
- void wait_any_childproc(int signo){
- sigset_t s;
- sigfillset(&s);
- sigdelset(&s,signo);
- sigdelset(&s,SIGINT);
- //不屏蔽SIGINT,SIGUSR1信号,并挂起进程,等待一个信号唤醒此进程,此操作为原子操作
- sigsuspend(&s);
- }
- int lock_reg(int fd,int mode,int flag){
- //注册一个文件锁,锁住这个文件
- int n;
- struct flock lock;
- bzero(&lock,sizeof(struct flock));
- lock.l_len=0;
- lock.l_type=flag;
- lock.l_whence=SEEK_SET;
- lock.l_start=0;
- n=fcntl(fd,mode,&lock);
- if(n==-1){
- fprintf(stderr,"lock_reg() faild:%s\\n",strerror(errno));
- exit(-1);
- }
- return n;
- }
- void init_process(int src_fd,int des_fd){
- int pipe_fd[2];
- sigset_t s,o;
- //安装信号处理句柄
- #ifdef USE_RTSIGNAL
- handle_signal(NOTIFY_SIGNO,(void*)sig_usr_rt);
- #else
- handle_signal(NOTIFY_SIGNO,(void*)sig_usr);
- #endif
- if(pipe(pipe_fd)<0){
- fprintf(stderr,"Pipe create error:%s\\n",strerror(errno));
- exit(-1);
- }
- //复制描述符,使子进程自己从标准输入输出中获取写入数据
- dup2(pipe_fd[0],STDIN_FILENO);
- dup2(pipe_fd[1],STDOUT_FILENO);
- close(pipe_fd[0]);
- close(pipe_fd[1]);
- //屏蔽所有信号
- sigfillset(&s);
- if(sigprocmask(SIG_SETMASK,&s,&o)==-1)
- fprintf(stderr,"wait_any_childproc() faild:%s\\n",strerror(errno));
- if(fork()==0){
- if(sigprocmask(SIG_SETMASK,&o,0)==-1)
- fprintf(stderr,"wait_any_childproc() faild:%s\\n",strerror(errno));
- producer(src_fd,pipe_fd);
- }
- if(fork()==0){
- if(sigprocmask(SIG_SETMASK,&o,0)==-1)
- fprintf(stderr,"wait_any_childproc() faild:%s\\n",strerror(errno));
- customer(des_fd,pipe_fd);
- }
- if(fork()==0){
- if(sigprocmask(SIG_SETMASK,&o,0)==-1)
- fprintf(stderr,"wait_any_childproc() faild:%s\\n",strerror(errno));
- customer(des_fd,pipe_fd);
- }
- //当接受到USR1信号,表示已经有一个进程运行了,可以关闭父进程的管道描述符了,
- //如果不这么做,由于CPU时间片内核在进程间切换的原因导致父进程先于所有的子
- //运行,那么提前关闭管道的读写端,将导致子进程对管道操作的失败。
- wait_any_childproc(NOTIFY_SIGNO);
- //还原原信号屏蔽字
- if(sigprocmask(SIG_SETMASK,&o,0)==-1)
- fprintf(stderr,"wait_any_childproc() faild:%s\\n",strerror(errno));
- close(STDIN_FILENO);
- close(STDOUT_FILENO);
- }
- /*
- 封装一个带有指示数据的数据块,写入管道。
- */
- int producer(int src_fd,int pipe_fd[2]){
- block_data_t *block;
- size_t n;
- //通知父进程,自己已开始运行
- notify_parent(NOTIFY_SIGNO);
- close(STDIN_FILENO);
- if((block=(block_data_t *)malloc(bufsize+BLOCK_HEAD))==NULL){
- fprintf(stderr,"malloc() error:%s\\n",strerror(errno));
- exit(-1);
- }
- bzero(block,bufsize+BLOCK_HEAD);
- block->flag=(u_int16_t)BLOCK_FLAG;
- while((n=read(src_fd,(char*)block+BLOCK_HEAD,bufsize))>0){
- //封装一个数据块
- block->len=n;
- if(write(STDOUT_FILENO,block,n+BLOCK_HEAD)!=n+BLOCK_HEAD){
- fprintf(stderr,"write pipe error:%s\\n",strerror(errno));
- exit(-1);
- }
- block->off+=n;
- }
- if(n==-1){
- fprintf(stderr,"read source file error:%s\\n",strerror(errno));
- exit(-1);
- }
- free(block);
- exit(0);
- }
- /*
- 读取数据块的头部,然后再根据头部中带有的长度标示,读取下个数据块(数据部分)。
- 由于这之间存在进程切换的可能,故使用一个锁。而管道的特性是,读取会改变管道的
- 数据,从而必须使用独占锁,即写锁。
- */
- int customer(int des_fd,int pipe_fd[2]){
- int n,seek;
- block_data_t *block;
- notify_parent(NOTIFY_SIGNO);
- close(STDOUT_FILENO);
- if((block=(block_data_t *)malloc(bufsize+BLOCK_HEAD))==NULL){
- fprintf(stderr,"malloc() error:%s\\n",strerror(errno));
- exit(-1);
- }
- while(1){
- if(is_print_msg)usleep(500*1000);
- //一定要独占模式的写锁
- writew_lock(my_lock_fd);
- //if(is_print_msg)fprintf(stderr,"{\\n\\tprocess %d get lock\\n",getpid());
- bzero(block,bufsize+BLOCK_HEAD);
- //读取头,以便确认该读取多少数据文件
- n=read(STDIN_FILENO,block,BLOCK_HEAD);
- if(n==0){
- fprintf(stderr,"\\tpipe is write over!\\n");
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- break;
- }
- if(n==-1){
- fprintf(stderr,"\\tread pipe error:%s\\n",strerror(errno));
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- exit(-1);
- }
- if(n!=BLOCK_HEAD){
- fprintf(stderr,"\\tread pipe %d error for head block:%d\\n",getpid(),n);
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- break;
- }
- //判断块头文件是否正确
- if(block->flag!=BLOCK_FLAG){
- fprintf(stderr,"\\tread pipe %d error for head block:0x%x\\n",getpid(),block->flag);
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- exit(-1);
- }
- //读取下一个数据块
- n=read(STDIN_FILENO,(char*)block+BLOCK_HEAD,block->len);
- //判断读取数据文件是否正确
- if(n!=block->len){
- fprintf(stderr,"\\tread pipe %d error for data block:%d\\n",getpid(),block->len);
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- break;
- }
- if(n==-1){
- fprintf(stderr,"\\tread pipe error:%s\\n",strerror(errno));
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- exit(-1);
- }
- //if(is_print_msg)fprintf(stderr,"\\tprocess %d unlock\\n}\\n",getpid());
- un_lock(my_lock_fd);
- //写数据,一个原子操作,不必使用锁
- if(pwrite(des_fd,(char*)block+BLOCK_HEAD,block->len,block->off)!=n){
- fprintf(stderr,"process %d write file error!\\n",getpid());
- exit(-1);
- }
- if(is_print_msg)fprintf(stderr,"process %d write %d bytes at %d\\n",getpid(),block->len,block->off);
- }
- free(block);
- exit(0);
- }
- //该片段来自于http://www.codesnippet.cn/detail/140520149572.html
来源: http://www.codesnippet.cn/detail/140520149572.html