d
WE ARE EXPERTS IN TECHNOLOGY

Let’s Work Together

n

StatusNeo

FastAPI : Deep Dive Into The Python Framework

In this post, we will be moving forward from the first part and see how to build a CRUD application using FastAPI, SQLAlchemy and MySQL.
Catch up on the FastAPI Introduction Part-1 if you haven’t read it before.

Objectives

  1. Database connection
  2. SQLAlchemy models
  3. CRUD operations 

Basic requirements

  1. Python 3.8.10
  2. Pip 20.0.2
  3. MySQL 8.0.28

I prefer using virtual environments for separate applications to install their related external dependencies. Click here to read more about it.

Dependencies:

  1. pip install fastapi
  2. pip install “uvicorn[standard]”
  3. pip install sqlalchemy
  4. pip install mysqlclient
  5. pip install fastapi_utils

Project Structure

— app
├── __init__.py
├── database.py
├── model.py
├── schemas.py
├── crud.py
├── exceptions.py
├── user_api.py
└── main.py

Database Queries

  1. Create a MySQL database
    create database fastapi;
    use fastapi;

  2. Create a user table
    create table car (user_id int AUTO_INCREMENT PRIMARY KEY, first_name varchar(30) NOT NULL, last_name varchar(30) NOT NULL, city varchar(30) NOT NULL, email varchar(30) NOT NULL, is_deleted DEFAULT 0);

Database connection

Configuring the SQLAlchemy to establish a connection with the MySQL database.

# database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# update user & password accordingly, may need to change the MySQL host & port
user = "user"
password = "password"
host = "127.0.0.1"
port = 3306
database = "fastapi"

DATABASE_URL = f"mysql+mysqldb://{user}:{password}@{host}:{port}/{database}"

db_engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)

Base = declarative_base()


def get_db():
    """
    Method to generate database session
    :return: Session
    """
    db = None
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()

What is SQLAlchemy?

SQLAlchemy is the Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL.

It provides a full suite of well-known enterprise-level persistence patterns, designed for efficient and high-performing database access, adapted into a simple and Pythonic domain language.

Read more about SQLAlchemy on their official site.

Data models using SQLAlchemy

The application only contains one model i.e. user. With the help of SQLAlchemy, we can define our model to interact with the table user

# model.py

from sqlalchemy.schema import Column
from sqlalchemy.types import String, Integer

from database import Base


class UserInfo(Base):
    __tablename__ = "user"

    user_id = Column(Integer, primary_key=True, index=True)
    first_name = Column(String)
    last_name = Column(String)
    email = Column(String)
    city = Column(String)


class GetUserInfo(UserInfo):
    is_deleted = Column(Integer)

Data Validation and Conversion

Define the pydantic models for the incoming and outgoing data validation and conversion of request handlers / API endpoints.

# schema.py

from typing import List, Optional
from pydantic import BaseModel


# POST and PUT APIs
class CreateAndUpdateUser(BaseModel):
    first_name: str
    last_name: str
    city: str
    email: str


# GET API
class User(CreateAndUpdateUser):
    user_id: int

    class Config:
        orm_mode = True


# List users API
class PaginatedUserInfo(BaseModel):
    limit: int
    offset: int
    data: List[User]


# DELETE API
class DeleteUser():
    msg: str

CRUD Operations

Define the methods to perform actual CRUD operations on the database. These methods will be used inside the REST API endpoints.

# crud.py

from typing import List
from sqlalchemy.orm import Session
from exceptions import UserInfoAlreadyExistError, UserInfoNotFoundError
from models import UserInfo, GetUserInfo
from schemas import CreateAndUpdateUser, DeleteUser


# Method to get list of user info
def get_all_users(session: Session, limit: int, offset: int) -> List[UserInfo]:
    return session.query(GetUserInfo).filter(GetUserInfo.is_deleted == 0).offset(offset).limit(limit).all()


# Method to  get info of a particular user
def get_user_info_by_id(session: Session, user_id: int) -> UserInfo:
    user_info = session.query(GetUserInfo).filter_by(user_id=user_id, is_deleted=0).first()

    if user_info is None:
        raise UserInfoNotFoundError

    return user_info


# Method to add a new user info to the database
def create_user(session: Session, user_info: CreateAndUpdateUser) -> UserInfo:
    user_details = session.query(GetUserInfo).filter(GetUserInfo.email == user_info.email).first()
    if user_details is not None:
        raise UserInfoAlreadyExistError

    new_user_info = UserInfo(**user_info.dict())
    session.add(new_user_info)
    session.commit()
    session.refresh(new_user_info)
    return new_user_info


# Method to update details of the user
def update_user_info(
    session: Session, user_id: int, info_update: CreateAndUpdateUser) -> UserInfo:
    user_info = get_user_info_by_id(session, user_id)

    if user_info is None:
        raise UserInfoNotFoundError

    user_info.first_name = info_update.first_name
    user_info.last_name = info_update.last_name
    user_info.email = info_update.email
    user_info.city = info_update.city

    session.commit()
    session.refresh(user_info)

    return user_info


# Method to delete a user info from the table
def delete_user_info(session: Session, user_id: int) -> DeleteUser:
    user_info = get_user_info_by_id(session, user_id)

    if user_info is None:
        raise UserInfoNotFoundError

    # for soft delete
    user_info.is_deleted = 1

    # for hard delete
    # session.delete(user_info)
    
    session.commit()

    return {"msg": "User deleted successfully"}

Defining custom exceptions to send proper HTTP responses and status codes.

# exceptions.py

class UserInfoException(Exception):
    ...


class UserInfoNotFoundError(UserInfoException):
    def __init__(self):
        self.status_code = 404
        self.detail = "User Info Not Found"


class UserInfoAlreadyExistError(UserInfoException):
    def __init__(self):
        self.status_code = 409
        self.detail = "User Info Already Exists"

API Implementation

Write your REST API endpoints for the user and define the routes as following

# user_api.py

from fastapi import APIRouter, Depends, HTTPException
from fastapi_utils.cbv import cbv
from sqlalchemy.orm import Session
from crud import (
    get_all_users,
    create_user,
    get_user_info_by_id,
    update_user_info,
    delete_user_info,
)
from database import get_db
from exceptions import UserInfoException
from schemas import User, CreateAndUpdateUser, PaginatedUserInfo

router = APIRouter()


@cbv(router)
class Users:
    session: Session = Depends(get_db)

    # API to get the list of user info
    @router.get("/users", response_model=PaginatedUserInfo)
    def list_users(self, limit: int = 10, offset: int = 0):

        users_list = get_all_users(self.session, limit, offset)
        return {"limit": limit, "offset": offset, "data": users_list}

    # API endpoint to add a user info to the database
    @router.post("/users")
    def add_user(self, user_info: CreateAndUpdateUser):

        try:
            user_info = create_user(self.session, user_info)
            return user_info
        except UserInfoException as cie:
            raise HTTPException(**cie.__dict__)


# API endpoint to get info of a particular user
@router.get("/users/{user_id}", response_model=User)
def get_user_info(user_id: int, session: Session = Depends(get_db)):

    try:
        user_info = get_user_info_by_id(session, user_id)
        return user_info
    except UserInfoException as cie:
        raise HTTPException(**cie.__dict__)


# API to update a existing user info
@router.put("/users/{user_id}", response_model=User)
def update_user(
    user_id: int, new_info: CreateAndUpdateUser, session: Session = Depends(get_db)
):

    try:
        user_info = update_user_info(session, user_id, new_info)
        return user_info
    except UserInfoException as cie:
        raise HTTPException(**cie.__dict__)


# API to delete a user info from the table
@router.delete("/users/{user_id}")
def delete_user(user_id: int, session: Session = Depends(get_db)):

    try:
        return delete_user_info(session, user_id)
    except UserInfoException as cie:
        raise HTTPException(**cie.__dict__)

Include the user API routes into your FastAPI app

from fastapi import FastAPI
import user_api

app = FastAPI()

app.include_router(user_api.router)


@app.get('/')
def root_method():
    return {"msg": "Welcome to Harshit's Blog"}

We are finished with writing the code and can hit the server to try out our endpoints. Open a terminal window inside the project directory and run the following command.

uvicorn --reload main:app

Output of your terminal should look like below:

Hit localhost:8000 on your web browser to see if your app is running fine. FastAPI generates automatic API documentation using Swagger. You can access the ReDoc by visiting localhost:8000/redoc and and can test these APIs using Swagger at localhost:8000/docs or using other API tools such as Postman.

Swagger Docs

Conclusion

  • In this part, we have covered the implementation of MySQL database connection using SQLAlchemy to our FastAPI app and simple CRUD operations on the database.

References

  1. FastAPI
  2. FastAPI Utils
  3. SQLAlchemy
  4. MySQL

Find the Github repository here!

Add Comment