Profile picture

[Python] FastAPI와 SQLite를 활용한 ORM 기반 하드웨어 인증 서버 구현

JaehyoJJAng2024년 08월 13일

개요

이번 포스팅에서는 클라이언트가 제공하는 하드웨어 해시 값(예: MAC 주소 해시)을 기반으로 인증을 수행하고,

인증 정보를 관리자가 백엔드 서버에 추가하는 과정을 다룰 예정이다.


해당 백엔드 서버는 다음과 같은 기능을 제공할 예정이다.

  • 하드웨어 인증 : 클라이언트의 하드웨어 해시 값을 백엔드 서버로 전송하여 인증을 시도.
  • 인증 정보 추가 : 관리자는 하드웨어 해시 값을 서버에 등록할 수 있다.
  • 보안 조치 : 인증 정보 추가는 관리자에게만 허용되도록 API 토큰을 통한 접근 제어를 적용한다.

코드 설명

1. 데이터베이스 설정 (database.py)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base

# 데이터베이스 설정
DATABASE_URL = "sqlite:///./auth.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 데이터베이스 초기화
Base.metadata.create_all(bind=engine)

SQLite를 사용하여 데이터베이스를 설정한다.


2. 모델 설정 (models.py)

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from pydantic import BaseModel
    
Base = declarative_base()

# 인증 정보를 저장할 모델 정의
class AuthInfo(Base):
    __tablename__ = "auth_info"
    hardware_hash = Column(String, primary_key=True, index=True)

class AuthRequest(BaseModel):
    hardware_hash: str
  • AuthInfo: 인증 정보를 저장할 테이블을 정의
    • 테이블 이름은 auth_info을 가지며, hardware_hash 컬럼이 기본 키로 사용됨.
  • AuthRequest: 클라이언트가 요청할 때 사용할 데이터 모델을 정의함.
    • AuthRequest는 클라이언트의 하드웨어 해시 값을 담음.

3. 관리자 토큰 설정 및 세션 관리 (main.py)

# 관리자 토큰 설정 (환경 변수나 안전한 저장소에 보관하는 것을 권장)
ADMIN_TOKEN :str = "qwer"

# 데이터베이스 세션 가져오기
def get_db():
    db : Session = SessionLocal()
    try:
        yield db
    finally:
        db.close()

관리자 접근을 제어하기 위한 API 토큰을 설정하고, 데이터베이스 세션을 관리하는 함수를 정의한다.

해당 토큰의 경우 관리자만 알고 있어야 하므로, os.getenv() 와 같은 함수를 사용하여 서버에 등록된 환경 변수를 불러오는 형식으로 변경해주는 것을 권장한다.


3-1. 관리자 인증 검사 함수

# 관리자 인증 검사 함수
def verify_admin_token(x_admin_token: str = Header(...)) -> None:
    if x_admin_token != ADMIN_TOKEN:
        raise HTTPException(status_code=403, detail="Invalid admin token")

요청 헤더에서 X-Admin-Token 값을 읽어 관리자 토큰과 일치하는지 확인한다.

일치하지 않으면 403 Forbidden 상태 코드와 함께 오류를 반환한다.


3-2. 인증 및 인증 정보 추가 엔드포인트

인증 엔드포인트

# 인증 API 앤드포인트
@app.post("/auth")
def authenticate(auth_request: AuthRequest, db=Depends(get_db)):
    auth_info = db.query(AuthInfo).filter(AuthInfo.hardware_hash == auth_request.hardware_hash).first()
    if auth_info:
        return {"message": "Authentication successful"}
    else:
        return HTTPException(status_code=401, detail="Authentication failed")

/auth 엔드포인트는 클라이언트의 하드웨어 해시 값이 데이터베이스에 존재하는지 확인하여 인증을 수행함.

존재하면 인증 성공 메시지를, 그렇지 않으면 401 Unauthorized 오류를 반환한다.


인증 정보 추가 엔드포인트

# 인증 정보 추가하는 앤드포인트
@app.post("/add_auth")
def add_auth_info(auth_request: AuthRequest, db=Depends(get_db), token: str = Depends(verify_admin_token)):
    existing_auth = db.query(AuthInfo).filter(AuthInfo.hardware_hash == auth_request.hardware_hash).first()
    if existing_auth:
        raise HTTPException(status_code=400, detail="Auth info already exists")

    new_auth_info = AuthInfo(hardware_hash=auth_request.hardware_hash)
    db.add(new_auth_info)
    db.commit()
    return {"message": "Auth info added successfully"}

/add_auth 엔드포인트는 관리자만 접근할 수 있도록 설정되며, 하드웨어 해시 값을 데이터베이스에 추가함.

이미 존재하는 경우에는 400 Bad Request 오류를 반환한다.


4. 클라이언트 인증 정보 요청 (client.py)

4-1. 하드웨어 ID 생성

수집한 하드웨어 정보를 결합하고 해시 함수를 통해 고유한 하드웨어 ID를 생성하는 함수를 작성해보자.

import hashlib
import uuid
import platform

def get_hardware_id():
    # MAC 주소 수집
    mac_address = ':'.join(['{:02x}'.format((uuid.getnode() >> ele) & 0xff)
                            for ele in range(0,8*6,8)][::-1])
    
    # CPU 정보 수집
    cpu_info = platform.processor()
    
    # 디스크 시리얼 번호 수집 (운영체제에 따라 다를 수 있음)
    disk_serial = "your_disk_serial_number"  # 실제로는 해당 OS에 맞는 방법으로 구현
    
    # 하드웨어 정보 결합
    hardware_info = mac_address + cpu_info + disk_serial
    
    # 해시 처리하여 하드웨어 ID 생성
    hashed_hardware_id = hashlib.sha256(hardware_info.encode()).hexdigest()
    return hashed_hardware_id

4-2. 인증 요청

인증을 요청하는 함수를 작성하자.

import requests

def authenticate_to_server() -> None:
    hardware_id :str = get_hardware_id()
    url = "https://your_server_domain_or_ip/auth"
    data = {"hardware_id": hardware_id}
    try:
        response = requests.post(url, json=data, timeout=5, verify=True)
        if response.status_code == 200:
            print("인증 성공:", response.json())
            # 프로그램 실행 로직 추가
        else:
            print("인증 실패:", response.json())
            # 프로그램 종료 또는 제한
    except requests.exceptions.RequestException as e:
        print("서버에 연결할 수 없습니다:", e)

# 인증 요청
authenticate_to_server()

클라이언트에서 get_hardware_id() 함수의 리턴 값을 /auth 경로로 전송하여 인증을 수행한다.


4-3. 인증 생성

토큰이 올바른 경우에만 인증 정보가 추가됨.

ADMIN_TOKEN :str = 'qwer'
hardware_id :str = get_hardware_id()
url = "https://your_server_domain_or_ip/add_auth"
resp = rq.post(
    url,
    json={"hardware_id": hardware_id},
    headers={"X-Admin-Token": ADMIN_TOKEN}
)

Loading script...