projetAnsible/myenv/lib/python3.12/site-packages/pulumi/runtime/sync_await.py
2024-12-09 06:16:28 +01:00

78 lines
3.1 KiB
Python

# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from typing import Awaitable, TypeVar
T = TypeVar("T")
def _sync_await(awaitable: Awaitable[T]) -> T:
"""
_sync_await waits for the given future to complete by effectively yielding the current task and pumping the event
loop.
"""
# Fetch the current event loop and ensure a future.
loop = _ensure_event_loop()
fut = asyncio.ensure_future(awaitable)
# If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair
# amount of bookkeeping logic around loop startup and shutdown.
if not loop.is_running():
return loop.run_until_complete(fut)
# If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to
# the event loop--by calling _leave_task.
task = asyncio.current_task(loop)
if task is not None:
asyncio.tasks._leave_task(loop, task)
# Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not
# work with alternative event loop implementations.
#
# In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the
# ready list and ensure that there are exactly that many handles on the list once we are finished.
#
# See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the
# _run_once kernel with which we need to cooperate.
ntodo = len(loop._ready) # type: ignore
while not fut.done() and not fut.cancelled():
loop._run_once() # type: ignore
if loop._stopping: # type: ignore
break
# If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing
# cancelled handles onto the list.
while len(loop._ready) < ntodo: # type: ignore
handle = asyncio.Handle(lambda: None, [], loop)
handle._cancelled = True
loop._ready.append(handle) # type: ignore
# If we were executing inside a task, restore its context and continue on.
if task is not None:
asyncio.tasks._enter_task(loop, task)
# Return the result of the future.
return fut.result()
def _ensure_event_loop():
"""Ensures an asyncio event loop exists for the current thread."""
loop = None
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop