projetAnsible/myenv/lib/python3.12/site-packages/pulumi/automation/_server.py
2024-12-09 06:16:28 +01:00

138 lines
5.3 KiB
Python

# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import sys
import traceback
import contextvars
from contextlib import suppress
import grpc
from .. import log
from ..errors import RunError
from ..runtime import reset_options, run_in_stack, set_all_config
from ..runtime.proto import LanguageRuntimeServicer, language_pb2, plugin_pb2
from ._workspace import PulumiFn
class LanguageServer(LanguageRuntimeServicer):
program: PulumiFn
def __init__(self, program: PulumiFn) -> None:
self.program = program
def GetRequiredPlugins(self, request, context):
return language_pb2.GetRequiredPluginsResponse()
def _exception_handler(self, loop, context):
# Exception are normally handler deeper in the stack. If this class of
# exception bubble up to here, something is wrong and we should stop
# the event loop
if "exception" in context and isinstance(context["exception"], grpc.RpcError):
loop.stop()
else:
loop.default_exception_handler(context)
def Run(self, request, context):
_suppress_unobserved_task_logging()
def run():
# Configure the runtime so that the user program hooks up to Pulumi as appropriate.
engine_address = request.args[0] if request.args else ""
organization = (
request.organization if request.organization else "organization"
)
reset_options(
project=request.project,
monitor_address=request.monitor_address,
engine_address=engine_address,
stack=request.stack,
parallel=request.parallel,
preview=request.dryRun,
organization=organization,
)
if request.config:
secret_keys = (
request.configSecretKeys if request.configSecretKeys else None
)
set_all_config(request.config, secret_keys)
# The strategy here is derived from sdk/python/cmd/pulumi-language-python-exec
result = language_pb2.RunResponse()
loop = asyncio.new_event_loop()
loop.set_exception_handler(self._exception_handler)
try:
loop.run_until_complete(run_in_stack(self.program))
except RunError as exn:
msg = str(exn)
log.error(msg)
result.error = str(msg)
return result
except grpc.RpcError as exn:
# If the monitor is unavailable, it is in the process of shutting down or has already
# shut down. Don't emit an error if this is the case.
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
log.debug("Resource monitor has terminated, shutting down.")
else:
msg = f"RPC error: {exn.details()}"
log.error(msg)
result.error = msg
return result
except Exception as exn:
msg = str(
f"python inline source runtime error: {exn}\n{traceback.format_exc()}"
)
log.error(msg)
result.error = msg
return result
finally:
# If there's an exception during `run_in_stack`, it may result in pending asyncio tasks remaining unresolved
# at the time the loop is closed, which results in a `Task was destroyed but it is pending!` error being
# logged to stdout. To avoid this, we collect all the unresolved tasks in the loop and cancel them before
# closing the loop.
pending = (
# lint safety: we use the python version here to track deprecations
asyncio.all_tasks(loop)
) # pylint: disable=no-member
log.debug(f"Cancelling {len(pending)} tasks.")
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
loop.close()
sys.stdout.flush()
sys.stderr.flush()
return result
ctx = contextvars.copy_context()
return ctx.run(run)
def GetPluginInfo(self, request, context):
return plugin_pb2.PluginInfo()
def _suppress_unobserved_task_logging():
"""Suppresses logs about faulted unobserved tasks. This is similar to
Python Pulumi user programs. See rationale in
`sdk/python/cmd/pulumi-language-python-exec`.
"""
logging.getLogger("asyncio").setLevel(logging.CRITICAL)