当前位置: 首页 > news >正文

linux系统编程08-高级IO

目录
  • 介绍
  • 1. 非阻塞IO
    • 数据中继:有限状态机实现
    • 数据中继引擎:封装成库
  • 2. IO多路转接
    • select
    • poll
    • epoll
  • 3. 其他读写函数
  • 4. 存储映射IO:mmap
  • 5. 文件锁
  • 6. 管道实例:手写管道

介绍

Untitled

1. 非阻塞IO

数据中继:

Untitled

有限状态机编程:

  • 简单流程:自然流程是结构化的
  • 复杂流程:自然流程是非机构化的

Untitled

例子:有限状态机实现数据中继 mycopy升级版

本质上就是两次 mycopy

数据中继:有限状态机实现

示例代码:

Untitled

##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>##define BUFSIZE     1024
##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"enum
{STATE_R=1,STATE_W,STATE_Ex,STATE_T
};struct fsm_st
{int state;int sfd;int dfd;char buf[BUFSIZE];int len;char *errstr;int pos;
};static void fsm_driver(struct fsm_st *fsm)
{int ret;switch(fsm->state){case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if(fsm->len == 0)fsm->state = STATE_T;else if(fsm->len < 0){if(errno == EAGAIN)fsm->state = STATE_R;else{fsm->errstr = "read()";fsm->state = STATE_Ex;}}else{fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf+fsm->pos, BUFSIZE);if(ret < 0){if(errno == EAGAIN)fsm->state = STATE_W;else{fsm->errstr = "write()";fsm->state = STATE_Ex;}}else//坚持写够len个字节{fsm->pos += ret;fsm->len -= ret;if(fsm->len == 0)fsm->state = STATE_R;elsefsm->state = STATE_W;}break;case STATE_Ex:perror(fsm->errstr);fsm->state = STATE_T;break;case STATE_T:/*do something*/break;default:/*do something*/abort();break;}
}static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;while(fsm12.state != STATE_T || fsm21.state != STATE_T){fsm_driver(&fsm12);fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}int main()
{int fd1, fd2;fd1 = open(TTY1,O_RDWR);if(fd1<0){perror("open()");exit(1);}write(fd1,"TTY1\n",5);fd2 = open(TTY2,O_RDWR|O_NONBLOCK);if(fd2<0){perror("open()");exit(1);}write(fd2,"TTY2\n",5);relayer(fd1,fd2);close(fd1);close(fd2);exit(0);
}

运行结果:

真机下:

ctrl + alte + f11/f12 :切换到TTY11/TTY12

sudo chvt n :切换到TTYn

tty :显示当前终端

Untitled

Untitled

数据中继引擎:封装成库

类似于anytimer的库

  • main.c relayer.c 修改自 relay.c
  • relayer.h
  • makefile

Untitled

##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>
##include<string.h>
##include"relayer.h"##define BUFSIZE     1024##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"
##define TTY3 "/dev/tty9"
##define TTY4 "/dev/tty10"int main()
{int fd1, fd2, fd3, fd4;int job1,job2;fd1 = open(TTY1,O_RDWR);if(fd1<0){perror("open()");exit(1);}write(fd1,"TTY1\n",5);fd2 = open(TTY2,O_RDWR|O_NONBLOCK);if(fd2<0){perror("open()");exit(1);}write(fd2,"TTY2\n",5);job1 = rel_addjob(fd1,fd2);if(job1 < 0){fprintf(stderr,"rel_addjob():%s\n",strerror(-job1));exit(1);}fd3 = open(TTY3,O_RDWR);if(fd3 < 0){perror("open()");exit(1);}write(fd3,"TTY3\n",5);fd4 = open(TTY4,O_RDWR);if(fd4 < 0){perror("open()");exit(1);}write(fd4,"TTY4\n",5);job2 = rel_addjob(fd3,fd4);if(job2 < 0){fprintf(stderr,"rel_addjob():%s\n",strerror(-job2));exit(1);}while(1)pause();close(fd1);close(fd2);close(fd3);close(fd4);exit(0);
}
##include<stdio.h>
##include<stdlib.h>
##include<unistd.h>
##include<errno.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<fcntl.h>
##include<pthread.h>
##include<string.h>
##include"relayer.h"##define BUFSIZE     1024
##define TTY1 "/dev/tty11"
##define TTY2 "/dev/tty12"enum
{STATE_R=1,STATE_W,STATE_Ex,STATE_T
};struct rel_fsm_st
{int state;int sfd;int dfd;char buf[BUFSIZE];int len;char *errstr;int pos;int64_t count;
};struct rel_job_st
{int fd1,fd2;int state;struct rel_fsm_st fsm12, fsm21;int fd1_save, fd2_save;
//    struct timerval start,end;
};static struct rel_job_st* rel_job[REL_JOBMAX];
static pthread_mutex_t mut_rel_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_once_t init_once = PTHREAD_ONCE_INIT;static void fsm_driver(struct rel_fsm_st *fsm)
{int ret;switch(fsm->state){case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if(fsm->len == 0)fsm->state = STATE_T;else if(fsm->len < 0){if(errno == EAGAIN)fsm->state = STATE_R;else{fsm->errstr = "read()";fsm->state = STATE_Ex;}}else{fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf+fsm->pos, BUFSIZE);if(ret < 0){if(errno == EAGAIN)fsm->state = STATE_W;else{fsm->errstr = "write()";fsm->state = STATE_Ex;}}else//坚持写够len个字节{fsm->pos += ret;fsm->len -= ret;if(fsm->len == 0)fsm->state = STATE_R;elsefsm->state = STATE_W;}break;case STATE_Ex:perror(fsm->errstr);fsm->state = STATE_T;break;case STATE_T:/*do something*/break;default:/*do something*/abort();break;}
}int get_free_pos_unlocked(void)
{int i;for(i=0; i < REL_JOBMAX; i++)if(rel_job[i]!=NULL)return i;return -1;
}//module_unload()
static void *thr_relayer(void* p);
//创建一个线程推状态机
static void module_load(void)
{pthread_t tid_relayer;int err;err = pthread_create(&tid_relayer, NULL, thr_relayer, NULL);if(err){fprintf(stderr, "pthread_create():%s\n",strerror(err));exit(1);}}static void *thr_relayer(void* p)
{pthread_mutex_lock(&mut_rel_job);int i;//忙等while(1){for(i = 0; i < REL_JOBMAX; i++){if(rel_job[i] != NULL){if(rel_job[i]->state == STATE_RUNNING){fsm_driver(&rel_job[i]->fsm12);fsm_driver(&rel_job[i]->fsm21);if(rel_job[i]->fsm12.state == STATE_T &&rel_job[i]->fsm21.state == STATE_T)rel_job[i]->state == STATE_OVER;}}}pthread_mutex_unlock(&mut_rel_job);}
}int rel_addjob(int fd1, int fd2)
{struct rel_job_st *me;//模块单次初始化pthread_once(&init_once, module_load);me = malloc(sizeof(struct rel_job_st));if(me == NULL)return -ENOMEM;me->fd1 = fd1;me->fd2 = fd2;me->state = STATE_RUNNING;me->fd1_save = fcntl(me->fd1,F_GETFL, me->fd1_save);fcntl(me->fd1,F_SETFL,me->fd1_save|O_NONBLOCK);me->fd2_save = fcntl(me->fd2,F_GETFL, me->fd2_save);fcntl(me->fd2,F_SETFL,me->fd2_save|O_NONBLOCK);me->fsm12.sfd = me->fd1;me->fsm12.dfd = me->fd2;me->fsm12.state = STATE_R;me->fsm21.sfd = me->fd2;me->fsm21.dfd = me->fd1;me->fsm21.state = STATE_R;//临界资源int pos;pthread_mutex_lock(&mut_rel_job);pos = get_free_pos_unlocked();if(pos < 0){pthread_mutex_unlock(&mut_rel_job);fcntl(me->fd1,F_SETFL,me->fd1_save);fcntl(me->fd2,F_SETFL,me->fd2_save);free(me);return -ENOSPC;}rel_job[pos] = me;pthread_mutex_unlock(&mut_rel_job);return pos;
}##if 0
int rel_canceljob(int id);
int rel_waitjob(int id, struct rel_state_st*);
int rel_statjob(int id, struct rel_state_st*);
##endif
##ifndef RELAYER_H__
##define RELAYER_H__##define REL_JOBMAX  10000enum
{STATE_RUNNING=1,STATE_CANCLED,STATE_OVER
};struct rel_state_st
{int state;int fd1,fd2;int64_t count12,count21;//    struct timerval start,end;
};int rel_addjob(int fd1, int fd2);
/** return >= 0          成功,返回当前任务ID*        == -EINVAL    失败,参数非法*        == -ENOSPC    失败,任务数组满*        == -ENOMEM    失败,内存分配有误*/int rel_canceljob(int id);
/** return == 0          成功,任务成功取消*        == -EINVAL    失败,参数非法*        == -EBUSY     失败,任务早已被取消*/int rel_waitjob(int id, struct rel_state_st*);
/** return == 0          成功,指定任务已终止并返回状态*        == -EINVAL    失败,参数非法*/int rel_statjob(int id, struct rel_state_st*);
/** return == 0          成功,指定任务状态已返回*        == -EINVAL    失败,参数非法*/##endif
CFLAGS+=-pthread
LDFLAGS+=-pthreadall:relayerrelayer:relayer.o main.ogcc $^ -o $@ $(CFLAGS) $(LDFLAGS)
clean:rm -rf *.o relayer

2. IO多路转接

Untitled

函数 组织方式 特点
select 事件为单位,组织文件描述符 老, 有设计缺陷
poll 文件描述符为单位,组织事件 移植性好,不错
epoll 同poll 有优化,linux方言

select

/* According to POSIX.1-2001, POSIX.1-2008 */
##include <sys/select.h>
/* According to earlier standards */
##include <sys/time.h>
##include <sys/types.h>
##include <unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
//对fd_set的操作,类似于信号集
void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
  • 参数
    • nfds 用到的文件描述的最大值+1
    • readfds writefds exceptfds 文件描述符集 【传入传出参数】
    • timeout 超时设置,不设置就死等
      • NULL:阻塞
      • 0:不阻塞,轮询
      • 非0值:超时时间
  • 返回值
    • 成功:文件描述符的个数
    • 失败:报错,超时报假错 EINTR

例子:将有限状态机代码改为非忙等

示例代码:

enum
{STATE_R=1,STATE_W,
STATE_AUTO, //相当于分界线STATE_Ex,STATE_T
};
static int max(int a, int b)
{if(a > b)return a;return b;
}
static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd_set rset, wset;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;while(fsm12.state != STATE_T || fsm21.state != STATE_T){//1.布置监视任务FD_ZERO(&rset);FD_ZERO(&wset);if(fsm12.state == STATE_R)FD_SET(fsm12.sfd, &rset);if(fsm12.state == STATE_W)FD_SET(fsm12.dfd, &wset);if(fsm21.state == STATE_R)FD_SET(fsm21.sfd, &rset);if(fsm21.state == STATE_W)FD_SET(fsm21.dfd, &wset);//2.监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){//Ex和T态无需监视if(select(max(fd1,fd2)+1, &rset, &wset, NULL, NULL)<0){//不能用while,因为出错时rest和wset都会被情况//要重新初始化if(errno == EINTR)continue;perror("select()");exit(1);}}//3.查看监视结果if(FD_ISSET(fd1,&rset) || FD_ISSET(fd2, &wset) || fsm12.state > STATE_AUTO) //Ex和T态无条件推动fsm_driver(&fsm12);if(FD_ISSET(fd2,&rset) || FD_ISSET(fd1, &wset) || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}

问题:

  • 参数没有 const 修饰:现场和结果放在一个地方
  • 监视结果太单一,除了读写就是异常

Untitled

poll

##include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int   fd;         /* file descriptor */short events;     /* requested events,bitmap */short revents;    /* returned events, bitmap */
};
  • 参数:
    • *fds 一个 pollfd数组
    • nfds 数组元素个数
    • timeout 超时设置,-1阻塞,0非阻塞,>0毫秒为单位
  • 返回值:同 select
static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;struct pollfd pfd[2];fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;pfd[0].fd = fd1;pfd[1].fd = fd2;while(fsm12.state != STATE_T || fsm21.state != STATE_T){//布置监视任务pfd[0].events = 0;if(fsm12.state == STATE_R)pfd[0].events |= POLLIN;if(fsm21.state == STATE_R)pfd[0].events |= POLLOUT;pfd[0].events = 0;if(fsm12.state == STATE_W)pfd[1].events |= POLLOUT;if(fsm21.state == STATE_W)pfd[1].events |= POLLIN;//监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){   //不用饶大圈,下面可以写whilewhile(poll(pfd, 2, -1)){if(errno == EINTR)continue;perror("poll()");exit(1);}}//查看监视结果if(pfd[0].revents & POLLIN || pfd[1].revents & POLLOUT || fsm12.state > STATE_AUTO) //Ex和T态无条件推动fsm_driver(&fsm12);if(pfd[1].revents & POLLOUT || pfd[0].revents & POLLIN || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);
}

epoll

Untitled

Untitled

Untitled

Untitled

static void relayer(int fd1, int fd2)
{int fd1_save, fd2_save;struct fsm_st fsm12,fsm21;fd1_save = fcntl(fd1,F_GETFL);fcntl(fd1, F_SETFL, fd1_save|O_NONBLOCK);fd2_save = fcntl(fd2,F_GETFL);fcntl(fd2, F_SETFL, fd2_save|O_NONBLOCK);fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state - STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;/*创建epoll实例*/int epfd;struct epoll_event ev;epfd = epoll_create(10);if(epfd < 0){perror("epoll_create()");exit(1);}ev.events = 0;ev.data.fd = fd1;epoll_ctl(epfd,EPOLL_CTL_ADD,fd1,&ev);ev.events = 0;ev.data.fd = fd2;epoll_ctl(epfd,EPOLL_CTL_ADD,fd2,&ev);while(fsm12.state != STATE_T || fsm21.state != STATE_T){//1.布置监视任务ev.data.fd = fd1;ev.events = 0;if(fsm12.state == STATE_R)ev.events |= EPOLLIN;if(fsm21.state == STATE_R)ev.events |= EPOLLOUT;epoll_ctl(epfd,EPOLL_CTL_MOD,fd1,&ev);ev.data.fd = fd2;ev.events = 0;if(fsm12.state == STATE_W)ev.events |= EPOLLOUT;if(fsm21.state == STATE_W)ev.events |= EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_MOD,fd2,&ev);//2.监视if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO){   while( epoll_wait(epfd, &ev, 1, -1) < 0){if(errno == EINTR)continue;perror("epoll()");exit(1);}}//3.查看监视结果if(ev.data.fd==fd1 && ev.events & EPOLLIN||ev.data.fd==fd2 && ev.events & EPOLLOUT  ||fsm12.state > STATE_AUTO) fsm_driver(&fsm12);if(ev.data.fd==fd2 && ev.events & EPOLLIN ||ev.data.fd==fd1 && ev.events & EPOLLOUT  || fsm21.state > STATE_AUTO)fsm_driver(&fsm21);}fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);/*销毁epoll实例*/close(epfd);
}

3. 其他读写函数

  • readv()
  • writev()

4. 存储映射IO:mmap

Untitled

##include <sys/mman.h>//注意要成对使用
void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
int munmap(void *addr, size_t length);
  • 作用: 把内存 fd (打开的文件),偏移 offset 后的,长为 length 的内容,映射到进程空间的地址 addr 处,属性是 prot ,标识为 flags
  • 参数:
    • addr :填 NULL 表示让系统找一个合适的空间

    • port

      Untitled

    • flagsMAP_SHAREDMAP_PRIVATE 必选其一, 其他为可选项 [ MAP_ANONYMOUS :不依赖于文件用作 malloc ]

  • 返回值: 成功返回可用空间,失败返回宏 MAP_FAILED

例子:用mmap统计文件中 ‘a’ 的个数

示例代码:

##include<stdio.h>
##include<stdlib.h>
##include<sys/mman.h>
##include<sys/types.h>
##include<sys/stat.h>
##include<unistd.h>
##include<fcntl.h>int main(int argc, char **argv)
{int fd;struct stat statres;char *str;int i,cnt=0;if(argc < 2){fprintf(stderr,"usage:...\n");exit(1);}fd = open(argv[1], O_RDONLY);if(fd < 0){perror("open()");exit(1);}if(fstat(fd,&statres)<0){perror("state()");exit(1);}str = mmap(NULL,statres.st_size,PROT_READ,MAP_SHARED,fd,0);if(str == MAP_FAILED){perror("mmap()");exit(1);}close(fd);for(i = 0; i < statres.st_size; i++){if(str[i] == 'a')cnt++;}printf("the number of 'a' is %d\n", cnt);munmap(str, statres.st_size);exit(0);
}

运行结果:

Untitled

例子:mmap实现父子进程间通信

fork后,子进程继承了父进程mmap得到的地址

fork后,子进程继承了父进程mmap得到的地址

示例代码:

##include<stdio.h>
##include<stdlib.h>
##include<sys/mman.h>
##include<sys/types.h>
##include<sys/wait.h>
##include<unistd.h>
##include<string.h>##define MEMSIZE 1024int main()
{char *ptr;size_t pid;ptr = mmap(NULL,MEMSIZE,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_ANONYMOUS,-1,0);if(ptr == NULL){perror("mmap()");exit(1);}pid = fork();if(pid < 0){perror("fopen()");munmap(ptr,MEMSIZE);exit(1);}if(pid == 0)    //child write{strcpy(ptr,"Hello");munmap(ptr,MEMSIZE);exit(0);}else            //parent read{wait(NULL);puts(ptr);munmap(ptr,MEMSIZE);exit(0);}
}

运行结果:

Untitled

5. 文件锁

Untitled

##include <unistd.h>
int lockf(int fd, int cmd, off_t len);
  • 参数
    • cmdF_LOCK F_TLOCK F_ULOCK F_TEST
    • len : 锁的长度, 0 表示无论文件今后怎么变化能锁多长锁多长
  • 返回值:

Untitled

文件意外解锁:

Untitled

文件的上锁是在inode层面,所以如果有多个结构体指向同一个inode,那么在一个结构体里上的锁可能会在另一个结构体里解锁

例子:重构互斥量代码,20个 进程 往文件里写内容

示例代码:

##include<stdio.h>
##include<stdlib.h>
##include<string.h>
##include<wait.h>
##include<unistd.h>##define PROCNUM     20
##define FILENAME    "/tmp/out"
##define LINESIZE     1024static void func_add(void)
{FILE *fp;int fd;char linebuf[LINESIZE];fp = fopen(FILENAME, "r+");if(fp == NULL){perror("fopen()");exit(1);}fd = fileno(fp);/*if error*/lockf(fd,F_LOCK,0);fgets(linebuf, LINESIZE, fp);fseek(fp, 0, SEEK_SET);//sleep(1);fprintf(fp, "%d\n",atoi(linebuf)+1);fflush(fp); //文件全缓冲lockf(fd,F_ULOCK,0);fclose(fp);
}int main()
{int i,err;size_t pid;for(i = 0; i < PROCNUM; i++){pid = fork();if(pid < 0){perror("fork()");exit(1);}if(pid == 0){func_add();exit(0);}}for(i = 0; i < PROCNUM; i++)wait(NULL);exit(0);
}

运行结果:

Untitled

代码分析:

fprintf(fp, "%d\n",atoi(linebuf)+1);
fflush(fp); //文件全缓冲
lockf(fd,F_ULOCK,0);
fclose(fp);

防止以外解锁,所以 fclose 要放在 lockf 后面,由于 lockf 可能会出错,导致 fclose 没有执行,缓冲区未刷新,所以 fprintf 之后,要立刻刷新缓冲区 fflush

6. 管道实例:手写管道

例子:筛质数任务池方法,原来的通信是用一个 int ,上下游速率不同步,线程大部分时间花在等待上,现在将 int 拓展为 int array

互斥量忙等→条件变量通知→管道[队列]加快同步

Untitled

mypipe.h

##ifndef MYPIPE_H__
##define MYPIPE_H__##define PIPESIZE        1024
##define MYPIPE_READ     0x00000001UL //最低位为1
##define MYPIPE_WRITE    0x00000002UL //次低位为1typedef void mypipe_st;mypipe_st *mypipe_init(void);int mypipe_register(mypipe_st *, int opmap);
int mypipe_unregister(mypipe_st *, int opmap);//仿照标准read
int mypipe_read(mypipe_st *, void *buf, size_t count);
int mypipe_write(mypipe_st *, const void* buf, size_t size);int mypipe_destroy(mypipe_st *);##endif

mypipe.c

##include<stdio.h>
##include<stdlib.h>
##include<pthread.h>##include"pthread.h"struct mypipe_st
{int head, tail;char data[PIPESIZE];int datasize;int count_rd;int count_wr;pthread_mutex_t mut; //独占使用pthread_cond_t cond; //阻塞使用:如果读取时没数据,等待直到有数据
};mypipe_st *mypipe_init(void)
{struct mypipe_st me;me = malloc(sizeof(*me));if(me == NULL)return NULL;me->head = 0;me->tail = 0;me->datasize = 0;me->count_rd = 0;me->count_wr = 0;pthread_mutex_init(&me->mut, NULL);pthread_cond_init(&me->cond, NULL);return me;
}int mypipe_register(mypipe_st *ptr, int opmap)
{/*if error*/struct mypipe_st *me = ptr;pthread_mutex_lock(&me->mut);if(opmap & MYPIPE_READ)me->count_rd++;if(opmap & MYPIPE_WRITE)me->count_wr++;//直到有读者且有写者来才会解除阻塞pthread_cond_broadcast(&me->cond);while(me->rd<=0 || me->wr<=0)pthread_cond_wait(&me->cond, &me->mut);pthread_mutex_unlock(&me->mut);return 0;
}int mypipe_unregister(mypipe_st *ptr, int opmap)
{struct mypipe_st *me = ptr;pthread_mutex_lock(&me->mut);if(opmap & MYPIPE_READ)me->count_rd--;if(opmap & MYPIPE_WRITE)me->count_wr--;//写者减小到<0,那么read无需阻塞等待,唤醒pthread_cond_broadcast(&me->cond);pthread_mutex_unlock(&me->mut);
}static int mypipe_readbyte_unlocked(struct mypipe_st *me, char *datap)
{if(me->datasize <= 0)return -1;*datap = me->data[me->head];me->head++;me->head = next(me->head);me->datasize--;return 0;
}int mypipe_read(mypipe_st *ptr, void *buf, size_t count)
{int i;struct mypipe_st me = ptr;pthread_mutex_lock(&me->mut);//问题分析:如果没有写者,读者不应该继续阻塞//所以要记录管道中的读者和写者//继而想到使用管道的时候要��册,.h中添加接口while(me->datasize <= 0 && me->count_wr > 0)pthread_cond_wait(&me->cond, &me->mut);if(me->datasize <= 0 && me->count_wr <= 0){pthread_mutex_unlock(&me->mut);return 0;}for(i = 0; i < count; i++)if(mypipe_readbyte_unlocked(me, buf+i) != 0)break;pthread_cond_broadcast(&me->cond); //唤醒阻塞写者pthread_mutex_unlock(&me->mut);return i;}int mypipe_write(mypipe_st *, const void* buf, size_t size)
{}int mypipe_destroy(mypipe_st ptr*)
{struct mypipe_st me = ptr;pthread_mutex_destroy(&me->mut);pthread_cond_destroy(&me->cond);free(ptr);
}

优化:二次封装,体现一切皆文件的思想

Untitled

http://www.wxhsa.cn/company.asp?id=6128

相关文章:

  • 第03周 预习、实验与作业:面向对象入门2与类的识别
  • 第8篇、Kafka 监控与调优实战指南
  • linux系统编程02-进程基本知识
  • linux系统编程03-并发:信号
  • linux系统编程04-并发:线程
  • 新手高效制作PPT的3个步骤:告别逻辑混乱,从构思到完成!
  • Avalonia:用 ReactiveUI 的方法绑定数据、事件和命令
  • 【pyQT 专栏】程序设置 windows 任务栏缩略图(.ico)教程
  • Say 题选记(9.14 - 9.20)
  • vm的配置
  • 力扣72题 编辑距离
  • 数学基本结构框架
  • 2025.9.16总结
  • 在 Tailscale 中禁用 DNS
  • 软件工程实践一:Git 使用教程(含分支与 Gitee)
  • 【青少年低空飞行玩意】设计图以及项目概况
  • C++ 多态
  • Python实现对比两个Excel表某个范围的内容并提取出差异
  • 我用AI给自己做了一整套专属表情包!攻略
  • 20250916 之所思 - 人生如梦
  • Vue3项目开发专题精讲【左扬精讲】—— 在线教育网站系统(基于 Vue3+TypeScript+Vite 的在线教育网站系统系统设计与实现)
  • 20250915
  • Python Socket网络编程(4)
  • 今日学习 dos命令和Java基础语法
  • Photoshop 2025 v26.0软件下载免费版安装教程(含 Photoshop 软件一键安装包免费下载渠道)
  • 课前问题列表
  • switch中初始化变量
  • office2024免费永久激活安装包下载安装教程包含(下载安装配置激活)
  • vue2和vue3一时转不过来
  • 怎么查询电脑的登录记录及密码更改情况? - Li