Source code for onbrisca.http_client

# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import logging
import ssl
from asyncio.exceptions import TimeoutError

import aiohttp
from aiohttp_socks import ProxyConnector
from python_socks._errors import ProxyError

from onbasca.onbasca import config, constants

logger = logging.getLogger(__name__)


[docs] class HttpClient: def __init__( self, socks_address, verify="False", timeout=config.DL_TIMEOUT_SECS, **session_kwargs, ): self.proxies = { "http": "socks5h://{}:{}".format(*socks_address), "https": "socks5h://{}:{}".format(*socks_address), } # FIXME: it looks like it is not possible to use `socks5h` self.connector = ProxyConnector.from_url( "socks5://{}:{}".format(*socks_address), rdns=True ) self.timeout = aiohttp.ClientTimeout(total=timeout) self.set_headers(**session_kwargs) self.set_verify(verify) self.create_trace_config() self.session = aiohttp.ClientSession( connector=self.connector, timeout=self.timeout, headers=self.headers, trace_configs=[self.trace_config], )
[docs] def create_trace_config(self): self.trace_config = aiohttp.TraceConfig() self.trace_config.on_request_start.append(self.on_request_start) self.trace_config.on_request_end.append(self.on_request_end) return self.trace_config
[docs] async def on_request_start(self, session, context, params): context.start_time = asyncio.get_event_loop().time()
[docs] async def on_request_end(self, session, context, params): context.end_time = asyncio.get_event_loop().time() elapsed_time = context.end_time - context.start_time params.response.elapsed_time = elapsed_time
[docs] def set_headers(self, **session_kwargs): self.headers = {} self.headers[ "Tor-Bandwidth-Scanner-Nickname" ] = constants.HTTP_HEADERS["Tor-Bandwidth-Scanner-Nickname"].format( session_kwargs.get("nickname", "") ) self.headers["Tor-Bandwidth-Scanner-UUID"] = constants.HTTP_HEADERS[ "Tor-Bandwidth-Scanner-UUID" ].format(session_kwargs.get("uuid", "")) self.headers["User-Agent"] = constants.HTTP_HEADERS[ "User-Agent" ].format(session_kwargs.get("tor_version", ""))
[docs] def set_verify(self, verify): # Because WebServer.verify is an string and requests accepts both # string or bool. if verify.lower() == "true": self.verify = True elif verify.lower() == "false": self.verify = False else: sslcontext = ssl.create_default_context() sslcontext.load_cert_chain(verify) self.verify = sslcontext
[docs] async def aclose(self): logger.info("Closing session...") await self.session.close() logger.info("Session closed.")
[docs] async def ahead(self, url): logger.debug("HTTP HEAD %s. Verify: %s.", url, self.verify) try: async with self.session.head( url, allow_redirects=False, ssl=self.verify ) as response: return response except TimeoutError: # No desc for TimeoutError msg = "TimeoutError requesting HEAD to {}.".format(url) logger.debug(msg) return msg except aiohttp.ClientError as e: logger.debug("Error requesting HEAD to %s: %s", url, e) return e except ProxyError as e: logger.debug("Error requesting HEAD to %s: %s", url, e) return e except Exception as e: logger.warning("%s", e) return e
[docs] async def aget(self, url, **session_kwargs): logger.debug("HTTP GET %s. Verify: %s.", url, self.verify) headers = constants.HTTP_GET_HEADERS bytes_range = session_kwargs.pop("range", None) if bytes_range: headers["Range"] = bytes_range try: async with self.session.get( url, headers=headers, allow_redirects=False, ssl=self.verify ) as response: return response except TimeoutError: # No desc for TimeoutError msg = "TimeoutError requesting GET to {}.".format(url) logger.debug(msg) return msg except aiohttp.ClientError as e: logger.debug("Error requesting GET to %s: %s", url, e) return e except ProxyError as e: logger.debug("Error requesting GET to %s: %s", url, e) return e except Exception as e: logger.warning("%s", e) return e