如何接受客戶端請求并調用處理函數

上篇概括了redis的啟動流程,這篇重點介紹redis如何接受客戶端請求并調用處理函數來執行命令。 在上一篇里,說到了在initServer()這個函數里邊,會調用anetTcpServer和anetUnixServer 這兩個函數創建對tcp端口和unix域套接字的監聽,那么這里首先重點分析下

上篇概括了redis的啟動流程,這篇重點介紹redis如何接受客戶端請求并調用處理函數來執行命令。

在上一篇里,美國服務器,說到了在initServer()這個函數里邊,會調用anetTcpServer和anetUnixServer 這兩個函數創建對tcp端口和unix域套接字的監聽,那么這里首先重點分析下這兩個函數的具體實現。

int anetTcpServer(char *err, int port, char *bindaddr) { int s; struct sockaddr_in sa; if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) return ANET_ERR; memset(&sa,0,sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) { anetSetError(err, ); close(s); return ANET_ERR; } if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) return ANET_ERR; return s; }

從代碼中我們可以看出,首先調用anetCreateSocket()創建一個套接字并復值給s, 然后對sa 這個sockaddr_in類型的結構體進行初始化, 設置要監聽的端口,地址,和地址族, 再調用anetListen() 函數綁定地址并監聽端口,這些工作完成后 anetCreateSocket函數返回,網站空間,并將創建的套接字復制給server.ipfd。注意在anetUnixServer()這個函數中完成的工 作類似于anetCreateSocket,只不過是綁定的unix socket。

接下來, 在initServer函數里, 調用了這個函數:aeCreateFileEvent, 這里重點分析第一個,第二個類似。

if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) oom(); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) oom();

首先,從eventLoop的event這個aeFileEvent數組里,取出當前fd對應的acFileEvent,主要是為了在下邊給它設置對應事件的處理函數;即根據傳入的mask來判斷是哪一類事件。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= AE_SETSIZE) return AE_ERR; aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == –1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }

?然后調用 acApiEvent這個事件注冊函數:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == –1) return1; return 0; }

可以看到, 這個函數中,主要是調用epoll_ctl(state->epfd,op,fd,&ee)將當前fd設置到及其所關心的事件注冊到epoll_create 返回的epoll的句柄里。由于我們這里注冊的是AE_READABLE事件,所以當這個fd(即redis監聽端口的套接字)有數據可讀時(這里我理解的是客戶端連接到達),網站空間,就會觸發相應的事件處理函數,這里的事件處理函數便是acceptTcpHandler。


下面我們看看acceptTcpHandler這個函數:

這個函數里邊首先調用 anetTcpAccept獲取客戶端與redis連接的socket fd(實際調用 ::accept(s,sa,len)函數返回的fd), 然后調用 acceptCommonHandler()函數,這個函數中調用 createClient()創建 redisClient *c實例, 如果當前redis服務器的連接總數沒有超過最大值,則將全局變量server中記錄連接數的stat_numconnections加1;如果超過了, 則想客戶端輸出錯誤信息,并釋放redisClient實例,函數返回。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[128]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); cfd = anetTcpAccept(server.neterr, fd, cip, &cport); if (cfd == AE_ERR) { redisLog(REDIS_WARNING,, server.neterr); return; } redisLog(REDIS_VERBOSE,, cip, cport); acceptCommonHandler(cfd); }

下面分析createClient這個函數:

redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = –1; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = NULL; c->bpop.count = 0; c->bpop.timeout = 0; c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); listAddNodeTail(server.clients,c); initClientMultiState(c); return c; }

? 版權聲明
THE END
喜歡就支持一下吧
點贊12 分享