Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: pikers/piker
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 32c3f63cfd138a9cfc6a38abe30f5a0e5886333b
Choose a base ref
..
head repository: pikers/piker
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6b64e1348d366366bdc2366e910278f2163d0274
Choose a head ref
Showing with 187 additions and 70 deletions.
  1. +183 −62 piker/data/_ahab.py
  2. +3 −7 piker/ui/_cursor.py
  3. +1 −1 piker/ui/_display.py
245 changes: 183 additions & 62 deletions piker/data/_ahab.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
import tractor
import docker
import json
from docker.models.containers import Container
from docker.models.containers import Container as DockerContainer
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout

@@ -133,6 +133,136 @@ def unpack_msg(err: Exception) -> str:
c.kill()


class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:

self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()

async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far

while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:

# ignore null lines
if not entry:
continue

try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise

msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()

getattr(log, level, log.error)(f'{msg}')

if patt in msg:
return True

# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)

return False

def try_signal(
self,
signal: str = 'SIGINT',

) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True

except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False

async def cancel(
self,
) -> None:

cid = self.cntr.id
self.try_signal('SIGINT')

with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)

for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break

if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')

try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break

except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise

else:
raise RuntimeError('Failed to cancel container {cid}')

log.cancel(f'Container stopped: {cid}')


@tractor.context
async def open_marketstored(
ctx: tractor.Context,
@@ -175,7 +305,7 @@ async def open_marketstored(
type='bind',
)

cntr: Container = client.containers.run(
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
@@ -191,77 +321,59 @@ async def open_marketstored(
init=True,
# remove=True,
)
try:
seen_so_far = set()

async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()

try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)

msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')

# if "launching tcp listener for all services..." in msg:
if match in msg:
return True

# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
cntr = Container(dcntr)

return False
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)

with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)

if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
await ctx.started((cntr.cntr.id, os.getpid()))

# async with ctx.open_stream() as stream:

try:

# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()

# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)

await ctx.started(cntr.id)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")

# block for the expected "teardown log msg"..
await process_logs_until('exiting...',)
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')

# # print("CANCELLING CONTAINER")
# await cntr.cancel()

# # print("SENDING ACK")
# await stream.send('ack')

except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise

finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
with trio.CancelScope(shield=True):
await cntr.cancel()
# await stream.send('ack')

raise


async def start_ahab(
@@ -311,9 +423,18 @@ async def start_ahab(
open_marketstored,
) as (ctx, first):

assert str(first)
# run till cancelled
cid, pid = first

await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'

# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in
10 changes: 3 additions & 7 deletions piker/ui/_cursor.py
Original file line number Diff line number Diff line change
@@ -295,8 +295,7 @@ def add_label(


class Cursor(pg.GraphicsObject):
'''
Multi-plot cursor for use on a ``LinkedSplits`` chart (set).
'''Multi-plot cursor for use on a ``LinkedSplits`` chart (set).
'''
def __init__(
@@ -311,7 +310,7 @@ def __init__(

self.linked = linkedsplits
self.graphics: dict[str, pg.GraphicsObject] = {}
self.plots: list['PlotChartWidget'] = [] # type: ignore # noqa
self.plots: List['PlotChartWidget'] = [] # type: ignore # noqa
self.active_plot = None
self.digits: int = digits
self._datum_xy: tuple[int, float] = (0, 0)
@@ -440,10 +439,7 @@ def add_plot(
if plot.linked.xaxis_chart is plot:
xlabel = self.xaxis_label = XAxisLabel(
parent=self.plots[plot_index].getAxis('bottom'),
# parent=self.plots[plot_index].pi_overlay.get_axis(
# plot.plotItem, 'bottom'
# ),

# parent=self.plots[plot_index].pi_overlay.get_axis(plot.plotItem, 'bottom'),
opacity=_ch_label_opac,
bg_color=self.label_color,
)
2 changes: 1 addition & 1 deletion piker/ui/_display.py
Original file line number Diff line number Diff line change
@@ -599,7 +599,7 @@ def graphics_update_cycle(
yrange=(mn, mx),
)

vars['last_mx'], vars['last_mn'] = mx, mn
vars['last_mx'], vars['last_mn'] = mx, mn

# run synchronous update on all linked flows
for curve_name, flow in chart._flows.items():