科技报道|- 共享内存,多进程编程( 三 )


#include#include#include#include#include#include#include#include#include#include#include#include#include#include#include#include#defineUSER_LIMIT5#defineBUFFER_SIZE1024#defineFD_LIMIT65535#defineMAX_EVENT_NUMBER1024#definePROCESS_LIMIT65536/*处理一个客户连接必要的数据*/structclient_data{sockaddr_inaddress;/*客户端socket地址*/intconnfd;/*socket文件描述符*/pid_tpid;/*处理这个连接的子进程PID*/intpipefd[2];/*和父进程通信用的管道*/};staticconstchar*shm_name="/my_shm";intsig_pipefd[2];intepollfd;intlistenfd;intshmfd;char*share_mem=0;/*客户连接数组 。 进程使用客户连接的编号来索引这个数据 , 及可取得相关的客户连接数据*/client_data*users=0;/*子进程与客户连接的映射关系 , 用进程PID来索引这个数组 。 即可取得该进程所处理的客户链接的编号*/int*sub_porcess=0;/*当前客户数量*/intuser_count=0;boolstop_child=false;intsetnonblocking(intfd){intold_option=fcntl(fd,F_GETFL);intnew_option=old_option|O_NONBLOCK;fcntl(fd,F_SETFL,new_option);returnold_option;}voidaddfd(intepollfd,intfd){epoll_eventevent;event.data.fd=fd;event.events=EPOLLIN|EPOLLET;epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);setnonblocking(fd);}voidsig_handler(intsig){intsave_errno=errno;intmsg=sig;send(sig_pipefd[1],(char*)&msg,1,0);errno=save_errno;}voidaddsig(intsig,void(*handler)(int),boolrestart=true){structsigactionsa;memset(&sa,'0',sizeof(sa));sa.sa_handler=handler;if(restart){sa.sa_flags|=SA_RESTART;}sigfillset(&sa.sa_mask);assert(sigaction(sig,&sa,NULL)!=-1);}voiddel_resource(){close(sig_pipefd[0]);close(sig_pipefd[1]);close(listenfd);close(epollfd);shm_unlink(shm_name);delete[]users;delete[]sub_porcess;}/*停止一个子进程*/voidchild_term_handler(intsig){stop_child=true;}/*子进程运行的函数 , 参数idx指出该子进程处理客户链接的编号users是保存连接数据的数组参数share_mem指出共享内存的其实地址*/intrun_child(intidx,client_data*users,char*share_mem){epoll_eventevents[MAX_EVENT_NUMBER];/*子进程使用I/O复用技术来同时监听两个文件描述副:客户连接socket、与父进程通信的管道文件描述符*/intchild_epollfd=epoll_create(5);assert(child_epollfd!=-1);intconnfd=users[idx].connfd;addfd(child_epollfd,connfd);intpipefd=users[idx].pipefd[1];addfd(child_epollfd,pipefd);intret=0;/*子进程需要设置自己的信号处理函数*/addsig(SIGTERM,child_term_handler,false);while(!stop_child){intnumber=epoll_wait(child_epollfd,events,MAX_EVENT_NUMBER,-1);if(number<0&&errno!=EINTR){printf("epollfailuren");break;}for(inti=0;i<number;++i){intsockfd=events[i].data.fd;/*本子进程负责的客户连接有数据到达*/if(sockfd==connfd&&events[i].events&EPOLLIN){memset(share_mem+idx*BUFFER_SIZE,'0',BUFFER_SIZE);/*将客户数据读取到对应的读缓存中 。 读缓存是共享内存的一段 , 它开始去idx*BUffER_SIZE处 , 长度为BUFFER_SIZE每个客户连接的读缓存是共享的*/ret=recv(connfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0);if(ret<0){if(errno!=EAGAIN){stop_child=true;}}elseif(ret==0){stop_child=true;}else{/*成功读取客户端数据后就通知主进程(通过管道)来处理*/send(pipefd,(char*)&idx,sizeof(idx),0);}}/*主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责的客户端*/elseif(sockfd==pipefd&&events[i].events&EPOLLIN){intclient=0;/*接受主进程发送过来的数据 , 即有客户端数据到达的连接编号*/ret=recv(sockfd,(char*)&client,sizeof(client),0);if(ret<0){if(errno!=EAGAIN){stop_child=true;}}elseif(ret=0){stop_child=true;}else{send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0);}}else{continue;}}}close(connfd);close(pipefd);close(child_epollfd);return0;}intmain(intargc,charconst*argv[]){if(argc<=2){printf("usage:%sip_addressport_numbern",basename(argv[0]));return1;}constchar*ip=argv[1];intport=atoi(argv[2]);intret=0;structsockaddr_inaddress;bzero(&address,sizeof(address));address.sin_family=AF_INET;inet_pton(AF_INET,ip,&address.sin_addr);address.sin_port=htons(port);listenfd=socket(PF_INET,SOCK_STREAM,0);assert(listenfd>=0);ret=bind(listenfd,(structsockaddr*)&address,sizeof(address));assert(ret!=-1);ret=listen(listenfd,5);assert(ret!=-1);user_count=0;users=newclient_data[USER_LIMIT+1];sub_porcess=newint[PROCESS_LIMIT];for(inti=0;i<PROCESS_LIMIT;++i){sub_porcess[i]=-1;}epoll_eventevents[MAX_EVENT_NUMBER];epollfd=epoll_create(5);assert(epollfd!=-1);addfd(epollfd,listenfd);ret=socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);assert(ret!=-1);setnonblocking(sig_pipefd[1]);addfd(epollfd,sig_pipefd[0]);addsig(SIGCHLD,sig_handler);addsig(SIGTERM,sig_handler);addsig(SIGINT,sig_handler);addsig(SIGPIPE,SIG_IGN);boolstop_server=false;boolterminiate=false;/*创建共享内存 , 作为所有客户socket链接的读缓存*/shmfd=shm_open(shm_name,O_CREAT|O_RDWR,0666);assert(shmfd!=-1);ret=ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);assert(ret!=-1);share_mem=(char*)mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0);assert(share_mem!=MAP_FAILED);close(shmfd);while(!stop_server){intnumber=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);if(number<0&&errno!=EINTR){printf("epollfailuren");break;}for(inti=0;i<number;++i){intsockfd=events[i].data.fd;/*新的客户连接*/if(sockfd==listenfd){structsockaddr_inclient_address;socklen_tclient_addrlength=sizeof(client_address);intconnfd=accept(listenfd,(structsockaddr*)&client_address,&client_addrlength);if(connfd<0){printf("errnois%dn",errno);continue;}if(user_count>=USER_LIMIT){constchar*info="toomanyusersn";printf("%s",info);send(connfd,info,strlen(info),0);close(connfd);continue;}/*保存第user_count个客户连接的相关数据*/users[user_count].address=client_address;users[user_count].connfd=connfd;/*在主进程和子进程间建立管道 , 以传递必要的数据*/ret=socketpair(PF_UNIX,SOCK_STREAM,0,users[user_count].pipefd);assert(ret!=-1);pid_tpid=fork();if(pid<0){close(connfd);continue;}elseif(pid==0){close(epollfd);close(listenfd);close(users[user_count].pipefd[0]);close(sig_pipefd[0]);close(sig_pipefd[1]);run_child(user_count,users,share_mem);munmap((void*)share_mem,USER_LIMIT*BUFFER_SIZE);exit(0);}else{close(connfd);close(users[user_count].pipefd[1]);addfd(epollfd,users[user_count].pipefd[0]);users[user_count].pid=pid;/*记录新的客户连接在数组users中的索引值 , 建立进程pid和该索引值之间的映射关系*/sub_porcess[pid]=user_count;user_count++;}}/*处理信号事件*/elseif(sockfd==sig_pipefd[0]&&events[i].events&EPOLLIN){intsig;charsignals[1024];ret=recv(sig_pipefd[0],signals,sizeof(signals),0);if(ret==-1){continue;}elseif(ret==0){continue;}else{for(inti=0;i<ret;++i){switch(signals[i]){caseSIGCHLD:{/*子进程退出 , 表示有某个客户端关闭了连接*/pid_tpid;intstat;while((pid=waitpid(-1,&stat,WNOHANG))>0){intdel_user=sub_porcess[pid];sub_porcess[pid]=-1;if(del_user<0||del_user>USER_LIMIT){continue;}epoll_ctl(epollfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0);close(users[del_user].pipefd[0]);users[del_user]=users[--user_count];sub_porcess[users[del_user].pid]=del_user;}if(terminiate&&user_count==0){stop_server=true;}break;}caseSIGTERM:caseSIGINT:{/*结束服务程序*/printf("killallthechildnown");if(user_count==0){stop_server=true;break;}for(inti=0;i<user_count;++i){intpid=users[i].pid;kill(pid,SIGTERM);}terminiate=true;break;}default:break;}}}}/*某个紫禁城向父进程写入了数据*/elseif(events[i].events&EPOLLIN){intchild=0;ret=recv(sockfd,(char*)&child,sizeof(child),0);printf("readdatafromchildprocessaccrosspipen");if(ret==-1){continue;}elseif(ret==0){continue;}else{/*向除负责处理第child和客户链接的子进程之外的其他子进程发送消息 , 通知有客户要写*/for(intj=0;j<user_count;++j){if(users[j].pipefd[0]!=sockfd){printf("senddatatochildaccrosspipen");send(users[j].pipefd[0],(char*)&child,sizeof(child),0);}}}}}}del_resource();return0;}