-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
marketstore
OHCLV storage and client integration
#247
Conversation
@guilledk can you rebase this amigo? |
aae9f34
to
1c22db1
Compare
Merged in the super stuff from #284 and will begin 1s OHLC ingest from ib shortly 🏄🏼 |
b2fe7f2
to
802a3be
Compare
marketstore
OHCLV` storage and client integration
marketstore
OHCLV` storage and client integrationmarketstore
OHCLV storage and client integration
5bea515
to
08b4048
Compare
Also, Start tinkering with `tractor.trionics.ipython_embed()` In effort to get back to a usable REPL around the mkts client this adds usage of the new `tractor` integration api as well as logic for skipping backfilling if existing tsdb arrays are found.
Add some basic `numpy` epoch slice logic to generate append and prepend arrays to write to the db. Mooar cool things, - add a `Storage.delete_ts()` method to wipe a column series from the db easily. - don't attempt to read in any OHLC series by default on client load - add some `pyqtgraph` profiling and drop manual latency measures - if no db series for the fqsn exists write the entire shm array
If `marketstore` is detected try to only load most recent missing data from the data provider (broker) and the rest from the tsdb and push it all to shm for display in the UI. If the provider/broker doesn't have the history client endpoint, just use the old one for now so we can start to incrementally add support. Don't start the ohlc step incrementer task until the backend signals that the feed is live.
It turns out (i guess not so shockingly?) that `marketstore` doesn't always teardown "gracefully" under SIGINT (seems to hang if there are open client connections which are also in the midst of teardown?) so this instead first tries the SIGINT and then fails over to a SIGKILL (destroy loop) which seems to be much more reliable to ensure shutdown without any downside - in terms of a "hard kill". Originally i was thinking the issue was root perms related (which get relegated solely to the `marketstored` daemon actor after spawn) but actually it was indeed the signalling / application layer causing the hold-up/latency on teardown. There's a bunch of lingering (now commented) code which tried to solve this non-problem as well as a bunch logging/prints to help decipher the root of the issue - this will all get cleaned out shortly.
Oh right and I have to go through the todos and update! Mostly just want get a first set of eyes on the code. |
@@ -133,6 +133,136 @@ def unpack_msg(err: Exception) -> str: | |||
c.kill() | |||
|
|||
|
|||
class Container: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you think of creating a docker-utils
python package with a lot of common abstractions to deal with dockerisms, it would be diferent than pytest-dockerctl
cause it wouldnt be only about pytest, I have a bunch of utils id like in a pip instalable package somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_for_attr
, get_container
, docker_open_process
, docker_wait_process
, docker_move_into
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is an amazing idea!
for example here, all the supervisor stuff i had to hack would be great to get into a (somewhat) formalized api layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm also not settled on a lot of this api / naming so don't take it too too seriously and feel free to suggest changes.
I'm also fine with landing this code and making it possible for someone else to go through and do a cleanup / reorg if desired.
I mostly just want to get in the ability to store large data sets for users and prime things for proper real-time ingest from tick feeds.
"""Spawn the piker broker-daemon. | ||
""" | ||
@click.option( | ||
'--tsdb', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is what a user passes to indicate pikerd
will spawn the marketstore
container in a subactor.
|
||
# taken from ``click`` since apparently they have some | ||
# super weirdness with sigint and sudo..no clue | ||
def get_app_dir(app_name, roaming=True, force_posix=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh right, yeah this is probably confusing af..
It seems that using SIGIN cancellation was somehow super oddly being swallowed/ignored by click
and so pulling out this one routine we needed (which probably isn't a horrible idea to maintain on our own anyway?) was the fast hack.
# callers to have root perms? | ||
await trio.sleep_forever() | ||
|
||
# await cntr.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left over code from when I thought the issues with "graceful cancellation" were to do with root perms stuff. Turns out it wasn't that and it was the actual marketstore
app layer not shutting down.
I will pull out this cruft shortly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in #308.
Currently the actor calling this task should normally be started | ||
with root permissions (until we decide to use something that doesn't | ||
require this, like docker's rootless mode or some wrapper project) but | ||
te root perms are de-escalated after the docker supervisor sub-actor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/te/the
@@ -254,61 +253,6 @@ def iterfqsns(self) -> list[str]: | |||
return keys | |||
|
|||
|
|||
def from_df( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh yeah right!
we can drop this (and maybe pandas
altogether for now) since i rewrote the ib
ohlc frame parser to just cast directly to numpy
.
|
||
tractor.run(main) | ||
# @cli.command() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh yeah we can probably drop this.
the way i'd like to deal with "tsdb management" is the new storesh
repl (with embedded ipython
) which will have a small interactive API for manual db tinkering.
@@ -188,6 +191,22 @@ async def _setup_persistent_brokerd( | |||
await trio.sleep_forever() | |||
|
|||
|
|||
async def start_backfill( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so there is a bunch of new code for this in #302 that hasn't yet been factored out. I wasn't sure if that should all come as part of a "backend ohlc history support" PR (aka more of less what #305 is) or be included here?
Technically this PR will land with minimal backend support (i think just ib
?) so it's more about how much code ya'll want test review 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this kinda stuff was the main reason for dropping this PR and just aggregating it all in to #308.
Replaced by #308 🏄🏼 |
Update outdated
ingest
backend and replace the in-repo async wrapped marketstore client with the new:https://github.com/pikers/anyio-marketstore
Includes support for a
tractor
based supervisor actor and thus resolves #143 which was merged in via the sub-PR #284.ingest TODOs:
destroy
cli piker cmd (now implemented asStorage.delete_ts()
)marketstore
frombrokerd
feeds:techtonicdb
schema and if it can be used with the aggregator plugin (got a feeling we'll need to write at least a1Sec
bucket in order for this to work looking at the code:aggregate()
calls intomodel.FromTrades()
andFromTrades()
only accepts a min step of1Sec
ohlcv ingest and load from multiple backends:this was added as part of the new history in Storage layer: initialmarketstore
tsdb support with async OHLCV history loading. #308.ib
seems like the natural place to start since1s
is already offered in their historykraken
has tick history which has been a todo for a while_ahab
TODO:ctx.started()
call?pikerd
with the tsdb stuff spawned?--tsdb
or--data
or something?mkts.yaml
config?pikerd --tsdb
should raiseDockerNotStarted
and appropriate perms error on nosudo
(for now)sudo
since we relinquish root perms in order to create shms..