Skip to content

Commit

Permalink
timeout adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
mcg1969 committed Feb 2, 2025
1 parent 71d4a87 commit 17e2b8e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 49 deletions.
136 changes: 97 additions & 39 deletions anaconda_anon_usage/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
"""

import argparse
import os
import sys
import time
from threading import Thread
from urllib.parse import urljoin

from conda.base.context import context
from conda.base.context import Context, context, locate_prefix_by_name
from conda.gateways.connection.session import get_session
from conda.models.channel import Channel

Expand All @@ -25,47 +27,46 @@
COM_REPO = "https://repo.anaconda.com/pkgs/"
REPOS = (CLD_REPO, COM_REPO, ORG_REPO)
HEARTBEAT_PATH = "noarch/activate-0.0.0-0.conda"

# How long to attempt the connection. When a connection to our
# repository is blocked or slow, a long timeout would lead to
# a slow activation and a poor user experience. This is a total
# timeout value, split between attempts and backoffs.
TIMEOUT = 0.4
RETRIES = 3
# timeout value, inclusive of all retries.
TIMEOUT = 0.75 # seconds
ATTEMPTS = 3


def _print(msg, *args, standalone=False, error=False):
def _print(msg, *args, error=False):
global VERBOSE
global STANDALONE
if not (VERBOSE or utils.DEBUG or error):
return
if standalone and not STANDALONE:
return
# It is very important that these messages are printed to stderr
# when called from within the activate script. Otherwise they
# will insert themselves into the activation command set
ofile = sys.stdout if STANDALONE and not (error or utils.DEBUG) else sys.stderr
print(msg % args, file=ofile)


def _ping(session, url, wait, timeout):
def _ping(session, url, timeout):
try:
# A short timeout is necessary here so that the activation
# is not unduly delayed by a blocked internet connection
start_time = time.perf_counter()
response = session.head(url, proxies=session.proxies, timeout=timeout)
_print("Status code (expect 404): %s", response.status_code)
delta = time.perf_counter() - start_time
_print(
"Success after %.3fs; code (expect 404): %d", delta, response.status_code
)
except Exception as exc:
if type(exc).__name__ != "ConnectionError":
_print("Unexpected heartbeat error: %s", exc, error=True)
elif "timeout=" in str(exc):
_print("Timeout exceeded; heartbeat likely not sent.")
delta = time.perf_counter() - start_time
_print("NO heartbeat sent after %.3fs.", delta)


def attempt_heartbeat(channel=None, path=None, wait=False, dry_run=False):
line = "------------------------"
_print(line, standalone=True)
_print("anaconda-anon-usage heartbeat", standalone=True)
_print(line, standalone=True)

def attempt_heartbeat(prefix=None, dry_run=False, channel=None, path=None):
if not hasattr(context, "_aau_initialized"):
from . import patch

Expand All @@ -84,43 +85,100 @@ def attempt_heartbeat(channel=None, path=None, wait=False, dry_run=False):
break
else:
_print("No valid heartbeat channel")
_print(line, standalone=True)
return
url = urljoin(base, channel or "main") + "/"
url = urljoin(url, path or HEARTBEAT_PATH)

_print("Heartbeat url: %s", url)
if prefix:
Context.checked_prefix = prefix
_print("Prefix: %s", prefix)
_print("User agent: %s", context.user_agent)

if dry_run:
_print("Dry run selected, not sending heartbeat.")
else:
# No backoff is applied between the first and second attempts
n_blocks = RETRIES + 2 ** max(RETRIES - 2, 0) - 1
timeout = TIMEOUT / n_blocks
context.remote_max_retries = RETRIES
context.remote_backoff_factor = timeout
session = get_session(url)
# Run in the background so we can proceed with the rest of the
# activation tasks while the request fires. The process will wait
# to terminate until the thread is complete.
t = Thread(target=_ping, args=(session, url, wait, timeout), daemon=False)
t.start()
_print(line, standalone=True)
return

# Build and configure the session object
timeout = TIMEOUT / ATTEMPTS
context.remote_max_retries = ATTEMPTS - 1
# No backoff between attempts
context.remote_backoff_factor = 0
session = get_session(url)

# Run in the background so we can proceed with the rest of the
# activation tasks while the request fires. The process will wait
# to terminate until the thread is complete.
t = Thread(target=_ping, args=(session, url, timeout), daemon=False)
t.start()
if STANDALONE:
t.join()


def main():
global VERBOSE
global STANDALONE
p = argparse.ArgumentParser()
p.add_argument("-c", "--channel", default=None)
p.add_argument("-p", "--path", default=None)
p.add_argument("-d", "--dry-run", action="store_true")
p.add_argument("-q", "--quiet", action="store_true")
p.add_argument("-w", "--wait", action="store_true")
args = p.parse_args()
STANDALONE = True
VERBOSE = not args.quiet
attempt_heartbeat(args.channel, args.path, args.wait, args.dry_run)
VERBOSE = "--quiet" not in sys.argv and "-q" not in sys.argv

line = "-----------------------------"
_print(line)
_print("anaconda-anon-usage heartbeat")
_print(line)

def environment_path(s):
if not os.path.isdir(s):
raise ValueError("Prefix not found")
return s

def environment_name(s):
return locate_prefix_by_name(s)

p = argparse.ArgumentParser()
g = p.add_mutually_exclusive_group()
g.add_argument(
"-n",
"--name",
type=environment_name,
default=None,
help="Environment name; defaults to the current environment.",
)
g.add_argument(
"-p",
"--prefix",
type=environment_path,
default=None,
help="Environment prefix; defaults to the current environment.",
)
p.add_argument(
"-d",
"--dry-run",
action="store_true",
help="Do not send the heartbeat; just show the steps.",
)
p.add_argument("-q", "--quiet", action="store_true", help="Suppress console logs.")
p.add_argument(
"--channel",
default=None,
help="(advanced) The full URL to a custom repository channel. By default, an "
"Anaconda-hosted channel listed in the user's channel configuration is used.",
)
p.add_argument(
"--path",
default=None,
help="(advanced) A custom path to append to the channel URL.",
)

try:
args = p.parse_args()
attempt_heartbeat(
prefix=args.prefix or args.name,
dry_run=args.dry_run,
channel=args.channel,
path=args.path,
)
finally:
_print(line)


if __name__ == "__main__":
Expand Down
28 changes: 19 additions & 9 deletions tests/integration/proxy_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@
import tempfile
import time
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from os.path import isfile, join
from socketserver import ThreadingMixIn
from threading import Lock, Thread

from cryptography import x509
Expand All @@ -58,7 +57,7 @@
# regex to find newlines in binary data
BINARY_NEWLINE = re.compile(rb"\r?\n")
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
CONNECTION_FORMAT = "[%s/%.3f] %s" # cid, elapsed time, message
CONNECTION_FORMAT = "[%s/%.3f/%.3f] %s" # cid, split, elapsed, message

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -182,7 +181,7 @@ def read_or_create_cert(host=None):
#


class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
class MyHTTPServer(ThreadingHTTPServer):
"""HTTPS proxy server with thread-per-connection handling"""

daemon_threads = True
Expand All @@ -204,6 +203,7 @@ class ProxyHandler(BaseHTTPRequestHandler):

def setup(self):
self.start_time = time.perf_counter()
self.last_time = self.start_time
with self.server.lock:
self.server.counter += 1
self.cid = "%04d" % self.server.counter
Expand All @@ -216,9 +216,14 @@ def log_message(self, format, *args):
def _log(self, *args, **kwargs):
"""Log message with elapsed time since first message for this connection ID"""
level = kwargs.pop("level", "info")
delta = time.perf_counter() - self.start_time
fmt = CONNECTION_FORMAT % (self.cid, delta, args[0])
n_time = (
self.start_time if kwargs.pop("noevent", False) else time.perf_counter()
)
d1 = n_time - self.last_time
d2 = n_time - self.start_time
fmt = CONNECTION_FORMAT % (self.cid, d1, d2, args[0])
getattr(logger, level)(fmt, *args[1:], **kwargs)
self.last_time = n_time

def _multiline_log(
self, blob, firstline=None, direction=None, include_binary=False
Expand Down Expand Up @@ -271,8 +276,13 @@ def do_CONNECT(self):
cert_file, key_file = read_or_create_cert(host)

if self.server.delay:
self._log("Adding %gs connection delay", self.server.delay)
time.sleep(self.server.delay)
self._log("Enforcing %gs delay", self.server.delay)
current = self.last_time
finish = self.start_time + self.server.delay
while finish - current > 0.001:
time.sleep(finish - current)
current = time.perf_counter()
self._log("End of connection delay")

# Establish tunnel
self.send_response(200, "Connection Established")
Expand Down Expand Up @@ -464,7 +474,7 @@ def cleanup():
cert_path, key_path = read_or_create_cert()

# Start and configure server
server = ThreadingHTTPServer(("0.0.0.0", args.port), ProxyHandler)
server = MyHTTPServer(("0.0.0.0", args.port), ProxyHandler)
server.delay = max(0, args.delay)

# Enable interception if any response-related args are provided
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_heartbeats.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def verify_user_agent(user_agent, expected, envname=None, marker=None):
# fmt: off
cmd = ["python", pscript, "--return-code", "404"]
if hval == "delay":
cmd.extend(["--delay", "1.0"])
cmd.extend(["--delay", "2.0"])
cmd.extend(["--", "python", "-m", "conda", "shell." + stype, "activate", envname])
# fmt: on
proc = subprocess.run(
Expand Down

0 comments on commit 17e2b8e

Please sign in to comment.