chore: Merge skyline-policy-manager into skyline-apiserver

1. merge skyline-policy-manager into skyline-apiserver
2. move tests into skyline-apiserver
3. we will remove skyline-policy-manager after we merge
skyline-nginx into skyline-apiserver

Change-Id: Id57598f3726c891e399af81eb0383746f78386b5
This commit is contained in:
zhu.boxiang 2022-05-16 17:12:54 +08:00
parent 6a1bdd4fb4
commit ba249b0741
33 changed files with 16122 additions and 177 deletions

View File

@ -26,6 +26,7 @@ venv:
install: venv
poetry run pip install -U pip setuptools'<58.0.0'
poetry install -vvv
tools/post_install.sh
.PHONY: package
@ -42,7 +43,7 @@ fmt:
.PHONY: lint
lint:
# poetry run mypy --config-file=../../mypy.ini $(PY_FILES)
# poetry run mypy --strict --config-file=../../mypy.ini $(PY_FILES)
poetry run isort --check-only --diff $(PY_FILES)
poetry run black --check --diff --color --config ../../pyproject.toml $(PY_FILES)
poetry run flake8 --config ../../.flake8 $(PY_FILES)

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,7 @@ pymysql = "0.9.3"
aiosqlite = "0.17.0"
dnspython = "2.1.0"
loguru = "0.5.3"
click = "7.1.2"
python-keystoneclient = "3.21.0"
python-cinderclient = "5.0.2"
python-glanceclient = "2.17.1"
@ -33,7 +34,7 @@ python-novaclient = "15.1.1"
python-octaviaclient = "1.10.1"
osc-placement = "1.7.0"
keystoneauth1 = "3.17.4"
skyline-policy-manager = "*"
"oslo.policy" = "3.8.2"
[tool.poetry.dev-dependencies]
isort = "5.9.3"
@ -47,14 +48,15 @@ pytest-asyncio = "0.15.1"
pytest-cov = "2.12.1"
pytest-html = "3.1.1"
mimesis = "4.1.3"
click = "7.1.2"
asgi-lifespan = "1.0.1"
types-PyYAML = "5.4.10"
skyline-policy-manager = {path = "../skyline-policy-manager", develop = true}
"oslo.log" = "4.8.0"
neutron-lib = "2.15.0"
[tool.poetry.scripts]
swagger-generator = 'skyline_apiserver.cmd.generate_swagger:main'
config-sample-generator = 'skyline_apiserver.cmd.generate_sample_config:main'
skyline-policy-manager = "skyline_apiserver.cmd.policy_manager:main"
[tool.pytest.ini_options]
minversion = "6.0"

View File

@ -18,7 +18,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from skyline_apiserver import schemas
from skyline_apiserver.api import deps
from skyline_apiserver.client.utils import generate_session, get_access
from skyline_apiserver.policies import ENFORCER, UserContext
from skyline_apiserver.policy import ENFORCER, UserContext
from skyline_apiserver.schemas import Policies, PoliciesRules, common
router = APIRouter()

View File

@ -0,0 +1,291 @@
# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
from importlib import metadata
from logging import StreamHandler
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Union
import click
from oslo_policy.policy import DocumentedRuleDefault, RuleDefault # type: ignore
from skyline_apiserver.log import LOG, setup as log_setup
from skyline_apiserver.policy.manager import get_service_rules
from skyline_apiserver.policy.manager.base import APIRule, Rule
from skyline_apiserver.types import constants
DEBUG = False
OSRules = Iterable[Union[DocumentedRuleDefault, RuleDefault]]
def load_list_rules_funcs(
namespace: str,
service_eps: Dict[str, List[str]],
) -> Dict[str, Callable[[], OSRules]]:
eps = set(metadata.entry_points()[namespace])
supported_eps = set()
for ep_names in service_eps.values():
supported_eps.update(ep_names)
return {ep.name: ep.load() for ep in eps if ep.name in supported_eps}
def load_list_rules_func(namespace: str, service_ep: str) -> Union[None, Callable[[], OSRules]]:
eps = set(metadata.entry_points()[namespace])
for ep in eps:
if ep.name == service_ep:
return ep.load()
return None
def comparison_rules(
service: str,
rule: Union[Rule, APIRule],
os_rule: Union[Rule, APIRule],
) -> None:
if isinstance(rule, APIRule) and isinstance(os_rule, APIRule):
if rule.scope_types != os_rule.scope_types:
LOG.error(
f'\nService "{service}" rule "{rule.name}" scope_types is {rule.scope_types},\n'
f"which is different from os_rule {os_rule.scope_types}.\n",
)
if rule.operations != os_rule.operations:
LOG.error(
f'\nService "{service}" rule "{rule.name}" operations is {rule.operations},\n'
f"which is different from os_rule {os_rule.operations}.\n",
)
elif (isinstance(rule, Rule) and isinstance(os_rule, APIRule)) or (
isinstance(rule, APIRule) and isinstance(os_rule, Rule)
):
LOG.warning(
f'\nService "{service}" rule "{rule.name}" is {rule.__class__},\n'
f"which is different from os_rule {os_rule.__class__}.\n",
)
elif isinstance(rule, Rule) and isinstance(os_rule, Rule):
pass
else:
LOG.error(f'\nService "{service}" rule "{rule.name}" is unknown class type.\n')
@click.group(name="skyline-policy-manager", help="Policy manager command line.")
@click.option("--debug", is_flag=True, default=False, help="Output more info.")
def policy_manager(debug: bool) -> None:
global DEBUG
DEBUG = debug
log_setup(StreamHandler(), debug=DEBUG, colorize=True, level="INFO")
@click.command(help="Generate sample policy yaml file.")
@click.option("--dir", help='Directory of policy file.(default: "./tmp")', default="./tmp")
def generate_sample(dir: str) -> None:
list_rules_funcs = load_list_rules_funcs(constants.POLICY_NS, constants.SUPPORTED_SERVICE_EPS)
rule_map = {}
for service, eps in constants.SUPPORTED_SERVICE_EPS.items():
rules = []
api_rules = []
for ep in eps:
ep_rules = list_rules_funcs.get(ep, lambda: [])()
for rule in ep_rules:
if isinstance(rule, DocumentedRuleDefault):
api_rules.append(APIRule.from_oslo(rule))
elif isinstance(rule, RuleDefault):
rules.append(Rule.from_oslo(rule))
rule_map[service] = {"rules": rules, "api_rules": api_rules}
for service, item in rule_map.items():
dir_path = Path(dir).joinpath(service)
dir_path.mkdir(mode=0o755, parents=True, exist_ok=True)
file_path = dir_path.joinpath("policy.yaml.sample")
with open(file_path, "w") as f:
f.write(f"{'#' * 20}\n# {service}\n{'#' * 20}\n\n")
for rule in item.get("rules", []):
f.writelines(rule.format_into_yaml())
for rule in item.get("api_rules", []):
f.writelines(rule.format_into_yaml())
LOG.info("Generate sample policy successful")
@click.command(help="Generate policy yaml file.")
@click.option("--dir", help='Directory of policy file.(default: "./tmp")', default="./tmp")
@click.option("--desc", help="Description of the generated policy file.", default="")
def generate_conf(dir: str, desc: str) -> None:
for service, rules in get_service_rules().items():
dir_path = Path(dir).joinpath(service)
dir_path.mkdir(mode=0o755, parents=True, exist_ok=True)
file_path = dir_path.joinpath("policy.yaml")
with open(file_path, "w") as f:
f.write(f"{'#' * 20}\n# {service}\n{'#' * 20}\n")
f.write(f"# {desc}\n\n")
for rule in rules:
rule_yaml = rule.format_into_yaml()
if service in constants.PREFIX_MAPPINGS:
rule_yaml = rule_yaml.replace(constants.PREFIX_MAPPINGS[service], "")
f.writelines(rule_yaml)
LOG.info("Generate policy successful")
@click.command(help="Generate service rule code.")
@click.argument("entry_point")
def generate_rule(entry_point: str) -> None:
ep_rules_func = load_list_rules_func(constants.POLICY_NS, entry_point)
if ep_rules_func is None:
raise Exception(
f"Not found entry point '{entry_point}' in oslo.policy.policies namespace.",
)
ep_rules = [item for item in ep_rules_func()]
rules = []
api_rules = []
for rule in ep_rules:
if isinstance(rule, DocumentedRuleDefault):
api_rules.append(APIRule.from_oslo(rule))
elif isinstance(rule, RuleDefault):
rules.append(Rule.from_oslo(rule))
header_str = """
from . import base
list_rules = ("""
print(header_str)
rule_format_str = (
" base.Rule(\n"
" name={name},\n"
" check_str=({check_str}),\n"
" description={description},\n"
" ),"
)
rule_mappings = {}
for r in rules:
rule_mappings[f"rule:{r.name}"] = r.check_str
print(
rule_format_str.format(
name=json.dumps(r.name),
check_str=json.dumps(r.check_str),
description=json.dumps(r.description),
),
)
apirule_format_str = (
" base.APIRule(\n"
" name={name},\n"
" check_str=({check_str}),\n"
" description={description},\n"
" scope_types={scope_types},\n"
" operations={operations},\n"
" ),"
)
for r in api_rules:
name = constants.PREFIX_MAPPINGS.get(entry_point, "") + r.name
check_str = r.check_str
tries = 0
while "rule:" in check_str:
tries += 1
for k, v in rule_mappings.items():
if k + " " in check_str or check_str.endswith(k):
check_str = check_str.replace(k, f"({v})")
elif "(" + k + ")" in check_str:
check_str = check_str.replace(k, v)
if tries > 10:
raise Exception(f"Can't replace rule name in {r.name}")
# Fix for Trove, replace 'project_id:%(tenant)s' with 'project_id:%(project_id)s'
if entry_point == "trove":
check_str = check_str.replace("project_id:%(tenant)s", "project_id:%(project_id)s")
print(
apirule_format_str.format(
name=json.dumps(name),
check_str=json.dumps(check_str),
description=json.dumps(r.description),
scope_types=json.dumps(r.scope_types),
operations=json.dumps(r.operations),
),
)
footer_str = """)
__all__ = ("list_rules",)
"""
print(footer_str)
LOG.info("Generate service rule code successful")
@click.command(help="Validate all policy rules.")
@click.option("--diff", help="Output policy rule diff info.", is_flag=True, default=False)
def validate(diff: bool) -> None:
list_rules_funcs = load_list_rules_funcs(constants.POLICY_NS, constants.SUPPORTED_SERVICE_EPS)
os_rule_map = {}
for service, eps in constants.SUPPORTED_SERVICE_EPS.items():
service_rules = {}
for ep in eps:
ep_rules = list_rules_funcs.get(ep, lambda: [])()
for rule in ep_rules:
if rule.name in service_rules:
LOG.error(
f'Service "{service}" entry point "{ep}" has duplicate rules '
f'"{rule.name}", please check source code of {service} service.',
)
if isinstance(rule, DocumentedRuleDefault):
service_rules[rule.name] = APIRule.from_oslo(rule)
elif isinstance(rule, RuleDefault):
service_rules[rule.name] = Rule.from_oslo(rule)
if not service_rules:
LOG.warning(
f'Service "{service}" does not load any rules, please check whether the '
f"service package is installed (pip list).",
)
os_rule_map[service] = service_rules
for service, rules in get_service_rules().items():
for r in rules:
os_rule = os_rule_map.get(service, {}).get(r.name)
if os_rule is None:
LOG.warning(
f'Rule "{r.name}" is not found in service "{service}", if it\'s deprecated, '
f"please remove.",
)
else:
if diff:
LOG.info(
f'\nService "{service}" rule "{r.name}" compare results:\n'
f'{"OpenStack":10}: {os_rule.check_str}\n{"Custom":10}: {r.check_str}\n',
)
comparison_rules(service, r, os_rule)
unmanaged_rules = set(os_rule_map.get(service, {}).keys()) - set(
[r.name for r in rules],
)
for r in unmanaged_rules:
LOG.error(f"Rule {r} is unmanaged, please add it in '{service}' service")
LOG.info("Validate policy completed")
def main() -> None:
policy_manager.add_command(generate_sample)
policy_manager.add_command(generate_conf)
policy_manager.add_command(generate_rule)
policy_manager.add_command(validate)
policy_manager()

View File

@ -21,7 +21,7 @@ from skyline_apiserver.api.v1 import api_router
from skyline_apiserver.config import CONF, configure
from skyline_apiserver.db import setup as db_setup
from skyline_apiserver.log import LOG, setup as log_setup
from skyline_apiserver.policies import setup as policies_setup
from skyline_apiserver.policy import setup as policies_setup
from starlette.middleware.cors import CORSMiddleware
PROJECT_NAME = "Skyline API"

View File

@ -14,10 +14,9 @@
from __future__ import annotations
from skyline_policy_manager.policies import get_service_rules
from skyline_policy_manager.policies.base import APIRule
from .base import Enforcer, UserContext
from .manager import get_service_rules
from .manager.base import APIRule
ENFORCER = Enforcer()

View File

@ -22,7 +22,8 @@ from immutables import Map
from keystoneauth1.access.access import AccessInfoV3
from oslo_policy._checks import _check
from skyline_apiserver.config import CONF
from skyline_policy_manager.policies.base import APIRule
from .manager.base import APIRule
class UserContext(MutableMapping):

View File

@ -0,0 +1,38 @@
# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from importlib import import_module
from os import path
from pkgutil import iter_modules
from typing import Dict, List, Union
from .base import APIRule, Rule
LIST_RULES_FUNC_NAME = "list_rules"
def get_service_rules() -> Dict[str, List[Union[Rule, APIRule]]]:
service_rules = {}
current_path = path.dirname(path.abspath(__file__))
for m in iter_modules(path=[current_path]):
if m.name in ["base"] or m.ispkg:
continue
module = import_module(f"{__package__}.{m.name}")
service_rules[m.name] = getattr(module, LIST_RULES_FUNC_NAME, [])
return service_rules
__all__ = ("get_service_rules",)

View File

@ -0,0 +1,120 @@
# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import List
from oslo_policy import _parser # type: ignore
from oslo_policy.policy import DocumentedRuleDefault, RuleDefault # type: ignore
from skyline_apiserver.schemas.policy_manager import Operation, OperationsSchema, ScopeTypesSchema
class Rule:
def __init__(
self,
name: str,
check_str: str,
description: str,
basic_check_str: str = "",
) -> None:
self.name = name
self.check_str = check_str
self.check = _parser.parse_rule(self.check_str)
self.description = description or "No description"
self.basic_check_str = basic_check_str or self.check_str
self.basic_check = _parser.parse_rule(self.basic_check_str)
def __str__(self) -> str:
return f'"{self.name}": "{self.check_str}"'
def __repr__(self) -> str:
return f"{self.__class__.__qualname__}(name='{self.name}', check_str='{self.check_str}')"
def __eq__(self, other: object) -> bool:
if isinstance(other, Rule) and isinstance(self, Rule):
return (self.name, self.check_str) == (other.name, other.check_str)
return False
def format_into_yaml(self) -> str:
desc = f"# {self.description}\n"
text = f"{desc}{str(self)}\n\n"
return text
@classmethod
def from_oslo(cls, rule: RuleDefault):
description = rule.description or ""
description = description.replace("\n", "\n#")
return cls(name=rule.name, check_str=rule.check_str, description=description)
class APIRule(Rule):
def __init__(
self,
name: str,
check_str: str,
description: str,
scope_types: List[str],
operations: List[Operation],
basic_check_str: str = "",
) -> None:
super().__init__(name, check_str, description, basic_check_str)
ScopeTypesSchema.parse_obj(scope_types)
self.scope_types = scope_types
OperationsSchema.parse_obj(operations)
self.operations = operations
def format_into_yaml(self) -> str:
op_list = [
f'# {operation.get("method"):8}{operation.get("path")}\n'
for operation in self.operations
]
op = "".join(op_list)
scope = f"# Intended scope(s): {self.scope_types}\n"
desc = f"# {self.description}\n"
text = f"{desc}{op}{scope}{str(self)}\n\n"
return text
@classmethod
def from_oslo(cls, rule: DocumentedRuleDefault):
description = rule.description or ""
description = description.replace("\n", "\n#")
if isinstance(rule.scope_types, list):
scope_types = [item for item in rule.scope_types]
else:
scope_types = ["project"]
operations = []
for operation in rule.operations:
method = operation.get("method")
if isinstance(method, list):
for i in method:
operations.append(Operation(method=i.upper(), path=operation.get("path", "")))
elif isinstance(method, str):
operations.append(
Operation(method=method.upper(), path=operation.get("path", "")),
)
else:
operations.append(Operation(method="GET", path=operation.get("path", "")))
return cls(
name=rule.name,
check_str=rule.check_str,
description=description,
scope_types=scope_types,
operations=operations,
)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,372 @@
# flake8: noqa
from . import base
list_rules = (
base.Rule(
name="default",
check_str=(""),
description="Defines the default rule used for policies that historically had an empty policy in the supplied policy.json file.",
),
base.Rule(
name="context_is_admin",
check_str=("role:admin"),
description="Defines the rule for the is_admin:True check.",
),
base.Rule(
name="manage_image_cache",
check_str=("role:admin"),
description="Manage image cache",
),
base.Rule(
name="metadef_default",
check_str=(""),
description="No description",
),
base.Rule(
name="metadef_admin",
check_str=("role:admin"),
description="No description",
),
base.Rule(
name="get_metadef_namespace",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="get_metadef_namespaces",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="modify_metadef_namespace",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="add_metadef_namespace",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="delete_metadef_namespace",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="get_metadef_object",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="get_metadef_objects",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="modify_metadef_object",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="add_metadef_object",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="delete_metadef_object",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="list_metadef_resource_types",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="get_metadef_resource_type",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="add_metadef_resource_type_association",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="remove_metadef_resource_type_association",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="get_metadef_property",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="get_metadef_properties",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="modify_metadef_property",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="add_metadef_property",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="remove_metadef_property",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="get_metadef_tag",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="get_metadef_tags",
check_str=("rule:metadef_default"),
description="No description",
),
base.Rule(
name="modify_metadef_tag",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="add_metadef_tag",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="add_metadef_tags",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="delete_metadef_tag",
check_str=("rule:metadef_admin"),
description="No description",
),
base.Rule(
name="delete_metadef_tags",
check_str=("rule:metadef_admin"),
description="No description",
),
base.APIRule(
name="add_image",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Create new image",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/images"}],
),
base.APIRule(
name="delete_image",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Deletes the image",
scope_types=["system", "project"],
operations=[{"method": "DELETE", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="get_image",
check_str=(
'role:admin or (role:reader and (project_id:%(project_id)s or project_id:%(member_id)s or "community":%(visibility)s or "public":%(visibility)s))'
),
basic_check_str=("role:admin or role:reader or role:admin or role:member or role:reader"),
description="Get specified image",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="get_images",
check_str=("role:admin or (role:reader and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:reader or role:admin or role:member or role:reader"),
description="Get all available images",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images"}],
),
base.APIRule(
name="modify_image",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Updates given image",
scope_types=["system", "project"],
operations=[{"method": "PATCH", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="publicize_image",
check_str=("role:admin"),
basic_check_str=("role:admin"),
description="Publicize given image",
scope_types=["system", "project"],
operations=[{"method": "PATCH", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="communitize_image",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("!"),
description="Communitize given image",
scope_types=["system", "project"],
operations=[{"method": "PATCH", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="download_image",
check_str=(
'role:admin or (role:member and (project_id:%(project_id)s or project_id:%(member_id)s or "community":%(visibility)s or "public":%(visibility)s))'
),
basic_check_str=("role:admin or role:admin or role:member"),
description="Downloads given image",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images/{image_id}/file"}],
),
base.APIRule(
name="upload_image",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Uploads data to specified image",
scope_types=["system", "project"],
operations=[{"method": "PUT", "path": "/v2/images/{image_id}/file"}],
),
base.APIRule(
name="delete_image_location",
check_str=("role:admin"),
basic_check_str=("role:admin"),
description="Deletes the location of given image",
scope_types=["system", "project"],
operations=[{"method": "PATCH", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="get_image_location",
check_str=("role:admin or (role:reader and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:reader or role:admin or role:member or role:reader"),
description="Reads the location of the image",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="set_image_location",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin"),
description="Sets location URI to given image",
scope_types=["system", "project"],
operations=[{"method": "PATCH", "path": "/v2/images/{image_id}"}],
),
base.APIRule(
name="add_member",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Create image member",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/images/{image_id}/members"}],
),
base.APIRule(
name="delete_member",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Delete image member",
scope_types=["system", "project"],
operations=[{"method": "DELETE", "path": "/v2/images/{image_id}/members/{member_id}"}],
),
base.APIRule(
name="get_member",
check_str=("role:admin or (role:reader and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:reader or role:admin or role:member or role:reader"),
description="Show image member details",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images/{image_id}/members/{member_id}"}],
),
base.APIRule(
name="get_members",
check_str=("role:admin or (role:reader and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:reader or role:admin or role:member or role:reader"),
description="List image members",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/images/{image_id}/members"}],
),
base.APIRule(
name="modify_member",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Update image member",
scope_types=["system", "project"],
operations=[{"method": "PUT", "path": "/v2/images/{image_id}/members/{member_id}"}],
),
base.APIRule(
name="deactivate",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Deactivate image",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/images/{image_id}/actions/deactivate"}],
),
base.APIRule(
name="reactivate",
check_str=("role:admin or (role:member and project_id:%(project_id)s)"),
basic_check_str=("role:admin or role:admin or role:member"),
description="Reactivate image",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/images/{image_id}/actions/reactivate"}],
),
base.APIRule(
name="copy_image",
check_str=("role:admin"),
basic_check_str=("@"),
description="Copy existing image to other stores",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/images/{image_id}/import"}],
),
base.APIRule(
name="get_task",
check_str=("rule:default"),
basic_check_str=("!"),
description="Get an image task.\n#\n#This granular policy controls access to tasks, both from the tasks API as well\n#as internal locations in Glance that use tasks (like import). Practically this\n#cannot be more restrictive than the policy that controls import or things will\n#break, and changing it from the default is almost certainly not what you want.\n#Access to the external tasks API should be restricted as desired by the\n#tasks_api_access policy. This may change in the future.\n#",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/tasks/{task_id}"}],
),
base.APIRule(
name="get_tasks",
check_str=("rule:default"),
basic_check_str=("!"),
description="List tasks for all images.\n#\n#This granular policy controls access to tasks, both from the tasks API as well\n#as internal locations in Glance that use tasks (like import). Practically this\n#cannot be more restrictive than the policy that controls import or things will\n#break, and changing it from the default is almost certainly not what you want.\n#Access to the external tasks API should be restricted as desired by the\n#tasks_api_access policy. This may change in the future.\n#",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/tasks"}],
),
base.APIRule(
name="add_task",
check_str=("rule:default"),
basic_check_str=("!"),
description="List tasks for all images.\n#\n#This granular policy controls access to tasks, both from the tasks API as well\n#as internal locations in Glance that use tasks (like import). Practically this\n#cannot be more restrictive than the policy that controls import or things will\n#break, and changing it from the default is almost certainly not what you want.\n#Access to the external tasks API should be restricted as desired by the\n#tasks_api_access policy. This may change in the future.\n#",
scope_types=["system", "project"],
operations=[{"method": "POST", "path": "/v2/tasks"}],
),
base.APIRule(
name="modify_task",
check_str=("rule:default"),
basic_check_str=("!"),
description="This policy is not used.",
scope_types=["system", "project"],
operations=[{"method": "DELETE", "path": "/v2/tasks/{task_id}"}],
),
base.APIRule(
name="tasks_api_access",
check_str=("role:admin"),
basic_check_str=("!"),
description="\n#This is a generic blanket policy for protecting all task APIs. It is not\n#granular and will not allow you to separate writable and readable task\n#operations into different roles.\n#",
scope_types=["system", "project"],
operations=[
{"method": "GET", "path": "/v2/tasks/{task_id}"},
{"method": "GET", "path": "/v2/tasks"},
{"method": "POST", "path": "/v2/tasks"},
{"method": "DELETE", "path": "/v2/tasks/{task_id}"},
],
),
)
__all__ = ("list_rules",)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,860 @@
# flake8: noqa
from . import base
list_rules = (
base.Rule(
name="system-admin",
check_str=("role:admin and system_scope:all"),
description="No description",
),
base.Rule(
name="system-reader",
check_str=("role:reader and system_scope:all"),
description="No description",
),
base.Rule(
name="project-member",
check_str=("role:member and project_id:%(project_id)s"),
description="No description",
),
base.Rule(
name="project-reader",
check_str=("role:reader and project_id:%(project_id)s"),
description="No description",
),
base.Rule(
name="context_is_admin",
check_str=("role:load-balancer_admin or rule:system-admin"),
description="No description",
),
base.Rule(
name="load-balancer:owner",
check_str=("project_id:%(project_id)s"),
description="No description",
),
base.Rule(
name="load-balancer:observer_and_owner",
check_str=("role:load-balancer_observer and rule:project-reader"),
description="No description",
),
base.Rule(
name="load-balancer:global_observer",
check_str=("role:load-balancer_global_observer or rule:system-reader"),
description="No description",
),
base.Rule(
name="load-balancer:member_and_owner",
check_str=("role:load-balancer_member and rule:project-member"),
description="No description",
),
base.Rule(
name="load-balancer:admin",
check_str=("is_admin:True or role:load-balancer_admin or rule:system-admin"),
description="No description",
),
base.Rule(
name="load-balancer:read",
check_str=(
"rule:load-balancer:observer_and_owner or rule:load-balancer:global_observer or rule:load-balancer:member_and_owner or rule:load-balancer:admin"
),
description="No description",
),
base.Rule(
name="load-balancer:read-global",
check_str=("rule:load-balancer:global_observer or rule:load-balancer:admin"),
description="No description",
),
base.Rule(
name="load-balancer:write",
check_str=("rule:load-balancer:member_and_owner or rule:load-balancer:admin"),
description="No description",
),
base.Rule(
name="load-balancer:read-quota",
check_str=(
"rule:load-balancer:observer_and_owner or rule:load-balancer:global_observer or rule:load-balancer:member_and_owner or role:load-balancer_quota_admin or rule:load-balancer:admin"
),
description="No description",
),
base.Rule(
name="load-balancer:read-quota-global",
check_str=(
"rule:load-balancer:global_observer or role:load-balancer_quota_admin or rule:load-balancer:admin"
),
description="No description",
),
base.Rule(
name="load-balancer:write-quota",
check_str=("role:load-balancer_quota_admin or rule:load-balancer:admin"),
description="No description",
),
base.APIRule(
name="os_load-balancer_api:flavor:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Flavors",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/flavors"}],
),
base.APIRule(
name="os_load-balancer_api:flavor:post",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Create a Flavor",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2.0/lbaas/flavors"}],
),
base.APIRule(
name="os_load-balancer_api:flavor:put",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Update a Flavor",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2.0/lbaas/flavors/{flavor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:flavor:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Flavor details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/flavors/{flavor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:flavor:delete",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Remove a Flavor",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2.0/lbaas/flavors/{flavor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:flavor-profile:get_all",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="List Flavor Profiles",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/flavorprofiles"}],
),
base.APIRule(
name="os_load-balancer_api:flavor-profile:post",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Create a Flavor Profile",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2.0/lbaas/flavorprofiles"}],
),
base.APIRule(
name="os_load-balancer_api:flavor-profile:put",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Update a Flavor Profile",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2.0/lbaas/flavorprofiles/{flavor_profile_id}"}],
),
base.APIRule(
name="os_load-balancer_api:flavor-profile:get_one",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="Show Flavor Profile details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/flavorprofiles/{flavor_profile_id}"}],
),
base.APIRule(
name="os_load-balancer_api:flavor-profile:delete",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Remove a Flavor Profile",
scope_types=["project"],
operations=[
{"method": "DELETE", "path": "/v2.0/lbaas/flavorprofiles/{flavor_profile_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Availability Zones",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/availabilityzones"}],
),
base.APIRule(
name="os_load-balancer_api:availability-zone:post",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Create an Availability Zone",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2.0/lbaas/availabilityzones"}],
),
base.APIRule(
name="os_load-balancer_api:availability-zone:put",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Update an Availability Zone",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v2.0/lbaas/availabilityzones/{availability_zone_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Availability Zone details",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v2.0/lbaas/availabilityzones/{availability_zone_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone:delete",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Remove an Availability Zone",
scope_types=["project"],
operations=[
{"method": "DELETE", "path": "/v2.0/lbaas/availabilityzones/{availability_zone_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone-profile:get_all",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="List Availability Zones",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2.0/lbaas/availabilityzoneprofiles"}],
),
base.APIRule(
name="os_load-balancer_api:availability-zone-profile:post",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Create an Availability Zone",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2.0/lbaas/availabilityzoneprofiles"}],
),
base.APIRule(
name="os_load-balancer_api:availability-zone-profile:put",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Update an Availability Zone",
scope_types=["project"],
operations=[
{
"method": "PUT",
"path": "/v2.0/lbaas/availabilityzoneprofiles/{availability_zone_profile_id}",
},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone-profile:get_one",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="Show Availability Zone details",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v2.0/lbaas/availabilityzoneprofiles/{availability_zone_profile_id}",
},
],
),
base.APIRule(
name="os_load-balancer_api:availability-zone-profile:delete",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Remove an Availability Zone",
scope_types=["project"],
operations=[
{
"method": "DELETE",
"path": "/v2.0/lbaas/availabilityzoneprofiles/{availability_zone_profile_id}",
},
],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Health Monitors of a Pool",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/healthmonitors"}],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:get_all-global",
check_str=("rule:load-balancer:read-global"),
basic_check_str=("role:admin or role:reader"),
description="List Health Monitors including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/healthmonitors"}],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a Health Monitor",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/healthmonitors"}],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Health Monitor details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/healthmonitors/{healthmonitor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a Health Monitor",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/healthmonitors/{healthmonitor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:healthmonitor:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a Health Monitor",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/healthmonitors/{healthmonitor_id}"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List L7 Policys",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/l7policies"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:get_all-global",
check_str=("rule:load-balancer:read-global"),
basic_check_str=("role:admin or role:reader"),
description="List L7 Policys including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/l7policies"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a L7 Policy",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/l7policies"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show L7 Policy details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/l7policies/{l7policy_id}"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a L7 Policy",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/l7policies/{l7policy_id}"}],
),
base.APIRule(
name="os_load-balancer_api:l7policy:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a L7 Policy",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/l7policies/{l7policy_id}"}],
),
base.APIRule(
name="os_load-balancer_api:l7rule:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List L7 Rules",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/l7policies/{l7policy_id}/rules"}],
),
base.APIRule(
name="os_load-balancer_api:l7rule:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a L7 Rule",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/l7policies/{l7policy_id}/rules"}],
),
base.APIRule(
name="os_load-balancer_api:l7rule:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show L7 Rule details",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v2/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:l7rule:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a L7 Rule",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v2/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:l7rule:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a L7 Rule",
scope_types=["project"],
operations=[
{"method": "DELETE", "path": "/v2/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:listener:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Listeners",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/listeners"}],
),
base.APIRule(
name="os_load-balancer_api:listener:get_all-global",
check_str=("rule:load-balancer:read-global"),
basic_check_str=("role:admin or role:reader"),
description="List Listeners including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/listeners"}],
),
base.APIRule(
name="os_load-balancer_api:listener:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a Listener",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/listeners"}],
),
base.APIRule(
name="os_load-balancer_api:listener:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Listener details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/listeners/{listener_id}"}],
),
base.APIRule(
name="os_load-balancer_api:listener:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a Listener",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/listeners/{listener_id}"}],
),
base.APIRule(
name="os_load-balancer_api:listener:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a Listener",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/listeners/{listener_id}"}],
),
base.APIRule(
name="os_load-balancer_api:listener:get_stats",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Listener statistics",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/listeners/{listener_id}/stats"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Load Balancers",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/loadbalancers"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:get_all-global",
check_str=("rule:load-balancer:read-global"),
basic_check_str=("role:admin or role:reader"),
description="List Load Balancers including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/loadbalancers"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a Load Balancer",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/loadbalancers"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Load Balancer details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a Load Balancer",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a Load Balancer",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:get_stats",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Load Balancer statistics",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}/stats"}],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:get_status",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Load Balancer status",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}/status"},
],
),
base.APIRule(
name="os_load-balancer_api:loadbalancer:put_failover",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Failover a Load Balancer",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v2/lbaas/loadbalancers/{loadbalancer_id}/failover"},
],
),
base.APIRule(
name="os_load-balancer_api:member:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Members of a Pool",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/pools/{pool_id}/members"}],
),
base.APIRule(
name="os_load-balancer_api:member:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a Member",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/pools/{pool_id}/members"}],
),
base.APIRule(
name="os_load-balancer_api:member:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Member details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/pools/{pool_id}/members/{member_id}"}],
),
base.APIRule(
name="os_load-balancer_api:member:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a Member",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/pools/{pool_id}/members/{member_id}"}],
),
base.APIRule(
name="os_load-balancer_api:member:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a Member",
scope_types=["project"],
operations=[
{"method": "DELETE", "path": "/v2/lbaas/pools/{pool_id}/members/{member_id}"},
],
),
base.APIRule(
name="os_load-balancer_api:pool:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Pools",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/pools"}],
),
base.APIRule(
name="os_load-balancer_api:pool:get_all-global",
check_str=("rule:load-balancer:read-global"),
basic_check_str=("role:admin or role:reader"),
description="List Pools including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/pools"}],
),
base.APIRule(
name="os_load-balancer_api:pool:post",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Create a Pool",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v2/lbaas/pools"}],
),
base.APIRule(
name="os_load-balancer_api:pool:get_one",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Pool details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/pools/{pool_id}"}],
),
base.APIRule(
name="os_load-balancer_api:pool:put",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Update a Pool",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/pools/{pool_id}"}],
),
base.APIRule(
name="os_load-balancer_api:pool:delete",
check_str=("rule:load-balancer:write"),
basic_check_str=(
"role:admin or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s"
),
description="Remove a Pool",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/pools/{pool_id}"}],
),
base.APIRule(
name="os_load-balancer_api:provider:get_all",
check_str=("rule:load-balancer:read"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List enabled providers",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/providers"}],
),
base.APIRule(
name="os_load-balancer_api:quota:get_all",
check_str=("rule:load-balancer:read-quota"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="List Quotas",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/quotas"}],
),
base.APIRule(
name="os_load-balancer_api:quota:get_all-global",
check_str=("rule:load-balancer:read-quota-global"),
basic_check_str=("role:admin or role:reader"),
description="List Quotas including resources owned by others",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/quotas"}],
),
base.APIRule(
name="os_load-balancer_api:quota:get_one",
check_str=("rule:load-balancer:read-quota"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Quota details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/quotas/{project_id}"}],
),
base.APIRule(
name="os_load-balancer_api:quota:put",
check_str=("rule:load-balancer:write-quota"),
basic_check_str=("role:admin"),
description="Update a Quota",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/lbaas/quotas/{project_id}"}],
),
base.APIRule(
name="os_load-balancer_api:quota:delete",
check_str=("rule:load-balancer:write-quota"),
basic_check_str=("role:admin"),
description="Reset a Quota",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/lbaas/quotas/{project_id}"}],
),
base.APIRule(
name="os_load-balancer_api:quota:get_defaults",
check_str=("rule:load-balancer:read-quota"),
basic_check_str=(
"role:admin or role:reader or role:admin and project_id:%(project_id)s or role:member and project_id:%(project_id)s or role:reader and project_id:%(project_id)s"
),
description="Show Default Quota for a Project",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/lbaas/quotas/{project_id}/default"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:get_all",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="List Amphorae",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/octavia/amphorae"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:get_one",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="Show Amphora details",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/octavia/amphorae/{amphora_id}"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:delete",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Delete an Amphora",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v2/octavia/amphorae/{amphora_id}"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:put_config",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Update Amphora Agent Configuration",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/octavia/amphorae/{amphora_id}/config"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:put_failover",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin"),
description="Failover Amphora",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v2/octavia/amphorae/{amphora_id}/failover"}],
),
base.APIRule(
name="os_load-balancer_api:amphora:get_stats",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="Show Amphora statistics",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v2/octavia/amphorae/{amphora_id}/stats"}],
),
base.APIRule(
name="os_load-balancer_api:provider-flavor:get_all",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="List the provider flavor capabilities.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v2/lbaas/providers/{provider}/flavor_capabilities"},
],
),
base.APIRule(
name="os_load-balancer_api:provider-availability-zone:get_all",
check_str=("rule:load-balancer:admin"),
basic_check_str=("role:admin or role:reader"),
description="List the provider availability zone capabilities.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v2/lbaas/providers/{provider}/availability_zone_capabilities",
},
],
),
)
__all__ = ("list_rules",)

View File

@ -0,0 +1,40 @@
# flake8: noqa
from . import base
list_rules = (
base.Rule(
name="context_is_admin",
check_str=("role:admin"),
description="No description",
),
base.APIRule(
name="segregation",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="Return the user and project the requestshould be limited to",
scope_types=["system"],
operations=[
{"method": "GET", "path": "/v2/events"},
{"method": "GET", "path": "/v2/events/{message_id}"},
],
),
base.APIRule(
name="telemetry:events:index",
check_str=(""),
basic_check_str=("@"),
description="Return all events matching the query filters.",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/events"}],
),
base.APIRule(
name="telemetry:events:show",
check_str=(""),
basic_check_str=("@"),
description="Return a single event with the given message id.",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/v2/events/{message_id}"}],
),
)
__all__ = ("list_rules",)

View File

@ -0,0 +1,290 @@
# flake8: noqa
from . import base
list_rules = (
base.Rule(
name="admin_api",
check_str=("role:admin"),
description="Default rule for most placement APIs.",
),
base.APIRule(
name="placement:resource_providers:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource providers.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers"}],
),
base.APIRule(
name="placement:resource_providers:create",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Create resource provider.",
scope_types=["system"],
operations=[{"method": "POST", "path": "/resource_providers"}],
),
base.APIRule(
name="placement:resource_providers:show",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="Show resource provider.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}"}],
),
base.APIRule(
name="placement:resource_providers:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update resource provider.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/resource_providers/{uuid}"}],
),
base.APIRule(
name="placement:resource_providers:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete resource provider.",
scope_types=["system"],
operations=[{"method": "DELETE", "path": "/resource_providers/{uuid}"}],
),
base.APIRule(
name="placement:resource_classes:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource classes.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_classes"}],
),
base.APIRule(
name="placement:resource_classes:create",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Create resource class.",
scope_types=["system"],
operations=[{"method": "POST", "path": "/resource_classes"}],
),
base.APIRule(
name="placement:resource_classes:show",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="Show resource class.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_classes/{name}"}],
),
base.APIRule(
name="placement:resource_classes:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update resource class.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/resource_classes/{name}"}],
),
base.APIRule(
name="placement:resource_classes:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete resource class.",
scope_types=["system"],
operations=[{"method": "DELETE", "path": "/resource_classes/{name}"}],
),
base.APIRule(
name="placement:resource_providers:inventories:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource provider inventories.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}/inventories"}],
),
base.APIRule(
name="placement:resource_providers:inventories:create",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Create one resource provider inventory.",
scope_types=["system"],
operations=[{"method": "POST", "path": "/resource_providers/{uuid}/inventories"}],
),
base.APIRule(
name="placement:resource_providers:inventories:show",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="Show resource provider inventory.",
scope_types=["system"],
operations=[
{"method": "GET", "path": "/resource_providers/{uuid}/inventories/{resource_class}"},
],
),
base.APIRule(
name="placement:resource_providers:inventories:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update resource provider inventory.",
scope_types=["system"],
operations=[
{"method": "PUT", "path": "/resource_providers/{uuid}/inventories"},
{"method": "PUT", "path": "/resource_providers/{uuid}/inventories/{resource_class}"},
],
),
base.APIRule(
name="placement:resource_providers:inventories:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete resource provider inventory.",
scope_types=["system"],
operations=[
{"method": "DELETE", "path": "/resource_providers/{uuid}/inventories"},
{
"method": "DELETE",
"path": "/resource_providers/{uuid}/inventories/{resource_class}",
},
],
),
base.APIRule(
name="placement:resource_providers:aggregates:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource provider aggregates.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}/aggregates"}],
),
base.APIRule(
name="placement:resource_providers:aggregates:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update resource provider aggregates.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/resource_providers/{uuid}/aggregates"}],
),
base.APIRule(
name="placement:resource_providers:usages",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource provider usages.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}/usages"}],
),
base.APIRule(
name="placement:usages",
check_str=(
"(role:reader and system_scope:all) or (role:reader and project_id:%(project_id)s)"
),
basic_check_str=("role:admin or role:reader"),
description="List total resource usages for a given project.",
scope_types=["system", "project"],
operations=[{"method": "GET", "path": "/usages"}],
),
base.APIRule(
name="placement:traits:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List traits.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/traits"}],
),
base.APIRule(
name="placement:traits:show",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="Show trait.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/traits/{name}"}],
),
base.APIRule(
name="placement:traits:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update trait.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/traits/{name}"}],
),
base.APIRule(
name="placement:traits:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete trait.",
scope_types=["system"],
operations=[{"method": "DELETE", "path": "/traits/{name}"}],
),
base.APIRule(
name="placement:resource_providers:traits:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource provider traits.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}/traits"}],
),
base.APIRule(
name="placement:resource_providers:traits:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update resource provider traits.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/resource_providers/{uuid}/traits"}],
),
base.APIRule(
name="placement:resource_providers:traits:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete resource provider traits.",
scope_types=["system"],
operations=[{"method": "DELETE", "path": "/resource_providers/{uuid}/traits"}],
),
base.APIRule(
name="placement:allocations:manage",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Manage allocations.",
scope_types=["system"],
operations=[{"method": "POST", "path": "/allocations"}],
),
base.APIRule(
name="placement:allocations:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List allocations.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/allocations/{consumer_uuid}"}],
),
base.APIRule(
name="placement:allocations:update",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Update allocations.",
scope_types=["system"],
operations=[{"method": "PUT", "path": "/allocations/{consumer_uuid}"}],
),
base.APIRule(
name="placement:allocations:delete",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Delete allocations.",
scope_types=["system"],
operations=[{"method": "DELETE", "path": "/allocations/{consumer_uuid}"}],
),
base.APIRule(
name="placement:resource_providers:allocations:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List resource provider allocations.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/resource_providers/{uuid}/allocations"}],
),
base.APIRule(
name="placement:allocation_candidates:list",
check_str=("role:reader and system_scope:all"),
basic_check_str=("role:admin or role:reader"),
description="List allocation candidates.",
scope_types=["system"],
operations=[{"method": "GET", "path": "/allocation_candidates"}],
),
base.APIRule(
name="placement:reshaper:reshape",
check_str=("role:admin and system_scope:all"),
basic_check_str=("role:admin"),
description="Reshape Inventory and Allocations.",
scope_types=["system"],
operations=[{"method": "POST", "path": "/reshaper"}],
),
)
__all__ = ("list_rules",)

View File

@ -0,0 +1,756 @@
from . import base
list_rules = (
base.Rule(
name="admin",
check_str=("role:admin or is_admin:True"),
description="Must be an administrator.",
),
base.Rule(
name="admin_or_owner",
check_str=("rule:admin or project_id:%(tenant)s"),
description="Must be an administrator or owner of the object.",
),
base.Rule(
name="default",
check_str=("rule:admin_or_owner"),
description="Must be an administrator or owner of the object.",
),
base.APIRule(
name="trove:instance:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a database instance.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/instances"}],
),
base.APIRule(
name="trove:instance:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a database instance.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/instances/{instance_id}"}],
),
base.APIRule(
name="trove:instance:force_delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Forcibly delete a database instance.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/instances/{instance_id}"}],
),
base.APIRule(
name="trove:instance:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List database instances.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/instances"}],
),
base.APIRule(
name="trove:instance:detail",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List database instances with details.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/instances/detail"}],
),
base.APIRule(
name="trove:instance:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get details of a specific database instance.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}"}],
),
base.APIRule(
name="trove:instance:update",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Update a database instance to attach/detach configuration",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v1.0/{account_id}/instances/{instance_id}"},
{"method": "POST", "path": "/v1.0/{account_id}/instances"},
],
),
base.APIRule(
name="trove:instance:edit",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Updates the instance to set or unset one or more attributes.",
scope_types=["project"],
operations=[{"method": "PATCH", "path": "/v1.0/{account_id}/instances/{instance_id}"}],
),
base.APIRule(
name="trove:instance:restart",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Restart a database instance.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (restart)",
},
],
),
base.APIRule(
name="trove:instance:resize_volume",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Resize a database instance volume.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (resize)",
},
],
),
base.APIRule(
name="trove:instance:resize_flavor",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Resize a database instance flavor.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (resize)",
},
],
),
base.APIRule(
name="trove:instance:reset_status",
check_str=("(role:admin or is_admin:True)"),
description="Reset the status of a database instance to ERROR.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (reset_status)",
},
],
),
base.APIRule(
name="trove:instance:promote_to_replica_source",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Promote instance to replica source.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (promote_to_replica_source)", # noqa
},
],
),
base.APIRule(
name="trove:instance:eject_replica_source",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Eject the replica source from its replica set.",
scope_types=["project"],
operations=[
{
"method": "POST",
"path": "/v1.0/{account_id}/instances/{instance_id}/action (eject_replica_source)",
},
],
),
base.APIRule(
name="trove:instance:configuration",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get the default configuration template applied to the instance.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/configuration"},
],
),
base.APIRule(
name="trove:instance:guest_log_list",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get all informations about all logs of a database instance.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/log"}],
),
base.APIRule(
name="trove:instance:backups",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get all backups of a database instance.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/backups"},
],
),
base.APIRule(
name="trove:instance:module_list",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations about modules on a database instance.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/modules"},
],
),
base.APIRule(
name="trove:instance:module_apply",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Apply modules to a database instance.",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/instances/{instance_id}/modules"},
{"method": "POST", "path": "/v1.0/{account_id}/instances"},
],
),
base.APIRule(
name="trove:instance:module_remove",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Remove a module from a database instance.",
scope_types=["project"],
operations=[
{
"method": "DELETE",
"path": "/v1.0/{account_id}/instances/{instance_id}/modules/{module_id}",
},
],
),
base.APIRule(
name="trove:instance:extension:root:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Enable the root user of a database instance.",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/instances/{instance_id}/root"},
],
),
base.APIRule(
name="trove:instance:extension:root:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Disable the root user of a database instance.",
scope_types=["project"],
operations=[
{"method": "DELETE", "path": "/v1.0/{account_id}/instances/{instance_id}/root"},
],
),
base.APIRule(
name="trove:instance:extension:root:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Show whether the root user of a database instance has been ever enabled.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/root"}],
),
base.APIRule(
name="trove:cluster:extension:root:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Enable the root user of the instances in a cluster.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/clusters/{cluster}/root"}],
),
base.APIRule(
name="trove:cluster:extension:root:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Enable the root user of the instances in a cluster.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/clusters/{cluster}/root"}],
),
base.APIRule(
name="trove:cluster:extension:root:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Disable the root of the instances in a cluster.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/clusters/{cluster}/root"}],
),
base.APIRule(
name="trove:instance:extension:user:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create users for a database instance.",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/instances/{instance_id}/users"},
{"method": "POST", "path": "/v1.0/{account_id}/instances"},
],
),
base.APIRule(
name="trove:instance:extension:user:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a user from a database instance.",
scope_types=["project"],
operations=[
{
"method": "DELETE",
"path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}",
},
],
),
base.APIRule(
name="trove:instance:extension:user:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get all users of a database instance.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/users"},
],
),
base.APIRule(
name="trove:instance:extension:user:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get the information of a single user of a database instance.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}"},
],
),
base.APIRule(
name="trove:instance:extension:user:update",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Update attributes for a user of a database instance.",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}"},
],
),
base.APIRule(
name="trove:instance:extension:user:update_all",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Update the password for one or more users a database instance.",
scope_types=["project"],
operations=[
{"method": "PUT", "path": "/v1.0/{account_id}/instances/{instance_id}/users"},
],
),
base.APIRule(
name="trove:instance:extension:user_access:update",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Grant access for a user to one or more databases.",
scope_types=["project"],
operations=[
{
"method": "PUT",
"path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}/databases",
},
],
),
base.APIRule(
name="trove:instance:extension:user_access:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Revoke access for a user to a databases.",
scope_types=["project"],
operations=[
{
"method": "DELETE",
"path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}/databases/{database}", # noqa
},
],
),
base.APIRule(
name="trove:instance:extension:user_access:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get permissions of a user",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/instances/{instance_id}/users/{user}/databases",
},
],
),
base.APIRule(
name="trove:instance:extension:database:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a set of Schemas",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/instances/{instance_id}/databases"},
{"method": "POST", "path": "/v1.0/{account_id}/instances"},
],
),
base.APIRule(
name="trove:instance:extension:database:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a schema from a database.",
scope_types=["project"],
operations=[
{
"method": "DELETE",
"path": "/v1.0/{account_id}/instances/{instance_id}/databases/{database}",
},
],
),
base.APIRule(
name="trove:instance:extension:database:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all schemas from a database.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/instances/{instance_id}/databases"},
],
),
base.APIRule(
name="trove:instance:extension:database:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a schema(Currently Not Implemented).",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/instances/{instance_id}/databases/{database}",
},
],
),
base.APIRule(
name="trove:cluster:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a cluster.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/clusters"}],
),
base.APIRule(
name="trove:cluster:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a cluster.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/clusters/{cluster}"}],
),
base.APIRule(
name="trove:cluster:force_delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Forcibly delete a cluster.",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/clusters/{cluster} (reset-status)"},
],
),
base.APIRule(
name="trove:cluster:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all clusters",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/clusters"}],
),
base.APIRule(
name="trove:cluster:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a cluster.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/clusters/{cluster}"}],
),
base.APIRule(
name="trove:cluster:show_instance",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a instance in a cluster.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/clusters/{cluster}/instances/{instance}",
},
],
),
base.APIRule(
name="trove:cluster:action",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Commit an action against a cluster",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/clusters/{cluster}"}],
),
base.APIRule(
name="trove:cluster:reset-status",
check_str=("(role:admin or is_admin:True)"),
description="Reset the status of a cluster to NONE.",
scope_types=["project"],
operations=[
{"method": "POST", "path": "/v1.0/{account_id}/clusters/{cluster} (reset-status)"},
],
),
base.APIRule(
name="trove:backup:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a backup of a database instance.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/backups"}],
),
base.APIRule(
name="trove:backup:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a backup of a database instance.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/backups/{backup}"}],
),
base.APIRule(
name="trove:backup:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all backups.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/backups"}],
),
base.APIRule(
name="trove:backup:index:all_projects",
check_str=("role:admin"),
description="List backups for all the projects.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/backups"}],
),
base.APIRule(
name="trove:backup:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a backup.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/backups/{backup}"}],
),
base.APIRule(
name="trove:backup_strategy:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a backup strategy.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/backup_strategies"}],
),
base.APIRule(
name="trove:backup_strategy:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all backup strategies.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/backup_strategies"}],
),
base.APIRule(
name="trove:backup_strategy:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete backup strategies.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/backup_strategies"}],
),
base.APIRule(
name="trove:configuration:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a configuration group.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/configurations"}],
),
base.APIRule(
name="trove:configuration:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a configuration group.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/configurations/{config}"}],
),
base.APIRule(
name="trove:configuration:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all configuration groups.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/configurations"}],
),
base.APIRule(
name="trove:configuration:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a configuration group.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/configurations/{config}"}],
),
base.APIRule(
name="trove:configuration:instances",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all instances which a configuration group has be assigned to.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/configurations/{config}/instances"},
],
),
base.APIRule(
name="trove:configuration:update",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Update a configuration group(the configuration group will be replaced completely).", # noqa
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v1.0/{account_id}/configurations/{config}"}],
),
base.APIRule(
name="trove:configuration:edit",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Patch a configuration group.",
scope_types=["project"],
operations=[{"method": "PATCH", "path": "/v1.0/{account_id}/configurations/{config}"}],
),
base.APIRule(
name="trove:configuration-parameter:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all parameters bind to a datastore version.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/{datastore}/versions/{version}/parameters",
},
],
),
base.APIRule(
name="trove:configuration-parameter:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get a paramter of a datastore version.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/{datastore}/versions/{version}/parameters/{param}", # noqa
},
],
),
base.APIRule(
name="trove:configuration-parameter:index_by_version",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all paramters bind to a datastore version by the id of the version(datastore is not provided).", # noqa
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/versions/{version}/paramters",
},
],
),
base.APIRule(
name="trove:configuration-parameter:show_by_version",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get a paramter of a datastore version by it names and the id of the version(datastore is not provided).", # noqa
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/versions/{version}/paramters/{param}",
},
],
),
base.APIRule(
name="trove:datastore:index",
check_str=(""),
description="List all datastores.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/datastores"}],
),
base.APIRule(
name="trove:datastore:show",
check_str=(""),
description="Get informations of a datastore.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/datastores/{datastore}"}],
),
base.APIRule(
name="trove:datastore:delete",
check_str=("(role:admin or is_admin:True)"),
description="Delete a datastore.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/datastores/{datastore}"}],
),
base.APIRule(
name="trove:datastore:version_show",
check_str=(""),
description="Get a version of a datastore by the version id.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/{datastore}/versions/{version}",
},
],
),
base.APIRule(
name="trove:datastore:version_show_by_uuid",
check_str=(""),
description="Get a version of a datastore by the version id(without providing the datastore id).", # noqa
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/datastores/versions/{version}"},
],
),
base.APIRule(
name="trove:datastore:version_index",
check_str=(""),
description="Get all versions of a datastore.",
scope_types=["project"],
operations=[
{"method": "GET", "path": "/v1.0/{account_id}/datastores/{datastore}/versions"},
],
),
base.APIRule(
name="trove:datastore:list_associated_flavors",
check_str=(""),
description="List all flavors associated with a datastore version.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/{datastore}/versions/{version}/flavors",
},
],
),
base.APIRule(
name="trove:datastore:list_associated_volume_types",
check_str=(""),
description="List all volume-types associated with a datastore version.",
scope_types=["project"],
operations=[
{
"method": "GET",
"path": "/v1.0/{account_id}/datastores/{datastore}/versions/{version}/volume-types", # noqa
},
],
),
base.APIRule(
name="trove:flavor:index",
check_str=(""),
description="List all flavors.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/flavors"}],
),
base.APIRule(
name="trove:flavor:show",
check_str=(""),
description="Get information of a flavor.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/flavors/{flavor}"}],
),
base.APIRule(
name="trove:limits:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all absolute and rate limit informations.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/limits"}],
),
base.APIRule(
name="trove:module:create",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Create a module.",
scope_types=["project"],
operations=[{"method": "POST", "path": "/v1.0/{account_id}/modules"}],
),
base.APIRule(
name="trove:module:delete",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Delete a module.",
scope_types=["project"],
operations=[{"method": "DELETE", "path": "/v1.0/{account_id}/modules/{module}"}],
),
base.APIRule(
name="trove:module:index",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all modules.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/modules"}],
),
base.APIRule(
name="trove:module:show",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Get informations of a module.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/modules/{module}"}],
),
base.APIRule(
name="trove:module:instances",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="List all instances to which a module is applied.",
scope_types=["project"],
operations=[{"method": "GET", "path": "/v1.0/{account_id}/modules/{module}/instances"}],
),
base.APIRule(
name="trove:module:update",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Update a module.",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v1.0/{account_id}/modules/{module}"}],
),
base.APIRule(
name="trove:module:reapply",
check_str=("((role:admin or is_admin:True) or project_id:%(project_id)s)"),
description="Reapply a module to all instances.",
scope_types=["project"],
operations=[{"method": "PUT", "path": "/v1.0/{account_id}/modules/{module}/instances"}],
),
)
__all__ = ("list_rules",)

View File

@ -0,0 +1,56 @@
# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from enum import Enum
from typing import List, TypedDict
from pydantic import BaseModel
class ScopeType(str, Enum):
system = "system"
domain = "domain"
project = "project"
class ScopeTypesSchema(BaseModel):
__root__: List[ScopeType]
class Method(str, Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
PATCH = "PATCH"
DELETE = "DELETE"
HEAD = "HEAD"
class Operation(TypedDict):
method: str
path: str
class OperationSchema(BaseModel):
method: Method
path: str
class OperationsSchema(BaseModel):
__root__: List[OperationSchema]
__all__ = ("ScopeType", "ScopeTypesSchema", "Method", "Operation", "OperationsSchema")

View File

@ -20,8 +20,8 @@ from httpx import AsyncClient
from skyline_apiserver import __version__, main
from skyline_apiserver.config import CONF
from skyline_apiserver.db import api as db_api, setup as db_setup
from skyline_apiserver.tests.utils import utils
from skyline_apiserver.types import constants
from utils import utils
@pytest.mark.skipif(os.getenv("TEST_API") != "true", reason="No backend OpenStack for api-test.")

View File

@ -14,7 +14,8 @@
from __future__ import annotations
from dataclasses import dataclass, field
import sys
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List
from mimesis import Generic
@ -22,6 +23,23 @@ from pydantic import StrictBool, StrictInt, StrictStr
FAKER = Generic()
FAKE_NS = "skyline_apiserver.tests.mock_ns"
FAKE_SERVICE_EPS = {
"cinder": ["cinder"],
"glance": ["glance"],
"heat": ["heat"],
"keystone": ["keystone"],
"neutron": ["neutron"],
"nova": ["nova"],
}
current_module = sys.modules[__name__]
for ep_names in FAKE_SERVICE_EPS.values():
for ep_name in ep_names:
setattr(current_module, f"{ep_name}_list_rules", lambda: [])
@dataclass
class FakeOptData:
@ -34,3 +52,50 @@ class FakeOptData:
)
default: Any = None
deprecated: bool = False
@dataclass
class FakeOperation:
method: str = field(
default_factory=lambda: FAKER.choice(["GET", "POST", "PUT", "PATCH", "DELETE"]),
)
path: str = field(
default_factory=lambda: FAKER.choice(["/resources", "/resources/{resource_id}"]),
)
@dataclass
class FakeDocumentedRuleData:
name: str = field(default_factory=lambda: ":".join(FAKER.text.words()))
description: str = field(default_factory=lambda: FAKER.text.text())
check_str: str = field(
default_factory=lambda: f'role:{FAKER.choice(["admin", "member", "reader"])}',
)
scope_types: List[str] = field(
default_factory=lambda: FAKER.choice(
["system", "domain", "project"],
length=FAKER.numbers.integer_number(1, 3),
unique=True,
),
)
operations: List[Dict[str, str]] = field(
default_factory=lambda: [
asdict(FakeOperation()) for _ in range(FAKER.numbers.integer_number(1, 5))
],
)
@dataclass
class FakeRuleData:
name: str = field(default_factory=lambda: ":".join(FAKER.text.words()))
description: str = field(default_factory=lambda: FAKER.text.text())
check_str: str = field(
default_factory=lambda: f'role:{FAKER.choice(["admin", "member", "reader"])}',
)
scope_types: List[str] = field(
default_factory=lambda: FAKER.choice(
["system", "domain", "project"],
length=FAKER.numbers.integer_number(1, 3),
unique=True,
),
)

View File

@ -0,0 +1,165 @@
# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import asdict
from importlib import metadata
from importlib.metadata import EntryPoint
from pathlib import Path
from typing import Dict, List, Tuple, Union
import pytest
from click.testing import CliRunner
from oslo_policy.policy import DocumentedRuleDefault, RuleDefault
from skyline_apiserver.cmd.policy_manager import (
generate_conf,
generate_rule,
generate_sample,
policy_manager,
validate,
)
from skyline_apiserver.policy.manager import get_service_rules
from skyline_apiserver.tests import fake
from skyline_apiserver.tests.fake import (
FAKE_NS,
FAKE_SERVICE_EPS,
FAKER,
FakeDocumentedRuleData,
FakeRuleData,
)
from skyline_apiserver.tests.models import ArgumentData, TestData
from skyline_apiserver.types import constants
class TestPolicyManager:
@pytest.fixture(autouse=True)
def setup_entry_points(self, monkeypatch) -> None:
eps = []
for ep_names in FAKE_SERVICE_EPS.values():
for ep_name in ep_names:
fake_rules: List[Union[DocumentedRuleDefault, RuleDefault]]
fake_rules = [
DocumentedRuleDefault(**asdict(FakeDocumentedRuleData()))
for _ in range(FAKER.numbers.integer_number(1, 10))
]
fake_rules.extend(
[
RuleDefault(**asdict(FakeRuleData()))
for _ in range(FAKER.numbers.integer_number(1, 3))
],
)
monkeypatch.setattr(fake, f"{ep_name}_list_rules", lambda: fake_rules)
eps.append(
EntryPoint(
name=ep_name,
value=f"skyline_apiserver.tests.fake:{ep_name}_list_rules",
group=FAKE_NS,
),
)
def entry_points() -> Dict[str, Tuple[EntryPoint, ...]]:
return {FAKE_NS: tuple(eps)}
monkeypatch.setattr(metadata, "entry_points", entry_points)
monkeypatch.setattr(constants, "POLICY_NS", FAKE_NS)
monkeypatch.setattr(constants, "SUPPORTED_SERVICE_EPS", FAKE_SERVICE_EPS)
@pytest.fixture
def runner(self) -> CliRunner:
runner = CliRunner()
return runner
@pytest.mark.ddt(
TestData(
arguments=("dir_path",),
argument_data_set=[
ArgumentData(
id="str_dir_path",
values=(FAKER.text.word(),),
),
],
),
)
def test_generate_sample(self, runner: CliRunner, tmp_path: Path, dir_path: str) -> None:
sample_dir = tmp_path.joinpath(dir_path)
sample_dir.mkdir(parents=True, exist_ok=True)
policy_manager.add_command(generate_sample)
result = runner.invoke(
policy_manager,
["generate-sample", "--dir", sample_dir.as_posix()],
)
assert result.exit_code == 0
for service in FAKE_SERVICE_EPS:
assert sample_dir.joinpath(service).exists()
assert sample_dir.joinpath(service).joinpath("policy.yaml.sample").exists()
@pytest.mark.ddt(
TestData(
arguments=("dir_path",),
argument_data_set=[
ArgumentData(
id="str_dir_path",
values=(FAKER.text.word(),),
),
],
),
TestData(
arguments=("description",),
argument_data_set=[
ArgumentData(
id="str_description",
values=(FAKER.text.text(),),
),
],
),
)
def test_generate_conf(
self,
runner: CliRunner,
tmp_path: Path,
dir_path: str,
description: str,
) -> None:
conf_dir = tmp_path.joinpath(dir_path)
conf_dir.mkdir(parents=True, exist_ok=True)
policy_manager.add_command(generate_conf)
result = runner.invoke(
policy_manager,
["generate-conf", "--dir", conf_dir.as_posix(), "--desc", description],
)
service_rules = get_service_rules()
assert result.exit_code == 0
for service in service_rules:
assert conf_dir.joinpath(service).exists()
assert conf_dir.joinpath(service).joinpath("policy.yaml").exists()
assert description in conf_dir.joinpath(service).joinpath("policy.yaml").read_text()
def test_generate_rule(self, runner: CliRunner) -> None:
policy_manager.add_command(generate_rule)
for ep_names in FAKE_SERVICE_EPS.values():
for ep_name in ep_names:
result = runner.invoke(policy_manager, ["generate-rule", ep_name])
assert result.exit_code == 0
def test_validate(self, runner: CliRunner) -> None:
policy_manager.add_command(validate)
result = runner.invoke(
policy_manager,
[
"validate",
"--diff",
],
)
assert result.exit_code == 0

View File

@ -45,3 +45,23 @@ SETTINGS_HIDDEN_SET = set()
SETTINGS_RESTART_SET = set()
DEFAULT_TIMEOUT = 30
POLICY_NS = "oslo.policy.policies"
SUPPORTED_SERVICE_EPS = {
# openstack_service: [<entry_point_name>, <entry_point_name>,]
"cinder": ["cinder"],
"glance": ["glance"],
"heat": ["heat"],
"ironic": ["ironic.api", "ironic_inspector.api"],
"keystone": ["keystone"],
"neutron": ["neutron", "neutron-vpnaas"],
"manila": ["manila"],
"nova": ["nova"],
"octavia": ["octavia"],
"panko": ["panko"],
"placement": ["placement"],
"trove": ["trove"],
}
PREFIX_MAPPINGS = {"trove": "trove:", "manila": "manila:"}

View File

@ -0,0 +1,32 @@
#!/usr/bin/env bash
# Install openstack service package
poetry run pip install --no-deps \
keystone \
openstack-placement \
nova \
cinder \
glance \
trove \
neutron neutron-vpnaas \
openstack-heat \
ironic-lib ironic ironic-inspector \
octavia-lib octavia \
panko \
manila
# Patch cinder
patch_path="$(poetry run python3 -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')/cinder/__init__.py"
sed -i 's/\(.*eventlet.*\)/# \1/g' $patch_path
# Patch neutron
patch_path="$(poetry run python3 -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')/neutron/conf/policies/floatingip_pools.py"
sed -i 's/admin/system/g' $patch_path
# Patch ironic
patch_path="$(poetry run python3 -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')/ironic/common/policy.py"
sed -i 's/\(.*lockutils.*\)/# \1/g' $patch_path
# Patch ironic_inspector
patch_path="$(poetry run python3 -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')/ironic_inspector/policy.py"
sed -i 's/\(.*lockutils.*\)/# \1/g' $patch_path

View File

@ -922,7 +922,7 @@ test = ["bandit (>=1.6.0,<1.7.0)", "coverage (>=4.5.1)", "fixtures (>=3.0.0)", "
[[package]]
name = "oslo.policy"
version = "3.12.1"
version = "3.8.2"
description = "Oslo Policy library"
category = "main"
optional = false
@ -1101,7 +1101,7 @@ test = ["flaky", "pretend", "pytest (>=3.0.1)"]
[[package]]
name = "pyparsing"
version = "3.0.8"
version = "3.0.9"
description = "pyparsing module - Classes and methods to define and execute parsing grammars"
category = "main"
optional = false
@ -1480,6 +1480,7 @@ develop = true
aiomysql = "0.0.21"
aiosqlite = "0.17.0"
alembic = "1.7.5"
click = "7.1.2"
databases = "0.4.3"
dnspython = "2.1.0"
fastapi = {version = "0.58.1", extras = ["all"]}
@ -1487,7 +1488,9 @@ gunicorn = "20.1.0"
httpx = "0.16.1"
immutables = "0.16"
keystoneauth1 = "3.17.4"
loguru = "0.5.3"
osc-placement = "1.7.0"
"oslo.policy" = "3.8.2"
pydantic = "1.8.2"
pymysql = "0.9.3"
python-cinderclient = "5.0.2"
@ -1499,9 +1502,6 @@ python-neutronclient = "6.14.1"
python-novaclient = "15.1.1"
python-octaviaclient = "1.10.1"
PyYAML = "5.4.1"
skyline-config = "*"
skyline-log = "*"
skyline-policy-manager = "*"
sqlalchemy = "1.3.24"
uvicorn = {version = "0.12.3", extras = ["standard"]}
@ -1551,21 +1551,6 @@ loguru = "0.5.3"
type = "directory"
url = "../skyline-log"
[[package]]
name = "skyline-policy-manager"
version = "0.1.0"
description = ""
category = "main"
optional = false
python-versions = ">=3.8,<4.0"
[package.dependencies]
click = "*"
"oslo.policy" = "*"
pydantic = "*"
skyline-log = "*"
Werkzeug = "*"
[[package]]
name = "sniffio"
version = "1.2.0"
@ -1741,17 +1726,6 @@ category = "main"
optional = false
python-versions = ">=3.6.1"
[[package]]
name = "werkzeug"
version = "2.1.2"
description = "The comprehensive WSGI web application library."
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
watchdog = ["watchdog"]
[[package]]
name = "win32-setctime"
version = "1.1.0"
@ -2358,8 +2332,8 @@ osc-placement = [
{file = "oslo.log-4.8.0.tar.gz", hash = "sha256:9eddf6f6a2035327f2a3231a108b98e21ad90c0680380d9d1bc647ac9663e056"},
]
"oslo.policy" = [
{file = "oslo.policy-3.12.1-py3-none-any.whl", hash = "sha256:505262d268081a3bd6bd15c3dd9f51b58cc4c29e714baf156c33de89bc3b34af"},
{file = "oslo.policy-3.12.1.tar.gz", hash = "sha256:dab83ed05a7b153aff0d1c3b62947bcd3699826368dfa1cf66948f889ab267f2"},
{file = "oslo.policy-3.8.2-py3-none-any.whl", hash = "sha256:4102cb537ebabf23e6e5ca6df344bcc4c6725252434f07427b9baa4f101183d4"},
{file = "oslo.policy-3.8.2.tar.gz", hash = "sha256:233030f9acbc3cb894c66943fd71406ec12825776021f5dda4afab6f1762837f"},
]
"oslo.serialization" = [
{file = "oslo.serialization-4.3.0-py3-none-any.whl", hash = "sha256:6c1c483231c3827787af9b6ca4a45f4e45fe364772a24692b02de78fe48eafb1"},
@ -2453,8 +2427,8 @@ pyopenssl = [
{file = "pyOpenSSL-22.0.0.tar.gz", hash = "sha256:660b1b1425aac4a1bea1d94168a85d99f0b3144c869dd4390d27629d0087f1bf"},
]
pyparsing = [
{file = "pyparsing-3.0.8-py3-none-any.whl", hash = "sha256:ef7b523f6356f763771559412c0d7134753f037822dad1b16945b7b846f7ad06"},
{file = "pyparsing-3.0.8.tar.gz", hash = "sha256:7bf433498c016c4314268d95df76c81b842a4cb2b276fa3312cfb1e1d85f6954"},
{file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"},
{file = "pyparsing-3.0.9.tar.gz", hash = "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb"},
]
pyperclip = [
{file = "pyperclip-1.8.2.tar.gz", hash = "sha256:105254a8b04934f0bc84e9c24eb360a591aaf6535c9def5f29d92af107a9bf57"},
@ -2737,9 +2711,6 @@ skyline-console = [
{file = "skyline_console-0.1.0-py2.py3-none-any.whl", hash = "sha256:42b25a064f7e12bd12c5128608016123efd28576974cffe5c63645264aa006d7"},
]
skyline-log = []
skyline-policy-manager = [
{file = "skyline_policy_manager-0.1.0-py3-none-any.whl", hash = "sha256:5e7838d573fee23dbba4b3b9e022fd86ecdcb751b4e23ff5fda129ec252ab848"},
]
sniffio = [
{file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"},
{file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"},
@ -2917,10 +2888,6 @@ websockets = [
{file = "websockets-8.1-cp38-cp38-win_amd64.whl", hash = "sha256:f8a7bff6e8664afc4e6c28b983845c5bc14965030e3fb98789734d416af77c4b"},
{file = "websockets-8.1.tar.gz", hash = "sha256:5c65d2da8c6bce0fca2528f69f44b2f977e06954c8512a952222cea50dad430f"},
]
werkzeug = [
{file = "Werkzeug-2.1.2-py3-none-any.whl", hash = "sha256:72a4b735692dd3135217911cbeaa1be5fa3f62bffb8745c5215420a03dc55255"},
{file = "Werkzeug-2.1.2.tar.gz", hash = "sha256:1ce08e8093ed67d638d63879fd1ba3735817f7a80de3674d293f5984f25fb6e6"},
]
win32-setctime = [
{file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
{file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},

51
poetry.lock generated
View File

@ -1492,17 +1492,12 @@ version = "0.1.0"
description = ""
category = "main"
optional = false
python-versions = "^3.8"
develop = true
python-versions = ">=3.8,<4.0"
[package.dependencies]
immutables = "0.16"
pydantic = "1.8.2"
PyYAML = "5.4.1"
[package.source]
type = "directory"
url = "libs/skyline-config"
immutables = "*"
pydantic = "*"
PyYAML = "*"
[[package]]
name = "skyline-console"
@ -1518,15 +1513,10 @@ version = "0.1.0"
description = ""
category = "main"
optional = false
python-versions = "^3.8"
develop = true
python-versions = ">=3.8,<4.0"
[package.dependencies]
loguru = "0.5.3"
[package.source]
type = "directory"
url = "libs/skyline-log"
loguru = "*"
[[package]]
name = "skyline-nginx"
@ -1559,19 +1549,14 @@ version = "0.1.0"
description = ""
category = "main"
optional = false
python-versions = "^3.8"
develop = true
python-versions = ">=3.8,<4.0"
[package.dependencies]
click = "7.1.2"
"oslo.policy" = "3.8.2"
pydantic = "1.8.2"
click = "*"
"oslo.policy" = "*"
pydantic = "*"
skyline-log = "*"
Werkzeug = "2.0.1"
[package.source]
type = "directory"
url = "libs/skyline-policy-manager"
Werkzeug = "*"
[[package]]
name = "sniffio"
@ -1880,7 +1865,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "5d559f245168ea3f166d49367a0bf4909e4efb4c9bcbbc9ce869be9430eb686c"
content-hash = "f514ad7674184efa501beb4cc72a11a7b2a0ac8f2af21e47a4fcd3f40a9c18ae"
[metadata.files]
aiofiles = [
@ -2748,13 +2733,19 @@ six = [
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
skyline-apiserver = []
skyline-config = []
skyline-config = [
{file = "skyline_config-0.1.0-py3-none-any.whl", hash = "sha256:63defc191c74714e59ae2f69874f90a6df245d47bb67130744b63c733ce7547f"},
]
skyline-console = [
{file = "skyline_console-0.1.0-py2.py3-none-any.whl", hash = "sha256:42b25a064f7e12bd12c5128608016123efd28576974cffe5c63645264aa006d7"},
]
skyline-log = []
skyline-log = [
{file = "skyline_log-0.1.0-py3-none-any.whl", hash = "sha256:60e61784ce43061c62ea424d271fd6ad0c04ba2a9e2df5d1e1f490a9cceb8d3b"},
]
skyline-nginx = []
skyline-policy-manager = []
skyline-policy-manager = [
{file = "skyline_policy_manager-0.1.0-py3-none-any.whl", hash = "sha256:5e7838d573fee23dbba4b3b9e022fd86ecdcb751b4e23ff5fda129ec252ab848"},
]
sniffio = [
{file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"},
{file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"},

View File

@ -7,17 +7,11 @@ authors = ["OpenStack <openstack-discuss@lists.openstack.org>"]
[tool.poetry.dependencies]
python = "^3.8"
skyline-config = "*"
skyline-log = "*"
skyline-policy-manager = "*"
skyline-apiserver = "*"
skyline-nginx = "*"
[tool.poetry.dev-dependencies]
flake8 = "4.0.1"
skyline-config = {path = "libs/skyline-config", develop = true}
skyline-log = {path = "libs/skyline-log", develop = true}
skyline-policy-manager = {path = "libs/skyline-policy-manager", develop = true}
skyline-apiserver = {path = "libs/skyline-apiserver", develop = true}
skyline-nginx = {path = "libs/skyline-nginx", develop = true}
reno = {extras = ["sphinx"], version = "^3.5.0"}