Source code for onbrisca.http_client
# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import logging
import ssl
from asyncio.exceptions import TimeoutError
import aiohttp
from aiohttp_socks import ProxyConnector
from python_socks._errors import ProxyError
from onbasca.onbasca import config, constants
logger = logging.getLogger(__name__)
[docs]
class HttpClient:
def __init__(
self,
socks_address,
verify="False",
timeout=config.DL_TIMEOUT_SECS,
**session_kwargs,
):
self.proxies = {
"http": "socks5h://{}:{}".format(*socks_address),
"https": "socks5h://{}:{}".format(*socks_address),
} # FIXME: it looks like it is not possible to use `socks5h`
self.connector = ProxyConnector.from_url(
"socks5://{}:{}".format(*socks_address), rdns=True
)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.set_headers(**session_kwargs)
self.set_verify(verify)
self.create_trace_config()
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout,
headers=self.headers,
trace_configs=[self.trace_config],
)
[docs]
def create_trace_config(self):
self.trace_config = aiohttp.TraceConfig()
self.trace_config.on_request_start.append(self.on_request_start)
self.trace_config.on_request_end.append(self.on_request_end)
return self.trace_config
[docs]
async def on_request_start(self, session, context, params):
context.start_time = asyncio.get_event_loop().time()
[docs]
async def on_request_end(self, session, context, params):
context.end_time = asyncio.get_event_loop().time()
elapsed_time = context.end_time - context.start_time
params.response.elapsed_time = elapsed_time
[docs]
def set_verify(self, verify):
# Because WebServer.verify is an string and requests accepts both
# string or bool.
if verify.lower() == "true":
self.verify = True
elif verify.lower() == "false":
self.verify = False
else:
sslcontext = ssl.create_default_context()
sslcontext.load_cert_chain(verify)
self.verify = sslcontext
[docs]
async def aclose(self):
logger.info("Closing session...")
await self.session.close()
logger.info("Session closed.")
[docs]
async def ahead(self, url):
logger.debug("HTTP HEAD %s. Verify: %s.", url, self.verify)
try:
async with self.session.head(
url, allow_redirects=False, ssl=self.verify
) as response:
return response
except TimeoutError:
# No desc for TimeoutError
msg = "TimeoutError requesting HEAD to {}.".format(url)
logger.debug(msg)
return msg
except aiohttp.ClientError as e:
logger.debug("Error requesting HEAD to %s: %s", url, e)
return e
except ProxyError as e:
logger.debug("Error requesting HEAD to %s: %s", url, e)
return e
except Exception as e:
logger.warning("%s", e)
return e
[docs]
async def aget(self, url, **session_kwargs):
logger.debug("HTTP GET %s. Verify: %s.", url, self.verify)
headers = constants.HTTP_GET_HEADERS
bytes_range = session_kwargs.pop("range", None)
if bytes_range:
headers["Range"] = bytes_range
try:
async with self.session.get(
url, headers=headers, allow_redirects=False, ssl=self.verify
) as response:
return response
except TimeoutError:
# No desc for TimeoutError
msg = "TimeoutError requesting GET to {}.".format(url)
logger.debug(msg)
return msg
except aiohttp.ClientError as e:
logger.debug("Error requesting GET to %s: %s", url, e)
return e
except ProxyError as e:
logger.debug("Error requesting GET to %s: %s", url, e)
return e
except Exception as e:
logger.warning("%s", e)
return e