Tech
Fastapi Framework로 효율 높이기
2023. 10. 18

 

쿡앱스 기술지원팀에서 서버 개발을 맡고 있는 김경종입니다. 최근 쿡앱스 기술지원팀에서는 새로운 Python Framework인 Fastapi를 도입했습니다. 그중 Fastapi Middleware 개발 배경과 개발 과정 중 변경된 사항들에 대해 아티클을 통해 공유합니다.

 


 

어쩌다 Fastapi를 도입하게 됐을까?

Fastapi는 기존 PHP 서버의 기능적인 단점을 보완하기 위해 도입됐습니다. PHP 서버에서는 대부분의 처리를 DB에 직접 조회 후 사용하는 구조였기 때문에 새로운 Framework 도입해 개선이 필요했습니다. 무엇보다 디버깅이 용이한 것은 물론 개별적으로 테스트 가능한 Swagger까지 내장하고 있어, Fastapi를 채택하게 됐습니다. Fastapi를 도입하며 크게 2가지 조건을 지켜야 했습니다. 먼저 기존 서버와 동일한 구조를 가지고 동일하게 기능해야 한다는 것. 그렇지 않다면, 추가 개발을 통하여 기능을 구현해야 한다는 것입니다.

 


 

기존 서버의 장점은 유지하기

기존 서버와 동일하게 기능해야 하는 기준은 기존 PHP 서버의 장점을 그대로 유지하기 위해서 마련됐습니다. 바로 효율성 때문입니다. PHP 서버에서는 Middleware에서 모든 Request와 Response를 처리할 수 있어 효율적이었습니다. Fastapi를 도입해도 이 효율성은 그대로 유지해야 했습니다. 우선 Starlette 프로젝트 소스부터 확인했습니다. Fastapi는 Starlette에서 시작된 프로젝트이기 때문입니다. Middleware에 대한 정보는 끝내 확인하지 못했지만, Messagepack에서 실마리를 얻어 개발을 시작하게 됐습니다.

 


 

Fastapi Middleware의 기본 구조

Fastapi Middleware에서는 Class에서 Call 발생 시에 처리할 수 있도록 선언해 줬습니다. 이후 Request는 receive_with_middleware에서 처리하고, Response는 send_with_middleware에서 처리할 수 있도록 구조화했습니다.

class ApplicationJsonMiddleware:
    def __init__(
        self,
        app: ASGIApp
    ) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "http":
            responder = _ApplicationJsonResponder(
                self.app
            )
            await responder(scope, receive, send)
            return
        await self.app(scope, receive, send)


class _ApplicationJsonResponder:
    def __init__(
        self,
        app: ASGIApp
    ) -> None:
        self.app = app
        self.receive: Receive = unattached_receive
        self.send: Send = unattached_send
        self.initial_message: Message = {}
        self.start_message: Message = {}
    
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        await self.app(scope, self.receive_with_middleware, self.send_with_middleware)

    async def receive_with_middleware(self) -> Message:
        message = await self.receive()

        
        while more_body:
            message = await self.receive()
            if message and message['body']:
                body = body + message['body'].decode('utf-8')
                
            more_body = message.get("more_body", False)
        
        if not body:
            return message
        
        
        return message

    async def send_with_middleware(self, message: Message) -> None:
        
        if (self.path == '/docs') or (self.path == '/openapi.json') :
            await self.send(message)
            return

        if message["type"] == "http.response.start":
            headers = Headers(raw=message["headers"])
            if headers["content-type"] != "application/json":
                self.should_send_json = False
                await self.send(message)
                return
            self.start_message = message
        elif message["type"] == "http.response.body":
            body = get(message, 'body')
            if not self.should_send_json:
                await self.send(message)
                return
                    
            await self.send(self.start_message)
            await self.send(message)
        
        return 


async def unattached_receive() -> Message:
    raise RuntimeError("receive awaitable not set")  # pragma: no cover


async def unattached_send(message: Message) -> None:
    raise RuntimeError("send awaitable not set")  # pragma: no cover

 

Middleware의 4가지 역할

Middleware에서 처리되는 부분은 다음과 같은 역할을 맡게 됩니다. 첫 번째, 암호화 처리에 대한 공통 처리 구현. 두 번째, Network Record와 같은 Network 통신 저장. 세 번째 대량의 데이터 처리. 마지막으로 Request/Response 처리 등 기타 공통 로그 처리입니다. Middleware에서 이 4가지 역할이 수행될 수 있도록, 4가지 기능을 추가로 구현할 필요가 있었습니다.

 


 

첫 번째, 패킷 복호화와 암호화 로직 세우기
Client에서 올라온 데이터는 다음과 같은 과정을 거쳐서 복호화 됩니다. 우선 Base64 Decoding한 후, 데이터를 AES128로 복호화 하여 json처리를 하여 응답을 보내주게 되는 것이 기본 복호화 구조 입니다.

obj = json.loads(body)
k = get(obj, 'k')
# 암호화 처리에 대한 부분을 여기서 진행하도록 한다.
if k:
    # 암호화를 풀어서 스트링을 얻고 얻은 값을 obj에 넣어서 진행해 주도록 한다.
    decipyer = AES.new(settings.AES_KEY.encode("utf8"), AES.MODE_ECB)
    k = base64.b64decode(k)
    msg_dec = decipyer.decrypt(k)
    msg_dec = unpad(msg_dec, 16)
    obj = json.loads(msg_dec)
else:
    if settings.ENVIRONMENT == 'prod':
        raise HTTPException(status_code=404)
    else:
        pass

 

이때 Response에서 암호화할 때 주의할 점이 있습니다. Response는 'http.response.start'와 'http.response.body' 등 2가지로 나뉘어 보내는데 Start쪽에서 Header정보를 보내게 됩니다. 문제는 Header에 Content-Size 정보가 있어서 암호화를 해서 보내면 두 개의 값이 다르기 때문에 받는 쪽에서 오류가 발생하게 됩니다.

if self.need_cipyer or (self.path == '/time'):
    cipyer = AES.new(settings.AES_KEY.encode("utf8"), AES.MODE_ECB)
    msg_enc = pad(body, 16)
    msg_enc = cipyer.encrypt(msg_enc)
    k = base64.b64encode(msg_enc)
    
    message['body'] = k
    
    headers = MutableHeaders(raw=self.start_message["headers"])
    headers.update({'content-length':str(len(k))})
    await self.send(self.start_message)
        await self.send(message)

 

이 문제를 예방하기 위해, 쿡앱스 서버에서는 Start Message를 임시저장하고 암호화를 한 뒤, 이에 맞는 Header값을 보정합니다. 이어서 Start를 송신하고 끝으로 Body를 보내도록 처리하게 수정했습니다. 이때 Mutable Headers를 이용하여 가변적 헤더 사이즈를 변경하도록 처리했습니다.


 

두 번째, Network Recoder 로직 더하기
매번 Request와 Response가 처리되면 이를 DB에 기록할 수 있도록 다음과 같은 처리를 추가했습니다.

if settings.ENVIRONMENT != 'prod':
    try:
        recoder = NetworkRecoder()
        recoder.uid = self.uid
        recoder.url = self.path
        recoder.req_origin = self.strRequest
        recoder.status = self.status
        recoder.res_data = self.strResponse
        recoder.res_final = None
        recoder.latency = self.time - int(time.time())
        recoder.created = datetime.now()
        db = get_db().__next__()
        db.connection()
        db.add(recoder)
        db.commit()
    finally:
        db.close()

 

우선 Prod로 리얼 서버에 반영되지 않도록 막았습니다. 개발과 Stage 서버에만 적용되어야 하기 때문이었습니다. 이후 코드를 보면 DB를 얻어오기 위해서 'get_db'를 통해 Connection을 가지고 오고, 마지막으로 Close 처리하도록 구성했습니다.


 

세 번째, 데이터 압축 처리를 위한 로직 추가하기
대량 데이터 처리가 가능하도록, 데이터 압축을 먼저 진행하도록 구조를 마련했습니다. 암호화와 Base64 인코딩을 거친 후, Header에 변경 작업을 마치고 나서 응답을 전달하도록 구현했습니다.

if self.enable_compress:
      msg_enc = gzip.compress(body)
      cipyer = AES.new(settings.AES_KEY.encode("utf8"), AES.MODE_ECB)
      msg_enc = pad(msg_enc, 16)
      msg_enc = cipyer.encrypt(msg_enc)
      k = base64.b64encode(msg_enc)
      message['body'] = k
      headers = MutableHeaders(raw=self.start_message["headers"])
      headers.update({'content-length':str(len(k))})

 


 

네 번째, 사용자 IP 추적 및 등록을 위한 로직 추가하기
클라이언트에서 올라온 헤더 정보를 이용하여 사용자의 IP를 확인하는 로직은 추가했습니다.

for header in headers.raw:
  if header[0] == x:
      if header[1].decode('utf-8').find(',') >= 0:
          origin_ip, forward_ip = header[1].decode('utf-8').split(', ')
          headers['origin_ip'] = origin_ip
      else:
          origin_ip = header[1].decode('utf-8')
          headers['origin_ip'] = origin_ip

 


 

마치며

Fastapi라는 새로운 Framework를 도입하는 동시에 기존 서버의 장점을 유지해야 한다는 과제는 까다로운 작업이었습니다. 지면상 한계 때문에, 다루지 못한 시행착오도 여럿 겪었습니다. 그래서 이번 Fastapi Middleware 사용기는 통해 더 다양한 시야를 갖출 수 있었던 기회였습니다.


이 문서를 작성하는 지금도 쿡앱스의 서버는 진화하고 있습니다. 이 사용기를 통해 다른 서버 개발자분들이 시간을 아끼실 수 있도록, 앞으로 개발과정도 꾸준히 공유하겠습니다.