138 lines
5.3 KiB
Python
138 lines
5.3 KiB
Python
# Copyright 2016-2021, Pulumi Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import traceback
|
|
import contextvars
|
|
from contextlib import suppress
|
|
|
|
import grpc
|
|
|
|
from .. import log
|
|
from ..errors import RunError
|
|
from ..runtime import reset_options, run_in_stack, set_all_config
|
|
from ..runtime.proto import LanguageRuntimeServicer, language_pb2, plugin_pb2
|
|
from ._workspace import PulumiFn
|
|
|
|
|
|
class LanguageServer(LanguageRuntimeServicer):
|
|
program: PulumiFn
|
|
|
|
def __init__(self, program: PulumiFn) -> None:
|
|
self.program = program
|
|
|
|
def GetRequiredPlugins(self, request, context):
|
|
return language_pb2.GetRequiredPluginsResponse()
|
|
|
|
def _exception_handler(self, loop, context):
|
|
# Exception are normally handler deeper in the stack. If this class of
|
|
# exception bubble up to here, something is wrong and we should stop
|
|
# the event loop
|
|
if "exception" in context and isinstance(context["exception"], grpc.RpcError):
|
|
loop.stop()
|
|
else:
|
|
loop.default_exception_handler(context)
|
|
|
|
def Run(self, request, context):
|
|
_suppress_unobserved_task_logging()
|
|
|
|
def run():
|
|
# Configure the runtime so that the user program hooks up to Pulumi as appropriate.
|
|
engine_address = request.args[0] if request.args else ""
|
|
organization = (
|
|
request.organization if request.organization else "organization"
|
|
)
|
|
reset_options(
|
|
project=request.project,
|
|
monitor_address=request.monitor_address,
|
|
engine_address=engine_address,
|
|
stack=request.stack,
|
|
parallel=request.parallel,
|
|
preview=request.dryRun,
|
|
organization=organization,
|
|
)
|
|
|
|
if request.config:
|
|
secret_keys = (
|
|
request.configSecretKeys if request.configSecretKeys else None
|
|
)
|
|
set_all_config(request.config, secret_keys)
|
|
|
|
# The strategy here is derived from sdk/python/cmd/pulumi-language-python-exec
|
|
result = language_pb2.RunResponse()
|
|
loop = asyncio.new_event_loop()
|
|
|
|
loop.set_exception_handler(self._exception_handler)
|
|
try:
|
|
loop.run_until_complete(run_in_stack(self.program))
|
|
except RunError as exn:
|
|
msg = str(exn)
|
|
log.error(msg)
|
|
result.error = str(msg)
|
|
return result
|
|
except grpc.RpcError as exn:
|
|
# If the monitor is unavailable, it is in the process of shutting down or has already
|
|
# shut down. Don't emit an error if this is the case.
|
|
# pylint: disable=no-member
|
|
if exn.code() == grpc.StatusCode.UNAVAILABLE:
|
|
log.debug("Resource monitor has terminated, shutting down.")
|
|
else:
|
|
msg = f"RPC error: {exn.details()}"
|
|
log.error(msg)
|
|
result.error = msg
|
|
return result
|
|
except Exception as exn:
|
|
msg = str(
|
|
f"python inline source runtime error: {exn}\n{traceback.format_exc()}"
|
|
)
|
|
log.error(msg)
|
|
result.error = msg
|
|
return result
|
|
finally:
|
|
# If there's an exception during `run_in_stack`, it may result in pending asyncio tasks remaining unresolved
|
|
# at the time the loop is closed, which results in a `Task was destroyed but it is pending!` error being
|
|
# logged to stdout. To avoid this, we collect all the unresolved tasks in the loop and cancel them before
|
|
# closing the loop.
|
|
pending = (
|
|
# lint safety: we use the python version here to track deprecations
|
|
asyncio.all_tasks(loop)
|
|
) # pylint: disable=no-member
|
|
log.debug(f"Cancelling {len(pending)} tasks.")
|
|
for task in pending:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
loop.run_until_complete(task)
|
|
loop.close()
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
return result
|
|
|
|
ctx = contextvars.copy_context()
|
|
return ctx.run(run)
|
|
|
|
def GetPluginInfo(self, request, context):
|
|
return plugin_pb2.PluginInfo()
|
|
|
|
|
|
def _suppress_unobserved_task_logging():
|
|
"""Suppresses logs about faulted unobserved tasks. This is similar to
|
|
Python Pulumi user programs. See rationale in
|
|
`sdk/python/cmd/pulumi-language-python-exec`.
|
|
|
|
"""
|
|
logging.getLogger("asyncio").setLevel(logging.CRITICAL)
|