Linux I/O多路復用
Linux中一切皆文件,不論是我們存儲在磁盤上的字符文件,可執行文件還是我們的接入電腦的I/O設備等都被VFS抽象成了文件,比如標準輸入設備默認是鍵盤,我們在操作標準輸入設備的時候,其實操作的是默認打開的一個文件描述符是0的文件,而一切軟件操作硬件都需要通過OS,而OS操作一切硬件都需要相應的驅動程序,這個驅動程序里配置了這個硬件的相應配置和使用方法。Linux的I/O分為阻塞I/O,非阻塞I/O,I/O多路復用,信號驅動I/O四種。對于I/O設備的驅動,一般都會提供關于阻塞和非阻塞兩種配置。我們最常見的I/O設備之一--鍵盤(標準輸入設備)的驅動程序默認是阻塞的。
多路復用就是為了使進程能夠從多個阻塞I/O中獲得自己想要的數據并繼續執行接下來的任務。其主要的思路就是同時監視多個文件描述符,如果有文件描述符的設定狀態的被觸發,就繼續執行進程,如果沒有任何一個文件描述符的設定狀態被觸發,進程進入sleep
多路復用的一個主要用途就是實現"I/O多路復用并發服務器",和多線程并發或者多進程并發相比,這種服務器的系統開銷更低,更適合做web服務器。
阻塞I/O
阻塞I/O,就是當進程試圖訪問這個I/O設備而這個設備并沒有準備好的時候,設備的驅動程序會通過內核讓這個試圖訪問的進程進入sleep狀態。阻塞I/O的一個好處就是可以大大的節約CPU時間,因為一旦一個進程試圖訪問一個沒有準備好的阻塞I/O,就會進入sleep狀態,而進入sleep狀態的進程是不在內核的進程調度鏈表中,直到目標I/O準備好了將其喚醒并加入調度鏈表,這樣就可以節約CPU時間。當然阻塞I/O也有其固有的缺點,如果進程試圖訪問一個阻塞I/O,但是否訪問成功并不對接下來的任務有決定性影響,那么直接使其進入sleep狀態顯然會延誤其任務的完成。
典型的默認阻塞IO有標準輸入設備,socket設備,管道設備等,當我們使用gets(),scanf(),read()等操作請求這些IO時而IO并沒有數據流入,就會造成進程的sleep。
假設一個進程希望通過三個管道中任意一個中讀取數據并顯示,偽代碼如下
read(pipe_0,buf,sizeof(buf)); //sleepprint buf;read(pipe_1,buf,sizeof(buf));print buf;read(pipe_2,buf,sizeof(buf));print buf;
由于管道是阻塞I/O,所以如果pipe_0沒有數據流入,進程就是在第一個read()處進入sleep狀態而即使pipe_1和pipe_2有數據流入也不會被讀取。
如果我們使用下述代碼重新設置管道的阻塞屬性,顯然,如果三個管道都沒有數據流入,那么進程就無法獲得請求的數據而繼續執行,倘若這些數據很重要(所以我們才要用阻塞I/O),那結果就會十分的糟糕,改為輪詢卻又大量的占據CPU時間。
int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);
如何讓進程同時監視三個管道,其中一個有數據就繼續執行而不會sleep,如果全部沒有數據流入再sleep,就是多路復用技術需要解決的問題。
非阻塞I/O
非阻塞I/O就是當一個進程試圖訪問一個I/O設備的時候,無論是否從中獲取了請求的數據都會返回并繼續執行接下來的任務。,但非常適合請求是否成功對接下來的任務影響不大的I/O請求。但如果訪問一個非阻塞I/O,但這個請求如果失敗對進程接下來的任務有致命影響,最粗暴的就是使用while(1){read()}輪詢。顯然,這種方式會占用大量的CPU時間。
select機制
select是一種非常"古老"的同步I/O接口,但是提供了一種很好的I/O多路復用的思路
模型
fd_set //創建fd_set對象,將來從中增減需要監視的fdFD_ZERO() //清空fd_set對象FD_SET() //將一個fd加入fd_set對象中 select() //監視fd_set對象中的文件描述符pselect() //先設定信號屏蔽,再監視FD_ISSET() //測試fd是否屬于fd_set對象FD_CLR() //從fd_set對象中刪除fd
Note:
select的第一個參數nfds是指集合中的最大的文件描述符+1,因為select會無差別遍歷整個文件描述符表直到找到目標,而文件描述符是從0開始的,所以一共是集合中的最大的文件描述符+1次。
上一條導致了這種機制的低效,如果需要監視的文件描述符是0和100那么每一次都會遍歷101次
select()每次返回都會修改fd_set,如果要循環select(),需要先對初始的fd_set進行備
例子_I/O多路復用并發服務器
關于server本身的編程模型,參見tcp/ip協議服務器模型和udp/ip協議服務器模型這里僅是使用select實現偽并行的部分模型
#define BUFSIZE 100#define MAXNFD 1024 int main(){ /***********服務器的listenfd已經準本好了**************/ fd_set readfds; fd_set writefds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_SET(listenfd, &readfds); fd_set temprfds = readfds; fd_set tempwfds = writefds; int maxfd = listenfd; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ temprfds = readfds; tempwfds = writefds; nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL) if(FD_ISSET(listenfd, &temprfds)){ //如果監聽到的是listenfd就進行accept int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //將新accept的scokfd加入監聽集合,并保持maxfd為最大fd FD_SET(sockfd, &readfds); maxfd = maxfd>sockfd?maxfd:sockfd; //如果意見檢查了nready個fd,就沒有必要再等了,直接下一個循環 if(--nready==0) continue; } int fd = 0; //遍歷文件描述符表,處理接收到的消息 for(;fd<=maxfd; fd++){ if(fd == listenfd) continue; if(FD_ISSET(fd, &temprfds)){ int ret = read(fd, buf[fd], sizeof buf[0]); if(0 == ret){ //客戶端鏈接已經斷開 close(fd); FD_CLR(fd, &readfds); if(maxfd==fd) --maxfd; continue; } //將fd加入監聽可寫的集合 FD_SET(fd, &writefds); } //找到了接收消息的socket的fd,接下來將其加入到監視寫的fd_set中 //將在下一次while()循環開始監視 if(FD_ISSET(fd, &tempwfds)){ int ret = write(fd, buf[fd], sizeof buf[0]); printf("ret %d: %d/n", fd, ret); FD_CLR(fd, &writefds); } } } close(listenfd);}
poll機制
poll是System V提出的一種基于select的改良機制,其針對select的諸多明顯的缺陷進行了重新設計,包括只遍歷被觸發個數個文件描述符,不需要備份fd_set等等
模型
struct pollfd fds //創建一個pollfd類型的數組fds[0].fd //向fds[0]中放入需要監視的fdfds[0].events //向fds[0]中放入需要監視的fd的觸發事件 POLLIN //I/O有輸入 POLLPRI //有緊急數據需要讀取 POLLOUT //I/O可寫 POLLRDHUP //流式套接字連接斷開或套接字處于半關閉狀態 POLLERR //錯誤條件(僅針對輸出) POLLHUP //掛起(僅針對輸出) POLLNVAL //無效的請求:fd沒有被打開(僅針對輸出)
例子_I/O多路復用并發服務器
/* ... */int main(){ /* ... */ struct pollfd myfds[MAXNFD] = {0}; myfds[0].fd = listenfd; myfds[0].events = POLLIN; int maxnum = 1; int nready; //準備二維數組buf,每個fd使用buf的一行,數據干擾 char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //poll直接返回event被觸發的fd的個數 nready = poll(myfds, maxnum, -1) int i = 0; for(;i<maxnum; i++){ //poll通過將相應的二進制位置一來表示已經設置 //如果下面的條件成立,表示revent[i]里的POLLIN位已經是1了 if(myfds[i].revents & POLLIN){ if(myfds[i].fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //將新accept的scokfd加入監聽集合 myfds[maxnum].fd = sockfd; myfds[maxnum].events = POLLIN; maxnum++; //如果意見檢查了nready個fd,就直接下一個循環 if(--nready==0) continue; } else{ int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); if(0 == ret){ //如果連接斷開了 close(myfds[i].fd); //初始化將文件描述符表所有的文件描述符標記為-1 //close的文件描述符也標記為-1 //打開新的描述符時從表中搜索第一個-1 //open()就是這樣實現始終使用最小的fd //這里為了演示并沒有使用這種機制 myfds[i].fd = -1; continue; } myfds[i].events = POLLOUT; } } else if(myfds[i].revents & POLLOUT){ int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); myfds[i].events = POLLIN; } } } close(listenfd);}
epoll
epoll在poll基礎上實現的更為健壯的接口,也是現在主流的web服務器使用的多路復用技術,epoll一大特色就是支持EPOLLET(邊沿觸發)和EPOLLLT (水平觸發),前者表示如果讀取之后緩沖區還有數據,那么只要讀取結束,剩余的數據也會丟棄,而后者表示里面的數據不會丟棄,下次讀的時候還在,默認是EPOLLLT
模型
epoll_create() //創建epoll對象struct epoll_event //準備事件結構體和事件結構體數組 event.events event.data.fd ...epoll_ctl() //配置epoll對象epoll_wait() //監控epoll對象中的fd及其相應的event
例子_I/O多路復用并發服務器
/* ... */int main(){ /* ... */ /* 創建epoll對象 */ int epoll_fd = epoll_create(1024); //準備一個事件結構體 struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = listenfd; //data是一個共用體,除了fd還可以返回其他數據 //ctl是監控listenfd是否有event被觸發 //如果發生了就把event通過wait帶出。 //所以,如果event里不標明fd,我們將來獲取就不知道哪個fd epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event); struct epoll_event revents[MAXNFD] = {0}; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //wait返回等待的event發生的數目 //并把相應的event放到event類型的數組中 nready = epoll_wait(epoll_fd, revents, MAXNFD, -1) int i = 0; for(;i<nready; i++){ //wait通過在events中設置相應的位來表示相應事件的發生 //如果輸入可用,那么下面的這個結果應該為真 if(revents[i].events & EPOLLIN){ //如果是listenfd有數據輸入 if(revents[i].data.fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = sockfd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event); } else{ int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); if(0 == ret){ close(revents[i].data.fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]); } revents[i].events = EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } else if(revents[i].events & EPOLLOUT){ int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); revents[i].events = EPOLLIN; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } } close(listenfd);}
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
新聞熱點
疑難解答