본문 바로가기
Programming/열혈 TCP, IP 소켓 프로그래밍(저자 윤성우)

Ch 18. 멀티쓰레드 기반의 서버구현

by minjunkim.dev 2020. 8. 10.

   모든 내용은 [윤성우 저, "열혈강의 TCP/IP 소켓 프로그래밍", 오렌지미디어] 를 기반으로 제 나름대로 이해하여 정리한 것입니다. 다소 부정확한 내용이 있을수도 있으니 이를 유념하고 봐주세요!


# 멀티프로세스 기반의 서버구현의 단점
1) 프로세스 생성이라는 운영체제 차원에서 부담스러운 작업과정을 거침
2) 두 프로세스 사이에서의 데이터 교환을 위해서는 별도의 IPC 기법을 적용해야함(e.g. 파이프)
3) 초당 적게는 수십번에서 많게는 수천번까지 일어나는 "컨텍스트 스위칭"에 따른 부담은
프로세스 생성 방식의 가장 큰 부담 - 가장 치명적 단점
* CPU 내의 코어보다 많은 수의 프로세스가 실행될 수 있는 이유는 프로세스들이 CPU의 할당시간을 나눠서 사용하기 때문인데, 이를 위해서는 "컨텍스트 스위칭"이 필요 => 이 때문에 오버헤드 발생

 

# 쓰레드의 등장
- 멀티프로세스의 특징을 유지하면서 단점을 극복하기 위해 등장
- 멀티프로세스의 단점을 최소화하기 위해 설계된 일종의 "경량화된" 프로세스
- 장점
1) 쓰레드의 생성 및 컨텍스트 스위칭은 프로세스보다 빠르다
2) 쓰레드 사이에서의 데이터 교환에는 특별한 기법이 필요치 않다.

 

# 쓰레드와 프로세스의 차이점
- 둘 이상의 실행흐름을 갖기 위해 프로세스가 유지하고 있는 메모리 영역을

통째로 복사한다는 것이 너무 부담스러움
- 프로세스 메모리 구조(각 프로세스마다 별도로 유지함)
code(text) section : 프로그램 코드
data section : 전역/static 변수
heap section : 동적 메모리할당 / 런타임에 크기 결정 / 높은번지에서 낮은번지로 확장
stack section : 함수 호출 시 생성되는 지역 변수와 매개 변수가 저장되는 영역 / 컴파일에 크기 결정 / 낮은번지에서 높은번지로 확장
- 쓰레드는 데이터/힙영역을 공유하고 스택영역만 분리시킴으로써 다음과 같은 장점을 얻음
1) 컨텍스트 스위칭시 데이터 영역과 힙은 올리고 내릴 필요가 없다.
2) 데이터 영역과 힙을 이용해서 데이터를 교환할 수 있다.
- 프로세스 : "운영체제 관점"에서 별도의 실행흐름을 구성하는 단위
- 쓰레드 : "프로세스 관점"에서 별도의 실행흐름을 구성하는 단위


# POSIX : UNIX 계열 운영체제간에 이식성을 높이기 위한 표준 API 규격을 뜻함

# 쓰레드의 생성과 실행흐름의 구성
- 쓰레드는 별도의 실행흐름을 가지므로, 쓰레드만의 main함수를 별도로 정의해야 함
- 그리고, 이 함수를 시작으로 별도의 실행흐름을 형성해줄 것을 운영체제에 요청해야함
- 운영체제에 이를 요청하는 함수

#include <pthread.h>

int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,
void *(*start_routine)(void*),void *restrict arg);
// -> 성공시 0, 실패시 0 이외의 값 반환

- thread : 생성할 쓰레드의 ID 저장을 위한 변수의 주소값.

참고로 쓰레드는 프로세스와 마찬가지로 쓰레드 구분을 위한 ID가 부여됨
- attr : 쓰레드에 부여할 특성정보의 전달, NULL 전달시 기본특성의 쓰레드 생성
- start_routine : 쓰레드의 main함수 역할을 하는, 별도 실행흐름의 시작이 되는 함수의 주소값(함수 포인터)
- arg : 세번째 인자를 통해 등록된 함수가 호출될때, 전달할 인자의 정보를 담고 있는 변수의 주소값

- restrict 키워드와 함수 포인터를 잘 알아야 제대로 사용할 수 있음.
- 프로세스가 종료되면, 프로세스 내의 쓰레드는 종료됨.

# 쓰레드 기반 프로그래밍에서 sleep 함수 호출을 통해 쓰레드의 실행을 관리한다는 것은,
프로그램의 흐름을 예측한다는 뜻이므로 사실상 불가능
=> 밑의 함수를 사용하여 문제를 쉽고 효율적으로 해결

#include <pthread.h>

int pthread_join(pthread_t thread, void **status);
// ->성공시 0, 실패시 0 이외의 값 반환

- thread : 이 매개변수에 전달되는 ID의 쓰레드가 종료될때까지 함수는 반환하지 않음
- status : 쓰레드의 main 함수가 반환하는 값이 저장될 포인터 변수의 주소값

 

# thread2.c

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

void* thread_main(void *arg);

int main(int argc, char* argv[])
{
    pthread_t t_id;
    int thread_param=5; // 쓰레드의 메인함수에 전달할 매개변수
    void* thr_ret;
    
    if(pthread_create(&t_id,NULL,thread_main,(void*)&thread_param)!=0) // 쓰레드의 생성 및 시작
    {
        puts("pthread_create() error");
        return -1;
    }

    if(pthread_join(t_id,&thr_ret)!=0) // 쓰레드가 끝날때까지 대기
    {
        puts("pthread_join() error");
        return -1;
    }

    printf("Thread return meesage: %s \n",(char*)thr_ret);
    free(thr_ret);

    return 0;
}

void* thread_main(void *arg)
{
    int i;
    int cnt=*((int*)arg);
    char* msg=(char*)malloc(sizeof(char)*50);
    strcpy(msg,"Hello, I'am thread- \n");

    for(i=0;i<cnt;++i) // 5번 반복
    {
        sleep(1); puts("running thread");
    }

    return (void*)msg; // 반환형에 맞춰 void형으로 변환하여 반환
}

# 실행 결과


# 임계영역 내에서 호출이 가능한 함수
- 동시에 둘 이상의 쓰레드를 생성시 그 방법에 차이는 없으나,
임계영역이 존재하는 함수를 둘 이상의 쓰레드가 동시에 실행하면 문제를 일으킬수 있음.
1) 쓰레드에 안전한 함수 : 함수를 동시 호출해도 문제가 발생하지 않음
2) 쓰레드에 불안전한 함수 : 함수를 동시 호출시 문제가 발생할 수 있음
=> 다만 쓰레드에 안전한 함수라도 임계영역은 존재할 수 있음.

문제가 발생하지 않도록 적절한 조치가 이루어져 있는 것 뿐임.
- 기본적으로 제공되는 대부분의 표준함수들은 쓰레드에 안전하다.
- 그리고, 쓰레드에 불안전한 함수의 같은 기능을 갖는 쓰레드에 안전한 함수가 정의되어 있어
우리가 굳이 구분을 할 필요가 없음.
e.g)

struct hostent * gethostbyname(const char* name);
=>
struct hostent * gethostbyname_r(const char *name,
struct hostent *result, char *buffer, int buflen, int *h_errnop);

- 리눅스에서는 일반적으로 쓰레드에 안전한 형태로 재구현된 함수의 이름에 _r이 붙음(윈도우는 다름)
- 헤더파일 이전에 매크로 _REENTRANT를 정의하거나(#define _REENTRANT),
컴파일시 -D_REENTRANT의 옵션을 추가하는 방식으로 매크로를 정의하면
재구현된 함수를 자동으로 사용할수 있어 매우 간편!
- 쓰레드들은 데이터 영역을 공유하므로 전역변수에 공유 및 접근이 가능하다.
=> 이런 경우 임계영역과 관련한 오류의 발생소지가 높다!


# 하나의 변수에 둘 이상의 쓰레드가 동시에 접근하는 것이 문제!
- 꼭 전역변수 뿐만 아니라, 어떤 메모리 공간이라도 동시에 접근하면 문제가 발생가능
- 쓰레도 역시 CPU의 할당시간을 나눠서 실행하지만(코어 수보다 쓰레드가 많은 경우에)

메모리에 대한 "동시접근"은 이루어질 수 있다.


예를 들어,
num=100; 변수를 1씩 증가시키는 thread1, 2가 있다고 가정하자.
먼저 thread1이 num=100; 을 참조하여 가져온다. 이 값을 CPU에 전달해서 값101 을 얻고,
thread1은 이 값을 다시 num에 저장해야 num은 비로소 num=101; 이 된다. thread2도 이와 같이 진행이 된다.
그런데, 1. thread1이 num의 값을 가져와서 2. CPU에 전달하고, 3. CPU가 계산한 값을 가져와서, 4. 이 값을 다시 num에 저장하기 전에
thread2가 num을 참조하여 값을 가져가면 우리가 원하는대로 작동하지 않는다.
=> 따라서 한 쓰레드가 한 변수에 접근해서 연산을 완료할때까지,

다른 쓰레드가 해당 변수에 접근하지 못하도록 막아야 하는데,
이것이 바로 "동기화"이다.
- 일반적으로 임계영역은 쓰레드에 의해서 실행되는 함수 내에 존재한다.

 

# 쓰레드의 접근순서 때문에 발생하는 문제점의 해결책 - 쓰레드 동기화
- 쓰레드 동기화가 필요한 상황
1) 동일한 메모리 영역으로의 동시접근이 발생하는 상황
2) 동일한 메모리 영역에 접근하는 쓰레드의 실행순서를 지정해야 하는 상황

: 쓰레드의 "실행순서 컨트롤"에 관련된 내용
=> 뮤텍스/세마포어 이용하여 해결


# 뮤텍스(Mutual Exclusion) : 쓰레드의 동시접근을 허용하지 않는다는 의미
- 임계영역에 대한 자물쇠 역할을 함

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destory(pthread_mutex_t *mutex);
// -> 성공시 0, 실패시 0이외의 값 반환

- mutex : 뮤텍스 생성시에는 뮤텍스의 참조값 저장을 위한 변수의 주소값,
뮤텍스 소멸시에는 소멸하고자 하는 뮤텍스의 참조값을 저장하고 있는 변수의 주소값
- attr : 생성하는 뮤텍스의 특성정보를 담고있는 변수의 주소값, 별도의 특성을 지정하지 않으면 NULL)
- 자물쇠 생성을 위해서는 pthread_mutex_t mutex형 변수가 선언되어야 한다.
- 이 변수의 주소값은 운영체제가 생성한 뮤텍스(자물쇠 시스템)의 참조에 사용되며, 뮤텍스 소멸시에도 사용됨.
두번째 인자에 NULL을 전달해서 뮤텍스를 생성하고자 할때는
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 도 가능하나 비추천
(매크로를 사용한 초기화는 오류발생에 대한 확인이 어려움)

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
/*
임계영역 들어가기 전에 호출. 다른 쓰레드가 임계영역 실행중이면,
그 쓰레드가 unlock 함수 호출할 때까지(임계영역에서 나올때까지) 블로킹
*/
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 임계영역 나오고 나서 호출
// -> 성공시 0, 실패시 0이외의 값 반환

e.g.

pthread_mutex_t mutex;
....
pthread_mutex_lock(&mutex);
// 임계영역 시작
....
// 임계영역 끝
pthread_mutex_unlock(&mutex);

=> 단 데드락(교착상태) 발생에 주의할 것!

 

- 뮤텍스의 lock, unlock 함수의 호출에는 생각보다 오랜 시간이 걸린다.
=> 따라서, 임계영역의 범위는 상황에 맞게 설정하자!


# 세마포어 : 뮤텍스와 굉장히 유사
- 여기서는, 바이너리 세마포어(0과 1 사용)를 대상으로 쓰레드의 "실행순서 컨트롤" 중심의 동기화를 설명

- 세마포어 생성과 소멸

#include <semaphore.h>

int sem_init(sem_t &sem, int pshared, unsigned int value);
int sem_destory(sem_t *sem);
// -> 성공시 0, 실패시 0 이외의 값 반환

- sem : 세마포어 생성시에는 세마포어의 참조값 저장을 위한 변수의 주소값,
세마포어 소멸시에는 소멸하고자 하는 세마포어의 참조값을 저장하는 변수의 주소값
- pshared : 0 이외의 값 전달시, 둘 이상의 프로세스에 의해 접근 가능한 세마포어 생성,
0 전달시 하나의 프로세스 내에서만 접근가능한 세마포어 생성
- value : 생성되는 세마포어의 초기값 지정

#include <semaphore.h>

int sem_wait(sem_t *sem); // 뮤텍스의 lock 역할
int sem_post(sem_t *sem); // 뮤텍스의 unlock 역할
// ->성공시 0, 실패시 0 이외의 값 반환

- sem : 세마포어의 참조값을 저장하고 있는 변수의 주소값.
sem_post에 전달되면 세마포어의 값은 하나 증가, sem_wait에 전달되면 세마포어의 값은 하나 감소

- sem_init 함수 호출시 운영체제에 의해 세마포어 오브젝트 라는 것이 만들어짐
=> 이곳에는 "세마포어 값"이라는 정수가 하나 기록됨
- "세마포어 값"이 위 두 함수 호출에 따라 변함. 단, 0보다 작아질수는 없음
- 세마포어 값이 0인 경우에 wait 호출시 블로킹에 빠지며,

다른 쓰레드가 sem_post가 호출해야만 블로킹 상태에서 빠져나가게 됨

# (세마포어 초기값이 1임을 가정) - 바이너리 세마포어

sem_wait(&sem); // 세마포어값 0으로
//임계영역의 시작
...
//임계영역의 끝
sem_post(&sem); // 세마포어값 1로

# 쓰레드를 소멸하는 두가지 방법
- 리눅스의 쓰레드는 쓰레드의 main 함수를 반환했다고 해서 자동으로 소멸하지 않음
- 쓰레드의 소멸을 직접적으로 명시해야만 함, 안그러면 쓰레드에 의해 할당된 메모리 공간이 계속해서 남아있게됨
1) pthread_join 함수의 호출 => 쓰레드의 종료대기 및 소멸까지 유도가 되나,

쓰레드가 종료될때까지 해당 프로세스가 블로킹 상태에 놓이게 됨.
2) pthread_detach 함수의 호출을 통해서 쓰레드의 소멸을 주로 유도함.

#include <pthread.h>

int pthread_detach(pthread_t thread);
// -> 성공시 0, 실패시 0 이외의 값 반환

- thread : 종료와 동시에 소멸시킬 쓰레드의 ID 정보 전달.


- 이 함수을 호출했다고 해서, 종료되지 않은 쓰레드가 종료되거나 블로킹 상태에 놓이지 않음.
- 이 함수를 통해, 종료된 쓰레드에게 할당된 메모리의 소멸을 유도할 수 있음.
- 이 함수 호출 이후에는 해당 쓰레드를 대상으로 pthread_join() 함수를 호출할 수 없으니 주의하자.


# 멀티쓰레드 기반의 다중접속 서버의 구현

 

chat_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void* handle_clnt(void* arg);
void send_msg(char* msg, int len);
void error_handling(char* message);

int clnt_cnt=0; // 서버에 접속한 클라이언트 수
int clnt_socks[MAX_CLNT]; // 클라이언트와의 송수신을 위해 생성한 소켓의 파일 디스크립터를 저장한 배열
pthread_mutex_t mutx; // 뮤텍스를 통한 쓰레드 동기화를 위한 변수

int main(int argc, char * argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t clnt_adr_sz;
    pthread_t t_id;

    if(argc!=2) // 실행파일 경로/PORT번호 를 입력으로 받아야 함
    {
        printf("Usage : %s <port> \n", argv[0]);
        exit(EXIT_FAILURE);
    }

    pthread_mutex_init(&mutx,NULL); // 뮤텍스 생성

    serv_sock=socket(PF_INET,SOCK_STREAM,0); // TCP 소켓 생성

    /* 서버 주소정보 초기화 */
    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
    serv_adr.sin_port=htons(atoi(argv[1]));

    /* 서버 주소정보를 기반으로 주소할당 */
    bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr));

    /* 서버소켓(리스닝소켓)이 됨 
    연결요청 대기큐를 생성하고, 클라이언트의 연결요청을 기다림
    */
    listen(serv_sock,5);

    while(true)
    {
        clnt_adr_sz=sizeof(clnt_adr);

        /* 클라이언트의 연결요청을 수락하고,
        클라이언트와의 송수신을 위한 새로운 소켓 생성 */
        clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_adr,&clnt_adr_sz);
        printf("Connected client IP: %s \n",inet_ntoa(clnt_adr.sin_addr));
        // 클라이언트의 IP정보를 문자열로 변환하여 출력

        pthread_mutex_lock(&mutx); // 뮤텍스 lock
        clnt_socks[clnt_cnt++]=clnt_sock; // 클라이언트 수와 파일 디스크립터를 등록
        pthread_mutex_unlock(&mutx); // 뮤텍스 unlock

        pthread_create(&t_id,NULL,handle_clnt,(void*)&clnt_sock); // 쓰레드 생성 및 실행
        pthread_detach(t_id); // 쓰레드가 종료되면 소멸시킴
    }

    return 0;
}

void* handle_clnt(void* arg)
{
    int clnt_sock=*((int*)arg); // 클라이언트와의 연결을 위해 생성된 소켓의 파일 디스크립터
    int str_len=0,i;
    char msg[BUF_SIZE];

    while((str_len=read(clnt_sock,msg,sizeof(msg)))!=0) // 클라이언트로부터 EOF를 수신 할때까지 읽어서
        send_msg(msg,str_len); // send_msg 함수 호출

    pthread_mutex_lock(&mutx); // 뮤텍스 lock
    for(i=0;i<clnt_cnt;++i) // remove disconnected client
    {
        if(clnt_sock==clnt_socks[i]) // 현재 해당하는 파일 디스크립터를 찾으면
        {
            while(i<clnt_cnt-1) // 클라이언트가 연결요청을 했으므로 해당정보를 덮어씌워 삭제
            {
                clnt_socks[i]=clnt_socks[i+1];
                ++i;
            }
            break;
        }
    }
    --clnt_cnt; // 클라이언트 수 감소
    pthread_mutex_unlock(&mutx); // 뮤텍스 unlock
    close(clnt_sock); // 클라이언트와의 송수신을 위한 생성했던 소켓종료
    return NULL;
}

void send_msg(char* msg, int len) // send to all
{
    int i;
    pthread_mutex_lock(&mutx); // 뮤텍스 lock
    for(i=0;i<clnt_cnt;++i) // 현재 연결된 모든 클라이언트에게 메세지를 전송
        write(clnt_socks[i],msg,len);
    pthread_mutex_unlock(&mutx); // 뮤텍스 unlock
}

void error_handling(char* message)
{
    fputs(message,stderr);
    fputc('\n',stderr);
    exit(EXIT_FAILURE); // -1
}

 

char_client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void* send_msg(void* arg);
void* recv_msg(void* arg); 
void error_handling(char* message);

char name[NAME_SIZE]="[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, char * argv[])
{
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void* thred_return;

    if(argc!=4) // 실행파일 경로/IP/PORT번호/채팅닉네임 을 입력으로 받아야 함
    {
        printf("Usage : %s <IP> <port> <name> \n",argv[0]);
        exit(EXIT_FAILURE);
    }

    sprintf(name,"[%s]",argv[3]);
    sock=socket(PF_INET,SOCK_STREAM,0); // TCP 소켓 생성

    /* 서버 주소정보 초기화 */
    memset(&serv_addr,0,sizeof(serv_addr));
    serv_addr.sin_family=AF_INET;
    serv_addr.sin_addr.s_addr=inet_addr(argv[1]);
    serv_addr.sin_port=htons(atoi(argv[2]));

    /* 서버 주소정보를 기반으로 연결요청
    이때 비로소 클라이언트 소켓이 됨 */
    connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    
    pthread_create(&snd_thread,NULL,send_msg,(void*)&sock); // 쓰레드 생성 및 실행
    pthread_create(&rcv_thread,NULL,recv_msg,(void*)&sock); // 쓰레드 생성 및 실행

    pthread_join(snd_thread,&thred_return); // 쓰레드 종료까지 대기 
    pthread_join(rcv_thread,&thred_return); // 쓰레드 종료까지 대기
    close(sock); // 클라이언트 소켓 연결종료

    return 0;
}

void* send_msg(void* arg) // send thread main
{
    int sock=*((int*)arg); // 클라이언트의 파일 디스크립터
    char name_msg[NAME_SIZE+BUF_SIZE];
    while(true)
    {
        fgets(msg,BUF_SIZE,stdin); 
        if(!strcmp(msg,"q\n")||!strcmp(msg,"Q\n"))
        {
            close(sock); // 클라이언트 소켓 연결종료 후
            exit(EXIT_SUCCESS); // 프로그램 종료
        }
        sprintf(name_msg,"%s %s",name,msg); // client 이름과 msg를 합침
        write(sock, name_msg, strlen(name_msg)); // 널문자 제외하고 서버로 문자열을 보냄
    }
    return NULL;
}

void* recv_msg(void* arg) // read thread main
{
    int sock=*((int*)arg); // 클라이언트의 파일 디스크립터
    char name_msg[NAME_SIZE+BUF_SIZE];
    int str_len;
    while(true)
    {
        str_len=read(sock,name_msg,NAME_SIZE+BUF_SIZE-1);
        if(str_len==-1) // read 실패시
            return (void*)-1;
        name_msg[str_len]='\0';
        fputs(name_msg,stdout);
    }
    return NULL;
}

void error_handling(char* message)
{
    fputs(message,stderr);
    fputc('\n',stderr);
    exit(EXIT_FAILURE); // -1
}

 


[출처] : 윤성우 저, "열혈강의 TCP/IP 소켓 프로그래밍", 오렌지미디어