Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for global log (e.g. $all stream in EventStore) #24

Open
eulerfx opened this issue Apr 11, 2016 · 46 comments
Open

Support for global log (e.g. $all stream in EventStore) #24

eulerfx opened this issue Apr 11, 2016 · 46 comments

Comments

@eulerfx
Copy link

eulerfx commented Apr 11, 2016

This would support the approach nowadays referred to as bottled water.

One way would be to do an additional insert, into a row space ordered by a global timestamp. Then at read time, a merge sort across partitions.

@yevhen
Copy link
Owner

yevhen commented Apr 11, 2016

Hi Lev! Cool to see you here.

Nice idea. That could be implemented but will require few additional things:

  1. Stream directory. At the moment SS doesn't maintain directory of all streams. There samples for how to implement that, which is really trivial. Anyway, that feature is on roadmap and will be baked into core soon.
  2. Maintaining global timestamp. I'm not sure about how best to proceed with that. Currently, SS is just simple serveless in-process library (a component). So if there multiple hosts each will have it's own view of the global timestamp. So having a global timestamp will require some synchronization protocol to be implemented. Perhaps, you can advice a simple solution.

If we disregard 2). and assume single-host deployment, this can be done very easily by anyone, since out-of-the-box SS supports including additional table operations along with event, so you can do additional insert, and implementing a stream directory is very simple as well.

I'll give that some more thinking.

@yevhen
Copy link
Owner

yevhen commented Apr 11, 2016

BTW, one of the drivers behind birth of SS was to have an ultra-scalable cheap PaaS event store by avoiding global ordering of events.

Jay Kreps talks about log partitions in Building a Scalable Log section here. What he describes are actually DDD aggregates each mapped to a distinct stream. In that sense global order of events doesn't make sense, really.

Why do you need global ordering? Are you're dealing with an aggregate with a long lifecycle? Busy aggregate? Perhaps you're looking for a sharded stream?

I can think of distributing events of a single stream into multiple partitions (shards) to get ultimate write/read speeds. That could be done by combining stateful processing (actor per stream, Akka.Net/Orleans) and consistent hashing algorithms. I'm thinking about implementing something like that just for fun, as an example of using two of my OSS projects (Streamstone/Orleankka).

Unfortunately, such stream won't guarantee a perfect order of events, could have gaps due to write failures and retries. This might be ok for something like scalable logging. Thus it would be probably better partitioned by natural key such as date.

@jkonecki
Copy link
Contributor

I think the link to Jay's talk is broken.

@yevhen
Copy link
Owner

yevhen commented Apr 12, 2016

@jkonecki thanks. fixed!

@yevhen
Copy link
Owner

yevhen commented Apr 12, 2016

AFAIK, Greg is able to provide _all feature and be perfectly ACID only due to EventStore being a single partition.

@eulerfx
Copy link
Author

eulerfx commented Apr 12, 2016

Why do you need global ordering?

This global stream only needs to maintain order with respect to events within individual streams, but not necessarily across streams. One approach is to do a transactional dual write in AT - one row corresponding to the event in an individual stream and another entry in a global log. This way, you split a partition into two halves - one containing all global log . This global log would be ordered by a timestamp plus some value to ensure uniqueness (such as the id of individual stream). If operations on individual streams are forced to specify explicit versions (OCC), this ensures consistency and ordering within individual streams.

Re Kafka and event sourcing, I'm not quite sure how to actually do it given that Kafka wasn't designed to support arbitrarily large numbers of topics. The log compaction feature can help, but only for integration - you still have no way to read events corresponding to an individual entity. Kafka is great as a bus, but I'm not sure it fits as a source of truth store.

Stream directory.

Would this be implemented using AT entity group transactions? Also, I'm not sure this and the global timestamp would suffice. I suppose you could scan the stream directory and pick events greater than a certain global timestamp, but this seems prohibitive.

Maintaining global timestamp

As alluded to above, I think it should be possible to do this using a simple timestamp plus a unique connection id or event the stream id + event number of the event being written. Since OCC ensures consistency within an individual stream and we don't require global timestamp to fully respect ordering across streams.

@yevhen
Copy link
Owner

yevhen commented Apr 12, 2016

This global log would be ordered by a timestamp plus some value to ensure uniqueness (such as the id of individual stream).

Now I'm confused. What did you mean by global timestamp previously? My understanding was that it's some monotonically increasing counter like in GY EventStore or some form of the Lamport Timestamp. But seems it's not.

If operations on individual streams are forced to specify explicit versions (OCC), this ensures consistency and ordering within individual streams.

Yes, SS guarantees that for individual streams. It's fully ACID.

Stream directory. Would this be implemented using AT entity group transactions?

Nope, since AT transaction is supported only within boundaries of a single partition. Basically, stream directory is just a registry of stream addresses (partition ids), which could be read in efficient way (instead of inefficient full table scan). It could be a single partition or a sharded directory with multiple partitions.

The stream directory is required so that you can enumerate all available partitions, something you will need to have to be able to implement an aforementioned merge sort across all partitions.

I think it should be possible to do this using a simple timestamp plus a unique connection id or event the stream id + event number of the event being written

Ok, as I understand by simple timestamp you really meant a current datetime? Proposed addition of stream id + event number will fix the precision problem and make that event id globally unique, right? Seems legit, but ...

At that point I want to ask you about the use-cases you have at hands for this _all stream? If you want to use it as an integration point between bounded contexts a.k.a feed of all changes you can quickly get into a trouble due to a clock skew on different nodes (stream writers). If for something else - I'm curious what for.

@jkonecki
Copy link
Contributor

@eulerfx It seems to me as well that you're not looking for a global timestamp but just for a timestamp which is already supported. Global ordering between streams could be achieved by implementing vector clock - as @yevhen mentioned synchronisation in distributed system is very difficult.

@eulerfx
Copy link
Author

eulerfx commented Apr 13, 2016

What did you mean by global timestamp previously?

I was referring to a possible implementation. It could also be a monotonic integer, but this would require a central authority to generate it. Using a wall-clock timestamp is one way to go. (I can already hear a co-worker calling out true time...) One challenge here is with commitment ordering.

Global ordering between streams could be achieved by implementing vector clock

Not sure how a vector clock would apply. I'm proposing that you don't need global ordering across different streams, but you still need a global log of all events. This will impose some order on events. This order needs to respect the ordering within individual streams, but not across streams. Suppose you've streams of the form Sn (S1, S2, S3, ...) and then events within them (S1E0, S1E1, S2E0, ...). Each event would also have a global timestamp Gn (G0, G1, G2, ...). Let G(SnEm) be the global timestamp of event m in stream n. We must have SnEi < SnEj => G(SnEi) < G(SnEj) but otherwise unconstrained. One challenge with this model of global timestamps is commitment ordering - the generation of global timestamps can be monotonic, but their commitment may fall slightly out of order. This places the burden on the consumer of the global log to allow some degree of skew, however it should be possible to bound it.

as @yevhen mentioned synchronisation in distributed system is very difficult.

Yes indeed :) we're trying to avoid it as much as possible.

Nope, since AT transaction is supported only within boundaries of a single partition.

How will the directory be written to? If not in the same partition, won't you need a non-transactional dual-write? It seems the directory write needs to be atomic, but not linearized, so something like RAMP could be used, but would require some coordination.

The stream directory is required so that you can enumerate all available partitions

Yes, although in the simplest case, the set of partitions can be fixed and hardcoded at configuration time.

If you want to use it as an integration point between bounded contexts a.k.a feed of all changes you can quickly get into a trouble due to a clock skew on different nodes (stream writers).

Yes, this is the use-case - this is the "bottled water" approach I referred to above. Yes, the skew could be an issue, but as described above, it can be bounded. (Its possible to commit G2 before G1, but this disorder should only exist for a limited amount of time, after which it can be ignored).

@jkonecki
Copy link
Contributor

We must have SnEi < SnEj => G(SnEi) < G(SnEj)

I think you have the implication the wrong way round.
Also the above is not a global timestamp as it's local to the stream. Streamstone does support ordering within a single stream.

It is unclear what are you asking for: on one hand you're mentioning a global timestamp but on the other you're providing examples that do not involve a global timestamp.

Global timestamp would be described as:
G(SnEi) < G(SmEj) => SnEi < SmEj

@eulerfx
Copy link
Author

eulerfx commented Apr 13, 2016

I think you have the implication the wrong way round.

For the purpose of this discussion, I'm defining G as the global timestamp (or alternatively version), with global meaning that it is defined for all events, in all streams (within some namespace, such as all shopping cart events). The reason I wrote the implication that way I did is, we already have total ordering within individual streams and we're specifying the meaning of G based on that.

Also the above is not a global timestamp as it's local to the stream.

Not sure what you're referring to. In SnEi the i is the i-th event within stream n and yes, this is local to that stream. On the other hand, I'm defining G(SnEi) as the global timestamp of the i-th event in stream n making it the G(SnEi)-th event within the global log.

It is unclear what are you asking for: on one hand you're mentioning a global timestamp but on the other you're providing examples that do not involve a global timestamp.

What I'm asking for is an API that allows consuming all events across all streams. This API should be akin to the EventStore $all stream (but some of the guarantees can be relaxed) or the transaction log of an RDBMS. In particular, it should be possible to request a stream of events that have been committed after a specific global timestamp. This is where having a stream directory alone may be prohibitive. Global timestamp as defined here corresponds to Position in EventStore (but again, with some guarantees relaxed). How global version is actually implemented is a separate discussion, although I did sketch one guarantee it has to provide.

@Scooletz
Copy link
Contributor

EventStore internally uses the physical log position for ordering all the events. When using a sharded, non open source version it correlates logs in a given time window, AFAIK, still using the log positions of singular shards. A good summary of different strategies for providing data to queries can be found at ReaderStrategy which defines how $all is handled. You can see that depending on the criteria, different strategies can be used like:

  • reading through the transaction log itself
  • using system stream created by build in projection

I think that your requirement of requesting a stream of event after a specific global timestamp can't be handled without an external indexing structure. One easily imagine a WorkerRole that was about to commit a batch of events but suffered from long GC/network problem. The data, marked with old timestamps would be added after sometime breaking the queries that should have read this events.

As far as I see it now, the only approach would be to have an indexing role that would scan every stream head and atomically, in the same partition write an indexing entry and bump up the indexed stream version. Detection of new streams would require full scans though.
It's not a perfect solution but provided linearizablity over all the events. What's your opinion?

@Scooletz
Copy link
Contributor

One more proposal of providing such an indexer. When one creates a new stream, before storing the head and the first event, the name of the stream is stored in the indexer partition. Then, the normal Streamstone storage operations are applied. This ensures a property, that indexer does not have to scan other tables to find streams, but rather queries stream after stream directly.
I'm aware of Azure Storage Account limitations and that one account can take no more than 20kIOPS. This would mean that indexer can't update more than 20.000 per second which isn't the best performance ever.
Still, this approach clearly suffers from the need of having a separate entity, an indexer, that updates the index scanning the original storage.

@eulerfx
Copy link
Author

eulerfx commented Apr 15, 2016

I think that your requirement of requesting a stream of event after a specific global timestamp can't be handled without an external indexing structure.

It can be using AT, since it provides transactional writes of up to 100 rows in a partition. This library already makes use of this approach.

I'm aware of Azure Storage Account limitations and that one account can take no more than 20kIOPS

These limitations are being lifted in the next version. However, it is always possible to distribute across storage accounts much like you distribute across partitions.

When using a sharded, non open source version it correlates logs in a given time window, AFAIK, still using the log positions of singular shards

EventStore doesn't directly support sharding. The OSS and commercial versions ere merged a few years ago. We can use AT to mimic the functionality of the EventStore index as described above.

@yevhen
Copy link
Owner

yevhen commented Apr 15, 2016

When one creates a new stream, before storing the head and the first event, the name of the stream is stored in the indexer partition.

That's what I call stream directory.

How will the directory be written to? If not in the same partition, won't you need a non-transactional dual-write? It seems the directory write needs to be atomic, but not linearized, so something like RAMP could be used, but would require some coordination.

I don't intend to write/update any of event sequence information to the stream directory but merely store an address (table, partitionId) of a stream, so I can later do an efficient for (var stream in directory) from a single partition. Stream directory maintenance could be done via explicit provisioning as @Scooletz rightly pointed.

These limitations are being lifted in the next version.

Interesting. Source?

However, it is always possible to distribute across storage accounts much like you distribute across partitions.

Indeed, this is simple. SS includes reusable sharding function just for that.

One challenge with this model of global timestamps is commitment ordering - the generation of global timestamps can be monotonic, but their commitment may fall slightly out of order. This places the burden on the consumer of the global log to allow some degree of skew, however it should be possible to bound it.

Ye, I understand. As a consumer I might read _all stream at 11:20PM and got latest position 1120_S101_001. Then, host with skewed timestamp (-1 minute) can write an event with global timestamp 1119_S202_005. If I then ask _all stream to give me (compute) new events since 1120_S101_001 I will miss it.

Yes, the skew could be an issue, but as described above, it can be bounded. (Its possible to commit G2 before G1, but this disorder should only exist for a limited amount of time, after which it can be ignored)

We do something similar with 3rd party aggregators, reading from past cursor position and then dealing with duplicates. Which is a good workaround for indexing lags. Depending on how big is you window and how many events are generated per this timeframe, that could have severe performance impact and processing lags. But it's doable.

What I'm asking for is an API that allows consuming all events across all streams. This API should be akin to the EventStore $all stream (but some of the guarantees can be relaxed) or the transaction log of an RDBMS. In particular, it should be possible to request a stream of events that have been committed after a specific global timestamp.

I can think of 2 possible implementations, each with its own set of trade-offs:

  1. Computed (virtual) stream. Just-in-time enumeration of all streams. Could be very prohibitive in a system with millions of streams to check each and every stream every time you poll _all stream.

    Possible optimization is to store the global timestamp of the first/last event of each stream in some easily queryable medium, could be an Index Table (on in the stream directory itself) and do a range query. That could greatly reduce a number of partitions which need to be read (for a merge sort).

    The downside is a possible inconsistency between index table and actual stream due to non-transactional write. But that could be easily tolerated. Another downside, is WATS partition IOPS limits. That could be mitigated by sharding of index table.

  2. Pre-computed (durable) stream. This is similar (including downsides) to the Index Table approach above but will store an actual event. It's basically a global log. It could be heavily partitioned to allow parallel reads.

How global version is actually implemented is a separate discussion, although I did sketch one guarantee it has to provide.

Right, that should be tackled separately. Wall clock might be sufficient if you can control the drift window and deal with it at read time.

Anyway, this is a massive feature and I don't think I would be able to make it alone. But I do accept pull requests! 😄

@Rurouni
Copy link

Rurouni commented May 9, 2016

If the goal is to have for achieving some kind of eventually consistent read-only projection then I have seen such setup working:
before writing entity group transaction with events into master table store you send promise {that stream x is updating } to eventhub, then you have another set of compute (worker role for example) reading that eventhub with delay y seconds (where y is your max allowed transaction time) and replicating streams from master to secondary table storage(for example secondary was on version 3 but master is now on 5, so there were 2 new events), now before writing to secondary new events you can send them to either another eventhub or service bus or whatever. On failures you don't progress eventhub checkpoint and do not write to secondary so you may end with duplicate events, but you should not lose any. In that way you can have stream with all events with guarantee and per stream ordering preserved(depending on final queue type), plus you have continuous backup :)

@Neil-p-Hughes
Copy link

I have made an attempt at building a global log and global dictionary. the code is no where near ready for me to submit a pull request but the concepts are working in the basic console app. any pointers would be appreciated

@aprooks
Copy link

aprooks commented May 25, 2016

@Neil-p-Hughes could you share a link please?

@Neil-p-Hughes
Copy link

@aprooks
Sure the code is just in my fork of stream stone.
https://github.com/Neil-p-Hughes/Streamstone

@Neil-p-Hughes
Copy link

Just to put some discription to what was built

There are two stream collection classes dictionary and log.

Dictionary just tracks the current version of all streams that are registered to it. A stream can be tracked by more then one dictionary at a time. This allows you to create groups of streams that you can pull snapshots of.

Log tracks the same way dictionary does but splits the data into table partitions based on a time stamp mathematically floored to a time span or window.

Reading from a collection does not return events. It returns the id of the streams and the versions of the streams. You then read from the streams as normal.

@yevhen
Copy link
Owner

yevhen commented May 27, 2016

Hi @Neil-p-Hughes!

I'm now on vacation. I promise I'll take a look once I'm back (in few weeks) 😄

@Scooletz
Copy link
Contributor

Scooletz commented Jun 3, 2016

@Neil-p-Hughes I took a look at your implementation and I've got one concern. I've seen how you augmented the Write in this line. As far as I understand, if storage operations start failing after committing stream batches, writes to a dictionary/log might not happen at all. This would result in a stream with appended events which are not tracked by neither a dictionary nor a log. Still, user reading the stream could read the last written version. Until the stream isn't written again, the dictionary and the log will not contain the events. This consistency isn't eventual then, as one can't guarantee the boundary when the log/dictionary is updated.

@alexjustus
Copy link

alexjustus commented Jun 29, 2016

We use a globally incrementing ID rather than a timestamp. This gives us sequential order without worrying about timestamps and clock drift.

The steps are:

  1. Read the current global ID and increment it by 1 for each new event we are saving.
  2. Store the updated global ID.
  3. Since we've now got a pre-allocated block of IDs, insert the Guid of each new event into a "Dispatch" table along with its global ID. This table is similar to the "global log" mentioned above.
  4. Store the events as usual with Streamstone.

To generate the global ID we have an single entity in AT (unique Partition/RowKey) that has just one value: The current global ID. We update it using InsertOrReplace combined with an "If-Match" Etag. The ETag prevents us from overwriting someone else's value if they get there first on another thread (in that case AT throws an exception which we can catch and then retry).

What we end up with is a table of sequentially ordered event Guids which can be scanned by other processes (e.g. for read model projections).

Streamstone's event store is still the canonical store and represents the true state of the system.

Given AT doesn't have transactions you might spot some holes in the above steps if something goes wrong during the process (crash, version conflict, Azure host issue).

  • Between steps 1 and 2: The whole process ends but we're fine since we haven't actually written anything yet.
  • Between steps 2 and 3: In this case we've now got a global ID that is "ahead" of the events. That is to say, the number of entites in the Dispatch table is now less than the global ID. This is actually fine though since the IDs don't need to be consecutive; as long as they are always increasing then global order is maintained.
  • Between steps 3 and 4: This leaves us with orphaned entites in the Dispatch table (entities which will never have an associated event in the event store). Our projection process simply ignores these by doing a lookup on the events table first to confirm that each event exists before it is processed.

The last one is the toughest to deal with because there is a race condition where our projection process can read an ID from the Dispatch table and do a lookup on the event store before the events have been saved at step 4. The projection process then ignores these events and skips over them, thinking they are orphaned.

I'm working on this issue currently. One way to solve it could be to store the Dispatch events within the event store table itself. They could live in the same partition as the real events which then gives us transactional consistency.

@alexjustus
Copy link

OK I've successfully cut over to my new method mentioned above. The Dispatch events are now stored in the events table itself.

I'm using EventIncludes (based on the Scenario 6 example) so that all events are sent to AT in a single batch.

Here's how it looks:
streamstone global identity

This process seems to be working well in my tests so far (although it's still very much a prototype). I feel more comfortable knowing that all of the rows are committed transactionally.

@endeavour
Copy link

How close is this to being ready? I'd love to use streamstone for a project but this functionality is a dealbreaker.

@yevhen
Copy link
Owner

yevhen commented Jul 22, 2016

Not even started. There is plenty of choices with various tradeoffs how this could be implemented discussed above. Choose the one that suits your app.

@yevhen
Copy link
Owner

yevhen commented Sep 8, 2016

Curious, what do you think, should this global log guarantee exactly-once semantics or at-least-once will be ok? So whether it's ok for global log to report same event (with same uuid) more than once, ie give it different global number/timestamp?

Should consumers be idempotent?

@Scooletz
Copy link
Contributor

Scooletz commented Sep 9, 2016

If we can do it, it should be both, so exactly-once semantics for logs and idempotent consumers. Let me explain.

If this global log would ensure only at-least-once, then to remember that a particular event was dispatched you'd need to remember either it's id, or if that is not present, it's partition/stream and a version number. This is a lot of data to store. Of course one could optimize and persist only the recently applied version value per stream, but this again, might be a lot. If we have exactly-once and a position like in EventStore, marking an event as processed becomes trivial and can be done by storing the log position for all the streams. This, again like in EventStore, is stored per projection. If as a projection, you considered also a process manager, this could be leveraged to use the position value to generate the idempotent event id (just by writing '0' in Guid on the Guid version position) and simplify dispatching events to other streams.

Consumers should be idempotent in anyway. If there was a global position, this would become trivial. Just store a position with bunch of changes.

@yreynhout
Copy link

yreynhout commented Sep 9, 2016

I'm thinking about writing my own global log using a combination of StreamDirectory and Windows Azure Blob Storage (Append Blob). Single writer responsible for doing this (which also implies single point of failure so it'll need some kind of supervision/watchdog). Asynchronous, in or out of process, haven't decided yet. The single writer brings some ease in that I can easily create a monotonic counter (position), work at my own pace. I'll have to monitor (polling) all streams one can write to, which is where the stream directory comes in. For efficiency, I'll probably write all known streams to an append blob that only has 2 "meta" events: StreamCreated, StreamDeleted. That way I can read the entire blob (and subsequent blobs) upon startup, fold into a list of streams to monitor, and kick off the monitoring task. How I handle new streams that come into existence will be yet another task on top of the StreamDirectory. This monitoring task (actor, thread, loop, however you wanna do this) then should round robin the streams to read as of the last version projected into the global log (how to recover from this is not entirely clear yet in my head, should a failure or a restart occur) and shove any new events (+ meta data) to the single writer. The end result is that I have a single blob (and potentially linked blobs - think linked list if this grows tremendously) to read from front to back to perform projections off of. Knowing as of which point to read is another big hole in my reasoning so far.

I don't like the fact there are so many moving parts and the whole inefficiency of it, but I'm not constrained by the fact this needs to be general purpose or efficient for that matter.

@yevhen
Copy link
Owner

yevhen commented Sep 9, 2016

@Scooletz nah, I meant domain-level idempotency, and not infrastructure, which is what sequence id based approach.

@yreynhout it's not that hard to build highly performant event store with global log on top of SS if clients could tolerate duplicate entries.

@Scooletz
Copy link
Contributor

Scooletz commented Sep 9, 2016

@yreynhout You should consider using PageBlob as well. AppendOnlyBlob has a limit of appends per file, which is equal to the number of segments which is 50k. With PageBlob you don't have this limitiation and still, you can apply your changes conditionally. Unfortunately, this brings a padding problem (you need to pad to the page boundary).

Block blobs let you upload large blobs efficiently. Block blobs are comprised of blocks, each of which is identified by a block ID. You create or modify a block blob by writing a set of blocks and committing them by their block IDs. Each block can be a different size, up to a maximum of 4 MB, and a block blob can include up to 50,000 blocks. The maximum size of a block blob is therefore slightly more than 195 GB (4 MB X 50,000 blocks).

from https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx

@Scooletz
Copy link
Contributor

Scooletz commented Sep 9, 2016

@yevhen Domain-level? Why this can't be done as a tool on the infra and provided for the domain to use?

@yevhen
Copy link
Owner

yevhen commented Sep 9, 2016

@Scooletz because it's impossible to do without sacrificing throughput. Either you have exactly-once and are limited to a throughput of a single partition (C), or you have best possible throughput but with at-least-once (AP). I can't think of any solution that will give both.

In the @eulerfx approach with weak global timestamp and merge-sort across partitions you will still have duplicate entries, since there is no perfect sequencing.

@Scooletz
Copy link
Contributor

Scooletz commented Sep 9, 2016

@yevhen Let me shortly describe one of the possible approaches. If the description isn't clear, ping me.

What you could do is to do the following. Provide a few virtual partitions, lets say 16 ("00", "01", "02" ..., "15"). When events writing to a stream, you'd include Insert-Or-Merge that would put a Guid under the partition key. The property name for the written Guid value would be a hash of the stream name %128 to do not exceed the number of properties. This would give 16 * 128 buckets for Guids.
Having this, a chaser could just read this 16 entries in a loop and scan underlying Streams only if a new Guid is written in a partion marking the stream bucket.

@yevhen
Copy link
Owner

yevhen commented Sep 10, 2016

Ye, I get that approach. But it still have this problem. The crash of the chaser, which as I understand is the thing that will be building a global log can lead to a duplicate entry.

@Scooletz
Copy link
Contributor

Not at all. If the chaser used a single partition to write its entries (potentially, a separate table), it could in a batch

  • INSERT indexing entries like
{PK: "index", RK:"00000000000001", Stream:"nameofstream", Version:"5}
  • REPLACE stream version information
{PK: "index", RK:"nameofstream", Version:"5}

ensuring that it writes log entries and streams' versions together and marking events as dispatched. Yes, in this scenario log throughput is limited to one partition only.

@yevhen
Copy link
Owner

yevhen commented Sep 10, 2016

Yes, in this scenario log throughput is limited to one partition only.

That's what I said before :)

@kellypleahy
Copy link

I'll describe what we did in our own EventStore that's now been in production use for several years now at my previous employer.

  1. when inserting new rows into a stream, we first insert a record into the "last write" partition (I can't remember what it is called, but there's a single partition with all "last write" records). The row key is the stream name, and the record contains the highest numbered record we are "about to" write to the stream.
  2. we have code running in our roles that periodically reads all "last write" records in the partition, and appends the missing records to the $all stream. We track which records have been written to $all already in the $all stream itself using deduplication support similar to what you already have and using the 'streamId+sequence' as the dedup key.
  3. we only delete the "last write" records if the last record we replicated from the stream matches the last write record we read (and the last write record's eTag hasn't changed since we read it).

I'm doing this from memory since I don't have access to the code, but I think this is correct.

@Scooletz
Copy link
Contributor

@kellypleahy What about the case, where the second write, the real append to the stream fails? Then you're left with a "last write" that points to an non existing event. How do you deal with this scenario? Does it make the chaser to loop infinitely?

@kellypleahy
Copy link

kellypleahy commented Oct 14, 2016

@Scooletz We consider this a non-issue for the following reasons (in our usage, YMMV).

  1. most of the time, when the write fails, the command that was being executed will retry, get a new 'start sequence', and save, thereby replacing the "bad" record with a "good" one.
  2. if (1) doesn't happen, the only real disadvantage is that while that "bogus" record is there, the updater will read an extra stream (the one with the bogus record) only to find there aren't any new records in that stream. This isn't very expensive so we've never optimized for it.

re (2), you could easily put a timeout in the logic to say that if these records are more than X 'old', the delete (atomically, of course).

The main trick (and a place where at one point there was a bug in our system) is that you need to be very careful not to "lose" these marker records prematurely. Another thing we had to do was maintain tracking records in the $all stream that told us where each other stream was that was contained by $all (I can't remember exactly how we did this, but it was definitely in a way that could be batched with the event writes, so it must have been a record in the same partition with a special rowkey). Since we also used the dedup records, we could ensure that even if this record was not properly updated, we could detect the situation and fix.

The general idea of all the approaches we had (including this and batches, etc.) is that each new writer could detect problems in the consistency of the store and fix them proactively. It's the same philosophy I'm going to be using in my AWS implementation at my latest client.

@Scooletz
Copy link
Contributor

I see your reasoning now @kellypleahy Thanks for the explanation. Still, I prefer to stick to the strong consistency using batches and the approach describe by me above. This has limitations of throughput though.

@Scooletz
Copy link
Contributor

@yevhen Would you consider a PR that would require introducing a global hook on the Stream.Write operation? Something similar to the event of OperationContext? The API for the stream is static, so that would be probably a prerequisite for doing it. I'm asking as I have something on my mind and would be able to flush it (either in this issue, or in a PR description).

@yevhen
Copy link
Owner

yevhen commented Dec 19, 2018

@Scooletz what problem you want to solve with that hook?

I would rather not touch Stream api leaving it to individual stream operations. Instead, what I'm considering is to create a level of indirection, like EventStore (non-static) object, where we can replicate stream operations but with added functionality on top. This may include stream directory management, sharding, global log (with some constraints of course), etc.

@Scooletz
Copy link
Contributor

I want to be able to enable fromAll queries, this is why I posted it in here 😅

I'd do it in the following way:

  1. before every batch: gather aggregate id + version + event ids that will be appended
  2. send a message with ASQ with visibility time set to reasonably long time (initially I thought about 10s) containing this data
  3. batch.Execute() occurs here

In a separate Azure Function, that is hooked the queue with aggregate messages:

  1. retrieve message, potentially in batches with QueueBatch
  2. try to read events specified in the message.
    1. if there are no events, means that commit failed, set artificial events on these positions to disable a "late write" from committer
    2. if there are events, but have different id, it means that this commit failed and another was put into this place. Do nothing
    3. if there are events, with these ids, append them to the log fromAll

So far I've found no counterexample for this algorithm. Thoughts @yevhen ?
I'm interested in this EventStore approach. Any PRs?

@yevhen
Copy link
Owner

yevhen commented Dec 20, 2018

This may work)

The only fragile part I can see here is AQS queue. In case of timeout or error the message will be returned back to the end of the queue, so you can't process any other events anymore until the order is restored. This could be fixed by using some kind of distributed commit log, like Kafka. So SS could provide optimistic concurrency on stream level and Kafka could give a global log.

Also, the hack against "late commit" might be somehow understood by stream writers, they need to able to differentiate between "this event was already written before, that's why I got dup event id exception and should not retry" and artificial events.

@Scooletz
Copy link
Contributor

Oh my. Retries. Yeah, this changes everything as the receiver needs to idempotent. Still, as the input, it would work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests