PATH: //opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations
FILE_BARU
CREATE
FOLDER_BARU
MKDIR
UPLOAD_FILE
GO
[ .. KEMBALI ]
📄 __init__.py
↓
X
📁 __pycache__/
X
📄 _wsgi_common.py
↓
X
📄 aiohttp.py
↓
X
📄 argv.py
↓
X
📄 arq.py
↓
X
📄 asgi.py
↓
X
📄 asyncio.py
↓
X
📄 atexit.py
↓
X
📄 aws_lambda.py
↓
X
📄 beam.py
↓
X
📄 boto3.py
↓
X
📄 bottle.py
↓
X
📄 celery.py
↓
X
📄 chalice.py
↓
X
📄 cloud_resource_context.py
↓
X
📄 dedupe.py
↓
X
📁 django/
X
📄 excepthook.py
↓
X
📄 executing.py
↓
X
📄 falcon.py
↓
X
📄 fastapi.py
↓
X
📄 flask.py
↓
X
📄 gcp.py
↓
X
📄 gnu_backtrace.py
↓
X
📁 grpc/
X
📄 httpx.py
↓
X
📄 huey.py
↓
X
📄 logging.py
↓
X
📄 loguru.py
↓
X
📄 modules.py
↓
X
📁 opentelemetry/
X
📄 pure_eval.py
↓
X
📄 pymongo.py
↓
X
📄 pyramid.py
↓
X
📄 quart.py
↓
X
📁 redis/
X
📄 rq.py
↓
X
📄 sanic.py
↓
X
📄 serverless.py
↓
X
📄 socket.py
↓
X
📁 spark/
X
📄 sqlalchemy.py
↓
X
📄 starlette.py
↓
X
📄 starlite.py
↓
X
📄 stdlib.py
↓
X
📄 threading.py
↓
X
📄 tornado.py
↓
X
📄 trytond.py
↓
X
📄 wsgi.py
↓
X
SAVING...
BERHASIL DIUBAH!
EDITING: socket.py
from __future__ import absolute_import import socket from sentry_sdk import Hub from sentry_sdk._types import MYPY from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration if MYPY: from socket import AddressFamily, SocketKind from typing import Tuple, Optional, Union, List __all__ = ["SocketIntegration"] class SocketIntegration(Integration): identifier = "socket" @staticmethod def setup_once(): # type: () -> None """ patches two of the most used functions of socket: create_connection and getaddrinfo(dns resolver) """ _patch_create_connection() _patch_getaddrinfo() def _get_span_description(host, port): # type: (Union[bytes, str, None], Union[str, int, None]) -> str try: host = host.decode() # type: ignore except (UnicodeDecodeError, AttributeError): pass description = "%s:%s" % (host, port) # type: ignore return description def _patch_create_connection(): # type: () -> None real_create_connection = socket.create_connection def create_connection( address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, # type: ignore source_address=None, ): # type: (Tuple[Optional[str], int], Optional[float], Optional[Tuple[Union[bytearray, bytes, str], int]])-> socket.socket hub = Hub.current if hub.get_integration(SocketIntegration) is None: return real_create_connection( address=address, timeout=timeout, source_address=source_address ) with hub.start_span( op=OP.SOCKET_CONNECTION, description=_get_span_description(address[0], address[1]), ) as span: span.set_data("address", address) span.set_data("timeout", timeout) span.set_data("source_address", source_address) return real_create_connection( address=address, timeout=timeout, source_address=source_address ) socket.create_connection = create_connection # type: ignore def _patch_getaddrinfo(): # type: () -> None real_getaddrinfo = socket.getaddrinfo def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): # type: (Union[bytes, str, None], Union[str, int, None], int, int, int, int) -> List[Tuple[AddressFamily, SocketKind, int, str, Union[Tuple[str, int], Tuple[str, int, int, int]]]] hub = Hub.current if hub.get_integration(SocketIntegration) is None: return real_getaddrinfo(host, port, family, type, proto, flags) with hub.start_span( op=OP.SOCKET_DNS, description=_get_span_description(host, port) ) as span: span.set_data("host", host) span.set_data("port", port) return real_getaddrinfo(host, port, family, type, proto, flags) socket.getaddrinfo = getaddrinfo # type: ignore
SIMPAN PERUBAHAN