AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79175780
Accepted
mascai
mascai
Asked: 2024-11-11 04:48:32 +0800 CST2024-11-11 04:48:32 +0800 CST 2024-11-11 04:48:32 +0800 CST

"Token inválido" no keycloak

  • 772

Estou tentando usar keycloak no meu aplicativo FastAPI Meu código

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os

from .config import settings


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"


# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
    server_url=f"{KEYCLOAK_SERVER_URL}",
    client_id=KEYCLOAK_CLIENT_ID,
    realm_name=KEYCLOAK_REALM,
    client_secret_key=KEYCLOAK_CLIENT_SECRET,
    verify=False
)
config_well_known = keycloak_openid.well_known()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        decoded_token = keycloak_openid.decode_token(
            token,validate=True,
        )
        username = decoded_token['preferred_username']
        logger.info(f"Decoded token: {decoded_token}")
        # Verify the issuer claim
        issuer = decoded_token["iss"]

        expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
        I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': '[email protected]'}
        logger.info(f"XXX_ issuer={issuer}")
        logger.info(f"XXX_ expected_issuer={expected_issuer}")
        if issuer != expected_issuer:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
        
        logger.info(f"username: {username}")
        return decoded_token
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

No meu main.py tenho o seguinte código

from fastapi import Depends, FastAPI, HTTPException, status, Security


from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL

app = FastAPI()

@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
    ...


@app.post("/login")
def get_token(body: schemas.Login):
    data = {
        "grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
        "client_id": KEYCLOAK_CLIENT_ID,
        "client_secret": KEYCLOAK_CLIENT_SECRET,
        "password": body.password,
        "username": body.login
    }
    response = requests.post(TOKEN_URL, data=data,  verify=False)
    return JSONResponse(status_code=response.status_code, content=response.json())

Estou obtendo o token com a ajuda do método /login. Então estou aplicando o token assim: insira a descrição da imagem aqui

Para solicitação /projecttenho um erro:

"Token inválido"

Como corrigir o erro?

keycloak
  • 1 1 respostas
  • 41 Views

1 respostas

  • Voted
  1. Best Answer
    Bench Vue
    2024-11-11T08:08:02+08:002024-11-11T08:08:02+08:00

    Estas 3 etapas podem verificar o token

    Step 1: Obtenha o certificado de chave pública do Keycloak para verificar a assinatura do token JWT.

    Step 2: Converta o certificado em um formato de chave pública utilizável.

    Step 3: Decodifique e verifique a assinatura do token JWT usando a chave pública, ignorando as verificações de público e emissor.

        # Step 1: Get public key for signature verification
        certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
        certs_response = requests.get(certs_url)
        certs_response.raise_for_status()
        public_key_data = certs_response.json()
        certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
        # Step 2: Convert certificate to public key
        cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
        public_key = cert.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    
        # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
        jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            options={
                'verify_aud': False,
                'verify_iss': False
            }
        )
    

    Iniciando o Key Cloak pelo Docker Compose

    version: '3.8'
    
    services:
      postgres:
        image: postgres:15.6
        container_name: postgres_db
        restart: always
        ports:
          - "5432:5432"
        volumes:
          - postgres_data:/var/lib/postgresql/data
        environment:
          POSTGRES_DB: keycloak
          POSTGRES_USER: keycloak
          POSTGRES_PASSWORD: password
    
      keycloak_web:
        image: quay.io/keycloak/keycloak:26.0.5
        container_name: keycloak_web
        environment:
          KC_DB: postgres
          KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
          KC_DB_USERNAME: keycloak
          KC_DB_PASSWORD: password
    
          KC_HOSTNAME: localhost
          KC_HOSTNAME_STRICT: false
          KC_HOSTNAME_STRICT_HTTPS: false
    
          KC_LOG_LEVEL: info
          KC_METRICS_ENABLED: true
          KC_HEALTH_ENABLED: true
          KEYCLOAK_ADMIN: admin
          KEYCLOAK_ADMIN_PASSWORD: admin
        command: start-dev
        depends_on:
          - postgres
        ports:
          - 8080:8080
    
    volumes:
      postgres_data:
    

    Criar reino e usuário

    realm

    fastapi-realm
    

    user

    username: user1
    password: 1234
    

    insira a descrição da imagem aqui

    Instalar dependência no Python 3.12 com conda

    pip install fastapi uvicorn requests python-keycloak python-jose cryptography
    

    insira a descrição da imagem aqui

    Código do servidor FastAPI

    api-server.py

    from fastapi import FastAPI, HTTPException, Depends
    from pydantic import BaseModel
    from starlette.responses import JSONResponse
    from keycloak import KeycloakOpenID
    from jose import jwt, JWTError
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    
    import requests
    import json
    
    app = FastAPI()
    
    # Configuration variables
    KEYCLOAK_URL = 'http://localhost:8080'
    REALM = 'fastapi-realm'
    TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
    KEYCLOAK_CLIENT_ID = 'admin-cli'
    
    # Keycloak service class
    class KeycloakService:
        def __init__(self):
            self.keycloak_url = KEYCLOAK_URL
            self.realm = REALM
    
        def decode_and_print_token(self, token):
            try:
                # Retrieve claims without verifying the signature or audience
                decoded_token = jwt.get_unverified_claims(token)
                print("Decoded JWT token:")
                print(json.dumps(decoded_token, indent=4))
                return decoded_token
            except JWTError as e:
                print(f"Error decoding token: {e}")
                return None
            
        def validate_token(self, token: str, client_id: str) -> bool:
            decoded_token = self.decode_and_print_token(token)
            if not decoded_token:
                print("Failed to decode token.")
                return False
    
            try:
                # Extract issuer, azp, and aud directly from decoded token
                issuer = decoded_token.get('iss')
                authorized_party = decoded_token.get('azp')
                audience = decoded_token.get('aud')
    
                print("Issuer (iss) in token:", issuer)
                print("Authorized party (azp) in token:", authorized_party)
                print("Audience (aud) in token:", audience)
    
                expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
                if issuer != expected_issuer:
                    print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
                    return False
    
                # Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
                if audience is None:
                    print("Audience is None, assuming this is a user token without an audience.")
                elif audience not in ["account", client_id]:
                    print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
                    return False
    
                # Step 1: Get public key for signature verification
                certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
                certs_response = requests.get(certs_url)
                certs_response.raise_for_status()
                public_key_data = certs_response.json()
                certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
                # Step 2: Convert certificate to public key
                cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
                public_key = cert.public_key().public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo
                )
    
                # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
                jwt.decode(
                    token,
                    public_key,
                    algorithms=['RS256'],
                    options={
                        'verify_aud': False,
                        'verify_iss': False
                    }
                )
                print("Token signature is valid.")
                return True
            except JWTError as e:
                print(f"Error validating token: {e}")
                return False
            except requests.RequestException as e:
                print(f"Error fetching public key: {e}")
                return False
    
    keycloak_service = KeycloakService()
    
    # Schemas
    class Login(BaseModel):
        login: str
        password: str
    
    class VerifyToken(BaseModel):
        token: str
        client_id: str
    
    # Login endpoint
    @app.post("/login")
    def get_token(body: Login):
        data = {
            "grant_type": "password",
            "client_id": KEYCLOAK_CLIENT_ID,
            "username": body.login,
            "password": body.password
        }
        response = requests.post(TOKEN_URL, data=data, verify=False)
        if response.status_code == 200:
            return JSONResponse(status_code=200, content=response.json())
        else:
            raise HTTPException(status_code=response.status_code, detail=response.json())
    
    # Verify endpoint
    @app.post("/verify")
    def verify_token(body: VerifyToken):
        is_valid = keycloak_service.validate_token(body.token, body.client_id)
        return {"valid": is_valid}
    
    

    Iniciando o servidor FastAPI

    uvicorn api-server:app --reload
    

    insira a descrição da imagem aqui

    API document URL

    http://localhost:8000/docs#/
    

    insira a descrição da imagem aqui

    Obter Token pelo Carteiro

    URL

    POST http://localhost:8000/login
    

    Input Body

    {
      "login": "user1",
      "password": "1234"
    }
    

    Scriptpara atribuir variável user_token

    var jsonData = JSON.parse(responseBody);
    
    if (jsonData.access_token) {
        postman.setEnvironmentVariable("user_token", jsonData.access_token);
    } else {
        console.error("access_token not found in the response");
    }
    

    insira a descrição da imagem aqui

    insira a descrição da imagem aqui

    insira a descrição da imagem aqui

    Verificar Token pelo Postman

    URL

    POST http://localhost:8000/verify
    

    Input Body

    {
      "token": "{{user_token}}",
      "client_id": "admin-cli"
    }
    

    insira a descrição da imagem aqui

    Então o token do usuário foi verificado com valor OK (verdadeiro).

    Resultado válido do token

    {
        "valid": true
    }
    
    • 1

relate perguntas

  • client-credentials é unsupported_grant_type

  • StackOverflow com Keycloak como provedor de login retorna endereço de e-mail vazio

  • Erro de troca de token Keycloak - O cliente não está dentro do público do token

  • Como listar usuários de contas de serviço no keycloak

  • Criar usuário via API no KeyCloak sem admin-cli?

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle?

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Quando devo usar um std::inplace_vector em vez de um std::vector?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Marko Smith

    Estou tentando fazer o jogo pacman usando apenas o módulo Turtle Random e Math

    • 1 respostas
  • Martin Hope
    Aleksandr Dubinsky Por que a correspondência de padrões com o switch no InetAddress falha com 'não cobre todos os valores de entrada possíveis'? 2024-12-23 06:56:21 +0800 CST
  • Martin Hope
    Phillip Borge Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle? 2024-12-12 20:46:46 +0800 CST
  • Martin Hope
    Oodini Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores? 2024-12-12 06:27:11 +0800 CST
  • Martin Hope
    sleeptightAnsiC `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso? 2024-11-09 07:18:53 +0800 CST
  • Martin Hope
    The Mad Gamer Quando devo usar um std::inplace_vector em vez de um std::vector? 2024-10-29 23:01:00 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST
  • Martin Hope
    MarkB Por que o GCC gera código que executa condicionalmente uma implementação SIMD? 2024-02-17 06:17:14 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve