-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for global log (e.g. $all stream in EventStore) #24
Comments
Hi Lev! Cool to see you here. Nice idea. That could be implemented but will require few additional things:
If we disregard 2). and assume single-host deployment, this can be done very easily by anyone, since out-of-the-box SS supports including additional table operations along with event, so you can do additional insert, and implementing a stream directory is very simple as well. I'll give that some more thinking. |
BTW, one of the drivers behind birth of SS was to have an ultra-scalable cheap PaaS event store by avoiding global ordering of events. Jay Kreps talks about log partitions in Building a Scalable Log section here. What he describes are actually DDD aggregates each mapped to a distinct stream. In that sense global order of events doesn't make sense, really. Why do you need global ordering? Are you're dealing with an aggregate with a long lifecycle? Busy aggregate? Perhaps you're looking for a sharded stream? I can think of distributing events of a single stream into multiple partitions (shards) to get ultimate write/read speeds. That could be done by combining stateful processing (actor per stream, Akka.Net/Orleans) and consistent hashing algorithms. I'm thinking about implementing something like that just for fun, as an example of using two of my OSS projects (Streamstone/Orleankka). Unfortunately, such stream won't guarantee a perfect order of events, could have gaps due to write failures and retries. This might be ok for something like scalable logging. Thus it would be probably better partitioned by natural key such as date. |
I think the link to Jay's talk is broken. |
@jkonecki thanks. fixed! |
AFAIK, Greg is able to provide |
This global stream only needs to maintain order with respect to events within individual streams, but not necessarily across streams. One approach is to do a transactional dual write in AT - one row corresponding to the event in an individual stream and another entry in a global log. This way, you split a partition into two halves - one containing all global log . This global log would be ordered by a timestamp plus some value to ensure uniqueness (such as the id of individual stream). If operations on individual streams are forced to specify explicit versions (OCC), this ensures consistency and ordering within individual streams. Re Kafka and event sourcing, I'm not quite sure how to actually do it given that Kafka wasn't designed to support arbitrarily large numbers of topics. The log compaction feature can help, but only for integration - you still have no way to read events corresponding to an individual entity. Kafka is great as a bus, but I'm not sure it fits as a source of truth store.
Would this be implemented using AT entity group transactions? Also, I'm not sure this and the global timestamp would suffice. I suppose you could scan the stream directory and pick events greater than a certain global timestamp, but this seems prohibitive.
As alluded to above, I think it should be possible to do this using a simple timestamp plus a unique connection id or event the stream id + event number of the event being written. Since OCC ensures consistency within an individual stream and we don't require global timestamp to fully respect ordering across streams. |
Now I'm confused. What did you mean by global timestamp previously? My understanding was that it's some monotonically increasing counter like in GY EventStore or some form of the Lamport Timestamp. But seems it's not.
Yes, SS guarantees that for individual streams. It's fully ACID.
Nope, since AT transaction is supported only within boundaries of a single partition. Basically, stream directory is just a registry of stream addresses (partition ids), which could be read in efficient way (instead of inefficient full table scan). It could be a single partition or a sharded directory with multiple partitions. The stream directory is required so that you can enumerate all available partitions, something you will need to have to be able to implement an aforementioned
Ok, as I understand by simple timestamp you really meant a current datetime? Proposed addition of stream id + event number will fix the precision problem and make that event id globally unique, right? Seems legit, but ... At that point I want to ask you about the use-cases you have at hands for this |
I was referring to a possible implementation. It could also be a monotonic integer, but this would require a central authority to generate it. Using a wall-clock timestamp is one way to go. (I can already hear a co-worker calling out true time...) One challenge here is with commitment ordering.
Not sure how a vector clock would apply. I'm proposing that you don't need global ordering across different streams, but you still need a global log of all events. This will impose some order on events. This order needs to respect the ordering within individual streams, but not across streams. Suppose you've streams of the form
Yes indeed :) we're trying to avoid it as much as possible.
How will the directory be written to? If not in the same partition, won't you need a non-transactional dual-write? It seems the directory write needs to be atomic, but not linearized, so something like RAMP could be used, but would require some coordination.
Yes, although in the simplest case, the set of partitions can be fixed and hardcoded at configuration time.
Yes, this is the use-case - this is the "bottled water" approach I referred to above. Yes, the skew could be an issue, but as described above, it can be bounded. (Its possible to commit G2 before G1, but this disorder should only exist for a limited amount of time, after which it can be ignored). |
I think you have the implication the wrong way round. It is unclear what are you asking for: on one hand you're mentioning a global timestamp but on the other you're providing examples that do not involve a global timestamp. Global timestamp would be described as: |
For the purpose of this discussion, I'm defining
Not sure what you're referring to. In
What I'm asking for is an API that allows consuming all events across all streams. This API should be akin to the EventStore $all stream (but some of the guarantees can be relaxed) or the transaction log of an RDBMS. In particular, it should be possible to request a stream of events that have been committed after a specific global timestamp. This is where having a stream directory alone may be prohibitive. Global timestamp as defined here corresponds to |
I think that your requirement of requesting a stream of event after a specific global timestamp can't be handled without an external indexing structure. One easily imagine a WorkerRole that was about to commit a batch of events but suffered from long GC/network problem. The data, marked with old timestamps would be added after sometime breaking the queries that should have read this events. As far as I see it now, the only approach would be to have an indexing role that would scan every stream head and atomically, in the same partition write an indexing entry and bump up the indexed stream version. Detection of new streams would require full scans though. |
One more proposal of providing such an indexer. When one creates a new stream, before storing the head and the first event, the name of the stream is stored in the indexer partition. Then, the normal Streamstone storage operations are applied. This ensures a property, that indexer does not have to scan other tables to find streams, but rather queries stream after stream directly. |
It can be using AT, since it provides transactional writes of up to 100 rows in a partition. This library already makes use of this approach.
These limitations are being lifted in the next version. However, it is always possible to distribute across storage accounts much like you distribute across partitions.
EventStore doesn't directly support sharding. The OSS and commercial versions ere merged a few years ago. We can use AT to mimic the functionality of the EventStore index as described above. |
That's what I call stream directory.
I don't intend to write/update any of event sequence information to the stream directory but merely store an address (table, partitionId) of a stream, so I can later do an efficient
Interesting. Source?
Indeed, this is simple. SS includes reusable sharding function just for that.
Ye, I understand. As a consumer I might read
We do something similar with 3rd party aggregators, reading from past cursor position and then dealing with duplicates. Which is a good workaround for indexing lags. Depending on how big is you window and how many events are generated per this timeframe, that could have severe performance impact and processing lags. But it's doable.
I can think of 2 possible implementations, each with its own set of trade-offs:
Right, that should be tackled separately. Wall clock might be sufficient if you can control the drift window and deal with it at read time. Anyway, this is a massive feature and I don't think I would be able to make it alone. But I do accept pull requests! 😄 |
If the goal is to have for achieving some kind of eventually consistent read-only projection then I have seen such setup working: |
I have made an attempt at building a global log and global dictionary. the code is no where near ready for me to submit a pull request but the concepts are working in the basic console app. any pointers would be appreciated |
@Neil-p-Hughes could you share a link please? |
@aprooks |
Just to put some discription to what was built There are two stream collection classes dictionary and log. Dictionary just tracks the current version of all streams that are registered to it. A stream can be tracked by more then one dictionary at a time. This allows you to create groups of streams that you can pull snapshots of. Log tracks the same way dictionary does but splits the data into table partitions based on a time stamp mathematically floored to a time span or window. Reading from a collection does not return events. It returns the id of the streams and the versions of the streams. You then read from the streams as normal. |
Hi @Neil-p-Hughes! I'm now on vacation. I promise I'll take a look once I'm back (in few weeks) 😄 |
@Neil-p-Hughes I took a look at your implementation and I've got one concern. I've seen how you augmented the |
We use a globally incrementing ID rather than a timestamp. This gives us sequential order without worrying about timestamps and clock drift. The steps are:
To generate the global ID we have an single entity in AT (unique Partition/RowKey) that has just one value: The current global ID. We update it using What we end up with is a table of sequentially ordered event Guids which can be scanned by other processes (e.g. for read model projections). Streamstone's event store is still the canonical store and represents the true state of the system. Given AT doesn't have transactions you might spot some holes in the above steps if something goes wrong during the process (crash, version conflict, Azure host issue).
The last one is the toughest to deal with because there is a race condition where our projection process can read an ID from the Dispatch table and do a lookup on the event store before the events have been saved at step 4. The projection process then ignores these events and skips over them, thinking they are orphaned. I'm working on this issue currently. One way to solve it could be to store the Dispatch events within the event store table itself. They could live in the same partition as the real events which then gives us transactional consistency. |
OK I've successfully cut over to my new method mentioned above. The Dispatch events are now stored in the events table itself. I'm using This process seems to be working well in my tests so far (although it's still very much a prototype). I feel more comfortable knowing that all of the rows are committed transactionally. |
How close is this to being ready? I'd love to use streamstone for a project but this functionality is a dealbreaker. |
Not even started. There is plenty of choices with various tradeoffs how this could be implemented discussed above. Choose the one that suits your app. |
Curious, what do you think, should this global log guarantee Should consumers be idempotent? |
If we can do it, it should be both, so If this global log would ensure only Consumers should be idempotent in anyway. If there was a global position, this would become trivial. Just store a position with bunch of changes. |
I'm thinking about writing my own global log using a combination of StreamDirectory and Windows Azure Blob Storage (Append Blob). Single writer responsible for doing this (which also implies single point of failure so it'll need some kind of supervision/watchdog). Asynchronous, in or out of process, haven't decided yet. The single writer brings some ease in that I can easily create a monotonic counter (position), work at my own pace. I'll have to monitor (polling) all streams one can write to, which is where the stream directory comes in. For efficiency, I'll probably write all known streams to an append blob that only has 2 "meta" events: I don't like the fact there are so many moving parts and the whole inefficiency of it, but I'm not constrained by the fact this needs to be general purpose or efficient for that matter. |
@Scooletz nah, I meant domain-level idempotency, and not infrastructure, which is what sequence id based approach. @yreynhout it's not that hard to build highly performant event store with global log on top of SS if clients could tolerate duplicate entries. |
@yreynhout You should consider using
from https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx |
@yevhen Domain-level? Why this can't be done as a tool on the infra and provided for the domain to use? |
@Scooletz because it's impossible to do without sacrificing throughput. Either you have In the @eulerfx approach with weak global timestamp and merge-sort across partitions you will still have duplicate entries, since there is no perfect sequencing. |
@yevhen Let me shortly describe one of the possible approaches. If the description isn't clear, ping me. What you could do is to do the following. Provide a few virtual partitions, lets say 16 ("00", "01", "02" ..., "15"). When events writing to a stream, you'd include |
Ye, I get that approach. But it still have this problem. The crash of the chaser, which as I understand is the thing that will be building a global log can lead to a duplicate entry. |
Not at all. If the chaser used a single partition to write its entries (potentially, a separate table), it could in a batch
ensuring that it writes log entries and streams' versions together and marking events as dispatched. Yes, in this scenario log throughput is limited to one partition only. |
That's what I said before :) |
I'll describe what we did in our own EventStore that's now been in production use for several years now at my previous employer.
I'm doing this from memory since I don't have access to the code, but I think this is correct. |
@kellypleahy What about the case, where the second write, the real append to the stream fails? Then you're left with a "last write" that points to an non existing event. How do you deal with this scenario? Does it make the chaser to loop infinitely? |
@Scooletz We consider this a non-issue for the following reasons (in our usage, YMMV).
re (2), you could easily put a timeout in the logic to say that if these records are more than X 'old', the delete (atomically, of course). The main trick (and a place where at one point there was a bug in our system) is that you need to be very careful not to "lose" these marker records prematurely. Another thing we had to do was maintain tracking records in the $all stream that told us where each other stream was that was contained by $all (I can't remember exactly how we did this, but it was definitely in a way that could be batched with the event writes, so it must have been a record in the same partition with a special rowkey). Since we also used the dedup records, we could ensure that even if this record was not properly updated, we could detect the situation and fix. The general idea of all the approaches we had (including this and batches, etc.) is that each new writer could detect problems in the consistency of the store and fix them proactively. It's the same philosophy I'm going to be using in my AWS implementation at my latest client. |
I see your reasoning now @kellypleahy Thanks for the explanation. Still, I prefer to stick to the strong consistency using batches and the approach describe by me above. This has limitations of throughput though. |
@yevhen Would you consider a PR that would require introducing a global hook on the |
@Scooletz what problem you want to solve with that hook? I would rather not touch Stream api leaving it to individual stream operations. Instead, what I'm considering is to create a level of indirection, like |
I want to be able to enable I'd do it in the following way:
In a separate Azure Function, that is hooked the queue with aggregate messages:
So far I've found no counterexample for this algorithm. Thoughts @yevhen ? |
This may work) The only fragile part I can see here is AQS queue. In case of timeout or error the message will be returned back to the end of the queue, so you can't process any other events anymore until the order is restored. This could be fixed by using some kind of distributed commit log, like Kafka. So SS could provide optimistic concurrency on stream level and Kafka could give a global log. Also, the hack against "late commit" might be somehow understood by stream writers, they need to able to differentiate between "this event was already written before, that's why I got dup event id exception and should not retry" and artificial events. |
Oh my. Retries. Yeah, this changes everything as the receiver needs to idempotent. Still, as the input, it would work. |
This would support the approach nowadays referred to as bottled water.
One way would be to do an additional insert, into a row space ordered by a global timestamp. Then at read time, a merge sort across partitions.
The text was updated successfully, but these errors were encountered: