目录
- 介绍
- 1. 非阻塞IO
- 数据中继:有限状态机实现
- 数据中继引擎:封装成库
- 2. IO多路转接
- select
- poll
- epoll
- 3. 其他读写函数
- 4. 存储映射IO:mmap
- 5. 文件锁
- 6. 管道实例:手写管道
介绍
1. 非阻塞IO
数据中继:
有限状态机编程:
- 简单流程:自然流程是结构化的
- 复杂流程:自然流程是非机构化的
例子:有限状态机实现数据中继 mycopy升级版
本质上就是两次 mycopy
数据中继:有限状态机实现
示例代码:
##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>##define BUFSIZE 1024
##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"enum
{STATE_R=1,STATE_W,STATE_Ex,STATE_T
};struct fsm_st
{int state;int sfd;int dfd;char buf[BUFSIZE];int len;char *errstr;int pos;
};static void fsm_driver(struct fsm_st *fsm)
{int ret;switch(fsm->state){case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if(fsm->len == 0)fsm->state = STATE_T;else if(fsm->len < 0){if(errno == EAGAIN)fsm->state = STATE_R;else{fsm->errstr = "read()";fsm->state = STATE_Ex;}}else{fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf+fsm->pos, BUFSIZE);if(ret < 0){if(errno == EAGAIN)fsm->state = STATE_W;else{fsm->errstr = "write()";fsm->state = STATE_Ex;}}else//坚持写够len个字节{fsm->pos += ret;fsm->len -= ret;if(fsm->len == 0)fsm->state = STATE_R;elsefsm->state = STATE_W;}break;case STATE_Ex:perror(fsm->errstr);fsm->state = STATE_T;break;case STATE_T:/*do something*/break;default:/*do something*/abort();break;}
}static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;while(fsm12.state != STATE_T || fsm21.state != STATE_T){fsm_driver(&fsm12);fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}int main()
{int fd1, fd2;fd1 = open(TTY1,O_RDWR);if(fd1<0){perror("open()");exit(1);}write(fd1,"TTY1\n",5);fd2 = open(TTY2,O_RDWR|O_NONBLOCK);if(fd2<0){perror("open()");exit(1);}write(fd2,"TTY2\n",5);relayer(fd1,fd2);close(fd1);close(fd2);exit(0);
}
运行结果:
真机下:
ctrl + alte + f11/f12
:切换到TTY11/TTY12
sudo chvt n
:切换到TTYn
tty
:显示当前终端
数据中继引擎:封装成库
类似于anytimer的库
main.c
relayer.c
修改自relay.c
relayer.h
makefile
##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>
##include<string.h>
##include"relayer.h"##define BUFSIZE 1024##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"
##define TTY3 "/dev/tty9"
##define TTY4 "/dev/tty10"int main()
{int fd1, fd2, fd3, fd4;int job1,job2;fd1 = open(TTY1,O_RDWR);if(fd1<0){perror("open()");exit(1);}write(fd1,"TTY1\n",5);fd2 = open(TTY2,O_RDWR|O_NONBLOCK);if(fd2<0){perror("open()");exit(1);}write(fd2,"TTY2\n",5);job1 = rel_addjob(fd1,fd2);if(job1 < 0){fprintf(stderr,"rel_addjob():%s\n",strerror(-job1));exit(1);}fd3 = open(TTY3,O_RDWR);if(fd3 < 0){perror("open()");exit(1);}write(fd3,"TTY3\n",5);fd4 = open(TTY4,O_RDWR);if(fd4 < 0){perror("open()");exit(1);}write(fd4,"TTY4\n",5);job2 = rel_addjob(fd3,fd4);if(job2 < 0){fprintf(stderr,"rel_addjob():%s\n",strerror(-job2));exit(1);}while(1)pause();close(fd1);close(fd2);close(fd3);close(fd4);exit(0);
}
##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>
##include<pthread.h>
##include<string.h>
##include"relayer.h"##define BUFSIZE 1024
##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"enum
{STATE_R=1,STATE_W,STATE_Ex,STATE_T
};struct rel_fsm_st
{int state;int sfd;int dfd;char buf[BUFSIZE];int len;char *errstr;int pos;int64_t count;
};struct rel_job_st
{int fd1,fd2;int state;struct rel_fsm_st fsm12, fsm21;int fd1_save, fd2_save;
// struct timerval start,end;
};static struct rel_job_st* rel_job[REL_JOBMAX];
static pthread_mutex_t mut_rel_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_once_t init_once = PTHREAD_ONCE_INIT;static void fsm_driver(struct rel_fsm_st *fsm)
{int ret;switch(fsm->state){case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if(fsm->len == 0)fsm->state = STATE_T;else if(fsm->len < 0){if(errno == EAGAIN)fsm->state = STATE_R;else{fsm->errstr = "read()";fsm->state = STATE_Ex;}}else{fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf+fsm->pos, BUFSIZE);if(ret < 0){if(errno == EAGAIN)fsm->state = STATE_W;else{fsm->errstr = "write()";fsm->state = STATE_Ex;}}else//坚持写够len个字节{fsm->pos += ret;fsm->len -= ret;if(fsm->len == 0)fsm->state = STATE_R;elsefsm->state = STATE_W;}break;case STATE_Ex:perror(fsm->errstr);fsm->state = STATE_T;break;case STATE_T:/*do something*/break;default:/*do something*/abort();break;}
}int get_free_pos_unlocked(void)
{int i;for(i=0; i < REL_JOBMAX; i++)if(rel_job[i]!=NULL)return i;return -1;
}//module_unload()
static void *thr_relayer(void* p);
//创建一个线程推状态机
static void module_load(void)
{pthread_t tid_relayer;int err;err = pthread_create(&tid_relayer, NULL, thr_relayer, NULL);if(err){fprintf(stderr, "pthread_create():%s\n",strerror(err));exit(1);}}static void *thr_relayer(void* p)
{pthread_mutex_lock(&mut_rel_job);int i;//忙等while(1){for(i = 0; i < REL_JOBMAX; i++){if(rel_job[i] != NULL){if(rel_job[i]->state == STATE_RUNNING){fsm_driver(&rel_job[i]->fsm12);fsm_driver(&rel_job[i]->fsm21);if(rel_job[i]->fsm12.state == STATE_T &&rel_job[i]->fsm21.state == STATE_T)rel_job[i]->state == STATE_OVER;}}}pthread_mutex_unlock(&mut_rel_job);}
}int rel_addjob(int fd1, int fd2)
{struct rel_job_st *me;//模块单次初始化pthread_once(&init_once, module_load);me = malloc(sizeof(struct rel_job_st));if(me == NULL)return -ENOMEM;me->fd1 = fd1;me->fd2 = fd2;me->state = STATE_RUNNING;me->fd1_save = fcntl(me->fd1,F_GETFL, me->fd1_save);fcntl(me->fd1,F_SETFL,me->fd1_save|O_NONBLOCK);me->fd2_save = fcntl(me->fd2,F_GETFL, me->fd2_save);fcntl(me->fd2,F_SETFL,me->fd2_save|O_NONBLOCK);me->fsm12.sfd = me->fd1;me->fsm12.dfd = me->fd2;me->fsm12.state = STATE_R;me->fsm21.sfd = me->fd2;me->fsm21.dfd = me->fd1;me->fsm21.state = STATE_R;//临界资源int pos;pthread_mutex_lock(&mut_rel_job);pos = get_free_pos_unlocked();if(pos < 0){pthread_mutex_unlock(&mut_rel_job);fcntl(me->fd1,F_SETFL,me->fd1_save);fcntl(me->fd2,F_SETFL,me->fd2_save);free(me);return -ENOSPC;}rel_job[pos] = me;pthread_mutex_unlock(&mut_rel_job);return pos;
}##if 0
int rel_canceljob(int id);
int rel_waitjob(int id, struct rel_state_st*);
int rel_statjob(int id, struct rel_state_st*);
##endif
##ifndef RELAYER_H__
##define RELAYER_H__##define REL_JOBMAX 10000enum
{STATE_RUNNING=1,STATE_CANCLED,STATE_OVER
};struct rel_state_st
{int state;int fd1,fd2;int64_t count12,count21;// struct timerval start,end;
};int rel_addjob(int fd1, int fd2);
/** return >= 0 成功,返回当前任务ID* == -EINVAL 失败,参数非法* == -ENOSPC 失败,任务数组满* == -ENOMEM 失败,内存分配有误*/int rel_canceljob(int id);
/** return == 0 成功,任务成功取消* == -EINVAL 失败,参数非法* == -EBUSY 失败,任务早已被取消*/int rel_waitjob(int id, struct rel_state_st*);
/** return == 0 成功,指定任务已终止并返回状态* == -EINVAL 失败,参数非法*/int rel_statjob(int id, struct rel_state_st*);
/** return == 0 成功,指定任务状态已返回* == -EINVAL 失败,参数非法*/##endif
CFLAGS+=-pthread
LDFLAGS+=-pthreadall:relayerrelayer:relayer.o main.ogcc $^ -o $@ $(CFLAGS) $(LDFLAGS)
clean:rm -rf *.o relayer
2. IO多路转接
函数 | 组织方式 | 特点 |
---|---|---|
select |
事件为单位,组织文件描述符 | 老, 有设计缺陷 |
poll |
文件描述符为单位,组织事件 | 移植性好,不错 |
epoll |
同poll | 有优化,linux方言 |
select
/* According to POSIX.1-2001, POSIX.1-2008 */
##include <sys/select.h>
/* According to earlier standards */
##include <sys/time.h>
##include <sys/types.h>
##include <unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
//对fd_set的操作,类似于信号集
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
- 参数
nfds
用到的文件描述的最大值+1readfds
writefds
exceptfds
文件描述符集 【传入传出参数】timeout
超时设置,不设置就死等- NULL:阻塞
- 0:不阻塞,轮询
- 非0值:超时时间
- 返回值
- 成功:文件描述符的个数
- 失败:报错,超时报假错
EINTR
例子:将有限状态机代码改为非忙等
示例代码:
enum
{STATE_R=1,STATE_W,
STATE_AUTO, //相当于分界线STATE_Ex,STATE_T
};
static int max(int a, int b)
{if(a > b)return a;return b;
}
static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd_set rset, wset;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;while(fsm12.state != STATE_T || fsm21.state != STATE_T){//1.布置监视任务FD_ZERO(&rset);FD_ZERO(&wset);if(fsm12.state == STATE_R)FD_SET(fsm12.sfd, &rset);if(fsm12.state == STATE_W)FD_SET(fsm12.dfd, &wset);if(fsm21.state == STATE_R)FD_SET(fsm21.sfd, &rset);if(fsm21.state == STATE_W)FD_SET(fsm21.dfd, &wset);//2.监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){//Ex和T态无需监视if(select(max(fd1,fd2)+1, &rset, &wset, NULL, NULL)<0){//不能用while,因为出错时rest和wset都会被情况//要重新初始化if(errno == EINTR)continue;perror("select()");exit(1);}}//3.查看监视结果if(FD_ISSET(fd1,&rset) || FD_ISSET(fd2, &wset) || fsm12.state > STATE_AUTO) //Ex和T态无条件推动fsm_driver(&fsm12);if(FD_ISSET(fd2,&rset) || FD_ISSET(fd1, &wset) || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}
问题:
- 参数没有
const
修饰:现场和结果放在一个地方 - 监视结果太单一,除了读写就是异常
poll
##include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* file descriptor */short events; /* requested events,bitmap */short revents; /* returned events, bitmap */
};
- 参数:
*fds
一个 pollfd数组nfds
数组元素个数timeout
超时设置,-1阻塞,0非阻塞,>0毫秒为单位
- 返回值:同
select
static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;struct pollfd pfd[2];fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;pfd[0].fd = fd1;pfd[1].fd = fd2;while(fsm12.state != STATE_T || fsm21.state != STATE_T){//布置监视任务pfd[0].events = 0;if(fsm12.state == STATE_R)pfd[0].events |= POLLIN;if(fsm21.state == STATE_R)pfd[0].events |= POLLOUT;pfd[0].events = 0;if(fsm12.state == STATE_W)pfd[1].events |= POLLOUT;if(fsm21.state == STATE_W)pfd[1].events |= POLLIN;//监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){ //不用饶大圈,下面可以写whilewhile(poll(pfd, 2, -1)){if(errno == EINTR)continue;perror("poll()");exit(1);}}//查看监视结果if(pfd[0].revents & POLLIN || pfd[1].revents & POLLOUT || fsm12.state > STATE_AUTO) //Ex和T态无条件推动fsm_driver(&fsm12);if(pfd[1].revents & POLLOUT || pfd[0].revents & POLLIN || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}
epoll
static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;/*创建epoll实例*/int epfd;struct epoll_event ev;epfd = epoll_create(10);if(epfd < 0){perror("epoll_create()");exit(1);}ev.events = 0;ev.data.fd = fd1;epoll_ctl(epfd,EPOLL_CTL_ADD,fd1,&ev);ev.events = 0;ev.data.fd = fd2;epoll_ctl(epfd,EPOLL_CTL_ADD,fd2,&ev);while(fsm12.state != STATE_T || fsm21.state != STATE_T){//1.布置监视任务ev.data.fd = fd1;ev.events = 0;if(fsm12.state == STATE_R)ev.events |= EPOLLIN;if(fsm21.state == STATE_R)ev.events |= EPOLLOUT;epoll_ctl(epfd,EPOLL_CTL_MOD,fd1,&ev);ev.data.fd = fd2;ev.events = 0;if(fsm12.state == STATE_W)ev.events |= EPOLLOUT;if(fsm21.state == STATE_W)ev.events |= EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_MOD,fd2,&ev);//2.监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){ while( epoll_wait(epfd, &ev, 1, -1) < 0){if(errno == EINTR)continue;perror("epoll()");exit(1);}}//3.查看监视结果if(ev.data.fd==fd1 && ev.events & EPOLLIN||ev.data.fd==fd2 && ev.events & EPOLLOUT ||fsm12.state > STATE_AUTO) fsm_driver(&fsm12);if(ev.data.fd==fd2 && ev.events & EPOLLIN ||ev.data.fd==fd1 && ev.events & EPOLLOUT || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);/*销毁epoll实例*/close(epfd);
}
3. 其他读写函数
readv()
writev()
4. 存储映射IO:mmap
##include <sys/mman.h>//注意要成对使用
void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
int munmap(void *addr, size_t length);
- 作用: 把内存
fd
(打开的文件),偏移offset
后的,长为length
的内容,映射到进程空间的地址addr
处,属性是prot
,标识为flags
- 参数:
-
addr
:填NULL
表示让系统找一个合适的空间 -
port
: -
flags
:MAP_SHARED
和MAP_PRIVATE
必选其一, 其他为可选项 [MAP_ANONYMOUS
:不依赖于文件用作malloc
]
-
- 返回值: 成功返回可用空间,失败返回宏
MAP_FAILED
例子:用mmap统计文件中 ‘a’ 的个数
示例代码:
##include<stdio.h>
##include<stdlib.h>
##include<sys/mman.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<unistd.h>
##include<fcntl.h>int main(int argc, char **argv)
{int fd;struct stat statres;char *str;int i,cnt=0;if(argc < 2){fprintf(stderr,"usage:...\n");exit(1);}fd = open(argv[1], O_RDONLY);if(fd < 0){perror("open()");exit(1);}if(fstat(fd,&statres)<0){perror("state()");exit(1);}str = mmap(NULL,statres.st_size,PROT_READ,MAP_SHARED,fd,0);if(str == MAP_FAILED){perror("mmap()");exit(1);}close(fd);for(i = 0; i < statres.st_size; i++){if(str[i] == 'a')cnt++;}printf("the number of 'a' is %d\n", cnt);munmap(str, statres.st_size);exit(0);
}
运行结果:
例子:mmap实现父子进程间通信
fork后,子进程继承了父进程mmap得到的地址
示例代码:
##include<stdio.h>
##include<stdlib.h>
##include<sys/mman.h>
##include<sys/types.h>
##include<sys/wait.h>
##include<unistd.h>
##include<string.h>##define MEMSIZE 1024int main()
{char *ptr;size_t pid;ptr = mmap(NULL,MEMSIZE,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_ANONYMOUS,-1,0);if(ptr == NULL){perror("mmap()");exit(1);}pid = fork();if(pid < 0){perror("fopen()");munmap(ptr,MEMSIZE);exit(1);}if(pid == 0) //child write{strcpy(ptr,"Hello");munmap(ptr,MEMSIZE);exit(0);}else //parent read{wait(NULL);puts(ptr);munmap(ptr,MEMSIZE);exit(0);}
}
运行结果:
5. 文件锁
##include <unistd.h>
int lockf(int fd, int cmd, off_t len);
- 参数
cmd
:F_LOCK
F_TLOCK
F_ULOCK
F_TEST
len
: 锁的长度,0
表示无论文件今后怎么变化能锁多长锁多长
- 返回值:
文件意外解锁:
文件的上锁是在inode层面,所以如果有多个结构体指向同一个inode,那么在一个结构体里上的锁可能会在另一个结构体里解锁
例子:重构互斥量代码,20个 进程 往文件里写内容
示例代码:
##include<stdio.h>
##include<stdlib.h>
##include<string.h>
##include<wait.h>
##include<unistd.h>##define PROCNUM 20
##define FILENAME "/tmp/out"
##define LINESIZE 1024static void func_add(void)
{FILE *fp;int fd;char linebuf[LINESIZE];fp = fopen(FILENAME, "r+");if(fp == NULL){perror("fopen()");exit(1);}fd = fileno(fp);/*if error*/lockf(fd,F_LOCK,0);fgets(linebuf, LINESIZE, fp);fseek(fp, 0, SEEK_SET);//sleep(1);fprintf(fp, "%d\n",atoi(linebuf)+1);fflush(fp); //文件全缓冲lockf(fd,F_ULOCK,0);fclose(fp);
}int main()
{int i,err;size_t pid;for(i = 0; i < PROCNUM; i++){pid = fork();if(pid < 0){perror("fork()");exit(1);}if(pid == 0){func_add();exit(0);}}for(i = 0; i < PROCNUM; i++)wait(NULL);exit(0);
}
运行结果:
代码分析:
fprintf(fp, "%d\n",atoi(linebuf)+1);
fflush(fp); //文件全缓冲
lockf(fd,F_ULOCK,0);
fclose(fp);
防止以外解锁,所以 fclose
要放在 lockf
后面,由于 lockf
可能会出错,导致 fclose
没有执行,缓冲区未刷新,所以 fprintf
之后,要立刻刷新缓冲区 fflush
6. 管道实例:手写管道
例子:筛质数任务池方法,原来的通信是用一个 int
,上下游速率不同步,线程大部分时间花在等待上,现在将 int
拓展为 int array
互斥量忙等→条件变量通知→管道[队列]加快同步
mypipe.h
##ifndef MYPIPE_H__
##define MYPIPE_H__##define PIPESIZE 1024
##define MYPIPE_READ 0x00000001UL //最低位为1
##define MYPIPE_WRITE 0x00000002UL //次低位为1typedef void mypipe_st;mypipe_st *mypipe_init(void);int mypipe_register(mypipe_st *, int opmap);
int mypipe_unregister(mypipe_st *, int opmap);//仿照标准read
int mypipe_read(mypipe_st *, void *buf, size_t count);
int mypipe_write(mypipe_st *, const void* buf, size_t size);int mypipe_destroy(mypipe_st *);##endif
mypipe.c
##include<stdio.h>
##include<stdlib.h>
##include<pthread.h>##include"pthread.h"struct mypipe_st
{int head, tail;char data[PIPESIZE];int datasize;int count_rd;int count_wr;pthread_mutex_t mut; //独占使用pthread_cond_t cond; //阻塞使用:如果读取时没数据,等待直到有数据
};mypipe_st *mypipe_init(void)
{struct mypipe_st me;me = malloc(sizeof(*me));if(me == NULL)return NULL;me->head = 0;me->tail = 0;me->datasize = 0;me->count_rd = 0;me->count_wr = 0;pthread_mutex_init(&me->mut, NULL);pthread_cond_init(&me->cond, NULL);return me;
}int mypipe_register(mypipe_st *ptr, int opmap)
{/*if error*/struct mypipe_st *me = ptr;pthread_mutex_lock(&me->mut);if(opmap & MYPIPE_READ)me->count_rd++;if(opmap & MYPIPE_WRITE)me->count_wr++;//直到有读者且有写者来才会解除阻塞pthread_cond_broadcast(&me->cond);while(me->rd<=0 || me->wr<=0)pthread_cond_wait(&me->cond, &me->mut);pthread_mutex_unlock(&me->mut);return 0;
}int mypipe_unregister(mypipe_st *ptr, int opmap)
{struct mypipe_st *me = ptr;pthread_mutex_lock(&me->mut);if(opmap & MYPIPE_READ)me->count_rd--;if(opmap & MYPIPE_WRITE)me->count_wr--;//写者减小到<0,那么read无需阻塞等待,唤醒pthread_cond_broadcast(&me->cond);pthread_mutex_unlock(&me->mut);
}static int mypipe_readbyte_unlocked(struct mypipe_st *me, char *datap)
{if(me->datasize <= 0)return -1;*datap = me->data[me->head];me->head++;me->head = next(me->head);me->datasize--;return 0;
}int mypipe_read(mypipe_st *ptr, void *buf, size_t count)
{int i;struct mypipe_st me = ptr;pthread_mutex_lock(&me->mut);//问题分析:如果没有写者,读者不应该继续阻塞//所以要记录管道中的读者和写者//继而想到使用管道的时候要��册,.h中添加接口while(me->datasize <= 0 && me->count_wr > 0)pthread_cond_wait(&me->cond, &me->mut);if(me->datasize <= 0 && me->count_wr <= 0){pthread_mutex_unlock(&me->mut);return 0;}for(i = 0; i < count; i++)if(mypipe_readbyte_unlocked(me, buf+i) != 0)break;pthread_cond_broadcast(&me->cond); //唤醒阻塞写者pthread_mutex_unlock(&me->mut);return i;}int mypipe_write(mypipe_st *, const void* buf, size_t size)
{}int mypipe_destroy(mypipe_st ptr*)
{struct mypipe_st me = ptr;pthread_mutex_destroy(&me->mut);pthread_cond_destroy(&me->cond);free(ptr);
}
优化:二次封装,体现一切皆文件的思想