AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 79175780
Accepted
mascai
mascai
Asked: 2024-11-11 04:48:32 +0800 CST2024-11-11 04:48:32 +0800 CST 2024-11-11 04:48:32 +0800 CST

keycloak 中的“无效令牌”

  • 772

我正在尝试在我的 FastAPI 应用程序中使用 keycloak 我的代码

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os

from .config import settings


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"


# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
    server_url=f"{KEYCLOAK_SERVER_URL}",
    client_id=KEYCLOAK_CLIENT_ID,
    realm_name=KEYCLOAK_REALM,
    client_secret_key=KEYCLOAK_CLIENT_SECRET,
    verify=False
)
config_well_known = keycloak_openid.well_known()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        decoded_token = keycloak_openid.decode_token(
            token,validate=True,
        )
        username = decoded_token['preferred_username']
        logger.info(f"Decoded token: {decoded_token}")
        # Verify the issuer claim
        issuer = decoded_token["iss"]

        expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
        I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': '[email protected]'}
        logger.info(f"XXX_ issuer={issuer}")
        logger.info(f"XXX_ expected_issuer={expected_issuer}")
        if issuer != expected_issuer:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
        
        logger.info(f"username: {username}")
        return decoded_token
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

在我的 main.py 中我有以下代码

from fastapi import Depends, FastAPI, HTTPException, status, Security


from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL

app = FastAPI()

@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
    ...


@app.post("/login")
def get_token(body: schemas.Login):
    data = {
        "grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
        "client_id": KEYCLOAK_CLIENT_ID,
        "client_secret": KEYCLOAK_CLIENT_SECRET,
        "password": body.password,
        "username": body.login
    }
    response = requests.post(TOKEN_URL, data=data,  verify=False)
    return JSONResponse(status_code=response.status_code, content=response.json())

我通过 /login 方法获取令牌然后我像这样应用令牌: 在此处输入图片描述

对于请求/project我有一个错误:

“无效令牌”

如何修复错误?

keycloak
  • 1 1 个回答
  • 41 Views

1 个回答

  • Voted
  1. Best Answer
    Bench Vue
    2024-11-11T08:08:02+08:002024-11-11T08:08:02+08:00

    这 3 个步骤可以验证 token

    Step 1:从 Keycloak 获取公钥证书来验证 JWT 令牌签名。

    Step 2:将证书转换为可用的公钥格式。

    Step 3:使用公钥解码并验证 JWT 令牌签名,同时跳过受众和发行者检查。

        # Step 1: Get public key for signature verification
        certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
        certs_response = requests.get(certs_url)
        certs_response.raise_for_status()
        public_key_data = certs_response.json()
        certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
        # Step 2: Convert certificate to public key
        cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
        public_key = cert.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    
        # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
        jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            options={
                'verify_aud': False,
                'verify_iss': False
            }
        )
    

    通过 Docker Compose 启动 Key cloak

    version: '3.8'
    
    services:
      postgres:
        image: postgres:15.6
        container_name: postgres_db
        restart: always
        ports:
          - "5432:5432"
        volumes:
          - postgres_data:/var/lib/postgresql/data
        environment:
          POSTGRES_DB: keycloak
          POSTGRES_USER: keycloak
          POSTGRES_PASSWORD: password
    
      keycloak_web:
        image: quay.io/keycloak/keycloak:26.0.5
        container_name: keycloak_web
        environment:
          KC_DB: postgres
          KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
          KC_DB_USERNAME: keycloak
          KC_DB_PASSWORD: password
    
          KC_HOSTNAME: localhost
          KC_HOSTNAME_STRICT: false
          KC_HOSTNAME_STRICT_HTTPS: false
    
          KC_LOG_LEVEL: info
          KC_METRICS_ENABLED: true
          KC_HEALTH_ENABLED: true
          KEYCLOAK_ADMIN: admin
          KEYCLOAK_ADMIN_PASSWORD: admin
        command: start-dev
        depends_on:
          - postgres
        ports:
          - 8080:8080
    
    volumes:
      postgres_data:
    

    创建领域和用户

    realm

    fastapi-realm
    

    user

    username: user1
    password: 1234
    

    在此处输入图片描述

    使用 conda 在 Python 3.12 中安装依赖项

    pip install fastapi uvicorn requests python-keycloak python-jose cryptography
    

    在此处输入图片描述

    FastAPI 服务器代码

    api-server.py

    from fastapi import FastAPI, HTTPException, Depends
    from pydantic import BaseModel
    from starlette.responses import JSONResponse
    from keycloak import KeycloakOpenID
    from jose import jwt, JWTError
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    
    import requests
    import json
    
    app = FastAPI()
    
    # Configuration variables
    KEYCLOAK_URL = 'http://localhost:8080'
    REALM = 'fastapi-realm'
    TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
    KEYCLOAK_CLIENT_ID = 'admin-cli'
    
    # Keycloak service class
    class KeycloakService:
        def __init__(self):
            self.keycloak_url = KEYCLOAK_URL
            self.realm = REALM
    
        def decode_and_print_token(self, token):
            try:
                # Retrieve claims without verifying the signature or audience
                decoded_token = jwt.get_unverified_claims(token)
                print("Decoded JWT token:")
                print(json.dumps(decoded_token, indent=4))
                return decoded_token
            except JWTError as e:
                print(f"Error decoding token: {e}")
                return None
            
        def validate_token(self, token: str, client_id: str) -> bool:
            decoded_token = self.decode_and_print_token(token)
            if not decoded_token:
                print("Failed to decode token.")
                return False
    
            try:
                # Extract issuer, azp, and aud directly from decoded token
                issuer = decoded_token.get('iss')
                authorized_party = decoded_token.get('azp')
                audience = decoded_token.get('aud')
    
                print("Issuer (iss) in token:", issuer)
                print("Authorized party (azp) in token:", authorized_party)
                print("Audience (aud) in token:", audience)
    
                expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
                if issuer != expected_issuer:
                    print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
                    return False
    
                # Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
                if audience is None:
                    print("Audience is None, assuming this is a user token without an audience.")
                elif audience not in ["account", client_id]:
                    print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
                    return False
    
                # Step 1: Get public key for signature verification
                certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
                certs_response = requests.get(certs_url)
                certs_response.raise_for_status()
                public_key_data = certs_response.json()
                certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
                # Step 2: Convert certificate to public key
                cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
                public_key = cert.public_key().public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo
                )
    
                # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
                jwt.decode(
                    token,
                    public_key,
                    algorithms=['RS256'],
                    options={
                        'verify_aud': False,
                        'verify_iss': False
                    }
                )
                print("Token signature is valid.")
                return True
            except JWTError as e:
                print(f"Error validating token: {e}")
                return False
            except requests.RequestException as e:
                print(f"Error fetching public key: {e}")
                return False
    
    keycloak_service = KeycloakService()
    
    # Schemas
    class Login(BaseModel):
        login: str
        password: str
    
    class VerifyToken(BaseModel):
        token: str
        client_id: str
    
    # Login endpoint
    @app.post("/login")
    def get_token(body: Login):
        data = {
            "grant_type": "password",
            "client_id": KEYCLOAK_CLIENT_ID,
            "username": body.login,
            "password": body.password
        }
        response = requests.post(TOKEN_URL, data=data, verify=False)
        if response.status_code == 200:
            return JSONResponse(status_code=200, content=response.json())
        else:
            raise HTTPException(status_code=response.status_code, detail=response.json())
    
    # Verify endpoint
    @app.post("/verify")
    def verify_token(body: VerifyToken):
        is_valid = keycloak_service.validate_token(body.token, body.client_id)
        return {"valid": is_valid}
    
    

    启动 FastAPI 服务器

    uvicorn api-server:app --reload
    

    在此处输入图片描述

    API document URL

    http://localhost:8000/docs#/
    

    在此处输入图片描述

    通过 Postman 获取 Token

    URL

    POST http://localhost:8000/login
    

    Input Body

    {
      "login": "user1",
      "password": "1234"
    }
    

    Script用于分配 user_token 变量

    var jsonData = JSON.parse(responseBody);
    
    if (jsonData.access_token) {
        postman.setEnvironmentVariable("user_token", jsonData.access_token);
    } else {
        console.error("access_token not found in the response");
    }
    

    在此处输入图片描述

    在此处输入图片描述

    在此处输入图片描述

    通过 Postman 验证 Token

    URL

    POST http://localhost:8000/verify
    

    Input Body

    {
      "token": "{{user_token}}",
      "client_id": "admin-cli"
    }
    

    在此处输入图片描述

    因此用户令牌验证通过,值为 OK(真)。

    令牌有效结果

    {
        "valid": true
    }
    
    • 1

相关问题

  • 客户端凭据是 unsupported_grant_type

  • 使用 Keycloak 作为登录提供商的 StackOverflow 返回空的电子邮件地址

  • Keycloak 令牌交换错误 - 客户端不在令牌受众范围内

  • 如何在keycloak中列出服务帐户用户

  • 在没有 admin-cli 的情况下通过 KeyCloak 中的 API 创建用户?

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    为什么这个简单而小的 Java 代码在所有 Graal JVM 上的运行速度都快 30 倍,但在任何 Oracle JVM 上却不行?

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    何时应使用 std::inplace_vector 而不是 std::vector?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Marko Smith

    我正在尝试仅使用海龟随机和数学模块来制作吃豆人游戏

    • 1 个回答
  • Martin Hope
    Aleksandr Dubinsky 为什么 InetAddress 上的 switch 模式匹配会失败,并出现“未涵盖所有可能的输入值”? 2024-12-23 06:56:21 +0800 CST
  • Martin Hope
    Phillip Borge 为什么这个简单而小的 Java 代码在所有 Graal JVM 上的运行速度都快 30 倍,但在任何 Oracle JVM 上却不行? 2024-12-12 20:46:46 +0800 CST
  • Martin Hope
    Oodini 具有指定基础类型但没有枚举器的“枚举类”的用途是什么? 2024-12-12 06:27:11 +0800 CST
  • Martin Hope
    sleeptightAnsiC `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它? 2024-11-09 07:18:53 +0800 CST
  • Martin Hope
    The Mad Gamer 何时应使用 std::inplace_vector 而不是 std::vector? 2024-10-29 23:01:00 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST
  • Martin Hope
    MarkB 为什么 GCC 生成有条件执行 SIMD 实现的代码? 2024-02-17 06:17:14 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve