Creación de una API REST con autenticación JWT, encriptación BCrypt, RethinkDB y cache con Valkey
Este manual detalla cómo construir un sistema de gestión para una clínica utilizando FastAPI, un framework moderno y rápido para construir APIs con Python.
Nota: Este tutorial asume que tienes conocimientos básicos de Python y conceptos de APIs REST.
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate # Windows
pip install fastapi uvicorn python-jose[cryptography] passlib bcrypt rethinkdb redis python-multipart
clinica_api/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database/
│ │ ├── __init__.py
│ │ ├── rethinkdb.py
│ │ └── valkey.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── paciente.py
│ │ ├── medico.py
│ │ └── cita.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── paciente.py
│ │ ├── medico.py
│ │ └── cita.py
│ ├── crud/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── paciente.py
│ │ ├── medico.py
│ │ └── cita.py
│ ├── auth/
│ │ ├── __init__.py
│ │ ├── jwt.py
│ │ └── dependencies.py
│ ├── utils/
│ │ ├── __init__.py
│ │ └── security.py
│ └── config.py
├── .env
├── requirements.txt
└── README.md
from pydantic import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "Clinica API"
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# RethinkDB
RETHINKDB_HOST: str
RETHINKDB_PORT: int = 28015
RETHINKDB_DB: str = "clinica"
# Valkey (Redis)
VALKEY_HOST: str = "localhost"
VALKEY_PORT: int = 6379
VALKEY_DB: int = 0
class Config:
env_file = ".env"
settings = Settings()
# Autenticación
SECRET_KEY=tu_super_secreto_aqui
# RethinkDB
RETHINKDB_HOST=localhost
RETHINKDB_PORT=28015
RETHINKDB_DB=clinica
# Valkey
VALKEY_HOST=localhost
VALKEY_PORT=6379
from fastapi import FastAPI
from app.config import settings
app = FastAPI(
title=settings.APP_NAME,
description="API para gestión de clínica médica",
version="0.1.0",
)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de la Clínica"}
uvicorn app.main:app --reload
Crea el archivo app/database/rethinkdb.py
:
import rethinkdb as r
from rethinkdb.errors import RqlRuntimeError, RqlDriverError
from app.config import settings
class RethinkDBManager:
def __init__(self):
self.conn = None
self.host = settings.RETHINKDB_HOST
self.port = settings.RETHINKDB_PORT
self.db_name = settings.RETHINKDB_DB
async def connect(self):
try:
self.conn = await r.connect(
host=self.host,
port=self.port,
db=self.db_name
)
return self.conn
except RqlDriverError as e:
print(f"Error de conexión a RethinkDB: {e}")
raise
async def close(self):
if self.conn is not None and self.conn.is_open():
await self.conn.close()
async def ensure_db_and_tables(self):
try:
# Verificar si la base de datos existe, si no, crearla
db_list = await r.db_list().run(self.conn)
if self.db_name not in db_list:
await r.db_create(self.db_name).run(self.conn)
# Usar la base de datos
self.conn.use(self.db_name)
# Tablas necesarias
tables = ['users', 'pacientes', 'medicos', 'citas']
existing_tables = await r.table_list().run(self.conn)
for table in tables:
if table not in existing_tables:
await r.table_create(table).run(self.conn)
# Crear índices para búsquedas eficientes
if table == 'users':
await r.table(table).index_create('email').run(self.conn)
elif table == 'pacientes':
await r.table(table).index_create('dni').run(self.conn)
elif table == 'medicos':
await r.table(table).index_create('especialidad').run(self.conn)
elif table == 'citas':
await r.table(table).index_create('paciente_id').run(self.conn)
await r.table(table).index_create('medico_id').run(self.conn)
await r.table(table).index_create('fecha').run(self.conn)
print("Base de datos y tablas verificadas/creadas correctamente")
except RqlRuntimeError as e:
print(f"Error al configurar la base de datos: {e}")
raise
rethinkdb_manager = RethinkDBManager()
Modifica main.py
:
from fastapi import FastAPI
from app.config import settings
from app.database.rethinkdb import rethinkdb_manager
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Al iniciar la aplicación
await rethinkdb_manager.connect()
await rethinkdb_manager.ensure_db_and_tables()
yield
# Al cerrar la aplicación
await rethinkdb_manager.close()
app = FastAPI(
title=settings.APP_NAME,
description="API para gestión de clínica médica",
version="0.1.0",
lifespan=lifespan
)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de la Clínica"}
Crea app/models/user.py
:
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
nombre: str
apellido: str
es_medico: bool = False
es_admin: bool = False
class UserCreate(UserBase):
password: str
class UserInDB(UserBase):
id: str
hashed_password: str
fecha_creacion: datetime
fecha_actualizacion: datetime
class Config:
orm_mode = True
class UserLogin(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
Crea app/utils/security.py
:
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
Crea app/auth/dependencies.py
:
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from app.utils.security import decode_token
from app.config import settings
from app.models.user import TokenData
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No se pudieron validar las credenciales",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
if payload is None:
raise credentials_exception
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
# Aquí deberías buscar el usuario en la base de datos por email
# user = await get_user_by_email(email=token_data.email)
# if user is None:
# raise credentials_exception
# return user
# Por ahora retornamos el email como ejemplo
return token_data.email
async def get_current_active_user(current_user: str = Depends(get_current_user)):
# Aquí podrías verificar si el usuario está activo
# if not current_user.is_active:
# raise HTTPException(status_code=400, detail="Usuario inactivo")
return current_user
async def get_current_admin_user(current_user: str = Depends(get_current_user)):
# Aquí deberías verificar si el usuario es admin
# if not current_user.es_admin:
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail="No tienes los permisos necesarios"
# )
# return current_user
# Por ahora retornamos el email como ejemplo
return current_user
Crea app/crud/user.py
:
from app.database.rethinkdb import rethinkdb_manager
from app.models.user import UserInDB, UserCreate
from app.utils.security import get_password_hash
from datetime import datetime
async def get_user_by_email(email: str) -> UserInDB:
cursor = await r.table("users").filter({"email": email}).run(rethinkdb_manager.conn)
users = await cursor.to_list(1)
if users:
return UserInDB(**users[0])
return None
async def create_user(user: UserCreate) -> UserInDB:
hashed_password = get_password_hash(user.password)
db_user = {
"email": user.email,
"nombre": user.nombre,
"apellido": user.apellido,
"es_medico": user.es_medico,
"es_admin": user.es_admin,
"hashed_password": hashed_password,
"fecha_creacion": datetime.now(),
"fecha_actualizacion": datetime.now()
}
result = await r.table("users").insert(db_user).run(rethinkdb_manager.conn)
if result["inserted"] == 1:
db_user["id"] = result["generated_keys"][0]
return UserInDB(**db_user)
raise Exception("Error al crear el usuario")
async def authenticate_user(email: str, password: str):
user = await get_user_by_email(email)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Crea app/auth/jwt.py
:
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from datetime import timedelta
from app.models.user import Token, UserLogin
from app.crud.user import authenticate_user
from app.utils.security import create_access_token
from app.config import settings
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register_user(user_data: UserLogin):
# Aquí deberías implementar la lógica de registro
# Por ahora solo es un ejemplo
return {"message": "Usuario registrado exitosamente"}
Actualiza main.py
:
from fastapi import FastAPI
from app.config import settings
from app.database.rethinkdb import rethinkdb_manager
from contextlib import asynccontextmanager
from app.auth.jwt import router as auth_router
@asynccontextmanager
async def lifespan(app: FastAPI):
await rethinkdb_manager.connect()
await rethinkdb_manager.ensure_db_and_tables()
yield
await rethinkdb_manager.close()
app = FastAPI(
title=settings.APP_NAME,
description="API para gestión de clínica médica",
version="0.1.0",
lifespan=lifespan
)
app.include_router(auth_router)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de la Clínica"}
Crea app/models/paciente.py
:
from pydantic import BaseModel
from datetime import date
from typing import Optional
class PacienteBase(BaseModel):
dni: str
nombre: str
apellido: str
fecha_nacimiento: date
genero: str
direccion: Optional[str] = None
telefono: Optional[str] = None
email: Optional[str] = None
historial_medico: Optional[str] = None
class PacienteCreate(PacienteBase):
pass
class PacienteInDB(PacienteBase):
id: str
fecha_creacion: str
fecha_actualizacion: str
class Config:
orm_mode = True
Crea app/models/medico.py
:
from pydantic import BaseModel
from typing import Optional
class MedicoBase(BaseModel):
nombre: str
apellido: str
especialidad: str
telefono: Optional[str] = None
email: Optional[str] = None
horario_atencion: Optional[str] = None
class MedicoCreate(MedicoBase):
pass
class MedicoInDB(MedicoBase):
id: str
fecha_creacion: str
fecha_actualizacion: str
class Config:
orm_mode = True
Crea app/models/cita.py
:
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class CitaBase(BaseModel):
paciente_id: str
medico_id: str
fecha: datetime
motivo: str
estado: str = "programada" # programada, completada, cancelada
notas: Optional[str] = None
class CitaCreate(CitaBase):
pass
class CitaInDB(CitaBase):
id: str
fecha_creacion: str
fecha_actualizacion: str
class Config:
orm_mode = True
Crea app/crud/paciente.py
:
from app.database.rethinkdb import rethinkdb_manager as r
from app.models.paciente import PacienteInDB, PacienteCreate
from datetime import datetime
from typing import List
async def create_paciente(paciente: PacienteCreate) -> PacienteInDB:
db_paciente = paciente.dict()
db_paciente.update({
"fecha_creacion": datetime.now(),
"fecha_actualizacion": datetime.now()
})
result = await r.table("pacientes").insert(db_paciente).run(r.conn)
if result["inserted"] == 1:
db_paciente["id"] = result["generated_keys"][0]
return PacienteInDB(**db_paciente)
raise Exception("Error al crear el paciente")
async def get_paciente(paciente_id: str) -> PacienteInDB:
paciente = await r.table("pacientes").get(paciente_id).run(r.conn)
if paciente:
return PacienteInDB(**paciente)
return None
async def get_paciente_by_dni(dni: str) -> PacienteInDB:
cursor = await r.table("pacientes").get_all(dni, index="dni").run(r.conn)
pacientes = await cursor.to_list(1)
if pacientes:
return PacienteInDB(**pacientes[0])
return None
async def get_pacientes(skip: int = 0, limit: int = 100) -> List[PacienteInDB]:
cursor = await r.table("pacientes").skip(skip).limit(limit).run(r.conn)
pacientes = await cursor.to_list()
return [PacienteInDB(**p) for p in pacientes]
async def update_paciente(paciente_id: str, paciente_data: dict) -> PacienteInDB:
paciente_data["fecha_actualizacion"] = datetime.now()
result = await r.table("pacientes").get(paciente_id).update(paciente_data).run(r.conn)
if result["replaced"] == 1:
return await get_paciente(paciente_id)
raise Exception("Error al actualizar el paciente")
async def delete_paciente(paciente_id: str) -> bool:
result = await r.table("pacientes").get(paciente_id).delete().run(r.conn)
return result["deleted"] == 1
Crea app/routers/paciente.py
:
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from app.models.paciente import PacienteInDB, PacienteCreate
from app.crud.paciente import (
create_paciente,
get_paciente,
get_pacientes,
update_paciente,
delete_paciente,
get_paciente_by_dni
)
from app.auth.dependencies import get_current_active_user
router = APIRouter(
prefix="/pacientes",
tags=["pacientes"],
dependencies=[Depends(get_current_active_user)]
)
@router.post("/", response_model=PacienteInDB, status_code=status.HTTP_201_CREATED)
async def crear_paciente(paciente: PacienteCreate):
# Verificar si el paciente ya existe por DNI
db_paciente = await get_paciente_by_dni(paciente.dni)
if db_paciente:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ya existe un paciente con este DNI"
)
return await create_paciente(paciente)
@router.get("/", response_model=List[PacienteInDB])
async def listar_pacientes(skip: int = 0, limit: int = 100):
return await get_pacientes(skip, limit)
@router.get("/{paciente_id}", response_model=PacienteInDB)
async def obtener_paciente(paciente_id: str):
db_paciente = await get_paciente(paciente_id)
if db_paciente is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Paciente no encontrado"
)
return db_paciente
@router.put("/{paciente_id}", response_model=PacienteInDB)
async def actualizar_paciente(paciente_id: str, paciente_data: dict):
db_paciente = await get_paciente(paciente_id)
if db_paciente is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Paciente no encontrado"
)
return await update_paciente(paciente_id, paciente_data)
@router.delete("/{paciente_id}", status_code=status.HTTP_204_NO_CONTENT)
async def eliminar_paciente(paciente_id: str):
db_paciente = await get_paciente(paciente_id)
if db_paciente is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Paciente no encontrado"
)
await delete_paciente(paciente_id)
return None
Crea app/database/valkey.py
:
import redis.asyncio as redis
from app.config import settings
from fastapi import Request, Response
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
class ValkeyManager:
def __init__(self):
self.redis = None
async def connect(self):
self.redis = redis.from_url(
f"redis://{settings.VALKEY_HOST}:{settings.VALKEY_PORT}/{settings.VALKEY_DB}"
)
FastAPICache.init(RedisBackend(self.redis), prefix="fastapi-cache")
return self.redis
async def close(self):
if self.redis:
await self.redis.close()
valkey_manager = ValkeyManager()
Actualiza main.py
:
from fastapi import FastAPI
from app.config import settings
from app.database.rethinkdb import rethinkdb_manager
from app.database.valkey import valkey_manager
from contextlib import asynccontextmanager
from app.auth.jwt import router as auth_router
from app.routers import paciente, medico, cita
@asynccontextmanager
async def lifespan(app: FastAPI):
await rethinkdb_manager.connect()
await rethinkdb_manager.ensure_db_and_tables()
await valkey_manager.connect()
yield
await rethinkdb_manager.close()
await valkey_manager.close()
app = FastAPI(
title=settings.APP_NAME,
description="API para gestión de clínica médica",
version="0.1.0",
lifespan=lifespan
)
app.include_router(auth_router)
app.include_router(paciente.router)
app.include_router(medico.router)
app.include_router(cita.router)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de la Clínica"}
Modifica app/routers/paciente.py
:
from fastapi_cache.decorator import cache
# ... (código anterior)
@router.get("/", response_model=List[PacienteInDB])
@cache(expire=60) # Cache por 60 segundos
async def listar_pacientes(skip: int = 0, limit: int = 100):
return await get_pacientes(skip, limit)
@router.get("/{paciente_id}", response_model=PacienteInDB)
@cache(expire=30)
async def obtener_paciente(paciente_id: str):
db_paciente = await get_paciente(paciente_id)
if db_paciente is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Paciente no encontrado"
)
return db_paciente
FastAPI genera automáticamente documentación interactiva:
En este manual hemos construido una API REST completa para la gestión de una clínica médica utilizando:
¡Felicidades! Ahora tienes una base sólida para seguir desarrollando y expandiendo tu API según las necesidades específicas de tu clínica.