X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/sentry_sdk/integrations
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(6.67 KB)
??
__pycache__
??
_wsgi_common.py
(4.43 KB)
??
aiohttp.py
(11.28 KB)
??
argv.py
(963 B)
??
arq.py
(6.58 KB)
??
asgi.py
(11.54 KB)
??
asyncio.py
(2.98 KB)
??
atexit.py
(1.8 KB)
??
aws_lambda.py
(15.44 KB)
??
beam.py
(5.56 KB)
??
boto3.py
(4.44 KB)
??
bottle.py
(6.32 KB)
??
celery.py
(18.65 KB)
??
chalice.py
(4.66 KB)
??
cloud_resource_context.py
(6.6 KB)
??
dedupe.py
(1.16 KB)
??
django
??
excepthook.py
(2.21 KB)
??
executing.py
(1.99 KB)
??
falcon.py
(7.8 KB)
??
fastapi.py
(4.39 KB)
??
flask.py
(7.72 KB)
??
gcp.py
(8.02 KB)
??
gnu_backtrace.py
(2.86 KB)
??
grpc
??
httpx.py
(4.89 KB)
??
huey.py
(4.59 KB)
??
logging.py
(8.97 KB)
??
loguru.py
(2.98 KB)
??
modules.py
(2.06 KB)
??
opentelemetry
??
pure_eval.py
(4.45 KB)
??
pymongo.py
(5.87 KB)
??
pyramid.py
(7.27 KB)
??
quart.py
(7.2 KB)
??
redis
??
rq.py
(5.28 KB)
??
sanic.py
(11.06 KB)
??
serverless.py
(1.93 KB)
??
socket.py
(2.88 KB)
??
spark
??
sqlalchemy.py
(4.14 KB)
??
starlette.py
(22.67 KB)
??
starlite.py
(9.85 KB)
??
stdlib.py
(8.06 KB)
??
threading.py
(2.87 KB)
??
tornado.py
(7.17 KB)
??
trytond.py
(1.7 KB)
??
wsgi.py
(9.36 KB)
Editing: socket.py
from __future__ import absolute_import import socket from sentry_sdk import Hub from sentry_sdk._types import MYPY from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration if MYPY: from socket import AddressFamily, SocketKind from typing import Tuple, Optional, Union, List __all__ = ["SocketIntegration"] class SocketIntegration(Integration): identifier = "socket" @staticmethod def setup_once(): # type: () -> None """ patches two of the most used functions of socket: create_connection and getaddrinfo(dns resolver) """ _patch_create_connection() _patch_getaddrinfo() def _get_span_description(host, port): # type: (Union[bytes, str, None], Union[str, int, None]) -> str try: host = host.decode() # type: ignore except (UnicodeDecodeError, AttributeError): pass description = "%s:%s" % (host, port) # type: ignore return description def _patch_create_connection(): # type: () -> None real_create_connection = socket.create_connection def create_connection( address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, # type: ignore source_address=None, ): # type: (Tuple[Optional[str], int], Optional[float], Optional[Tuple[Union[bytearray, bytes, str], int]])-> socket.socket hub = Hub.current if hub.get_integration(SocketIntegration) is None: return real_create_connection( address=address, timeout=timeout, source_address=source_address ) with hub.start_span( op=OP.SOCKET_CONNECTION, description=_get_span_description(address[0], address[1]), ) as span: span.set_data("address", address) span.set_data("timeout", timeout) span.set_data("source_address", source_address) return real_create_connection( address=address, timeout=timeout, source_address=source_address ) socket.create_connection = create_connection # type: ignore def _patch_getaddrinfo(): # type: () -> None real_getaddrinfo = socket.getaddrinfo def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): # type: (Union[bytes, str, None], Union[str, int, None], int, int, int, int) -> List[Tuple[AddressFamily, SocketKind, int, str, Union[Tuple[str, int], Tuple[str, int, int, int]]]] hub = Hub.current if hub.get_integration(SocketIntegration) is None: return real_getaddrinfo(host, port, family, type, proto, flags) with hub.start_span( op=OP.SOCKET_DNS, description=_get_span_description(host, port) ) as span: span.set_data("host", host) span.set_data("port", port) return real_getaddrinfo(host, port, family, type, proto, flags) socket.getaddrinfo = getaddrinfo # type: ignore
Upload File
Create Folder