projetAnsible/myenv/lib/python3.12/site-packages/pulumi/runtime/invoke.py
2024-12-09 06:16:28 +01:00

501 lines
19 KiB
Python

# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import traceback
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
)
import grpc
from google.protobuf import struct_pb2
from semver import VersionInfo
from .. import _types, log
from ..invoke import InvokeOptions, InvokeOutputOptions
from ..runtime.proto import resource_pb2
from . import rpc
from ._depends_on import _resolve_depends_on_urns, _resolve_depends_on
from .settings import (
_get_rpc_manager,
get_monitor,
grpc_error_to_exception,
handle_grpc_error,
)
from .sync_await import _sync_await
if TYPE_CHECKING:
from .. import Inputs, Output, Resource
def _requires_legacy_none_return_for_empty_struct(
ret_obj: struct_pb2.Struct, tok: str, version: str
) -> bool:
"""
To avoid breaking older versions of the Kubernetes Python SDK, we maintain the legacy behavior of
returning None instead of an empty dict for empty results if the invoke is for a version of the
Pulumi Kubernetes SDK that can't handle empty dicts.
See https://github.com/pulumi/pulumi/issues/14508.
"""
# If re_obj is truthy or the token is not one of the known Kubernetes tokens, we don't need the legacy
# behavior.
if ret_obj or tok not in {
"kubernetes:yaml:decode",
"kubernetes:helm:template",
"kubernetes:kustomize:directory",
}:
return False
# If we have a version and it's less than or equal to pulumi-kubernetes 4.5.4, we need the legacy behavior;
# otherwise, we don't because later versions can handle the new behavior correctly.
if version:
k8s_ver = VersionInfo(4, 5, 4)
try:
ver = VersionInfo.parse(version)
except Exception as ex:
log.debug(f"Failed to parse version {version} as semver: {ex}")
ver = k8s_ver
return ver <= k8s_ver
# We don't have a version, default to the legacy behavior.
return True
class InvokeResult:
"""
InvokeResult is a helper type that wraps a prompt value in an Awaitable.
"""
def __init__(
self,
value: Any,
is_secret: bool = False,
is_known: bool = True,
dependencies: Optional[List["Resource"]] = None,
):
self.value = value
self.is_secret = is_secret
self.is_known = is_known
self.dependencies = dependencies or []
# pylint: disable=using-constant-test
def __await__(self):
# We need __await__ to be an iterator, but we only want it to return one value. As such, we use
# `if False: yield` to construct this.
if False:
yield self.value
return self.value
__iter__ = __await__
def invoke(
tok: str,
props: "Inputs",
opts: Optional[InvokeOptions] = None,
typ: Optional[type] = None,
package_ref: Optional[Awaitable[Optional[str]]] = None,
) -> InvokeResult:
"""
invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
resolves when the invoke finishes.
"""
# Run the RPC callback asynchronously and then immediately await it.
awaitableInvokeResult = _invoke(tok, props, opts, typ, package_ref=package_ref)
return _sync_await(awaitableInvokeResult)
def invoke_output(
tok: str,
props: "Inputs",
opts: Optional[Union[InvokeOptions, InvokeOutputOptions]] = None,
typ: Optional[type] = None,
package_ref: Optional[Awaitable[Optional[str]]] = None,
) -> "Output[Any]":
"""
invoke_output dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
can be a bag of computed values (Ts or Awaitable[T]s), and the result is an Output[T] that
resolves when the invoke finishes.
"""
# Setup the futures for the output.
resolve_value: "asyncio.Future" = asyncio.Future()
resolve_is_known: "asyncio.Future[bool]" = asyncio.Future()
resolve_is_secret: "asyncio.Future[bool]" = asyncio.Future()
resolve_deps: "asyncio.Future[Set[Resource]]" = asyncio.Future()
from .. import Output # pylint: disable=import-outside-toplevel
out = Output(resolve_deps, resolve_value, resolve_is_known, resolve_is_secret)
async def do_invoke_output() -> None:
try:
invoke_result = await _invoke(
tok, props, opts, typ, package_ref=package_ref
)
resolve_value.set_result(
invoke_result.value if invoke_result.is_known else None
)
resolve_is_known.set_result(invoke_result.is_known)
resolve_is_secret.set_result(
invoke_result.is_secret and invoke_result.is_known
)
resolve_deps.set_result(set(invoke_result.dependencies))
except Exception as exn:
resolve_value.set_exception(exn)
resolve_is_known.set_exception(exn)
resolve_is_secret.set_exception(exn)
resolve_deps.set_exception(exn)
asyncio.ensure_future(_get_rpc_manager().do_rpc("invoke", do_invoke_output)())
return out
async def invoke_async(
tok: str,
props: "Inputs",
opts: Optional[InvokeOptions] = None,
typ: Optional[type] = None,
package_ref: Optional[Awaitable[Optional[str]]] = None,
) -> Any:
"""
invoke_async dynamically asynchronously invokes the function, tok, which is offered by a provider plugin.
the inputs can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
resolves when the invoke finishes.
"""
invokeResult = await _invoke(tok, props, opts, typ, package_ref=package_ref)
return invokeResult.value
def _invoke(
tok: str,
props: "Inputs",
opts: Optional[Union[InvokeOptions, InvokeOutputOptions]],
typ: Optional[type],
package_ref: Optional[Awaitable[Optional[str]]],
) -> Awaitable[InvokeResult]:
log.debug(f"Invoking function: tok={tok}")
if opts is None:
opts = InvokeOptions()
if typ and not _types.is_output_type(typ):
raise TypeError("Expected typ to be decorated with @output_type")
async def do_invoke() -> Tuple[InvokeResult, Optional[Exception]]:
# If a parent was provided, but no provider was provided, use the parent's provider if one was specified.
if opts is not None and opts.parent is not None and opts.provider is None:
opts.provider = opts.parent.get_provider(tok)
# Construct a provider reference from the given provider, if one was provided to us.
provider_ref = None
if opts is not None and opts.provider is not None:
provider_urn = await opts.provider.urn.future()
provider_id = (await opts.provider.id.future()) or rpc.UNKNOWN
provider_ref = f"{provider_urn}::{provider_id}"
log.debug(f"Invoke using provider {provider_ref}")
# If we have a package reference, we need to wait for it to resolve.
package_ref_str = None
if package_ref is not None:
package_ref_str = await package_ref
# If we have a package reference we can clear some of the invoke
# options.
if package_ref_str is not None and opts is not None:
opts.plugin_download_url = None
opts.version = None
log.debug(f"Invoke using package reference {package_ref_str}")
monitor = get_monitor()
# keep track of the dependencies of the inputs
property_dependencies: Dict[str, List["Resource"]] = {}
inputs = await rpc.serialize_properties(props, property_dependencies)
if rpc.struct_contains_unknowns(inputs):
return (InvokeResult(None, is_secret=False, is_known=False), None)
# keep track of secretness of inputs
# if any of the inputs are secret OR the invoke response contains secrets
# then we mark the invoke result as secret
plain_inputs, inputs_contain_secrets = rpc._unwrap_rpc_secret_struct_properties(
inputs
)
# keep track of the dependencies from depends_on
depends_on_dependencies: Set["Resource"] = set()
if isinstance(opts, InvokeOutputOptions):
depends_on_dependencies = await _resolve_depends_on(opts._depends_on_list())
version = opts.version or "" if opts is not None else ""
plugin_download_url = opts.plugin_download_url or "" if opts is not None else ""
accept_resources = not (
os.getenv("PULUMI_DISABLE_RESOURCE_REFERENCES", "").upper() in {"TRUE", "1"}
)
log.debug(f"Invoking function prepared: tok={tok}")
req = resource_pb2.ResourceInvokeRequest(
tok=tok,
args=plain_inputs,
provider=provider_ref or "",
version=version,
acceptResources=accept_resources,
pluginDownloadURL=plugin_download_url,
packageRef=package_ref_str or "",
)
def do_invoke():
try:
return monitor.Invoke(req), None
except grpc.RpcError as exn:
return None, grpc_error_to_exception(exn)
resp, error = await asyncio.get_event_loop().run_in_executor(None, do_invoke)
log.debug(f"Invoking function completed: tok={tok}, error={error}")
# If the invoke failed, raise an error.
if error is not None:
return (
InvokeResult(None, is_secret=False),
Exception(f"invoke of {tok} failed: {error}"),
)
if resp.failures:
return (
InvokeResult(None, is_secret=False),
Exception(
f"invoke of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})"
),
)
# Otherwise, return the output properties.
ret_obj = getattr(resp, "return")
# To avoid breaking older versions of the Kubernetes Python SDK, return None instead
# of an empty dict for empty results if this invoke is for a version of the Pulumi
# Kubernetes SDK that can't handle empty dicts.
if _requires_legacy_none_return_for_empty_struct(ret_obj, tok, version):
log.debug(f"Returning None for empty result for invoke of {tok}")
return InvokeResult(value=None, is_secret=False), None
deserialized, is_secret = rpc.deserialize_properties_unwrap_secrets(ret_obj)
# If typ is not None, call translate_output_properties to instantiate any output types.
result = deserialized
if typ:
result = rpc.translate_output_properties(
deserialized, lambda prop: prop, typ
)
invoke_output_secret = is_secret or inputs_contain_secrets
dependencies: Set["Resource"] = depends_on_dependencies
for _, property_deps in property_dependencies.items():
for dep in property_deps:
dependencies.add(dep)
return (
InvokeResult(
value=result,
is_secret=invoke_output_secret,
dependencies=list(dependencies),
),
None,
)
async def do_rpc():
# Await any dependencies before invoking our RPC.
await _resolve_depends_on_urns(
opts._depends_on_list() if isinstance(opts, InvokeOutputOptions) else []
)
resp, exn = await _get_rpc_manager().do_rpc("invoke", do_invoke)()
# If there was an RPC level exception, we will raise it. Note that this will also crash the
# process because it will have been considered "unhandled". For semantic level errors, such
# as errors from the data source itself, we return that as part of the returned tuple instead.
if exn is not None:
raise exn
invokeResult, error = resp
if error is not None:
raise error
return invokeResult
async def wait_for_fut():
return await asyncio.ensure_future(do_rpc())
return wait_for_fut()
def call(
tok: str,
props: "Inputs",
res: Optional["Resource"] = None,
typ: Optional[type] = None,
package_ref: Optional[Awaitable[Optional[str]]] = None,
) -> "Output[Any]":
"""
call dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
can be a bag of computed values (Ts or Awaitable[T]s).
"""
log.debug(f"Calling function: tok={tok}")
if typ and not _types.is_output_type(typ):
raise TypeError("Expected typ to be decorated with @output_type")
# Setup the futures for the output.
resolve_value: "asyncio.Future" = asyncio.Future()
resolve_is_known: "asyncio.Future[bool]" = asyncio.Future()
resolve_is_secret: "asyncio.Future[bool]" = asyncio.Future()
resolve_deps: "asyncio.Future[Set[Resource]]" = asyncio.Future()
from .. import Output # pylint: disable=import-outside-toplevel
out = Output(resolve_deps, resolve_value, resolve_is_known, resolve_is_secret)
async def do_call() -> None:
try:
# Construct a provider reference from the given provider, if one is available on the resource.
provider_ref, version, plugin_download_url = None, "", ""
if res is not None:
if res._provider is not None:
provider_urn = await res._provider.urn.future()
provider_id = (await res._provider.id.future()) or rpc.UNKNOWN
provider_ref = f"{provider_urn}::{provider_id}"
log.debug(f"Call using provider {provider_ref}")
version = res._version or ""
plugin_download_url = res._plugin_download_url or ""
monitor = get_monitor()
# Serialize out all props to their final values. In doing so, we'll also collect all the Resources pointed to
# by any Dependency objects we encounter, adding them to 'implicit_dependencies'.
property_dependencies_resources: Dict[str, List["Resource"]] = {}
# We keep output values when serializing inputs for call.
inputs = await rpc.serialize_properties(
props, property_dependencies_resources, keep_output_values=True
)
property_dependencies = {}
for key, property_deps in property_dependencies_resources.items():
urns = set()
for dep in property_deps:
urn = await dep.urn.future()
if urn is not None:
urns.add(urn)
property_dependencies[key] = (
resource_pb2.ResourceCallRequest.ArgumentDependencies(
urns=list(urns)
)
)
# If we have a package reference, we need to wait for it to resolve.
package_ref_str = None
if package_ref is not None:
package_ref_str = await package_ref
# If we have a package reference we can clear some of the invoke
# options.
if package_ref_str is not None:
plugin_download_url = ""
version = ""
log.debug(f"Call using package reference {package_ref_str}")
req = resource_pb2.ResourceCallRequest(
tok=tok,
args=inputs,
argDependencies=property_dependencies,
provider="" if provider_ref is None else provider_ref,
version=version,
pluginDownloadURL=plugin_download_url,
packageRef=package_ref_str or "",
)
def do_rpc_call():
try:
return monitor.Call(req)
except grpc.RpcError as exn:
handle_grpc_error(exn)
return None
resp = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
if resp is None:
return
if resp.failures:
raise Exception(
f"call of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})"
)
log.debug(f"Call successful: tok={tok}")
value = None
is_known = True
is_secret = False
deps: Set["Resource"] = set()
ret_obj = getattr(resp, "return")
if ret_obj:
deserialized = rpc.deserialize_properties(ret_obj)
is_known = not rpc.contains_unknowns(deserialized)
# Keep track of whether we need to mark the resulting output a secret,
# and unwrap each individual value.
for k, v in deserialized.items():
if rpc.is_rpc_secret(v):
is_secret = True
deserialized[k] = rpc.unwrap_rpc_secret(v)
# Combine the individual dependencies into a single set of dependency resources.
rpc_deps = resp.returnDependencies
deps_urns: Set[str] = (
{urn for v in rpc_deps.values() for urn in v.urns}
if rpc_deps
else set()
)
from ..resource import ( # pylint: disable=import-outside-toplevel
DependencyResource,
)
deps = set(map(DependencyResource, deps_urns))
if is_known:
# If typ is not None, call translate_output_properties to instantiate any output types.
value = (
rpc.translate_output_properties(deserialized, lambda p: p, typ)
if typ
else deserialized
)
resolve_value.set_result(value)
resolve_is_known.set_result(is_known)
resolve_is_secret.set_result(is_secret)
resolve_deps.set_result(deps)
except Exception as exn:
log.debug(
f"exception when preparing or executing rpc: {traceback.format_exc()}"
)
resolve_value.set_exception(exn)
resolve_is_known.set_exception(exn)
resolve_is_secret.set_exception(exn)
resolve_deps.set_result(set())
raise
asyncio.ensure_future(_get_rpc_manager().do_rpc("call", do_call)())
return out