Checking safety in Exactly-once - Tomek Masternak, Szymon Pobiega

Oct 2, 2020 03:14 · 3237 words · 16 minute read already processed model resistance simple

Hi, everyone. My name is Tomek Masternak, I’m an engineer on the particular software, when I build distributed systems and platform before building distributed systems. In this presentation, I will be talking about, checking safety in Exactly-Once, which is another library that I built together with my friend Szymon Pobiega. Let me start with describing the problem and the challenge distributed system builders face, when they use messaging infrastructure. What is currently at the factor standard, both when it comes to the infrastructure which is available On-prem or in the Cloud, is At-Least-Once message delivery guarantee. That changed quite sometime in the past, before we used to have a two-phase commit available and distributed transactions.

01:03 - That’s pretty much no longer the case and currently At-Least-Once message delivery is that reality that everyone has to work with. Now, At-Least-Once message delivery can be challenging from the system builder perspective especially when writing logic for handling messages that arrive or are being pushed from the messaging infrastructure to the receivers. Let us go through our hypothetical situation just to see what kind of situations I’m talking about. Let’s assume that there are some messages in flight one, two and three and that is the order in which they are stored in the messaging infrastructure or in the queue. Because of the way, how the At-Least-Once message delivery works is perfectly possible for a message to be read, delivered, or delivered multiple times to the receiver.

02:06 - This is usually a simple consequence of the fact that messaging infrastructure will not consider a message to be delivered until it gets an acknowledgment from the receiver. Whenever the acknowledgment does not arrive or does not arrive in a given period of time, the message will reappear in the queue and will be processed or delivered once again. From the processing perspective, what is possible is that some messages get duplicated in our situation, it’s message two. What’s also possible, is reordering and this usually happens when a messaging infrastructure has some delivery or processing timeout. Whenever our messages beat out by the receiver, the timeout starts and if it reaches a threshold, that message will reappear in the queue one more time and potentially in the meantime, other messages might be successfully processed, so the effective ordering of processing that the receiver had will be different than the original ordering.

03:14 - Finally, both of those situations can happen independently at the same time, so via BN situation is perfectly fine for the messages to reorder and it’s perfectly fine for messages to be duplicated and now that can be a challenge from the system builder perspective. A very common and standard solution to that situation is item potency, which basically means that whenever a duplicate of the message is received, the end result of processing that message should be the same as for the first processing of that message. Very often this is understood or this boils down to the message processing logic being deterministic. This is often the case in remote procedure call type of systems when there is only a single message in flight or a single call in flight and that call will be repeated over and over again until a successful acknowledgment comes in from the remoting location. Now, that might be not enough if there are more than one messaging flight and this is very often the case when we are building and modeling business processes.

04:42 - This is a very simple example which shows worldwide what might be the problem. What we are looking at here is a simple sequence diagram showing the behavior in a system which models or implements shooting range game. There is a handlers shooting range which is responsible for holding, a piece of spade which is the current target position of the target. The players can attempt to fire at the given position and the shooting range is responsible for sending back a response saying whether the attempt was successful or fail. So what we can see here is a situation in which we have a shooting range and the shooting range is set to target position 42 and then we have a fire that fires at position 42. The response from that call is a hit.

05:46 - After that, the target is being moved to position one and finally, an interesting situation happens because we get a duplicate and duplicated fire at message which was already processed once, but then it’s being reprocessed one more time. Logic is deterministic but the response that we’ve had is a miss. What happened in that scenario is that we have a single logical message being delivered twice and each of the processing of this message results in a contradictory result, contradictory side effect, which is usually far from what we are expecting from the systems that we’re building. The obvious problem that we have here is that, even though the logical is deterministic, the state changed significantly in between the first processing and the duplicate which resulted in that behavior. What it shows is that the deterministic logic is not enough, we also need to think about the state.

06:55 - Either we need to capture the side effects and basically we’ll do them whenever a duplicate comes in or we need to be able to get a handle at the historical state, as it was when the message first arrived. What we are assuming in the library is that the distributed system that we are talking about is field of bunch of handlers, each handler has a separate dedicated piece of state. That handler is sole holder and the only possibility to communicate between the handlers is via sending messages. What each handler is doing is that it picks up the message from the input queue, executes the business logic, which results in state update and puts a message in the output queue. The main idea or goal of the library was, as stated here, to make sure that the observable side effects correspond to some serial execution of input messages with atomic commit guarantee between the business storage and the output queue.

08:08 - Basically what we wanted to make sure is that whenever a duplicate comes in, the side effects that we’ll produce on the state and in terms of sending of the output messages, is the same as if there was a single processing and the ordering is corresponding to some possible serial execution. The implementation of the library is based on the Azure Stack. If they run in Azure Function host, which basically means now the whole will be messaging infrastructure available for Azure functions is available there as well. The state is stored in Azure Cosmos DB. Specifically, what do we rely on in terms of Azure Cosmos DB is that it provides optimistic concurrency control, which is pretty important or the key part of the approach that we are taking. We store our infrastructural library-driven data in a separate logical partition, which also comes with a consequence which is that there is no atomic transactions between the business state and the infrastructural state.

09:28 - We assume that the users are using Azure Cosmos DB with session consistency model. Finally, high performance scenarios are out of scope. It’s not something that we focused on. Good enough performance, whatever that means, was our goal. Now, the general idea of the algorithm is as follows. Whenever a message comes in, we check whether we already processed it. If not, we load the state.

09:56 - We run the business logic and we do not send out the output messages, but we capture them and store in this form. Now, we attach a single correlation ID with the business piece of data, and then we commit it to the business logic or partition. This is where we do the optimistic concurrency trade. We do that only if that entity did not change since we loaded it from the store. If that is successful, this is basically the point of no return.

10:36 - The atomic commit protocol, once that is done, the both parts, which is the business state which is already committed and the output messages will be committed eventual. Now, an important thing here to notice is that during some failure situations, what can happen is that the output messages might have been sent out multiple times. However, because the duplicates can happen with At-Least-Once message delivery already, we can say that we are hiding behind that failure mode. Basically, because the duplicates can be already there, we can produce duplicates because whoever is processing them should be and is expected to cope with that situation. That was the general idea about the problem and the solution that we seek and wanted to implement.

11:35 - Now I wanted to talk about the TLA+ Model and how we use that model to validate the safety. The goals of the model was to make sure that our algorithm works not only in the happy path but also on in the things to implement. As we all know, it’s very easy to show that the algorithm sometimes works, but it’s actually very tricky to figure out all the further case scenario. We wanted to make sure that we have some validation, some model checking down to make sure that we actually are convinced ourselves that the algorithm does what we wanted it to do. Specifically what we wanted to check was the safety properties.

12:35 - Those safety properties were described as atomic commit. Basically we wanted to make sure that when we commit to the business logic store we eventually push out the changes to the output queue, and also made changes to be the input queue, and also we wanted to make sure that the side effects are consistent in a way that, even for duplicates, there is only one change in the business state and the output messages and the business state change were made over the same version of the business state, which I think will be a bit clearer when we get to the formulas. Finally, we wanted to make sure that everything works as expected even without atomic transaction between different logical partitions. Because as I stated, we start the business logic and the infrastructural data in a separate partitions. Obviously, there were most of the things that were in the implementation did not make it to the model, and specifically we did not model the cleanup logic which is responsible for clearing the infrastructural data that is stored by the library.

14:11 - We did not model the model resistance that are used to lower the chance of concurrency and collisions, optimistic concurrency check failures basically. We also did not model the exponential backlogs and processing retries which are there to make the library a bit more performing in the failure scenarios. Let’s look at the specification. This is the main part of the process that models the single handler. Most of the labels are folded and that shows the main structural specification. What we have here is the infinite while loop and in that loop the process is trying to input message forms the input queue.

15:00 - Then it checks by looking at the transaction field on the state, whether it needs to redo some of the transactional steps to basically push out the changes that are still there and not pushed to the output queue. After doing that, we check whether we already processed the message. If we did not process the message, we model the business logic execution capturing the side effects storing them, the optimistic concurrency commits to the business store and then pushing out the transaction until it’s finished. The main reason why we use plus calc was that this syntax was more familiar for us and secondly, that it actually corresponds pretty well when it comes to the implementation that was done in C#. We were able to look at the code side-by-side and the specification corresponded pretty well with what we could see on the screen looking at the real implementation.

16:15 - When it comes to state modeling, there wasn’t that much happening. Two important notices is that the input queue was modeled as a set, but that set had a records with two fields, One which was the logical message ID and duplicate message ID. The way how we model the duplicates was basically by starting off with duplicates already put queue. When it comes to the business stack modeling, we were a starting a sequence which was a history of snapshots of the states as they were throughout the execution of the algorithm, and that was pretty useful when writing the invariants. We also modeled the diversion for the optimistic concurrency check.

17:09 - All the other bits were used to model the infrastructural storage and the output to which was also set. Another interesting thing is that we did not use is a default termination formula. What we said was that the processes were running in the infinite loops. And the power termination of the algorithm was a situation in which all the processes were in the lock-in message and the input queue was empty. Basically, the termination was modeling the situation in which the input cue is trained and none of the handler is doing anything in specific.

17:54 - But waiting for anything to appear in that input. Finally, the safety invariants. This is what we ended up when it comes to safety. That basically models or expresses the safety property, which makes sure that for any logical message there is AtMostOneStateChange happening to the business store and most once message being produced and sent to the output cue. Those are the first two parts, one on the top and one in the middle. Finally, the one which is called consistent state and output says that if the message is fully processed, the version on which the diversion for the business state and the output message should be the same.

18:54 - Basically we make sure that those two pieces of side-effects are consistent in a way that they operated and were used by a business logic operating on the same version of the business set. Another interesting bit is how we modeled failures. What we ended up was a combat role, but basically was a single instruction which was filled two main loop, which basically meant that model the situation in which either the process restarts or some exception is being thrown and we basically should have the top of the stack and go back looking in that infinite fail loop. The way that we used it was that whenever a failure was possible or we wanted to model at failure, we would do either or expression. For instance, in the commit state macro we can see that if the concurrent optimistic concurrency check results in intro, we either commit right to the store or we fail.

20:05 - That was a general approach to monitoring those kinds of failures. Okay, so now a few words about the results. There were expected and unexpected results. When it comes to the expected results, those were obviously bugs to be found in various parts of the algorithm. Specifically in the post-failure commits and also in the logic that is signs, the transaction id. As I said, that was something that we wanted to get from the model check-ins. So that’s why I called it expected. I don’t want to deprecate the value of that because that was exactly what we want. However, there were also unexpected bits. I would say that the main unexpected bit was the fact that when we started off writing the model, we realized that we don’t really understand what is it that we are actually doing. So it actually forced us to distill the algorithm to its essentials and to make sure whether the assumptions that we are making which are pretty important and also what are the most important steps in that algorithm. As a follow-up of doubt, we realized that there are actually some extensions that we could also use.

21:31 - For instance, I already said that one of the main assumptions that we are making is that we don’t need any kind of a preparer phase for the output queue, because we can always commit to that output queue. We can always send a message there. The duplicates can happen there because there’s already that failure mode, which we need to cope with. But what we realized is that there might be some other resources that are always able to commit. So for instance, if we have some write once store, so for instance, a blobs store with a [inaudible] blobs that can be written only once that could be an expression to that algorithm as well. That could be a supported side effect that we could apply the same way as we are sending out the messages.

22:30 - Finally, what we realized is that we did not know the cost of [inaudible] well enough and all the memory model implications. So we had to go back to the drawing board, and understand what doesn’t mean that we are running in a session memory model. Also that there are no atomic transactions between the logical preparations. Finally, another unexpected thing was that in order to create a meaningful model, we had to understand what are the concurrency characteristics of the algorithm. What are the parts that can happen independently and concurrently? But even more importantly, we had to learn what are the failure modes or what are the failure situations that may happen in different parts of the system as well.

23:26 - So it actually forced us to understand the technology that we’re using are better than we knew it beforehand. So we did the validation, we did the model checking. An interesting note here is that whenever there was a bug in a specific location or in the algorithm, we were able to find it on very small models. And by very small models, I mean a single message with a single duplicate and a single process. Almost that. In that sense, the running time of model checking was never a problem for us because whenever there were bugs, we would get a feedback immediately.

24:19 - Now that being said, after we settled on an algorithm and we checked it with a smaller model. We also went with a bigger one. The bigger that we did was two processes with six messages in the input cue, those are numbers in terms of number of space that we ended up with and that was running roughly two-and-a-half hours on a laptop, my personal laptop, that I would consider a high-end laptop based on current standards. That’s it that I prepared for you in this presentation. If anyone wants to know more about the work that we did around TLA class and the library. We are posting about that at exactly-once.github.io.

25:11 - I’m also available on Twitter at Masternak. If anyone wants to chat, please reach out to me. I’d really like to share ideas. Thank you very much. .