PATH: //opt/cloudlinux/venv/lib/python3.11/site-packages/aiohttp
FILE_BARU
CREATE
FOLDER_BARU
MKDIR
UPLOAD_FILE
GO
[ .. KEMBALI ]
📁 .hash/
X
📄 __init__.py
↓
X
📁 __pycache__/
X
📄 _cparser.pxd
↓
X
📄 _find_header.pxd
↓
X
📄 _headers.pxi
↓
X
📄 _helpers.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _helpers.pyi
↓
X
📄 _helpers.pyx
↓
X
📄 _http_parser.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _http_parser.pyx
↓
X
📄 _http_writer.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _http_writer.pyx
↓
X
📄 _websocket.cpython-311-x86_64-linux-gnu.so
↓
X
📄 _websocket.pyx
↓
X
📄 abc.py
↓
X
📄 base_protocol.py
↓
X
📄 client.py
↓
X
📄 client_exceptions.py
↓
X
📄 client_proto.py
↓
X
📄 client_reqrep.py
↓
X
📄 client_ws.py
↓
X
📄 compression_utils.py
↓
X
📄 connector.py
↓
X
📄 cookiejar.py
↓
X
📄 formdata.py
↓
X
📄 hdrs.py
↓
X
📄 helpers.py
↓
X
📄 http.py
↓
X
📄 http_exceptions.py
↓
X
📄 http_parser.py
↓
X
📄 http_websocket.py
↓
X
📄 http_writer.py
↓
X
📄 locks.py
↓
X
📄 log.py
↓
X
📄 multipart.py
↓
X
📄 payload.py
↓
X
📄 payload_streamer.py
↓
X
📄 py.typed
↓
X
📄 pytest_plugin.py
↓
X
📄 resolver.py
↓
X
📄 streams.py
↓
X
📄 tcp_helpers.py
↓
X
📄 test_utils.py
↓
X
📄 tracing.py
↓
X
📄 typedefs.py
↓
X
📄 web.py
↓
X
📄 web_app.py
↓
X
📄 web_exceptions.py
↓
X
📄 web_fileresponse.py
↓
X
📄 web_log.py
↓
X
📄 web_middlewares.py
↓
X
📄 web_protocol.py
↓
X
📄 web_request.py
↓
X
📄 web_response.py
↓
X
📄 web_routedef.py
↓
X
📄 web_runner.py
↓
X
📄 web_server.py
↓
X
📄 web_urldispatcher.py
↓
X
📄 web_ws.py
↓
X
📄 worker.py
↓
X
SAVING...
BERHASIL DIUBAH!
EDITING: payload_streamer.py
""" Payload implementation for coroutines as data provider. As a simple case, you can upload data from file:: @aiohttp.streamer async def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(2**16) while chunk: await writer.write(chunk) chunk = f.read(2**16) Then you can use `file_sender` like this: async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text()) ..note:: Coroutine must accept `writer` as first argument """ import types import warnings from typing import Any, Awaitable, Callable, Dict, Tuple from .abc import AbstractStreamWriter from .payload import Payload, payload_type __all__ = ("streamer",) class _stream_wrapper: def __init__( self, coro: Callable[..., Awaitable[None]], args: Tuple[Any, ...], kwargs: Dict[str, Any], ) -> None: self.coro = types.coroutine(coro) self.args = args self.kwargs = kwargs async def __call__(self, writer: AbstractStreamWriter) -> None: await self.coro(writer, *self.args, **self.kwargs) class streamer: def __init__(self, coro: Callable[..., Awaitable[None]]) -> None: warnings.warn( "@streamer is deprecated, use async generators instead", DeprecationWarning, stacklevel=2, ) self.coro = coro def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper: return _stream_wrapper(self.coro, args, kwargs) @payload_type(_stream_wrapper) class StreamWrapperPayload(Payload): async def write(self, writer: AbstractStreamWriter) -> None: await self._value(writer) @payload_type(streamer) class StreamPayload(StreamWrapperPayload): def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None: super().__init__(value(), *args, **kwargs) async def write(self, writer: AbstractStreamWriter) -> None: await self._value(writer)
SIMPAN PERUBAHAN