PATH: //proc/thread-self/root/usr/lib/python3.9/site-packages/redis
FILE_BARU
CREATE
FOLDER_BARU
MKDIR
UPLOAD_FILE
GO
[ .. KEMBALI ]
📄 __init__.py
↓
X
📁 __pycache__/
X
📁 asyncio/
X
📄 backoff.py
↓
X
📄 client.py
↓
X
📄 cluster.py
↓
X
📁 commands/
X
📄 compat.py
↓
X
📄 connection.py
↓
X
📄 crc.py
↓
X
📄 exceptions.py
↓
X
📄 lock.py
↓
X
📄 ocsp.py
↓
X
📄 retry.py
↓
X
📄 sentinel.py
↓
X
📄 typing.py
↓
X
📄 utils.py
↓
X
SAVING...
BERHASIL DIUBAH!
EDITING: typing.py
# from __future__ import annotations from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, Awaitable, Iterable, TypeVar, Union from redis.compat import Protocol if TYPE_CHECKING: from redis.asyncio.connection import ConnectionPool as AsyncConnectionPool from redis.asyncio.connection import Encoder as AsyncEncoder from redis.connection import ConnectionPool, Encoder Number = Union[int, float] EncodedT = Union[bytes, memoryview] DecodedT = Union[str, int, float] EncodableT = Union[EncodedT, DecodedT] AbsExpiryT = Union[int, datetime] ExpiryT = Union[float, timedelta] ZScoreBoundT = Union[float, str] # str allows for the [ or ( prefix BitfieldOffsetT = Union[int, str] # str allows for #x syntax _StringLikeT = Union[bytes, str, memoryview] KeyT = _StringLikeT # Main redis key space PatternT = _StringLikeT # Patterns matched against keys, fields etc FieldT = EncodableT # Fields within hash tables, streams and geo commands KeysT = Union[KeyT, Iterable[KeyT]] ChannelT = _StringLikeT GroupT = _StringLikeT # Consumer group ConsumerT = _StringLikeT # Consumer name StreamIdT = Union[int, _StringLikeT] ScriptTextT = _StringLikeT TimeoutSecT = Union[int, float, _StringLikeT] # Mapping is not covariant in the key type, which prevents # Mapping[_StringLikeT, X] from accepting arguments of type Dict[str, X]. Using # a TypeVar instead of a Union allows mappings with any of the permitted types # to be passed. Care is needed if there is more than one such mapping in a # type signature because they will all be required to be the same key type. AnyKeyT = TypeVar("AnyKeyT", bytes, str, memoryview) AnyFieldT = TypeVar("AnyFieldT", bytes, str, memoryview) AnyChannelT = TypeVar("AnyChannelT", bytes, str, memoryview) class CommandsProtocol(Protocol): connection_pool: Union["AsyncConnectionPool", "ConnectionPool"] def execute_command(self, *args, **options): ... class ClusterCommandsProtocol(CommandsProtocol): encoder: Union["AsyncEncoder", "Encoder"] def execute_command(self, *args, **options) -> Union[Any, Awaitable]: ...
SIMPAN PERUBAHAN