Spaces:
Sleeping
Sleeping
from typing import List, Optional | |
from litellm.caching import DualCache | |
from litellm.proxy._types import ( | |
KeyManagementRoutes, | |
LiteLLM_TeamTableCachedObj, | |
LiteLLM_VerificationToken, | |
LiteLLMRoutes, | |
LitellmUserRoles, | |
Member, | |
ProxyErrorTypes, | |
ProxyException, | |
UserAPIKeyAuth, | |
) | |
from litellm.proxy.auth.auth_checks import get_team_object | |
from litellm.proxy.auth.route_checks import RouteChecks | |
from litellm.proxy.utils import PrismaClient | |
DEFAULT_TEAM_MEMBER_PERMISSIONS = [ | |
KeyManagementRoutes.KEY_INFO, | |
KeyManagementRoutes.KEY_HEALTH, | |
] | |
class TeamMemberPermissionChecks: | |
def get_permissions_for_team_member( | |
team_member_object: Member, | |
team_table: LiteLLM_TeamTableCachedObj, | |
) -> List[KeyManagementRoutes]: | |
""" | |
Returns the permissions for a team member | |
""" | |
if team_table.team_member_permissions and isinstance( | |
team_table.team_member_permissions, list | |
): | |
return [ | |
KeyManagementRoutes(permission) | |
for permission in team_table.team_member_permissions | |
] | |
return DEFAULT_TEAM_MEMBER_PERMISSIONS | |
def _get_list_of_route_enum_as_str( | |
route_enum: List[KeyManagementRoutes], | |
) -> List[str]: | |
""" | |
Returns a list of the route enum as a list of strings | |
""" | |
return [route.value for route in route_enum] | |
async def can_team_member_execute_key_management_endpoint( | |
user_api_key_dict: UserAPIKeyAuth, | |
route: KeyManagementRoutes, | |
prisma_client: PrismaClient, | |
user_api_key_cache: DualCache, | |
existing_key_row: LiteLLM_VerificationToken, | |
): | |
""" | |
Main handler for checking if a team member can update a key | |
""" | |
from litellm.proxy.management_endpoints.key_management_endpoints import ( | |
_get_user_in_team, | |
) | |
# 1. Don't execute these checks if the user role is proxy admin | |
if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | |
return | |
# 2. Check if the operation is being done on a team key | |
if existing_key_row.team_id is None: | |
return | |
# 3. Get Team Object from DB | |
team_table = await get_team_object( | |
team_id=existing_key_row.team_id, | |
prisma_client=prisma_client, | |
user_api_key_cache=user_api_key_cache, | |
parent_otel_span=user_api_key_dict.parent_otel_span, | |
check_db_only=True, | |
) | |
# 4. Extract `Member` object from `team_table` | |
key_assigned_user_in_team = _get_user_in_team( | |
team_table=team_table, user_id=user_api_key_dict.user_id | |
) | |
# 5. Check if the team member has permissions for the endpoint | |
TeamMemberPermissionChecks.does_team_member_have_permissions_for_endpoint( | |
team_member_object=key_assigned_user_in_team, | |
team_table=team_table, | |
route=route, | |
) | |
def does_team_member_have_permissions_for_endpoint( | |
team_member_object: Optional[Member], | |
team_table: LiteLLM_TeamTableCachedObj, | |
route: str, | |
) -> Optional[bool]: | |
""" | |
Raises an exception if the team member does not have permissions for calling the endpoint for a team | |
""" | |
# permission checks only run for non-admin users | |
# Non-Admin user trying to access information about a team's key | |
if team_member_object is None: | |
return False | |
if team_member_object.role == "admin": | |
return True | |
_team_member_permissions = ( | |
TeamMemberPermissionChecks.get_permissions_for_team_member( | |
team_member_object=team_member_object, | |
team_table=team_table, | |
) | |
) | |
team_member_permissions = ( | |
TeamMemberPermissionChecks._get_list_of_route_enum_as_str( | |
_team_member_permissions | |
) | |
) | |
if not RouteChecks.check_route_access( | |
route=route, allowed_routes=team_member_permissions | |
): | |
raise ProxyException( | |
message=f"Team member does not have permissions for endpoint: {route}. You only have access to the following endpoints: {team_member_permissions} for team {team_table.team_id}", | |
type=ProxyErrorTypes.team_member_permission_error, | |
param=route, | |
code=401, | |
) | |
return True | |
async def user_belongs_to_keys_team( | |
user_api_key_dict: UserAPIKeyAuth, | |
existing_key_row: LiteLLM_VerificationToken, | |
) -> bool: | |
""" | |
Returns True if the user belongs to the team that the key is assigned to | |
""" | |
from litellm.proxy.management_endpoints.key_management_endpoints import ( | |
_get_user_in_team, | |
) | |
from litellm.proxy.proxy_server import prisma_client, user_api_key_cache | |
if existing_key_row.team_id is None: | |
return False | |
team_table = await get_team_object( | |
team_id=existing_key_row.team_id, | |
prisma_client=prisma_client, | |
user_api_key_cache=user_api_key_cache, | |
parent_otel_span=user_api_key_dict.parent_otel_span, | |
check_db_only=True, | |
) | |
# 4. Extract `Member` object from `team_table` | |
team_member_object = _get_user_in_team( | |
team_table=team_table, user_id=user_api_key_dict.user_id | |
) | |
return team_member_object is not None | |
def get_all_available_team_member_permissions() -> List[str]: | |
""" | |
Returns all available team member permissions | |
""" | |
all_available_permissions = [] | |
for route in LiteLLMRoutes.key_management_routes.value: | |
all_available_permissions.append(route.value) | |
return all_available_permissions | |
def default_team_member_permissions() -> List[str]: | |
return [route.value for route in DEFAULT_TEAM_MEMBER_PERMISSIONS] | |