# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause
import datetime
import functools
import logging
import os
import sys
import time
import stem
from asgiref.sync import async_to_sync
from stem import process
from stem.control import Controller, EventType, Listener
from stem.descriptor.remote import DescriptorDownloader
from onbasca.onbasca import config, constants
from onbasca.onbasca.http_session import TimedSession
from onbasca.onbasca.models.consensus import Consensus
from onbasca.onbasca.models.measurement import Measurement
from onbasca.onbasca.models.relaydesc import RelayDesc
from onbasca.onbasca.models.routerstatus import RouterStatus
from onbasca.onbasca.models.webserver import WebServer
from onbasca.onbasca.util import bytes_range_from_head
logger = logging.getLogger(__name__)
downloader = DescriptorDownloader()
[docs]
class TorControl:
def __init__(self, tor_config=None, controller=None):
self.controller = controller
self.tor_config = tor_config
self.consensus = None
self.endpoints = None
self.testing_network = False
[docs]
def set_config(self, tor_config_dict):
"""Set torrc options at runtime."""
try:
self.controller.set_options(tor_config_dict)
except Exception as e:
logger.exception(e)
sys.exit(1)
[docs]
def set_config_can_fail(self, tor_config_dict):
logger.debug("Setting tor options that can fail.")
for key, value in tor_config_dict.items():
try:
self.controller.set_conf(key, value)
except Exception as e:
logger.debug("Failed to set option %s", e)
[docs]
def obtain_tor_controller(self, port=None, socket=None, pw=None):
logger.info("Starting or connectiong to Tor.")
if port:
logger.debug("port")
self.controller = Controller.from_port(port=port)
if socket:
logger.debug("socket")
self.controller = Controller.from_socket_file(socket)
if pw:
logger.debug("pw")
self.controller.authenticate(pw)
else:
self.controller.authenticate()
return self.controller
[docs]
def launch_or_connect_tor(
self,
port=None,
socket=None,
pw=None,
tor_config=config.TOR_CONFIG,
):
logger.info("Launching or connecting to tor.")
logger.debug("port %s", port)
if port or socket:
logger.debug("Obtaining tor from %s", port or socket)
self.controller = self.obtain_tor_controller(port, socket, pw)
self.set_config(config.TOR_CONFIG_BASE)
if not self.controller or not self.controller.is_alive():
tor_config = tor_config
logger.info("Launching tor with config %s", tor_config)
os.makedirs(tor_config["DataDirectory"], mode=0o700, exist_ok=True)
process.launch_tor_with_config(tor_config, take_ownership=True)
logger.info("Launched tor.")
port = tor_config.get("ControlPort", None)
port = int(port) if port else None
socket = tor_config.get("ControlSocket", None)
logger.debug("port %s, socket %s", port, socket)
self.controller = self.obtain_tor_controller(port, socket, pw)
logger.info("Obtained tor controller.")
self.set_config_can_fail(config.TOR_CONFIG_CAN_FAIL)
self.controller.set_options(config.TOR_CONFIG_RUNTIME)
if self.controller.get_conf("TestingTorNetwork") == "1":
self.testing_network = True
dirport = self.controller.get_conf("DirPort", None)
if dirport:
self.endpoints = [stem.DirPort("127.0.0.1", dirport)]
self.controller.add_event_listener(
functools.partial(self.handle_new_consensus_event),
EventType.NEWCONSENSUS,
)
# Set this object event listener only for onbasca, not onbrisca
if config.BRIDGESCAN:
return self.controller
self.controller.add_event_listener(
functools.partial(self.handle_new_descriptor_event),
EventType.NEWDESC,
)
return self.controller
[docs]
def get_socks_address(self):
return self.controller.get_listeners(Listener.SOCKS)[0]
[docs]
def obtain_consensus(self, path=None):
logger.info("Obtaining consensus.")
router_statuses = []
if not self.controller:
logger.error("Can not obtain router statuses without controller.")
# TODO: exit here
return []
while not router_statuses:
try:
router_statuses = self.controller.get_network_statuses()
# In testing network it might take some seconds to initialize
except stem.DescriptorUnavailable:
time.sleep(2)
return router_statuses
[docs]
def handle_new_consensus_event(self, event):
"""
dict_keys(['arrived_at', '_parsed_content', '_raw_content', '_str',
'_hash', 'type', 'positional_args', 'keyword_args',
'consensus_content', 'desc'])
"""
logger.info(
"New consensus event. Number of relays %s", len(event.desc)
)
self.consensus = Consensus.objects.from_router_statuses(
event.desc,
valid_after=datetime.datetime.utcnow().replace(
# minute=0, # not for chutney
second=0,
microsecond=0,
),
testing_network=self.testing_network,
)
consensus_params_dict = self.obtain_consensus_params()
self.consensus.set_params(consensus_params_dict)
[docs]
def obtain_consensus_params(self):
"""Obtain current consensus params fields and store them as an attr.
It is not possible to obtain them from `get_network_statuses` via
control port, only via a cached file.
"""
logger.debug("Obtaining consensus params")
# For now, assume congestion control in testing network.
# Later on, configure tests for congestion control or not.
if self.testing_network:
logger.debug("Testing network, assuming congestion control.")
return {"cc_alg": 2, "bwscanner_cc": 1}
consensus = self.controller.get_info(
"dir/status-vote/current/consensus"
)
# Create a dictionary from all the consensus lines.
consensus_dict = dict(
[
(line.split(" ")[0], line.split()[1:])
for line in consensus.split("\n")
]
)
# Create a dictionary from the consensus `params` line.
consensus_params_dict = dict(
[
(p.split("=")[0], int(p.split("=")[1]))
for p in consensus_dict.get("params", [])
]
)
logger.debug("Consensus params: %s", consensus_params_dict.keys())
return consensus_params_dict
[docs]
def handle_new_descriptor_event(self, event):
logger.info("New desc event.")
for fingerprint, _nickname in event.relays:
_ = RelayDesc.objects.from_relay_desc(
self.controller.get_server_descriptor(fingerprint)
)
[docs]
def obtain_latest_relays(self, latest_consensus):
self.consensus = latest_consensus
# For now, only run integration tests for relays' scanning,
# not bridges' scanning
if config.BRIDGESCAN and not self.testing_network:
async_to_sync(self.consensus.aset_exits_non_exits_bridges)()
return self.consensus
Consensus.objects.set_exits_non_exits(
self.consensus, self.testing_network
)
return self.consensus
[docs]
def obtain_relays(self):
now = datetime.datetime.utcnow()
logger.info("Obtaining relays.")
router_statuses = self.obtain_consensus()
router_statuses_list = list(router_statuses)
logger.info(
"Number of relays in consensus %s", len(router_statuses_list)
)
server_descriptors = self.controller.get_server_descriptors()
server_descriptors_list = list(server_descriptors)
server_descriptors_dict = dict(
[(r.fingerprint, r) for r in server_descriptors_list]
)
self.consensus = Consensus.objects.from_router_statuses(
router_statuses_list,
valid_after=now.replace(
# minute=0, # not for chutney
second=0,
microsecond=0,
),
testing_network=self.testing_network,
)
consensus_params_dict = self.obtain_consensus_params()
self.consensus.set_params(consensus_params_dict)
for router_status in router_statuses_list:
if server_descriptors_dict.get(router_status.fingerprint, None):
_ = RelayDesc.objects.from_relay_desc(
server_descriptors_dict[router_status.fingerprint]
)
logger.info("Updated/created relays.")
return self.consensus
[docs]
def attach_stream_to_circuit(self, circuit_id, stream_event):
if (
stream_event.status
and stream_event.status == "NEW"
and stream_event.purpose == "USER"
):
logger.debug(
"Attaching stream %s to circuit %s.",
circuit_id,
stream_event.id,
)
try:
self.controller.attach_stream(stream_event.id, circuit_id)
# So far we never saw this error.
# Catch all
except Exception as e:
logger.debug(
"Error attaching stream %s to circ %s: %s",
stream_event.id,
circuit_id,
e,
)
[docs]
def fetch_http_head(self, circuit_id, session, url):
logger.debug("Obtaining HTTP HEAD.")
stream_listener = functools.partial(
self.attach_stream_to_circuit, circuit_id
)
logger.debug("Attaching stream.")
self.controller.add_event_listener(stream_listener, EventType.STREAM)
head = None
try:
head = session.head(url)
except Exception as e:
# Max retries exceeded with url
logger.debug(e)
return e
finally:
self.controller.remove_event_listener(stream_listener)
return head
[docs]
def measure_bw(
self,
circuit_id,
session,
url,
bytes_range,
size=constants.INITIAL_DL_BYTES,
):
logger.debug(
"Measuring bw with circuit %s, web server %s, bytes range %s "
"and size %s.",
circuit_id,
url,
bytes_range,
size,
)
start_time = time.monotonic()
try:
response = session.get(url, range=bytes_range)
# Catch any request exception.
# `RequestException` doesn't include Timeout
except Exception as e:
logger.debug(e)
return e
finally:
self.controller.close_circuit(circuit_id)
end_time = time.monotonic()
time_delta = end_time - start_time
logger.debug(
"Time delta getting %s bytes: %s seconds.", size, time_delta
)
logger.debug(
"GET response elapsed seconds: %s.",
response.elapsed.total_seconds(),
)
measured_bandwidth = size / (time_delta)
logger.debug(
"Measured bandwidth: %s Bytes/seconds",
measured_bandwidth,
)
return measured_bandwidth
[docs]
def measure_relay(self, relay, session_kwargs, socks_address):
"""
https://gitlab.torproject.org/tpo/network-health/sbws/-/issues/40048:
It's being stored in which position a relay is being measured and
whether it is an exit.
https://gitlab.torproject.org/tpo/network-health/sbws/-/issues/40027:
Building circuits just use stem's API, not any special class.
"""
logger.debug("Measuring relay %s", relay)
measurement, _ = Measurement.objects.update_or_create(
consensus=self.consensus,
relay=relay,
defaults={"queued_at": datetime.datetime.utcnow()},
)
measurement.save()
relay.set_measurement_latest(measurement)
relay.routerstatus_latest().set_measurement_latest(measurement)
now = datetime.datetime.utcnow()
measurement.attempted_at = now
measurement.webserver = WebServer.objects.select_random()
session = TimedSession(
socks_address,
verify=measurement.webserver.verify,
**session_kwargs,
)
measurement.save()
path = relay.routerstatus_latest().helper_path()
if not path:
measurement.error = "no path"
measurement.finished_at = datetime.datetime.utcnow()
measurement.save()
return measurement
if path[0] == relay.fingerprint:
as_exit = False
helper = RouterStatus.objects.get(
fingerprint=path[1], consensus=self.consensus
)
else:
as_exit = True
helper = RouterStatus.objects.get(
fingerprint=path[0], consensus=self.consensus
)
measurement.helper = helper.relay
measurement.as_exit = as_exit
measurement.save()
# logger.debug("path %s", path)
logger.debug("Creating circuit.")
try:
circuit_id = self.controller.new_circuit(path, await_build=True)
except Exception as e:
# no such router, channel close, destroyed
logger.debug(e)
measurement.error = e
measurement.finished_at = datetime.datetime.utcnow()
logger.warning("Error creating circuit with path %s", path)
measurement.save()
return measurement
logger.debug("HTTP HEAD with %s", measurement.webserver.url)
head = self.fetch_http_head(
circuit_id, session, measurement.webserver.url
)
if isinstance(head, Exception):
measurement.error = head
measurement.finished_at = datetime.datetime.utcnow()
# webserver.add_stream_error(measurement)
logger.warning("Error fetching HEAD with path %s", path)
measurement.save()
return measurement
size = (
relay.routerstatus_latest().bandwidth * constants.DL_SECS
or constants.INITIAL_DL_BYTES
)
logger.debug("Download size %s.", size)
bytes_range = bytes_range_from_head(head, size=size)
measured_bandwidth = self.measure_bw(
circuit_id, session, measurement.webserver.url, bytes_range, size
)
if isinstance(measured_bandwidth, Exception):
measurement.error = measured_bandwidth
measurement.finished_at = datetime.datetime.utcnow()
logger.warning("Error fetching bytes with path %s", path)
measurement.save()
return measurement
measurement.bandwidth = measured_bandwidth
measurement.finished_at = datetime.datetime.utcnow()
measurement.save()
logger.info("Created measurement %s", measurement)
return measurement