0%

Linux非阻塞模式下的EAGIN异常

之前在写一个并发聊天室时候在多个客户端连续发送数据的时候遇到EAGIN错误,如图。

image-20210218231840685

Linux下,管道、FIFO以及一些设备(通常是终端和网络)一般有以下几种性质。

1、一次read操作返回的数据可能会少于所要求的数据,及时此时没有到达文件尾部也可能会出现这种情况。对于这种情况,我们不认为是错误,而是视为一种异常,应该继续对此设备进行读取。

2、一次write或send操作的返回值也可能会少于指定的输出的字节数。通常这可能是由于某一种因素引起的,比如在处理网络数据的时,内核输出的缓冲区满,此时写不进数据。我们也通常不认为这是一种错误,应当继续将余下的数据进行写入。(通常遇到这种情况的场景是非阻塞描述符,或者捕捉到某一种信号的时候,才会发生中途返回。)

服务端server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include "utility.h"

int main(int argc, char *argv[]){

/* 服务器IP+PORT */
struct sockaddr_in serverAddr;
serverAddr.sin_family = PF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);

int listener = socket(PF_INET, SOCK_STREAM, 0);
if(listener < 0){
perror("socket failed");
exit(1);
}
int opt=1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
printf("listen socket success, listenfd = %d\n", listener);
if(bind(listener, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0){
perror("bind failed");
exit(1);
}

int ret = listen(listener, 5);
if (ret < 0){
perror("listen failed");
exit(1);
}

printf("start listen: %s\n", SERVER_IP);

/* 在内核中创建事件列表 */
int epfd = epoll_create(EPOLL_SIZE);
if(epfd < 0){
perror("epoll_create failed");
exit(1);
}

printf("epoll create, epollfd = %d\n", epfd);
static struct epoll_event events[EPOLL_SIZE];

/* 往内核事件列表添加事件 */
addfd(epfd, listener, true);

/* 主循环 */
while(1){

//epoll_events_count 表示就绪事件的数目
int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1);
if(epoll_events_count < 0){
perror("epoll_wait failed");
break;
}
printf("epoll_events_count:%d\n", epoll_events_count);

/* 处理epoll_events_count个就绪事件 */
for (int i=0; i<epoll_events_count; i++){
int sockfd = events[i].data.fd;
if (sockfd == listener){
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int connfd = accept(listener, (struct sockaddr *)&clientAddr, &clientAddrLen);
printf("client connection from %s:%d clientfd=%d\n", inet_ntoa(clientAddr.sin_addr), ntohs(clientAddr.sin_port), connfd);

/* 将新的客户端添加到事件列表中 */
addfd(epfd, connfd, true);

/* 服务端用list保存用户连接 */
clients_list.push_back(connfd);
printf("Add new clientfd = %d to epoll.\n", connfd);
printf("Now there are %d clients int the chat room\n", (int)clients_list.size());

/* 服务端发送欢迎信息 */
printf("welcome message\n");
char message[BUF_SIZE];
bzero(message, BUF_SIZE);
snprintf(message, sizeof(message), SERVER_WELCOME, connfd);
int ret = send(connfd, message, BUF_SIZE, 0);
if (ret < 0){
printf("line:%d errno:%d\n", __LINE__, errno);
perror("send failed");
exit(1);
}
}
else
{
int ret = sendBroadcastMessage(sockfd);
if (ret < 0){
printf("line:%d errno:%d\n", __LINE__, errno);
perror("send failed");
exit(1);
}
}
}
}

/* 关闭socket */
close(listener);

/* 关闭内核 不在监控这些注册事件是否发生 */
close(epfd);

return 0;
}

客户端client.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//客户端client.c
#include "utility.h"

int main(int argc, char *argv[]){


if(argc < 2 ){
fprintf(stdout, "Please inout ipAddr, eg:%s 127.0.0.1\n", argv[0]);
exit(1);
}
/* 用户连接服务器 */
struct sockaddr_in serverAddr;
serverAddr.sin_family = PF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
serverAddr.sin_addr.s_addr = inet_addr(argv[1]);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
if (sockfd < 0){
perror("socket failed");
exit(1);
}

/* 连接服务端 */
if(connect(sockfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0){
perror("connect faild");
exit(1);
}

/* 创建管道,其中fd[0]用于父进程读,fd[1]用于子进程写 */
int pipe_fd[2];
if(pipe(pipe_fd) < 0){
perror("pipe failed");
exit(1);
}

/* 创建epoll */
int epfd = epoll_create(EPOLL_SIZE);
if(epfd < 0){
perror("epoll_create failed");
exit(1);
}

static epoll_event events[2];

/* 将sockfd和管道读端描述符都添加到内核事件列表中 */
addfd(epfd, sockfd, true);
addfd(epfd, pipe_fd[0], true);

/* 表示客户端是否正常工作标志 */
bool isClientWork = true;

/* 聊天信息缓冲区 */
char message[BUF_SIZE];

//FORK
int pid = fork();
if( pid < 0 ){
perror("fork failed");
exit(1);
}
else if( pid == 0 ){
/* 子进程 */
/* 子进程负责写入管道,因此先关闭读端 */
close(pipe_fd[0]);
printf("Please input 'exit' to exit the chat room!\n");

while(isClientWork){
bzero(message, BUF_SIZE);
fgets(message, BUF_SIZE, stdin);

/* 客户端输出exit,并退出 */
if( strncasecmp(message, EXIT, strlen(EXIT)) == 0 ){
isClientWork = 0;
}else{
/* 子进程将信息写入管道 */
if(write(pipe_fd[1], message, strlen(message)-1) < 0)
{
perror("write failed");
exit(1);
}
}
}
}else{
/* pid>0 父进程 */
/* 父进程负责读取管道数据,因此先关闭写端 */
close(pipe_fd[1]);
while(isClientWork){
int epoll_events_count = epoll_wait(epfd, events, 2, -1);
for(int i=0; i<epoll_events_count; i++){
bzero(message, BUF_SIZE);

/* 接收服务端发来的消息 */
if(events[i].data.fd == sockfd){

/* 接收服务端的消息 */
int ret = recv(sockfd, message, BUF_SIZE, 0);

/* ret == 0 服务端关闭 */
if(ret == 0){
printf("Server closed connection:%d\n", sockfd);
close(sockfd);
isClientWork = 0;
}else{
printf("%s\n", message);
}
}else{

/* 子进程写入事件发生,父进程处理并发送服务器 */
/* 父进程从管道中读取数据 */
int ret = read(events[i].data.fd, message, BUF_SIZE);

//ret=0
if( ret == 0 ){
isClientWork = 0;
}else{

/* 信息发送服务器 */
send(sockfd, message, BUF_SIZE, 0);
}
}
}
}
}

if (pid){
close(pipe_fd[0]);
close(sockfd);
}else{
close(pipe_fd[1]);
}

return 0;
}

头文件utility.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#ifndef UTILITY_H_
#define UTILITY_H_

#include <iostream>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <error.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

using namespace std;

/* clients_list save all the client's socket */
list<int> clients_list;

/******* macro defintion ***********/
//server ip
#define SERVER_IP "0.0.0.0"

//server port
#define SERVER_PORT 8888

//epoll size
#define EPOLL_SIZE 5000

//BUF_SIZE
#define BUF_SIZE 0xffff

#define SERVER_WELCOME "welcome you join the chat room! Your chat ID is: Client #%d"

#define SERVER_MESSAGE "ClientID %d say >> %s"

//exit
#define EXIT "EXIT"

#define CAUTION "There is only one in the char room!"

/*
*@param sockfd:socket descriptor
*@return 0
*/
int setnonblockint(int sockfd){
fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK);
return 0;
}

/*
*@param epollfd:epoll handle
*@param fd:sockfd descriptor
*@param enable_et: enable_et = true, epoll use ET;otherwise LT
*
*/
void addfd(int epollfd, int fd, bool enable_et){
struct epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN;
if(enable_et)
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
setnonblockint(fd);
printf("fd added to epoll!\n\n");
}

/*
*@param clientfd: socket descriptor
*@return :len
*/
int sendBroadcastMessage(int clientfd){
//buf[BUF_SIZE] receive new chat message
//message[BUF_SIZE] save format message
char buf[BUF_SIZE];
char message[BUF_SIZE];
bzero(buf, BUF_SIZE);
bzero(message, BUF_SIZE);

printf("read from client(clientID = %d)\n", clientfd);
int len = recv(clientfd, buf, BUF_SIZE, 0);

/* len == 0 means the client closed connection */
if(len == 0){
close(clientfd);
/* server remove the client */
clients_list.remove(clientfd);
printf("ClientID=%d close.\n now there are %d client in the chat room!\n", clientfd, (int)clients_list.size());
}
else
{
/* this means is only one in the room */
if(clients_list.size() == 1){
send(clientfd, CAUTION, strlen(CAUTION), 0);
return len;
}
snprintf(message, sizeof(message), SERVER_MESSAGE, clientfd, buf);

list<int>::iterator it;
for(it=clients_list.begin(); it!=clients_list.end(); it++){
if (*it != clientfd){
if(send(*it, message, BUF_SIZE, 0) < 0){
printf("line:%d errno:%d\n", __LINE__, errno);
perror("send failed");
exit(1);
}
}
}
}
return len;
}
#endif

Makefile文件

1
2
3
4
5
6
7
8
9
10
all: server client

server:
g++ server.cpp -o server
client:
g++ client.cpp -o client

.PHONLY:clean
clean:
rm -rf server client

未完待续~

小主,路过打个赏再走呗~