Source code for onbasca.onbasca.torcontrol

# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause

import datetime
import functools
import logging
import os
import sys
import time

import stem
from asgiref.sync import async_to_sync
from stem import process
from stem.control import Controller, EventType, Listener
from stem.descriptor.remote import DescriptorDownloader

from onbasca.onbasca import config, constants
from onbasca.onbasca.http_session import TimedSession
from onbasca.onbasca.models.consensus import Consensus
from onbasca.onbasca.models.measurement import Measurement
from onbasca.onbasca.models.relaydesc import RelayDesc
from onbasca.onbasca.models.routerstatus import RouterStatus
from onbasca.onbasca.models.webserver import WebServer
from onbasca.onbasca.util import bytes_range_from_head

logger = logging.getLogger(__name__)
downloader = DescriptorDownloader()


[docs] class TorControl: def __init__(self, tor_config=None, controller=None): self.controller = controller self.tor_config = tor_config self.consensus = None self.endpoints = None self.testing_network = False
[docs] def set_config(self, tor_config_dict): """Set torrc options at runtime.""" try: self.controller.set_options(tor_config_dict) except Exception as e: logger.exception(e) sys.exit(1)
[docs] def set_config_can_fail(self, tor_config_dict): logger.debug("Setting tor options that can fail.") for key, value in tor_config_dict.items(): try: self.controller.set_conf(key, value) except Exception as e: logger.debug("Failed to set option %s", e)
[docs] def obtain_tor_controller(self, port=None, socket=None, pw=None): logger.info("Starting or connectiong to Tor.") if port: logger.debug("port") self.controller = Controller.from_port(port=port) if socket: logger.debug("socket") self.controller = Controller.from_socket_file(socket) if pw: logger.debug("pw") self.controller.authenticate(pw) else: self.controller.authenticate() return self.controller
[docs] def launch_or_connect_tor( self, port=None, socket=None, pw=None, tor_config=config.TOR_CONFIG, ): logger.info("Launching or connecting to tor.") logger.debug("port %s", port) if port or socket: logger.debug("Obtaining tor from %s", port or socket) self.controller = self.obtain_tor_controller(port, socket, pw) self.set_config(config.TOR_CONFIG_BASE) if not self.controller or not self.controller.is_alive(): tor_config = tor_config logger.info("Launching tor with config %s", tor_config) os.makedirs(tor_config["DataDirectory"], mode=0o700, exist_ok=True) process.launch_tor_with_config(tor_config, take_ownership=True) logger.info("Launched tor.") port = tor_config.get("ControlPort", None) port = int(port) if port else None socket = tor_config.get("ControlSocket", None) logger.debug("port %s, socket %s", port, socket) self.controller = self.obtain_tor_controller(port, socket, pw) logger.info("Obtained tor controller.") self.set_config_can_fail(config.TOR_CONFIG_CAN_FAIL) self.controller.set_options(config.TOR_CONFIG_RUNTIME) if self.controller.get_conf("TestingTorNetwork") == "1": self.testing_network = True dirport = self.controller.get_conf("DirPort", None) if dirport: self.endpoints = [stem.DirPort("127.0.0.1", dirport)] self.controller.add_event_listener( functools.partial(self.handle_new_consensus_event), EventType.NEWCONSENSUS, ) # Set this object event listener only for onbasca, not onbrisca if config.BRIDGESCAN: return self.controller self.controller.add_event_listener( functools.partial(self.handle_new_descriptor_event), EventType.NEWDESC, ) return self.controller
[docs] def get_socks_address(self): return self.controller.get_listeners(Listener.SOCKS)[0]
[docs] def obtain_consensus(self, path=None): logger.info("Obtaining consensus.") router_statuses = [] if not self.controller: logger.error("Can not obtain router statuses without controller.") # TODO: exit here return [] while not router_statuses: try: router_statuses = self.controller.get_network_statuses() # In testing network it might take some seconds to initialize except stem.DescriptorUnavailable: time.sleep(2) return router_statuses
[docs] def handle_new_consensus_event(self, event): """ dict_keys(['arrived_at', '_parsed_content', '_raw_content', '_str', '_hash', 'type', 'positional_args', 'keyword_args', 'consensus_content', 'desc']) """ logger.info( "New consensus event. Number of relays %s", len(event.desc) ) self.consensus = Consensus.objects.from_router_statuses( event.desc, valid_after=datetime.datetime.utcnow().replace( # minute=0, # not for chutney second=0, microsecond=0, ), testing_network=self.testing_network, ) consensus_params_dict = self.obtain_consensus_params() self.consensus.set_params(consensus_params_dict)
[docs] def obtain_consensus_params(self): """Obtain current consensus params fields and store them as an attr. It is not possible to obtain them from `get_network_statuses` via control port, only via a cached file. """ logger.debug("Obtaining consensus params") # For now, assume congestion control in testing network. # Later on, configure tests for congestion control or not. if self.testing_network: logger.debug("Testing network, assuming congestion control.") return {"cc_alg": 2, "bwscanner_cc": 1} consensus = self.controller.get_info( "dir/status-vote/current/consensus" ) # Create a dictionary from all the consensus lines. consensus_dict = dict( [ (line.split(" ")[0], line.split()[1:]) for line in consensus.split("\n") ] ) # Create a dictionary from the consensus `params` line. consensus_params_dict = dict( [ (p.split("=")[0], int(p.split("=")[1])) for p in consensus_dict.get("params", []) ] ) logger.debug("Consensus params: %s", consensus_params_dict.keys()) return consensus_params_dict
[docs] def handle_new_descriptor_event(self, event): logger.info("New desc event.") for fingerprint, _nickname in event.relays: _ = RelayDesc.objects.from_relay_desc( self.controller.get_server_descriptor(fingerprint) )
[docs] def obtain_latest_relays(self, latest_consensus): self.consensus = latest_consensus # For now, only run integration tests for relays' scanning, # not bridges' scanning if config.BRIDGESCAN and not self.testing_network: async_to_sync(self.consensus.aset_exits_non_exits_bridges)() return self.consensus Consensus.objects.set_exits_non_exits( self.consensus, self.testing_network ) return self.consensus
[docs] def obtain_relays(self): now = datetime.datetime.utcnow() logger.info("Obtaining relays.") router_statuses = self.obtain_consensus() router_statuses_list = list(router_statuses) logger.info( "Number of relays in consensus %s", len(router_statuses_list) ) server_descriptors = self.controller.get_server_descriptors() server_descriptors_list = list(server_descriptors) server_descriptors_dict = dict( [(r.fingerprint, r) for r in server_descriptors_list] ) self.consensus = Consensus.objects.from_router_statuses( router_statuses_list, valid_after=now.replace( # minute=0, # not for chutney second=0, microsecond=0, ), testing_network=self.testing_network, ) consensus_params_dict = self.obtain_consensus_params() self.consensus.set_params(consensus_params_dict) for router_status in router_statuses_list: if server_descriptors_dict.get(router_status.fingerprint, None): _ = RelayDesc.objects.from_relay_desc( server_descriptors_dict[router_status.fingerprint] ) logger.info("Updated/created relays.") return self.consensus
[docs] def attach_stream_to_circuit(self, circuit_id, stream_event): if ( stream_event.status and stream_event.status == "NEW" and stream_event.purpose == "USER" ): logger.debug( "Attaching stream %s to circuit %s.", circuit_id, stream_event.id, ) try: self.controller.attach_stream(stream_event.id, circuit_id) # So far we never saw this error. # Catch all except Exception as e: logger.debug( "Error attaching stream %s to circ %s: %s", stream_event.id, circuit_id, e, )
[docs] def fetch_http_head(self, circuit_id, session, url): logger.debug("Obtaining HTTP HEAD.") stream_listener = functools.partial( self.attach_stream_to_circuit, circuit_id ) logger.debug("Attaching stream.") self.controller.add_event_listener(stream_listener, EventType.STREAM) head = None try: head = session.head(url) except Exception as e: # Max retries exceeded with url logger.debug(e) return e finally: self.controller.remove_event_listener(stream_listener) return head
[docs] def measure_bw( self, circuit_id, session, url, bytes_range, size=constants.INITIAL_DL_BYTES, ): logger.debug( "Measuring bw with circuit %s, web server %s, bytes range %s " "and size %s.", circuit_id, url, bytes_range, size, ) start_time = time.monotonic() try: response = session.get(url, range=bytes_range) # Catch any request exception. # `RequestException` doesn't include Timeout except Exception as e: logger.debug(e) return e finally: self.controller.close_circuit(circuit_id) end_time = time.monotonic() time_delta = end_time - start_time logger.debug( "Time delta getting %s bytes: %s seconds.", size, time_delta ) logger.debug( "GET response elapsed seconds: %s.", response.elapsed.total_seconds(), ) measured_bandwidth = size / (time_delta) logger.debug( "Measured bandwidth: %s Bytes/seconds", measured_bandwidth, ) return measured_bandwidth
[docs] def measure_relay(self, relay, session_kwargs, socks_address): """ https://gitlab.torproject.org/tpo/network-health/sbws/-/issues/40048: It's being stored in which position a relay is being measured and whether it is an exit. https://gitlab.torproject.org/tpo/network-health/sbws/-/issues/40027: Building circuits just use stem's API, not any special class. """ logger.debug("Measuring relay %s", relay) measurement, _ = Measurement.objects.update_or_create( consensus=self.consensus, relay=relay, defaults={"queued_at": datetime.datetime.utcnow()}, ) measurement.save() relay.set_measurement_latest(measurement) relay.routerstatus_latest().set_measurement_latest(measurement) now = datetime.datetime.utcnow() measurement.attempted_at = now measurement.webserver = WebServer.objects.select_random() session = TimedSession( socks_address, verify=measurement.webserver.verify, **session_kwargs, ) measurement.save() path = relay.routerstatus_latest().helper_path() if not path: measurement.error = "no path" measurement.finished_at = datetime.datetime.utcnow() measurement.save() return measurement if path[0] == relay.fingerprint: as_exit = False helper = RouterStatus.objects.get( fingerprint=path[1], consensus=self.consensus ) else: as_exit = True helper = RouterStatus.objects.get( fingerprint=path[0], consensus=self.consensus ) measurement.helper = helper.relay measurement.as_exit = as_exit measurement.save() # logger.debug("path %s", path) logger.debug("Creating circuit.") try: circuit_id = self.controller.new_circuit(path, await_build=True) except Exception as e: # no such router, channel close, destroyed logger.debug(e) measurement.error = e measurement.finished_at = datetime.datetime.utcnow() logger.warning("Error creating circuit with path %s", path) measurement.save() return measurement logger.debug("HTTP HEAD with %s", measurement.webserver.url) head = self.fetch_http_head( circuit_id, session, measurement.webserver.url ) if isinstance(head, Exception): measurement.error = head measurement.finished_at = datetime.datetime.utcnow() # webserver.add_stream_error(measurement) logger.warning("Error fetching HEAD with path %s", path) measurement.save() return measurement size = ( relay.routerstatus_latest().bandwidth * constants.DL_SECS or constants.INITIAL_DL_BYTES ) logger.debug("Download size %s.", size) bytes_range = bytes_range_from_head(head, size=size) measured_bandwidth = self.measure_bw( circuit_id, session, measurement.webserver.url, bytes_range, size ) if isinstance(measured_bandwidth, Exception): measurement.error = measured_bandwidth measurement.finished_at = datetime.datetime.utcnow() logger.warning("Error fetching bytes with path %s", path) measurement.save() return measurement measurement.bandwidth = measured_bandwidth measurement.finished_at = datetime.datetime.utcnow() measurement.save() logger.info("Created measurement %s", measurement) return measurement