PATH: //proc/thread-self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/aiohttp
FILE_BARU
CREATE
FOLDER_BARU
MKDIR
UPLOAD_FILE
GO
[ .. KEMBALI ]
📁 .hash/
X
📄 __init__.py
↓
X
📁 __pycache__/
X
📄 _cparser.pxd
↓
X
📄 _find_header.pxd
↓
X
📄 _headers.pxi
↓
X
📄 _helpers.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _helpers.pyi
↓
X
📄 _helpers.pyx
↓
X
📄 _http_parser.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _http_parser.pyx
↓
X
📄 _http_writer.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _http_writer.pyx
↓
X
📄 _websocket.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _websocket.pyx
↓
X
📄 abc.py
↓
X
📄 base_protocol.py
↓
X
📄 client.py
↓
X
📄 client_exceptions.py
↓
X
📄 client_proto.py
↓
X
📄 client_reqrep.py
↓
X
📄 client_ws.py
↓
X
📄 compression_utils.py
↓
X
📄 connector.py
↓
X
📄 cookiejar.py
↓
X
📄 formdata.py
↓
X
📄 hdrs.py
↓
X
📄 helpers.py
↓
X
📄 http.py
↓
X
📄 http_exceptions.py
↓
X
📄 http_parser.py
↓
X
📄 http_websocket.py
↓
X
📄 http_writer.py
↓
X
📄 locks.py
↓
X
📄 log.py
↓
X
📄 multipart.py
↓
X
📄 payload.py
↓
X
📄 payload_streamer.py
↓
X
📄 py.typed
↓
X
📄 pytest_plugin.py
↓
X
📄 resolver.py
↓
X
📄 streams.py
↓
X
📄 tcp_helpers.py
↓
X
📄 test_utils.py
↓
X
📄 tracing.py
↓
X
📄 typedefs.py
↓
X
📄 web.py
↓
X
📄 web_app.py
↓
X
📄 web_exceptions.py
↓
X
📄 web_fileresponse.py
↓
X
📄 web_log.py
↓
X
📄 web_middlewares.py
↓
X
📄 web_protocol.py
↓
X
📄 web_request.py
↓
X
📄 web_response.py
↓
X
📄 web_routedef.py
↓
X
📄 web_runner.py
↓
X
📄 web_server.py
↓
X
📄 web_urldispatcher.py
↓
X
📄 web_ws.py
↓
X
📄 worker.py
↓
X
SAVING...
BERHASIL DIUBAH!
EDITING: locks.py
import asyncio import collections from typing import Any, Deque, Optional class EventResultOrError: """Event asyncio lock helper class. Wraps the Event asyncio lock allowing either to awake the locked Tasks without any error or raising an exception. thanks to @vorpalsmith for the simple design. """ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._loop = loop self._exc: Optional[BaseException] = None self._event = asyncio.Event() self._waiters: Deque[asyncio.Future[Any]] = collections.deque() def set(self, exc: Optional[BaseException] = None) -> None: self._exc = exc self._event.set() async def wait(self) -> Any: waiter = self._loop.create_task(self._event.wait()) self._waiters.append(waiter) try: val = await waiter finally: self._waiters.remove(waiter) if self._exc is not None: raise self._exc return val def cancel(self) -> None: """Cancel all waiters""" for waiter in self._waiters: waiter.cancel()
SIMPAN PERUBAHAN