今天这篇文章基本上属于之前上学学习 c 语言的回顾了,要实现一个简单的聊天功能,其实还是需要话费一些代价的,这里面还是涉及到比较多的知识的。比如:
- 套接字相关的使用
- 多线程相关
- select 模型
- 中心化的聊天架构,以及衍生出的去中心化的架构
本文为了简化期间,仅仅实现一个中心化的聊天,因为这个我们比较熟悉,而且在学校里学习 c 语言时,多多少少老师让做实习也是写过的。
ok,一如继往的习惯,我们来看看这个程序交互方式是如何的。
要实现 A 和 B 聊天,首先,我们需要有个服务端,服务端套接字绑定在一个端口上,然后等待客户端A 和 B 来连接,服务端将 A 和 B 的客户端套接字引用存在自己的内存中,A 发送给的消息先经过服务器接收,服务器发现这个是 A 发送给 B 的,然后就转发给到 B,同理,也是一致。
这就是上述的过程,那,我们如何实现这个程序呢,现来看服务端的代码实现:multi_client_server.c
代码语言:javascript复制#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAX_CLIENTS 10
#define BUFFER_SIZE 2048
#define NAME_LEN 32
static unsigned int cli_count = 0;
static int uid = 10;
// Client structure
typedef struct {
struct sockaddr_in address;
int sockfd;
int uid;
char name[NAME_LEN];
} client_t;
client_t *clients[MAX_CLIENTS];
pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
void str_trim_lf(char* arr, int length) {
for (int i = 0; i < length; i ) { // trim \n
if (arr[i] == '\n') {
arr[i] = '\0';
break;
}
}
}
int find_client_sockfd(char *recipient_name, Client *clients, int max_clients) {
for (int i = 0; i < max_clients; i ) {
if (clients[i].sockfd != 0 && strcmp(clients[i].name, recipient_name) == 0) {
return clients[i].sockfd;
}
}
return -1;
}
void print_client_addr(struct sockaddr_in addr) {
printf("%d.%d.%d.%d",
addr.sin_addr.s_addr & 0xff,
(addr.sin_addr.s_addr & 0xff00) >> 8,
(addr.sin_addr.s_addr & 0xff0000) >> 16,
(addr.sin_addr.s_addr & 0xff000000) >> 24);
}
// Add clients to array
void queue_add(client_t *cl) {
pthread_mutex_lock(&clients_mutex);
for (int i = 0; i < MAX_CLIENTS; i) {
if (!clients[i]) {
clients[i] = cl;
break;
}
}
pthread_mutex_unlock(&clients_mutex);
}
// Remove clients to array
void queue_remove(int uid) {
pthread_mutex_lock(&clients_mutex);
for (int i = 0; i < MAX_CLIENTS; i) {
if (clients[i]) {
if (clients[i]->uid == uid) {
clients[i] = NULL;
break;
}
}
}
pthread_mutex_unlock(&clients_mutex);
}
// Send message to all clients except the sender
void send_message(char *s, int uid) {
pthread_mutex_lock(&clients_mutex);
for (int i = 0; i < MAX_CLIENTS; i) {
if (clients[i]) {
if (clients[i]->uid != uid) {
if (write(clients[i]->sockfd, s, strlen(s)) < 0) {
perror("ERROR: write to descriptor failed");
break;
}
}
}
}
pthread_mutex_unlock(&clients_mutex);
}
// Handle all communication with the client
void *handle_client(void *arg) {
char buff_out[BUFFER_SIZE];
char name[NAME_LEN];
int leave_flag = 0;
cli_count ;
client_t *cli = (client_t *)arg;
// Name
if (recv(cli->sockfd, name, NAME_LEN, 0) <= 0 || strlen(name) < 2 || strlen(name) >= NAME_LEN-1) {
printf("Enter the name correctly\n");
leave_flag = 1;
} else {
strcpy(cli->name, name);
sprintf(buff_out, "%s has joined\n", cli->name);
printf("%s", buff_out);
send_message(buff_out, cli->uid);
}
bzero(buff_out, BUFFER_SIZE);
while (1) {
if (leave_flag) {
break;
}
int receive = recv(cli->sockfd, buff_out, BUFFER_SIZE, 0);
if (receive > 0) {
if (strlen(buff_out) > 0) {
send_message(buff_out, cli->uid);
str_trim_lf(buff_out, strlen(buff_out));
printf("%s -> %s\n", buff_out, cli->name);
}
} else if (receive == 0 || strcmp(buff_out, "exit") == 0) {
sprintf(buff_out, "%s has left\n", cli->name);
printf("%s", buff_out);
send_message(buff_out, cli->uid);
leave_flag = 1;
} else {
printf("ERROR: -1\n");
leave_flag = 1;
}
bzero(buff_out, BUFFER_SIZE);
}
close(cli->sockfd);
queue_remove(cli->uid);
free(cli);
cli_count--;
pthread_detach(pthread_self());
return NULL;
}
int main(int argc, char **argv) {
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
return EXIT_FAILURE;
}
int port = atoi(argv[1]);
int listenfd = 0, connfd = 0;
struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr;
pthread_t tid;
// Socket settings
listenfd = socket(AF_INET, SOCK_STREAM, 0);
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(port);
// Signals
signal(SIGPIPE, SIG_IGN);
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) {
perror("ERROR: setsockopt failed");
return EXIT_FAILURE;
}
// Binding
if (bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
perror("ERROR: Bind failed");
return EXIT_FAILURE;
}
// Listening
if (listen(listenfd, 10) < 0) {
perror("ERROR: Listen failed");
return EXIT_FAILURE;
}
printf("<[ SERVER STARTED ]>\n");
// Accept clients
while (1) {
socklen_t clilen = sizeof(cli_addr);
connfd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
// Check if max clients is reached
if ((cli_count 1) == MAX_CLIENTS) {
printf("Max clients reached. Rejected: ");
print_client_addr(cli_addr);
printf(":%d\n", cli_addr.sin_port);
close(connfd);
continue;
}
// Client settings
client_t *cli = (client_t *)malloc(sizeof(client_t));
cli->address = cli_addr;
cli->sockfd = connfd;
cli->uid = uid ;
// Add client to the queue and fork thread
queue_add(cli);
pthread_create(&tid, NULL, &handle_client, (void*)cli);
// Reduce CPU usage
sleep(1);
}
return EXIT_SUCCESS;
}
这段代码理解难度不大,首先,服务端绑定在一个端口上,然后等待客户端的连接,一旦有客户端连接上来,就将此客户端加入到队列里面,随后立马起一个线程去专门为他做消息收发用。
接着看客户端的实现:client.c
代码语言:javascript复制#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#define LENGTH 2048
// Global variables
volatile sig_atomic_t flag = 0;
int sockfd = 0;
char name[32];
void str_overwrite_stdout() {
printf("\r%s", "> ");
fflush(stdout);
}
void str_trim_lf (char* arr, int length) {
int i;
for (i = 0; i < length; i ) { // trim \n
if (arr[i] == '\n') {
arr[i] = '\0';
break;
}
}
}
void catch_ctrl_c_and_exit(int sig) {
flag = 1;
}
void send_msg_handler() {
char message[LENGTH] = {};
char buffer[LENGTH 32] = {};
while(1) {
str_overwrite_stdout();
fgets(message, LENGTH, stdin);
str_trim_lf(message, LENGTH);
if (strcmp(message, "exit") == 0) {
break;
} else {
sprintf(buffer, "%s: %s\n", name, message);
send(sockfd, buffer, strlen(buffer), 0);
}
bzero(message, LENGTH);
bzero(buffer, LENGTH 32);
}
catch_ctrl_c_and_exit(2);
}
void recv_msg_handler() {
char message[LENGTH] = {};
while (1) {
int receive = recv(sockfd, message, LENGTH, 0);
if (receive > 0) {
printf("%s", message);
str_overwrite_stdout();
} else if (receive == 0) {
break;
} else {
// -1
}
memset(message, 0, sizeof(message));
}
}
int main(int argc, char **argv){
if(argc != 3){
printf("Usage: %s <ip> <port>\n", argv[0]);
return EXIT_FAILURE;
}
char *ip = argv[1];
int port = atoi(argv[2]);
signal(SIGINT, catch_ctrl_c_and_exit);
printf("Please enter your name: ");
fgets(name, 32, stdin);
str_trim_lf(name, strlen(name));
if (strlen(name) > 32 || strlen(name) < 2){
printf("Name must be less than 30 and more than 2 characters.\n");
return EXIT_FAILURE;
}
struct sockaddr_in server_addr;
// Socket settings
sockfd = socket(AF_INET, SOCK_STREAM, 0);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip);
server_addr.sin_port = htons(port);
// Connect to Server
int err = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (err == -1) {
printf("ERROR: connect\n");
return EXIT_FAILURE;
}
// Send name
send(sockfd, name, 32, 0);
printf("=== WELCOME TO THE CHATROOM ===\n");
pthread_t send_msg_thread;
if(pthread_create(&send_msg_thread, NULL, (void *) send_msg_handler, NULL) != 0){
printf("ERROR: pthread\n");
return EXIT_FAILURE;
}
pthread_t recv_msg_thread;
if(pthread_create(&recv_msg_thread, NULL, (void *) recv_msg_handler, NULL) != 0){
printf("ERROR: pthread\n");
return EXIT_FAILURE;
}
while (1){
if(flag){
printf("\nBye\n");
break;
}
}
close(sockfd);
return EXIT_SUCCESS;
}
客户端的代码也比较容易理解,流程很简单,客户端就是加入到服务端,然后创建一个线程来做消息的收发工作,指导接收到退出的信号,才终止整个大的 loop。
说了这么多,让我们来运行的试试吧!
这儿是服务端程序启动,不过我后面给出了几个客户端连接过来的打印。
随后,看客户端方面:用户 A 的客户端
用户 B 的客户端
用户 C 的客户端
此时,我们发现, userA发送的消息,userB,userC 都收到了,别着急,这是正常的,因为,我们
代码语言:javascript复制// Send message to all clients except the sender
void send_message(char *s, int uid) {
pthread_mutex_lock(&clients_mutex);
for (int i = 0; i < MAX_CLIENTS; i) {
if (clients[i]) {
if (clients[i]->uid != uid) {
if (write(clients[i]->sockfd, s, strlen(s)) < 0) {
perror("ERROR: write to descriptor failed");
break;
}
}
}
}
pthread_mutex_unlock(&clients_mutex);
}
这段逻辑就是服务端会将所有的消息广播到非消息发送方,所以,我们需要在这里继续优化一下:
要实现这个特性,我们需要在服务器端进行修改,以便它能够解析 /msg <recipient_name> <message>
格式的消息,并将消息发送给指定的接收者。
int find_client_sockfd(const char *recipient_name, Client *clients, int max_clients) {
for (int i = 0; i < max_clients; i ) {
if (clients[i].sockfd != 0 && strcmp(clients[i].name, recipient_name) == 0) {
return clients[i].sockfd;
}
}
return -1;
}
接下来,我们需要修改服务器的消息处理部分,以便它能够识别私密消息并将其发送给正确的接收者,篇幅有限,给出主要实现,增加的部分,发现是私密消息,就走这段逻辑,否则就按照直接的方式广播。
代码语言:javascript复制 // 检查是否为私密消息
if (strncmp(msg, "/msg ", 5) == 0) {
char recipient_name[32];
char *message;
// 解析消息
message = msg 5; // 跳过 "/msg "
sscanf(message, "%s", recipient_name);
message = strstr(message, " ") 1; // 跳过接收者名字
// 查找接收者的套接字
int recipient_sockfd = find_client_sockfd(recipient_name, clients, max_clients);
if (recipient_sockfd != -1) {
// 发送私密消息给接收者
sprintf(buffer, "[Private] %s", message);
send(recipient_sockfd, buffer, strlen(buffer), 0);
} else {
// 接收者不存在,发送错误消息给发送者
sprintf(buffer, "User %s does not exist.\n", recipient_name);
send(client_sockfd, buffer, strlen(buffer), 0);
}
}
ok,整个实现就完成了,这样,我们就实现了一个中心化的 聊天服务。
总结
虽然我们实现了一个中心化的聊天服务,但是这个离线上可运营还是有很大的距离的,这个例子非常基础,没有错误处理,也没有加密通信,在生产环境中,你需要考虑更多的错误处理、安全性、性能优化(比如使用线程池或者事件驱动的IO模型)以及其他的高级特性。再者,这个是一个完全在内存中的跑的模型,断电之后,聊天消息,好友关系全部都没有了,而且我们基础版本的聊天室里面,后加入的小伙伴不能接收之前大家都聊了些啥,这多少还差那么点意思,所以,交给你来继续完善,你会有思路吗?
而且,作为拔高,不防思考一下,如何基于此,实现一个去中心化的聊天服务呢?提示下,最简单的是,端既可以是一个服务端也可以扮演一个客户端。则样 A 和 B 就无需一个固定的服务端了。