亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 數據庫 > Redis > 正文

關于Redis網絡模型的源碼詳析

2020-10-28 21:27:26
字體:
來源:轉載
供稿:網友

前言

Redis的網絡模型是基于I/O多路復用程序來實現的。源碼中包含四種多路復用函數庫epoll、select、evport、kqueue。在程序編譯時會根據系統自動選擇這四種庫其中之一。下面以epoll為例,來分析Redis的I/O模塊的源碼。

epoll系統調用方法

Redis網絡事件處理模塊的代碼都是圍繞epoll那三個系統方法來寫的。先把這三個方法弄清楚,后面就不難了。

epfd = epoll_create(1024);

創建epoll實例

參數:表示該 epoll 實例最多可監聽的 socket fd(文件描述符)數量。

返回: epoll 專用的文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

管理epoll中的事件,對事件進行注冊、修改和刪除。

參數:
epfd:epoll實例的文件描述符;
op:取值三種:EPOLL_CTL_ADD 注冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除;
fd:socket的文件描述符;
epoll_event *event:事件

event代表一個事件,類似于Java NIO中的channel“通道”。epoll_event 的結構如下:

typedef union epoll_data {void *ptr;int fd; /* socket文件描述符 */__uint32_t u32;__uint64_t u64;} epoll_data_t;struct epoll_event {__uint32_t events; /* Epoll events 就是各種待監聽操作的操作碼求與的結果,例如EPOLLIN(fd可讀)、EPOLLOUT(fd可寫) */epoll_data_t data; /* User data variable */};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等待事件是否就緒,類似于Java NIO中 select 方法。如果事件就緒,將就緒的event存入events數組中。

參數
epfd:epoll實例的文件描述符;
events:已就緒的事件數組;
intmaxevents:每次能處理的事件數;
timeout:阻塞時間,等待產生就緒事件的超時值。

源碼分析

事件

Redis事件系統中將事件分為兩種類型:

  • 文件事件;網絡套接字對應的事件;
  • 時間事件:Redis中一些定時操作事件,例如 serverCron 函數。

下面從事件的注冊、觸發兩個流程對源碼進行分析

綁定事件

建立 eventLoop

在 initServer方法(由 redis.c 的 main 函數調用) 中,在建立 RedisDb 對象的同時,會初始化一個“eventLoop”對象,我稱之為事件處理器對象。結構體的關鍵成員變量如下所示:

struct aeEventLoop{aeFileEvent *events;//已注冊的文件事件數組aeFiredEvent *fired;//已就緒的文件事件數組aeTimeEvent *timeEventHead;//時間事件數組...}

初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中執行。該方法中除了初始化 eventLoop 還調用如下方法初始化了一個 epoll 實例。

/* * ae_epoll.c * 創建一個新的 epoll 實例,并將它賦值給 eventLoop */static int aeApiCreate(aeEventLoop *eventLoop) {  aeApiState *state = zmalloc(sizeof(aeApiState));  if (!state) return -1;  // 初始化事件槽空間  state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);  if (!state->events) {    zfree(state);    return -1;  }  // 創建 epoll 實例  state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */  if (state->epfd == -1) {    zfree(state->events);    zfree(state);    return -1;  }  // 賦值給 eventLoop  eventLoop->apidata = state;  return 0;}

也正是在此處調用了系統方法“epoll_create”。這里的state是一個aeApiState結構,如下所示:

/* * 事件狀態 */typedef struct aeApiState {  // epoll 實例描述符  int epfd;  // 事件槽  struct epoll_event *events;} aeApiState;

這個 state 由 eventLoop->apidata 來記錄。

綁定ip端口與句柄

通過 listenToPort 方法開啟TCP端口,每個IP端口會對應一個文件描述符 ipfd(因為服務器可能會有多個ip地址)

// 打開 TCP 監聽端口,用于等待客戶端的命令請求if (server.port != 0 &&  listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)  exit(1);

注意:*eventLoop 和 ipfd 分別被 server.el 和 server.ipfd[] 引用。server 是結構體 RedisServer 的實例,是Redis的全局變量。

注冊事件

如下所示代碼,為每一個文件描述符綁定一個事件函數

// initServer方法:for (j = 0; j < server.ipfd_count; j++) {  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,    acceptTcpHandler,NULL) == AE_ERR)    {      redisPanic(        "Unrecoverable error creating server.ipfd file event.");    }}// ae.c 中的 aeCreateFileEvent 方法/* * 根據 mask 參數的值,監聽 fd 文件的狀態, * 當 fd 可用時,執行 proc 函數 */int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,    aeFileProc *proc, void *clientData){  if (fd >= eventLoop->setsize) {    errno = ERANGE;    return AE_ERR;  }  if (fd >= eventLoop->setsize) return AE_ERR;  // 取出文件事件結構  aeFileEvent *fe = &eventLoop->events[fd];  // 監聽指定 fd 的指定事件  if (aeApiAddEvent(eventLoop, fd, mask) == -1)    return AE_ERR;  // 設置文件事件類型,以及事件的處理器  fe->mask |= mask;  if (mask & AE_READABLE) fe->rfileProc = proc;  if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 私有數據  fe->clientData = clientData;  // 如果有需要,更新事件處理器的最大 fd  if (fd > eventLoop->maxfd)    eventLoop->maxfd = fd;  return AE_OK;}

aeCreateFileEvent 函數中有一個方法調用:aeApiAddEvent,代碼如下

/* * ae_epoll.c * 關聯給定事件到 fd */static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {  aeApiState *state = eventLoop->apidata;  struct epoll_event ee;  /* If the fd was already monitored for some event, we need a MOD   * operation. Otherwise we need an ADD operation.    *   * 如果 fd 沒有關聯任何事件,那么這是一個 ADD 操作。   *   * 如果已經關聯了某個/某些事件,那么這是一個 MOD 操作。   */  int op = eventLoop->events[fd].mask == AE_NONE ?      EPOLL_CTL_ADD : EPOLL_CTL_MOD;  // 注冊事件到 epoll  ee.events = 0;  mask |= eventLoop->events[fd].mask; /* Merge old events */  if (mask & AE_READABLE) ee.events |= EPOLLIN;  if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  ee.data.u64 = 0; /* avoid valgrind warning */  ee.data.fd = fd;  if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;  return 0;}

這里實際上就是調用系統方法“epoll_ctl”,將事件(文件描述符)注冊進 epoll 中。首先要封裝一個 epoll_event 結構,即 ee ,通過“epoll_ctl”將其注冊進 epoll 中。

除此之外,aeCreateFileEvent 還完成了下面兩個重要操作:

  • 將事件函數“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc 來引用(也可能是wfileProc,分別代表讀事件和寫事件);
  • 將當操作碼添加進 eventLoop->events[fd]->mask 中(mask 類似于JavaNIO中的ops操作碼,代表事件類型)。

事件監聽與執行

redis.c 的main函數會調用 ae.c 中的 main 方法,如下所示:

/* * 事件處理器的主循環 */void aeMain(aeEventLoop *eventLoop) {  eventLoop->stop = 0;  while (!eventLoop->stop) {    // 如果有需要在事件處理前執行的函數,那么運行它    if (eventLoop->beforesleep != NULL)      eventLoop->beforesleep(eventLoop);    // 開始處理事件    aeProcessEvents(eventLoop, AE_ALL_EVENTS);  }}

上述代碼會調用 aeProcessEvents 方法用于處理事件,方法如下所示

/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 處理所有已到達的時間事件,以及所有已就緒的文件事件。 * 函數的返回值為已處理事件的數量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags){  int processed = 0, numevents;  /* Nothing to do? return ASAP */  if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;  if (eventLoop->maxfd != -1 ||    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {    int j;    aeTimeEvent *shortest = NULL;    struct timeval tv, *tvp;    // 獲取最近的時間事件    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))      shortest = aeSearchNearestTimer(eventLoop);    if (shortest) {      // 如果時間事件存在的話      // 那么根據最近可執行時間事件和現在時間的時間差來決定文件事件的阻塞時間      long now_sec, now_ms;      /* Calculate the time missing for the nearest       * timer to fire. */      // 計算距今最近的時間事件還要多久才能達到      // 并將該時間距保存在 tv 結構中      aeGetTime(&now_sec, &now_ms);      tvp = &tv;      tvp->tv_sec = shortest->when_sec - now_sec;      if (shortest->when_ms < now_ms) {        tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;        tvp->tv_sec --;      } else {        tvp->tv_usec = (shortest->when_ms - now_ms)*1000;      }      // 時間差小于 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞)      if (tvp->tv_sec < 0) tvp->tv_sec = 0;      if (tvp->tv_usec < 0) tvp->tv_usec = 0;    } else {            // 執行到這一步,說明沒有時間事件      // 那么根據 AE_DONT_WAIT 是否設置來決定是否阻塞,以及阻塞的時間長度      /* If we have to check for events but need to return       * ASAP because of AE_DONT_WAIT we need to set the timeout       * to zero */      if (flags & AE_DONT_WAIT) {        // 設置文件事件不阻塞        tv.tv_sec = tv.tv_usec = 0;        tvp = &tv;      } else {        /* Otherwise we can block */        // 文件事件可以阻塞直到有事件到達為止        tvp = NULL; /* wait forever */      }    }    // 處理文件事件,阻塞時間由 tvp 決定    numevents = aeApiPoll(eventLoop, tvp);    for (j = 0; j < numevents; j++) {      // 從已就緒數組中獲取事件      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];      int mask = eventLoop->fired[j].mask;      int fd = eventLoop->fired[j].fd;      int rfired = 0;      /* note the fe->mask & mask & ... code: maybe an already processed       * event removed an element that fired and we still didn't       * processed, so we check if the event is still valid. */      // 讀事件      if (fe->mask & mask & AE_READABLE) {        // rfired 確保讀/寫事件只能執行其中一個        rfired = 1;        fe->rfileProc(eventLoop,fd,fe->clientData,mask);      }      // 寫事件      if (fe->mask & mask & AE_WRITABLE) {        if (!rfired || fe->wfileProc != fe->rfileProc)          fe->wfileProc(eventLoop,fd,fe->clientData,mask);      }      processed++;    }  }  /* Check time events */  // 執行時間事件  if (flags & AE_TIME_EVENTS)    processed += processTimeEvents(eventLoop);  return processed; }

該函數中代碼大致分為三個主要步驟

  • 根據時間事件與當前時間的關系,決定阻塞時間 tvp;
  • 調用aeApiPoll方法,將就緒事件都寫入eventLoop->fired[]中,返回就緒事件數目;
  • 遍歷eventLoop->fired[],遍歷每一個就緒事件,執行之前綁定好的方法rfileProc 或者wfileProc。

ae_epoll.c 中的 aeApiPoll 方法如下所示:

/* * 獲取可執行事件 */static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {  aeApiState *state = eventLoop->apidata;  int retval, numevents = 0;  // 等待時間  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);  // 有至少一個事件就緒?  if (retval > 0) {    int j;    // 為已就緒事件設置相應的模式    // 并加入到 eventLoop 的 fired 數組中    numevents = retval;    for (j = 0; j < numevents; j++) {      int mask = 0;      struct epoll_event *e = state->events+j;      if (e->events & EPOLLIN) mask |= AE_READABLE;      if (e->events & EPOLLOUT) mask |= AE_WRITABLE;      if (e->events & EPOLLERR) mask |= AE_WRITABLE;      if (e->events & EPOLLHUP) mask |= AE_WRITABLE;      eventLoop->fired[j].fd = e->data.fd;      eventLoop->fired[j].mask = mask;    }  }    // 返回已就緒事件個數  return numevents;}

執行epoll_wait后,就緒的事件會被寫入 eventLoop->apidata->events 事件槽。后面的循環就是將事件槽中的事件寫入到 eventLoop->fired[] 中。具體描述:每一個事件都是一個 epoll_event 結構,用e來指代,則e.data.fd代表文件描述符,e->events表示其操作碼,將操作碼轉化為mask,最后將fd 和 mask 都寫入eventLoop->fired[j]中。

之后,在外層的 aeProcessEvents 方法中會執行函數指針 rfileProc 或者 wfileProc 指向的方法,例如前文提到已注冊的“acceptTcpHandler”。

總結

Redis的網絡模塊其實是一個簡易的Reactor模式。本文順著“服務端注冊事件――>接受客戶端連接――>監聽事件是否就緒――>執行事件”這樣的路線,來分析Redis源碼,描述了Redis接受客戶端connect的過程。實際上NIO的思想都基本類似。

到此這篇關于Redis網絡模型的源碼詳析的文章就介紹到這了,更多相關Redis網絡模型源碼內容請搜索武林網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持武林網!

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
成人黄色生活片| 亚洲欧美中文字幕| 欧美整片在线观看| 欧美电影免费观看| 国产欧美一区二区| 亚洲欧美成人一区二区在线电影| 精品五月天久久| 性欧美在线看片a免费观看| 国产精品第一第二| 国产精品一区二区3区| 日韩成人在线视频网站| 日韩亚洲一区二区| 国色天香2019中文字幕在线观看| 国产精品一区二区久久久久| 国产亚洲一区二区精品| 欧美性猛交xxxx富婆| 久久精品国产亚洲精品| 日韩精品亚洲视频| 日韩大胆人体377p| 国产精品99久久久久久久久久久久| 美乳少妇欧美精品| 国产精品国产亚洲伊人久久| 国产精品三级在线| 欧美精品久久久久久久免费观看| 色妞久久福利网| 北条麻妃99精品青青久久| 91成人国产在线观看| 麻豆国产va免费精品高清在线| 国产精品流白浆视频| 国产99久久精品一区二区| 欧美黄色免费网站| 久久精品国产久精国产一老狼| 91极品女神在线| 亚洲精品有码在线| 精品久久久久久久久久| 久久中文字幕一区| 91精品视频免费观看| 国产在线视频一区| 亚洲国产精品美女| 日韩精品丝袜在线| 欧美一级电影在线| 国产精品扒开腿做爽爽爽男男| 国内精品视频一区| 亚洲激情视频网| 亚洲精品日韩av| 日韩av网站导航| 亚洲国产日韩精品在线| 国产精品吹潮在线观看| 国产成人精品a视频一区www| 国产视频综合在线| 欧美色视频日本高清在线观看| 欧美日韩激情视频| 欧美日韩免费看| 欧美性感美女h网站在线观看免费| 亚洲精品成人免费| 欧美一级片免费在线| 51午夜精品视频| 亚洲女人天堂网| 欧美在线一区二区视频| 亚洲欧美一区二区三区四区| 日韩经典中文字幕| 午夜精品国产精品大乳美女| 国语自产精品视频在线看抢先版图片| 日韩av大片在线| 久久久久久久久久久av| 久久99亚洲精品| 中文字幕日韩免费视频| 夜夜嗨av色一区二区不卡| 成人在线观看视频网站| 免费91麻豆精品国产自产在线观看| 亚洲女人天堂av| 国产主播欧美精品| 欧美亚洲国产日韩2020| 亚洲成avwww人| 国产精品日本精品| 成人午夜激情网| 国产精品毛片a∨一区二区三区|国| 亚洲性av在线| 国产精品国产自产拍高清av水多| 国产亚洲精品一区二区| 亚洲电影免费观看高清| 91久久中文字幕| 2021久久精品国产99国产精品| 成人黄色片在线| 亚洲久久久久久久久久| 国产精品久久久久久久午夜| 久久男人资源视频| 日日摸夜夜添一区| 欧美日韩另类视频| 国产精品日韩欧美| 欧美中文字幕视频| 国产精品免费电影| 欧美黄色小视频| 亚洲人成免费电影| 日韩亚洲成人av在线| 成人福利网站在线观看| 欧美精品第一页在线播放| 久久久中文字幕| 亚洲国产小视频在线观看| 一区二区亚洲欧洲国产日韩| 亚洲欧美在线磁力| 国产精品福利在线| 色爱精品视频一区| 国产97在线亚洲| 欧美午夜视频一区二区| 久久中文精品视频| 亚洲第一网中文字幕| 亚洲久久久久久久久久久| 午夜精品久久久久久久男人的天堂| 欧美精品videosex牲欧美| 日韩在线观看你懂的| 国模精品视频一区二区| 岛国视频午夜一区免费在线观看| 久久久久久尹人网香蕉| 在线播放日韩专区| 久热在线中文字幕色999舞| 国产亚洲激情视频在线| 日韩电影第一页| 最新的欧美黄色| 欧美日韩一区二区三区在线免费观看| 91精品国产一区| 成人妇女免费播放久久久| 日本成人在线视频网址| 久久久久久91香蕉国产| 亚洲色图国产精品| 中文字幕在线国产精品| 午夜精品一区二区三区在线视| 久久久亚洲福利精品午夜| 色综久久综合桃花网| 欧美电影电视剧在线观看| 日韩av成人在线| 国产精品人成电影| 97精品国产97久久久久久免费| 91社区国产高清| 亚洲精品福利在线观看| 欧美中文字幕在线观看| 亚洲a中文字幕| 欧美性生交xxxxx久久久| 国产美女久久久| 8x拔播拔播x8国产精品| 亚洲网址你懂得| 91亚洲精品久久久| 91在线直播亚洲| 欧美午夜宅男影院在线观看| 欧美大片欧美激情性色a∨久久| 68精品久久久久久欧美| 欧美日韩中文在线| 欧美人与物videos| 91精品国产91久久久久久久久| 日韩精品视频在线观看网址| 国产精品美女www爽爽爽视频| www.亚洲天堂| 久久久天堂国产精品女人| 91国语精品自产拍在线观看性色| 国产精品第一第二| 日韩中文视频免费在线观看| 日韩精品黄色网| 久久精品成人动漫| 日韩美女免费线视频| 日本欧美一二三区| 成人免费观看网址| 欧美一级淫片丝袜脚交| 97色在线视频观看| 精品国偷自产在线视频99|