Skip to content

Commit

Permalink
timeout adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
mcg1969 committed Feb 2, 2025
1 parent 71d4a87 commit 3b1804b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 29 deletions.
85 changes: 66 additions & 19 deletions anaconda_anon_usage/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
"""

import argparse
import os
import sys
import time
from threading import Thread
from urllib.parse import urljoin

from conda.base.context import context
from conda.base.context import Context, context, locate_prefix_by_name
from conda.gateways.connection.session import get_session
from conda.models.channel import Channel

Expand All @@ -28,9 +30,9 @@
# How long to attempt the connection. When a connection to our
# repository is blocked or slow, a long timeout would lead to
# a slow activation and a poor user experience. This is a total
# timeout value, split between attempts and backoffs.
TIMEOUT = 0.4
RETRIES = 3
# timeout value, inclusive of all retries.
TIMEOUT = 0.75 # seconds
RETRIES = 2


def _print(msg, *args, standalone=False, error=False):
Expand All @@ -47,20 +49,23 @@ def _print(msg, *args, standalone=False, error=False):
print(msg % args, file=ofile)


def _ping(session, url, wait, timeout):
def _ping(session, url, timeout):
try:
# A short timeout is necessary here so that the activation
# is not unduly delayed by a blocked internet connection
start_time = time.perf_counter()
response = session.head(url, proxies=session.proxies, timeout=timeout)
_print("Status code (expect 404): %s", response.status_code)
delta = time.perf_counter() - start_time
_print("Sent after %.3fs; code (expect 404): %d", delta, response.status_code)
except Exception as exc:
if type(exc).__name__ != "ConnectionError":
_print("Unexpected heartbeat error: %s", exc, error=True)
elif "timeout=" in str(exc):
_print("Timeout exceeded; heartbeat likely not sent.")
delta = time.perf_counter() - start_time
_print("NOT sent after %.3fs.", delta)


def attempt_heartbeat(channel=None, path=None, wait=False, dry_run=False):
def attempt_heartbeat(prefix=None, dry_run=False, channel=None, path=None):
line = "------------------------"
_print(line, standalone=True)
_print("anaconda-anon-usage heartbeat", standalone=True)
Expand Down Expand Up @@ -90,20 +95,28 @@ def attempt_heartbeat(channel=None, path=None, wait=False, dry_run=False):
url = urljoin(url, path or HEARTBEAT_PATH)

_print("Heartbeat url: %s", url)
if prefix:
Context.checked_prefix = prefix
_print("Prefix: %s", prefix)
_print("User agent: %s", context.user_agent)
if dry_run:
_print("Dry run selected, not sending heartbeat.")
else:
# No backoff is applied between the first and second attempts
n_blocks = RETRIES + 2 ** max(RETRIES - 2, 0) - 1
timeout = TIMEOUT / n_blocks
timeout = TIMEOUT / (RETRIES + 1)
context.remote_max_retries = RETRIES
context.remote_backoff_factor = timeout
# No backoff between attempts
context.remote_backoff_factor = 0
_print(
"Attempts: %d, timeout (attempt/total): %.3f/%.3f",
RETRIES + 1,
timeout,
TIMEOUT,
)
session = get_session(url)
# Run in the background so we can proceed with the rest of the
# activation tasks while the request fires. The process will wait
# to terminate until the thread is complete.
t = Thread(target=_ping, args=(session, url, wait, timeout), daemon=False)
t = Thread(target=_ping, args=(session, url, timeout), daemon=False)
t.start()
_print(line, standalone=True)

Expand All @@ -112,15 +125,49 @@ def main():
global VERBOSE
global STANDALONE
p = argparse.ArgumentParser()
p.add_argument("-c", "--channel", default=None)
p.add_argument("-p", "--path", default=None)
p.add_argument("-d", "--dry-run", action="store_true")
p.add_argument("-q", "--quiet", action="store_true")
p.add_argument("-w", "--wait", action="store_true")
g = p.add_mutually_exclusive_group()
g.add_argument(
"-n",
"--name",
default=None,
help="Environment name; defaults to the current environment.",
)
g.add_argument(
"-p",
"--prefix",
default=None,
help="Environment prefix; defaults to the current environment.",
)
p.add_argument(
"-d",
"--dry-run",
action="store_true",
help="Do not send the heartbeat; just show the steps.",
)
p.add_argument("-q", "--quiet", action="store_true", help="Suppress console logs.")
p.add_argument(
"--channel",
default=None,
help="(advanced) The full URL to a custom repository channel. By default, an "
"Anaconda-hosted channel listed in the user's channel configuration is used.",
)
p.add_argument(
"--path",
default=None,
help="(advanced) A custom path to append to the channel URL.",
)
args = p.parse_args()
STANDALONE = True
VERBOSE = not args.quiet
attempt_heartbeat(args.channel, args.path, args.wait, args.dry_run)
if args.prefix:
prefix = os.path.abspath(args.prefix)
elif args.name:
prefix = locate_prefix_by_name(args.name)
else:
prefix = None
attempt_heartbeat(
prefix, dry_run=args.dry_run, channel=args.channel, path=args.path
)


if __name__ == "__main__":
Expand Down
28 changes: 19 additions & 9 deletions tests/integration/proxy_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@
import tempfile
import time
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from os.path import isfile, join
from socketserver import ThreadingMixIn
from threading import Lock, Thread

from cryptography import x509
Expand All @@ -58,7 +57,7 @@
# regex to find newlines in binary data
BINARY_NEWLINE = re.compile(rb"\r?\n")
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
CONNECTION_FORMAT = "[%s/%.3f] %s" # cid, elapsed time, message
CONNECTION_FORMAT = "[%s/%.3f/%.3f] %s" # cid, split, elapsed, message

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -182,7 +181,7 @@ def read_or_create_cert(host=None):
#


class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
class MyHTTPServer(ThreadingHTTPServer):
"""HTTPS proxy server with thread-per-connection handling"""

daemon_threads = True
Expand All @@ -204,6 +203,7 @@ class ProxyHandler(BaseHTTPRequestHandler):

def setup(self):
self.start_time = time.perf_counter()
self.last_time = self.start_time
with self.server.lock:
self.server.counter += 1
self.cid = "%04d" % self.server.counter
Expand All @@ -216,9 +216,14 @@ def log_message(self, format, *args):
def _log(self, *args, **kwargs):
"""Log message with elapsed time since first message for this connection ID"""
level = kwargs.pop("level", "info")
delta = time.perf_counter() - self.start_time
fmt = CONNECTION_FORMAT % (self.cid, delta, args[0])
n_time = (
self.start_time if kwargs.pop("noevent", False) else time.perf_counter()
)
d1 = n_time - self.last_time
d2 = n_time - self.start_time
fmt = CONNECTION_FORMAT % (self.cid, d1, d2, args[0])
getattr(logger, level)(fmt, *args[1:], **kwargs)
self.last_time = n_time

def _multiline_log(
self, blob, firstline=None, direction=None, include_binary=False
Expand Down Expand Up @@ -271,8 +276,13 @@ def do_CONNECT(self):
cert_file, key_file = read_or_create_cert(host)

if self.server.delay:
self._log("Adding %gs connection delay", self.server.delay)
time.sleep(self.server.delay)
self._log("Enforcing %gs delay", self.server.delay)
current = self.last_time
finish = self.start_time + self.server.delay
while finish - current > 0.001:
time.sleep(finish - current)
current = time.perf_counter()
self._log("End of connection delay")

# Establish tunnel
self.send_response(200, "Connection Established")
Expand Down Expand Up @@ -464,7 +474,7 @@ def cleanup():
cert_path, key_path = read_or_create_cert()

# Start and configure server
server = ThreadingHTTPServer(("0.0.0.0", args.port), ProxyHandler)
server = MyHTTPServer(("0.0.0.0", args.port), ProxyHandler)
server.delay = max(0, args.delay)

# Enable interception if any response-related args are provided
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_heartbeats.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def verify_user_agent(user_agent, expected, envname=None, marker=None):
# fmt: off
cmd = ["python", pscript, "--return-code", "404"]
if hval == "delay":
cmd.extend(["--delay", "1.0"])
cmd.extend(["--delay", "2.0"])
cmd.extend(["--", "python", "-m", "conda", "shell." + stype, "activate", envname])
# fmt: on
proc = subprocess.run(
Expand Down

0 comments on commit 3b1804b

Please sign in to comment.