본문 바로가기
Python

[FastAPI] 비밀번호 암호화, JWT 사용하기

by bryan.oh 2023. 8. 15.
반응형

비밀번호 암호화

라이브러리 설치

pip install "passlib[bcrypt]"

 

우선 비밀번호 hash / verify 하는 예제 코드 입니다.

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def get_password_hash(password):
    return pwd_context.hash(password)


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


print(get_password_hash('hello-bryan'))
print(get_password_hash('hello-bryan'))

enc_pwd = get_password_hash('hello-bryan')
print(enc_pwd)
print(verify_password('hello', enc_pwd))
print(verify_password('hello-bryan', enc_pwd))
$2b$12$yddJpuySdEy1BHQNAEA1sOGmMv2r17D1YEkYB7qtW8WzownbGSUC6 $2b$12$/XrB7PO4D4HXXpDv9OrnBOttHyT397d7EAz6jEdDQwQ724FrZIp92 $2b$12$uqNoG1ylcFez52s9VkJUpeSuojQN3Cjod7xXAF/7j.E1sRv01rDhO
False
True

 

결과를 보면 알겠지만, 같은 문자를 hash 해도 다른 결과가 나옵니다.

그래도 verify 를 하면 같은지 알수있습니다.

 

 

JWT : Json Web Tokens

pip install "python-jose[cryptography]"
SECRET_KEY = "3649b1400afba1383e0c8a46b5b896eddbd0af580d52033adb984988517d8c86"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

SECRET_KEY 는 터미널에서 다음 명령어를 실행하면 랜덤으로 얻을 수 있습니다.

$ openssl rand -hex 32
# 3649b1400afba1383e0c8a46b5b896eddbd0af580d52033adb984988517d8c86

 

JWT token 을 생성하는 예제입니다.

from datetime import datetime, timedelta
from jose import JWTError, jwt

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


username = 'hello-bryan'
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
print(create_access_token(data={"sub": username}, expires_delta=access_token_expires))
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJoZWxsby1icnlhbiIsImV4cCI6MTY5MjAyMzAxOH0.8NN9wEVXrtHQ5pXtVGbiFzqclM-dLvVGlP512-1mCYk

결과는 이렇게 나옵니다.

 

 

FastAPI 에서 테스트

FastAPI 공식문서에 나와있는 예제입니다.

from datetime import datetime, timedelta
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return [{"item_id": "Foo", "owner": current_user.username}]

이 코드를 실행합니다.

파일이름을 main.py 라고 했다면

uvicorn main:app --reload  --port=8000

 

로그인을 할땐, 

/token 으로 

POST, form-data 로 username, password 를 보내면 

access_token 을 리턴해줍니다.

FrontEnd 쪽에서 이렇게 받은 access_token 을 저장해뒀다가 

호출할 때, 아래와 같이 호출하면 인증이 됩니다.

 

위에서 /token 과 /users/me 호출하는 것은 javascript 로 각각 아래와 같습니다.

/token (최초 로그인)

var formdata = new FormData();
formdata.append("username", "johndoe");
formdata.append("password", "secret");

var requestOptions = {
  method: 'POST',
  body: formdata,
  redirect: 'follow'
};

fetch("localhost:8000/token", requestOptions)
  .then(response => response.text())
  .then(result => console.log(result))
  .catch(error => console.log('error', error));

 

/users/me (token 으로 인증)

var myHeaders = new Headers();
myHeaders.append("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNjkyMDMzNDAxfQ.Gw_hhKYd1PzZ1fwfoYNtRXjUpN9YGaS000EnzgGwDeY");

var requestOptions = {
  method: 'GET',
  headers: myHeaders,
  redirect: 'follow'
};

fetch("localhost:8000/users/me", requestOptions)
  .then(response => response.text())
  .then(result => console.log(result))
  .catch(error => console.log('error', error));

 

 

 

 

 

728x90
반응형

댓글