501 lines
19 KiB
Python
501 lines
19 KiB
Python
# Copyright 2016-2018, Pulumi Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import asyncio
|
|
import os
|
|
import traceback
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Awaitable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Set,
|
|
Tuple,
|
|
Union,
|
|
)
|
|
|
|
import grpc
|
|
from google.protobuf import struct_pb2
|
|
|
|
from semver import VersionInfo
|
|
|
|
from .. import _types, log
|
|
from ..invoke import InvokeOptions, InvokeOutputOptions
|
|
from ..runtime.proto import resource_pb2
|
|
from . import rpc
|
|
from ._depends_on import _resolve_depends_on_urns, _resolve_depends_on
|
|
from .settings import (
|
|
_get_rpc_manager,
|
|
get_monitor,
|
|
grpc_error_to_exception,
|
|
handle_grpc_error,
|
|
)
|
|
from .sync_await import _sync_await
|
|
|
|
if TYPE_CHECKING:
|
|
from .. import Inputs, Output, Resource
|
|
|
|
|
|
def _requires_legacy_none_return_for_empty_struct(
|
|
ret_obj: struct_pb2.Struct, tok: str, version: str
|
|
) -> bool:
|
|
"""
|
|
To avoid breaking older versions of the Kubernetes Python SDK, we maintain the legacy behavior of
|
|
returning None instead of an empty dict for empty results if the invoke is for a version of the
|
|
Pulumi Kubernetes SDK that can't handle empty dicts.
|
|
See https://github.com/pulumi/pulumi/issues/14508.
|
|
"""
|
|
|
|
# If re_obj is truthy or the token is not one of the known Kubernetes tokens, we don't need the legacy
|
|
# behavior.
|
|
if ret_obj or tok not in {
|
|
"kubernetes:yaml:decode",
|
|
"kubernetes:helm:template",
|
|
"kubernetes:kustomize:directory",
|
|
}:
|
|
return False
|
|
|
|
# If we have a version and it's less than or equal to pulumi-kubernetes 4.5.4, we need the legacy behavior;
|
|
# otherwise, we don't because later versions can handle the new behavior correctly.
|
|
if version:
|
|
k8s_ver = VersionInfo(4, 5, 4)
|
|
try:
|
|
ver = VersionInfo.parse(version)
|
|
except Exception as ex:
|
|
log.debug(f"Failed to parse version {version} as semver: {ex}")
|
|
ver = k8s_ver
|
|
return ver <= k8s_ver
|
|
|
|
# We don't have a version, default to the legacy behavior.
|
|
return True
|
|
|
|
|
|
class InvokeResult:
|
|
"""
|
|
InvokeResult is a helper type that wraps a prompt value in an Awaitable.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
value: Any,
|
|
is_secret: bool = False,
|
|
is_known: bool = True,
|
|
dependencies: Optional[List["Resource"]] = None,
|
|
):
|
|
self.value = value
|
|
self.is_secret = is_secret
|
|
self.is_known = is_known
|
|
self.dependencies = dependencies or []
|
|
|
|
# pylint: disable=using-constant-test
|
|
def __await__(self):
|
|
# We need __await__ to be an iterator, but we only want it to return one value. As such, we use
|
|
# `if False: yield` to construct this.
|
|
if False:
|
|
yield self.value
|
|
return self.value
|
|
|
|
__iter__ = __await__
|
|
|
|
|
|
def invoke(
|
|
tok: str,
|
|
props: "Inputs",
|
|
opts: Optional[InvokeOptions] = None,
|
|
typ: Optional[type] = None,
|
|
package_ref: Optional[Awaitable[Optional[str]]] = None,
|
|
) -> InvokeResult:
|
|
"""
|
|
invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
|
|
can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
|
|
resolves when the invoke finishes.
|
|
"""
|
|
# Run the RPC callback asynchronously and then immediately await it.
|
|
awaitableInvokeResult = _invoke(tok, props, opts, typ, package_ref=package_ref)
|
|
return _sync_await(awaitableInvokeResult)
|
|
|
|
|
|
def invoke_output(
|
|
tok: str,
|
|
props: "Inputs",
|
|
opts: Optional[Union[InvokeOptions, InvokeOutputOptions]] = None,
|
|
typ: Optional[type] = None,
|
|
package_ref: Optional[Awaitable[Optional[str]]] = None,
|
|
) -> "Output[Any]":
|
|
"""
|
|
invoke_output dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
|
|
can be a bag of computed values (Ts or Awaitable[T]s), and the result is an Output[T] that
|
|
resolves when the invoke finishes.
|
|
"""
|
|
|
|
# Setup the futures for the output.
|
|
resolve_value: "asyncio.Future" = asyncio.Future()
|
|
resolve_is_known: "asyncio.Future[bool]" = asyncio.Future()
|
|
resolve_is_secret: "asyncio.Future[bool]" = asyncio.Future()
|
|
resolve_deps: "asyncio.Future[Set[Resource]]" = asyncio.Future()
|
|
|
|
from .. import Output # pylint: disable=import-outside-toplevel
|
|
|
|
out = Output(resolve_deps, resolve_value, resolve_is_known, resolve_is_secret)
|
|
|
|
async def do_invoke_output() -> None:
|
|
try:
|
|
invoke_result = await _invoke(
|
|
tok, props, opts, typ, package_ref=package_ref
|
|
)
|
|
|
|
resolve_value.set_result(
|
|
invoke_result.value if invoke_result.is_known else None
|
|
)
|
|
resolve_is_known.set_result(invoke_result.is_known)
|
|
resolve_is_secret.set_result(
|
|
invoke_result.is_secret and invoke_result.is_known
|
|
)
|
|
resolve_deps.set_result(set(invoke_result.dependencies))
|
|
except Exception as exn:
|
|
resolve_value.set_exception(exn)
|
|
resolve_is_known.set_exception(exn)
|
|
resolve_is_secret.set_exception(exn)
|
|
resolve_deps.set_exception(exn)
|
|
|
|
asyncio.ensure_future(_get_rpc_manager().do_rpc("invoke", do_invoke_output)())
|
|
return out
|
|
|
|
|
|
async def invoke_async(
|
|
tok: str,
|
|
props: "Inputs",
|
|
opts: Optional[InvokeOptions] = None,
|
|
typ: Optional[type] = None,
|
|
package_ref: Optional[Awaitable[Optional[str]]] = None,
|
|
) -> Any:
|
|
"""
|
|
invoke_async dynamically asynchronously invokes the function, tok, which is offered by a provider plugin.
|
|
the inputs can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
|
|
resolves when the invoke finishes.
|
|
"""
|
|
invokeResult = await _invoke(tok, props, opts, typ, package_ref=package_ref)
|
|
return invokeResult.value
|
|
|
|
|
|
def _invoke(
|
|
tok: str,
|
|
props: "Inputs",
|
|
opts: Optional[Union[InvokeOptions, InvokeOutputOptions]],
|
|
typ: Optional[type],
|
|
package_ref: Optional[Awaitable[Optional[str]]],
|
|
) -> Awaitable[InvokeResult]:
|
|
log.debug(f"Invoking function: tok={tok}")
|
|
if opts is None:
|
|
opts = InvokeOptions()
|
|
|
|
if typ and not _types.is_output_type(typ):
|
|
raise TypeError("Expected typ to be decorated with @output_type")
|
|
|
|
async def do_invoke() -> Tuple[InvokeResult, Optional[Exception]]:
|
|
# If a parent was provided, but no provider was provided, use the parent's provider if one was specified.
|
|
if opts is not None and opts.parent is not None and opts.provider is None:
|
|
opts.provider = opts.parent.get_provider(tok)
|
|
|
|
# Construct a provider reference from the given provider, if one was provided to us.
|
|
provider_ref = None
|
|
if opts is not None and opts.provider is not None:
|
|
provider_urn = await opts.provider.urn.future()
|
|
provider_id = (await opts.provider.id.future()) or rpc.UNKNOWN
|
|
provider_ref = f"{provider_urn}::{provider_id}"
|
|
log.debug(f"Invoke using provider {provider_ref}")
|
|
|
|
# If we have a package reference, we need to wait for it to resolve.
|
|
package_ref_str = None
|
|
if package_ref is not None:
|
|
package_ref_str = await package_ref
|
|
# If we have a package reference we can clear some of the invoke
|
|
# options.
|
|
if package_ref_str is not None and opts is not None:
|
|
opts.plugin_download_url = None
|
|
opts.version = None
|
|
log.debug(f"Invoke using package reference {package_ref_str}")
|
|
|
|
monitor = get_monitor()
|
|
# keep track of the dependencies of the inputs
|
|
property_dependencies: Dict[str, List["Resource"]] = {}
|
|
inputs = await rpc.serialize_properties(props, property_dependencies)
|
|
if rpc.struct_contains_unknowns(inputs):
|
|
return (InvokeResult(None, is_secret=False, is_known=False), None)
|
|
|
|
# keep track of secretness of inputs
|
|
# if any of the inputs are secret OR the invoke response contains secrets
|
|
# then we mark the invoke result as secret
|
|
plain_inputs, inputs_contain_secrets = rpc._unwrap_rpc_secret_struct_properties(
|
|
inputs
|
|
)
|
|
|
|
# keep track of the dependencies from depends_on
|
|
depends_on_dependencies: Set["Resource"] = set()
|
|
if isinstance(opts, InvokeOutputOptions):
|
|
depends_on_dependencies = await _resolve_depends_on(opts._depends_on_list())
|
|
|
|
version = opts.version or "" if opts is not None else ""
|
|
plugin_download_url = opts.plugin_download_url or "" if opts is not None else ""
|
|
accept_resources = not (
|
|
os.getenv("PULUMI_DISABLE_RESOURCE_REFERENCES", "").upper() in {"TRUE", "1"}
|
|
)
|
|
log.debug(f"Invoking function prepared: tok={tok}")
|
|
req = resource_pb2.ResourceInvokeRequest(
|
|
tok=tok,
|
|
args=plain_inputs,
|
|
provider=provider_ref or "",
|
|
version=version,
|
|
acceptResources=accept_resources,
|
|
pluginDownloadURL=plugin_download_url,
|
|
packageRef=package_ref_str or "",
|
|
)
|
|
|
|
def do_invoke():
|
|
try:
|
|
return monitor.Invoke(req), None
|
|
except grpc.RpcError as exn:
|
|
return None, grpc_error_to_exception(exn)
|
|
|
|
resp, error = await asyncio.get_event_loop().run_in_executor(None, do_invoke)
|
|
log.debug(f"Invoking function completed: tok={tok}, error={error}")
|
|
|
|
# If the invoke failed, raise an error.
|
|
if error is not None:
|
|
return (
|
|
InvokeResult(None, is_secret=False),
|
|
Exception(f"invoke of {tok} failed: {error}"),
|
|
)
|
|
|
|
if resp.failures:
|
|
return (
|
|
InvokeResult(None, is_secret=False),
|
|
Exception(
|
|
f"invoke of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})"
|
|
),
|
|
)
|
|
|
|
# Otherwise, return the output properties.
|
|
ret_obj = getattr(resp, "return")
|
|
|
|
# To avoid breaking older versions of the Kubernetes Python SDK, return None instead
|
|
# of an empty dict for empty results if this invoke is for a version of the Pulumi
|
|
# Kubernetes SDK that can't handle empty dicts.
|
|
if _requires_legacy_none_return_for_empty_struct(ret_obj, tok, version):
|
|
log.debug(f"Returning None for empty result for invoke of {tok}")
|
|
return InvokeResult(value=None, is_secret=False), None
|
|
|
|
deserialized, is_secret = rpc.deserialize_properties_unwrap_secrets(ret_obj)
|
|
# If typ is not None, call translate_output_properties to instantiate any output types.
|
|
result = deserialized
|
|
if typ:
|
|
result = rpc.translate_output_properties(
|
|
deserialized, lambda prop: prop, typ
|
|
)
|
|
|
|
invoke_output_secret = is_secret or inputs_contain_secrets
|
|
dependencies: Set["Resource"] = depends_on_dependencies
|
|
for _, property_deps in property_dependencies.items():
|
|
for dep in property_deps:
|
|
dependencies.add(dep)
|
|
return (
|
|
InvokeResult(
|
|
value=result,
|
|
is_secret=invoke_output_secret,
|
|
dependencies=list(dependencies),
|
|
),
|
|
None,
|
|
)
|
|
|
|
async def do_rpc():
|
|
# Await any dependencies before invoking our RPC.
|
|
await _resolve_depends_on_urns(
|
|
opts._depends_on_list() if isinstance(opts, InvokeOutputOptions) else []
|
|
)
|
|
|
|
resp, exn = await _get_rpc_manager().do_rpc("invoke", do_invoke)()
|
|
# If there was an RPC level exception, we will raise it. Note that this will also crash the
|
|
# process because it will have been considered "unhandled". For semantic level errors, such
|
|
# as errors from the data source itself, we return that as part of the returned tuple instead.
|
|
if exn is not None:
|
|
raise exn
|
|
invokeResult, error = resp
|
|
if error is not None:
|
|
raise error
|
|
return invokeResult
|
|
|
|
async def wait_for_fut():
|
|
return await asyncio.ensure_future(do_rpc())
|
|
|
|
return wait_for_fut()
|
|
|
|
|
|
def call(
|
|
tok: str,
|
|
props: "Inputs",
|
|
res: Optional["Resource"] = None,
|
|
typ: Optional[type] = None,
|
|
package_ref: Optional[Awaitable[Optional[str]]] = None,
|
|
) -> "Output[Any]":
|
|
"""
|
|
call dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
|
|
can be a bag of computed values (Ts or Awaitable[T]s).
|
|
"""
|
|
log.debug(f"Calling function: tok={tok}")
|
|
|
|
if typ and not _types.is_output_type(typ):
|
|
raise TypeError("Expected typ to be decorated with @output_type")
|
|
|
|
# Setup the futures for the output.
|
|
resolve_value: "asyncio.Future" = asyncio.Future()
|
|
resolve_is_known: "asyncio.Future[bool]" = asyncio.Future()
|
|
resolve_is_secret: "asyncio.Future[bool]" = asyncio.Future()
|
|
resolve_deps: "asyncio.Future[Set[Resource]]" = asyncio.Future()
|
|
|
|
from .. import Output # pylint: disable=import-outside-toplevel
|
|
|
|
out = Output(resolve_deps, resolve_value, resolve_is_known, resolve_is_secret)
|
|
|
|
async def do_call() -> None:
|
|
try:
|
|
# Construct a provider reference from the given provider, if one is available on the resource.
|
|
provider_ref, version, plugin_download_url = None, "", ""
|
|
if res is not None:
|
|
if res._provider is not None:
|
|
provider_urn = await res._provider.urn.future()
|
|
provider_id = (await res._provider.id.future()) or rpc.UNKNOWN
|
|
provider_ref = f"{provider_urn}::{provider_id}"
|
|
log.debug(f"Call using provider {provider_ref}")
|
|
version = res._version or ""
|
|
plugin_download_url = res._plugin_download_url or ""
|
|
|
|
monitor = get_monitor()
|
|
|
|
# Serialize out all props to their final values. In doing so, we'll also collect all the Resources pointed to
|
|
# by any Dependency objects we encounter, adding them to 'implicit_dependencies'.
|
|
property_dependencies_resources: Dict[str, List["Resource"]] = {}
|
|
# We keep output values when serializing inputs for call.
|
|
inputs = await rpc.serialize_properties(
|
|
props, property_dependencies_resources, keep_output_values=True
|
|
)
|
|
|
|
property_dependencies = {}
|
|
for key, property_deps in property_dependencies_resources.items():
|
|
urns = set()
|
|
for dep in property_deps:
|
|
urn = await dep.urn.future()
|
|
if urn is not None:
|
|
urns.add(urn)
|
|
property_dependencies[key] = (
|
|
resource_pb2.ResourceCallRequest.ArgumentDependencies(
|
|
urns=list(urns)
|
|
)
|
|
)
|
|
|
|
# If we have a package reference, we need to wait for it to resolve.
|
|
package_ref_str = None
|
|
if package_ref is not None:
|
|
package_ref_str = await package_ref
|
|
# If we have a package reference we can clear some of the invoke
|
|
# options.
|
|
if package_ref_str is not None:
|
|
plugin_download_url = ""
|
|
version = ""
|
|
log.debug(f"Call using package reference {package_ref_str}")
|
|
|
|
req = resource_pb2.ResourceCallRequest(
|
|
tok=tok,
|
|
args=inputs,
|
|
argDependencies=property_dependencies,
|
|
provider="" if provider_ref is None else provider_ref,
|
|
version=version,
|
|
pluginDownloadURL=plugin_download_url,
|
|
packageRef=package_ref_str or "",
|
|
)
|
|
|
|
def do_rpc_call():
|
|
try:
|
|
return monitor.Call(req)
|
|
except grpc.RpcError as exn:
|
|
handle_grpc_error(exn)
|
|
return None
|
|
|
|
resp = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
|
|
if resp is None:
|
|
return
|
|
|
|
if resp.failures:
|
|
raise Exception(
|
|
f"call of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})"
|
|
)
|
|
|
|
log.debug(f"Call successful: tok={tok}")
|
|
|
|
value = None
|
|
is_known = True
|
|
is_secret = False
|
|
deps: Set["Resource"] = set()
|
|
ret_obj = getattr(resp, "return")
|
|
if ret_obj:
|
|
deserialized = rpc.deserialize_properties(ret_obj)
|
|
is_known = not rpc.contains_unknowns(deserialized)
|
|
|
|
# Keep track of whether we need to mark the resulting output a secret,
|
|
# and unwrap each individual value.
|
|
for k, v in deserialized.items():
|
|
if rpc.is_rpc_secret(v):
|
|
is_secret = True
|
|
deserialized[k] = rpc.unwrap_rpc_secret(v)
|
|
|
|
# Combine the individual dependencies into a single set of dependency resources.
|
|
rpc_deps = resp.returnDependencies
|
|
deps_urns: Set[str] = (
|
|
{urn for v in rpc_deps.values() for urn in v.urns}
|
|
if rpc_deps
|
|
else set()
|
|
)
|
|
from ..resource import ( # pylint: disable=import-outside-toplevel
|
|
DependencyResource,
|
|
)
|
|
|
|
deps = set(map(DependencyResource, deps_urns))
|
|
|
|
if is_known:
|
|
# If typ is not None, call translate_output_properties to instantiate any output types.
|
|
value = (
|
|
rpc.translate_output_properties(deserialized, lambda p: p, typ)
|
|
if typ
|
|
else deserialized
|
|
)
|
|
|
|
resolve_value.set_result(value)
|
|
resolve_is_known.set_result(is_known)
|
|
resolve_is_secret.set_result(is_secret)
|
|
resolve_deps.set_result(deps)
|
|
|
|
except Exception as exn:
|
|
log.debug(
|
|
f"exception when preparing or executing rpc: {traceback.format_exc()}"
|
|
)
|
|
resolve_value.set_exception(exn)
|
|
resolve_is_known.set_exception(exn)
|
|
resolve_is_secret.set_exception(exn)
|
|
resolve_deps.set_result(set())
|
|
raise
|
|
|
|
asyncio.ensure_future(_get_rpc_manager().do_rpc("call", do_call)())
|
|
|
|
return out
|