Profile picture

[Python] 스레드(Thread)와 멀티 스레딩(Multi Threading)

JaehyoJJAng2024년 07월 03일

스레드 ?

스레드(Thread)는 컴퓨터 프로그램에서 작업을 수행하는 가장 작은 실행 단위라고 일컫는다.

일반적으로 하나의 프로세스에는 여러 스레드를 가질 수 있으며, 각 스레드는 독립적으로 실행되기는 하지만 메모리 공간을 공유한다.

이를 통해 프로세스 내에서 여러 작업을 동시에 처리할 수 있다.


스레드의 특징

  • 경량 프로세스 : 스레드는 일반적으로 경량 프로세스 라고 불리운다. 프로세스가 독립된 메모리 공간을 사용하는 반면, 스레드는 같은 프로세스 내에서 메모리와 자원을 공유한다.
    • 따라서 스레드 간의 통신이 빠르고 메모리 사용량이 적다.
  • 동시성 : 여러 스레드가 동시에 실행되어 **동시성(concurrency)**을 제공할 수 있다.
    • 특히, I/O 작업과 같은 대기 시간이 발생하는 경우, 다른 스레드가 실행되어 효율성을 높일 수 있다.

스레드와 프로세스의 차이점

  • 프로세스: 독립적인 실행 단위로, 고유한 메모리 공간을 사용하고, 다른 프로세스와 메모리를 공유하지 않는다.
    • 프로세스 간의 통신은 비교적 느리고, 별도의 통신 방식(예: 파이프, 메시지 큐, 소켓 등)이 필요하다.
  • 스레드: 프로세스 내에서 실행되는 단위로, 같은 메모리 공간을 공유한다.
    • 스레드 간의 통신이 빠르지만, 공유 자원을 사용할 때 동기화 문제를 해결해야 한다.

코드로 보는 간단한 스레드 예시

import threading

def worker():
    print("스레드가 실행 중입니다.")

# 스레드 생성
thread = threading.Thread(target=worker)

# 스레드 시작
thread.start()

# 스레드 종료 대기
thread.join()

print("스레드가 종료되었습니다.")

위 코드는 하나의 스레드를 생성하여 worker 함수를 실행하고, 스레드가 끝날 때까지 기다린 후 메인 프로그램이 종료된다.


멀티 스레딩

파이썬에서의 멀티 스레딩이란 여러 스레드를 동시에 실행하여 병렬 처리하는 방법이다.

스레드는 프로세스 내에서 독립적으로 실행되는 흐름 단위로, 멀티스레딩을 사용하면 CPU의 여러 코어를 활용하거나 I/O 작업을 동시에 처리할 수도 있다.

하지만, 스레드 간 자원이 공유되기 때문에, 효율적이면서도 충돌 가능성의 위험이 존재한다.


멀티스레딩이 유용한 이유

  • I/O 바운드 작업 : 파일 읽기/쓰기, 네트워크 통신, 데이터베이스 작업 등 I/O 작업이 많은 경우에는 멀티스레딩이 효율적이다.
    • 각 스레드가 I/O 작업을 대기하는 동안 다른 스레드가 실행될 수 있기 때문에 병렬 처리가 가능해진다.
  • I/O 작업의 병렬 처리: GIL이 I/O 작업에는 적용되지 않기 때문에, 이러한 작업을 동시에 수행할 때는 멀티스레딩의 장점이 발휘된다.

멀티스레딩 실습을 진행하기 이전에, 싱글 스레딩과, 코루틴을 사용한 동시성 프로그래밍을 먼저 실습해본 후, 멀테스레딩과의 차이점을 비교해보자.


싱글 스레딩 예시

import requests as rq
import time
import os
import threading

def fetch(session: rq.Session, url: str) -> str:
    print(f"Process {os.getpid()} | {threading.get_ident()} url: {url}")
    with session.get(url=url) as resp:
        return resp.text

def main() -> None:
    urls :list[str] = ["https://www.naver.com" for _ in range(40)]
    with rq.Session() as session:
        result :list[str] = [fetch(session=session, url=url) for url in urls]
        print(result)        

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print(f" 작업 종료: end - start")

위 코드는 www.naver.com에 GET 요청을 보내 응답을 받는 코드이다.

os.getpid() 메소드를 통해 현재 프로세스 ID 값을 얻고, threading.get_ident() 메소드를 통해 현재 스레드의 ID 값을 확인할 수 있다.


실행 결과를 확인해보자.

Process 12926 | 127800567118720 url: https://www.naver.com
Process 12926 | 127800567118720 url: https://www.naver.com
Process 12926 | 127800567118720 url: https://www.naver.com
Process 12926 | 127800567118720 url: https://www.naver.com
Process 12926 | 127800567118720 url: https://www.naver.com
...
...
...
 작업 종료: 2.5873804092407227

12926 이라는 동일한 프로세스 내에서 ID가 127800567118720 ID 값을 가진 스레드 홀로 작업 중인 것을 볼 수 있다.

이 코드의 경우 하나의 작업을 모두 마칠 때까지 다음 작업으로 넘어가지지 않는다.

해당 코드를 실행하는데 약 2.5초가 소요되었다.


코루틴 동시성 프로그래밍 예시 (싱글 스레드)

import requests as rq
import time
import os
import threading
import asyncio
import aiohttp

async def fetch(session: rq.Session, url: str) -> str:
    print(f"Process {os.getpid()} | {threading.get_ident()} url: {url}")    
    async with session.get(url=url) as resp:
        return resp.text

async def main() -> None:
    urls :list[str] = ["https://www.naver.com" for _ in range(40)]
    async with rq.Session() as session:
        result :list[str] = await asyncio.gather(*[fetch(session=session, url=url) for url in urls])

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f" 작업 종료: {end - start}")

다음은 코루팅을 사용해 동시성 프로그래밍을 적용한 코드이다. 코루틴은 하나의 스레드에서 비동기 작업을 실행할 수 있도록 해준다.


실행 결과를 확인해보자.

Process 13409 | 132587443825536 url: https://www.naver.com
Process 13409 | 132587443825536 url: https://www.naver.com
Process 13409 | 132587443825536 url: https://www.naver.com
Process 13409 | 132587443825536 url: https://www.naver.com
Process 13409 | 132587443825536 url: https://www.naver.com
 작업 종료: 0.35297632217407227

코루틴을 사용했을 경우 동일한 프로세스에서 하나의 스레드를 사용하지만 I/O 바운드 작업의 대기 시간 동안 다른 작업을 할 수 있기 때문에 싱글 스레딩 예시 코드보다 현저히 빨라진 작업 속도를 볼 수 있다.


멀티 스레딩 예시 #1

import requests as rq
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor


# 멀티스레딩으로 처리할 함수
def fetch(params: tuple[rq.Session, str]) -> tuple:
    session, url = params
    print(f"Process {os.getpid()} | {threading.get_ident()} url: {url}")
    with session.get(url=url) as resp:
        return url, resp.status_code

def main() -> None:
    urls :list[str] = ["https://www.naver.com" for _ in range(40)]
    
    # 스레드 5개로 제한
    max_threads :int = 10
    
    with rq.Session() as session:
        # url과 세션을 튜플로 묶어줌
        params :list[tuple[rq.Session, str]] = [(session, url) for url in urls]
        
        with ThreadPoolExecutor(max_workers=max_threads) as executor:
            # fetch 함수와 URL 파라미터들을 병렬로 처리
            results = list(executor.map(fetch, params))
        
        # 결과 출력
        for result in results:
            print(f"URL: {result[0]} ({result[-1]})")

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print(f" 작업 종료: {end - start}")

실행 결과를 확인해보자.

Process 13682 | 126125315458752 url: https://www.naver.com
Process 13682 | 126125304972992 url: https://www.naver.com
Process 13682 | 126125294487232 url: https://www.naver.com
Process 13682 | 126125284001472 url: https://www.naver.com
Process 13682 | 126125135103680 url: https://www.naver.com
Process 13682 | 126125124617920 url: https://www.naver.com
Process 13682 | 126125432899264 url: https://www.naver.com
...
...
...
 작업 종료: 0.6496436595916748

동일한 프로세스 내에서 각기 다른 ID 값을 가지는 스레드를 확인할 수 있다.

ThreadPoolExecutor() 메소드 내의 max_workers 라는 파라미터를 통해 스레드 개수를 조절할 수 있는데, 위 코드에서는 10개의 스레드를 사용했을 때 0.64초가 소요되었다.


1개의 스레드만 사용하는 경우에는 멀티스레딩을 적용하지 않은 코드보다도 더 느리다.

왜 그런거냐면, 스레드를 지정해주는 비용 때문에 그렇다.

멀티 스레딩을 사용할 때는 스레드를 만들고 우선 순위를 지정해주는 등의 비용이 들기 때문에 메모리 점유율이 올라갈 수 밖에 없다.

따라서, 특별한 상황이 아니라면 코루틴(Coroutine)을 사용하는 것을 권장한다.


멀티 스레딩 예시 #2

아래 코드는 멀티 스레딩 예시 #1에서 코드만 조금 다르게 수정한 것이다.

import requests as rq
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,  as_completed


# 멀티스레딩으로 처리할 함수
def fetch(url: str) -> tuple:
    print(f"Process {os.getpid()} | {threading.get_ident()} url: {url}")
    with rq.Session() as session:
        with session.get(url=url) as resp:
            return url, resp.status_code

def main() -> None:
    urls :list[str] = ["https://www.naver.com" for _ in range(40)]
    
    # 스레드 5개로 제한
    max_threads :int = 10
    
    results :list = []
    with rq.Session() as session:
        # url과 세션을 튜플로 묶어줌
        params :list[tuple[rq.Session, str]] = [(session, url) for url in urls]
        
        with ThreadPoolExecutor(max_workers=max_threads) as executor:
            # 각 URL에 대해 fetch 함수를 submit
            futures = {executor.submit(fetch, url) for url in urls}

            # 완료된 순서대로 결과 처리
            for i, future in enumerate(as_completed(futures)):
                try:
                    url, status_code = future.result()
                    if isinstance(status_code, str): # 에러 메시지인 경우
                        print(f"Error fetching {url}: {status_code}")
                    else:
                        print(f"Success fetching {url}: {status_code}")
                        results.append((url, status_code))
                except Exception as e:
                    print(f"Unexpected error: {e}")

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print(f" 작업 종료: {end - start}")
  • executor.submit(fetch, url): 각 URL에 대해 fetch 함수를 실행하고 futures에 저장함.
  • as_completed(futures): 완료된 작업을 순서대로 처리하며, 각 future.result()에서 결과를 받아서 출력함.

Loading script...