skyline-apiserver/skyline_apiserver/policy/base.py
yangshaoxue 1e4f72ca2b Add configurable policy files
Configurable policy files are added for services.

Change-Id: I53549fe80798bdd2e6ed3f92f548ed16393a269f
2023-10-31 13:58:22 +08:00

150 lines
5.2 KiB
Python

# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import MutableMapping
from pathlib import Path
from typing import Any, Dict, Iterator, List, Union
import attr
from immutables import Map
from keystoneauth1.access.access import AccessInfoV3
from oslo_policy import _cache_handler, _checks, policy
from skyline_apiserver.config import CONF
from skyline_apiserver.log import LOG
from .manager.base import APIRule, Rule
class UserContext(MutableMapping):
def __init__(
self,
access: AccessInfoV3,
):
self._data: Dict[str, Any] = {}
self.access = access
self._data.setdefault("auth_token", getattr(access, "auth_token", None))
self._data.setdefault("user_id", getattr(access, "user_id", None))
self._data.setdefault("project_id", getattr(access, "project_id", None))
self._data.setdefault("domain_id", getattr(access, "domain_id", None))
self._data.setdefault("user_domain_id", getattr(access, "user_domain_id", None))
self._data.setdefault("project_domain_id", getattr(access, "project_domain_id", None))
self._data.setdefault("username", getattr(access, "username", None))
self._data.setdefault("project_name", getattr(access, "project_name", None))
self._data.setdefault("domain_name", getattr(access, "domain_name", None))
self._data.setdefault("user_domain_name", getattr(access, "user_domain_name", None))
self._data.setdefault("project_domain_name", getattr(access, "project_domain_name", None))
self._data.setdefault(
"system_scope",
"all"
if getattr(access, "system") and getattr(access, "system", {}).get("all", False)
else "",
)
self._data.setdefault("role_ids", getattr(access, "role_ids", []))
self._data.setdefault("roles", getattr(access, "role_names", []))
is_admin = False
for role in CONF.openstack.system_admin_roles:
if role in self._data["roles"]:
is_admin = True
break
self._data.setdefault("is_admin", is_admin)
is_reader_admin = False
for role in CONF.openstack.system_reader_roles:
if role in self._data["roles"]:
is_reader_admin = True
break
self._data.setdefault("is_reader_admin", is_reader_admin)
def __getitem__(self, k: Any) -> Any:
return self._data[k]
def __setitem__(self, k: Any, v: Any) -> None:
self._data[k] = v
def __delitem__(self, k: Any) -> None:
del self._data[k]
def __iter__(self) -> Iterator[Any]:
return iter(self._data)
def __len__(self) -> int:
return len(self._data)
def __str__(self) -> str:
return self._data.__str__()
def __repr__(self) -> str:
return self._data.__repr__()
@attr.s(kw_only=True, repr=True, frozen=False, slots=True, auto_attribs=True)
class Enforcer:
service: str = attr.ib(repr=True, init=True)
rules: Map = attr.ib(factory=Map, repr=True, init=False)
file_rules: Dict[str, Any] = attr.ib(default={}, repr=True, init=True)
_file_cache: Dict[str, Any] = attr.ib(default={}, repr=True, init=True)
def load_rules(self) -> None:
path = Path(CONF.default.policy_file_path).joinpath(
str(self.service + "_" + CONF.default.policy_file_suffix)
)
if path.exists():
reloaded, data = _cache_handler.read_cached_file(
self._file_cache, path, force_reload=False
)
if reloaded or not self.file_rules:
self.file_rules = policy.Rules.load(data)
else:
self.file_rules = {}
def register_rules(self, rules: List[Union[Rule, APIRule]]) -> None:
rule_map = {}
for rule in rules:
if rule.name in rule_map:
raise ValueError(f"Duplicate policy rule {rule.name}.")
rule_map[rule.name] = rule.basic_check
self.rules = Map(rule_map)
def authorize(self, rule: str, target: Dict[str, Any], context: UserContext) -> bool:
try:
self.load_rules()
except Exception:
LOG.debug(f"Failed to load {self.service} rules.")
do_check = self.file_rules.get(rule) or self.rules.get(rule)
if do_check is None:
LOG.debug(f"Policy {rule} not registered.")
return False
try:
result = _checks._check(
rule=do_check,
target=target,
creds=context,
enforcer=self,
current_rule=rule,
)
except Exception:
result = False
return result