Multi-document ACID transactions in MongoDB are very similar to what you probably already know from traditional relational databases.
MongoDB’s transactions are a conversational set of related operations that must atomically commit or fully rollback with all-or-nothing execution.
Transactions are used to make sure operations are atomic even across multiple collections or databases. Thus, with snapshot isolation reads, another user can only see all the operations or none of them.
Let’s now add a shopping cart to our example.
For this example, 2 collections are required because we are dealing with 2 different business entities: the stock management and the shopping cart each client can create during shopping. The lifecycle of each document in these collections is different.
A document in the product collection represents an item I’m selling. This contains the current price of the product and the current stock. I created a POJO to represent it : Product.java.
A shopping cart is created when a client adds its first item in the cart and is removed when the client proceeds to checkout or leaves the website. I created a POJO to represent it : Cart.java.
The challenge here resides in the fact that I cannot sell more than I possess: if I have 5 beers to sell, I cannot have more than 5 beers distributed across the different client carts.
To ensure that, I have to make sure that the operation creating or updating the client cart is atomic with the stock update. That’s where the multi-document transaction comes into play. The transaction must fail in the case someone tries to buy something I do not have in my stock. I will add a constraint on the product stock:
db.createCollection("product", { validator: { $jsonSchema: { bsonType:"object", required: [ "_id","price","stock" ], properties: { _id: { bsonType:"string", description:"must be a string and is required" }, price: { bsonType:"decimal", minimum:0, description:"must be a positive decimal and is required" }, stock: { bsonType:"int", minimum:0, description:"must be a positive integer and is required" } } } }})
Node that this is already included in the Java code.
To monitor our example, we are going to use MongoDB Change Streams that were introduced in MongoDB 3.6.
In each of the threads of this process called ChangeStreams.java, I am going to monitor one of the 2 collections and print each operation with its associated cluster time.
// package and importspublicclassChangeStreams {privatestaticfinalBson filterUpdate =Filters.eq("operationType","update");privatestaticfinalBson filterInsertUpdate =Filters.in("operationType","insert","update"); private static final String jsonSchema = "{ $jsonSchema: { bsonType: \"object\", required: [ \"_id\", \"price\", \"stock\" ], properties: { _id: { bsonType: \"string\", description: \"must be a string and is required\" }, price: { bsonType: \"decimal\", minimum: 0, description: \"must be a positive decimal and is required\" }, stock: { bsonType: \"int\", minimum: 0, description: \"must be a positive integer and is required\" } } } } ";
publicstaticvoidmain(String[] args) {MongoDatabase db =initMongoDB(args[0]);MongoCollection<Cart> cartCollection =db.getCollection("cart",Cart.class);MongoCollection<Product> productCollection =db.getCollection("product",Product.class);ExecutorService executor =Executors.newFixedThreadPool(2);executor.submit(() ->watchChangeStream(productCollection, filterUpdate));executor.submit(() ->watchChangeStream(cartCollection, filterInsertUpdate));ScheduledExecutorService scheduled =Executors.newSingleThreadScheduledExecutor();scheduled.scheduleWithFixedDelay(System.out::println,0,1,TimeUnit.SECONDS); }privatestaticvoidwatchChangeStream(MongoCollection<?> collection,Bson filter) {System.out.println("Watching "+collection.getNamespace());List<Bson> pipeline =Collections.singletonList(Aggregates.match(filter));collection.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).forEach((Consumer<ChangeStreamDocument<?>>) doc ->System.out.println(doc.getClusterTime() +" => "+doc.getFullDocument())); }privatestaticMongoDatabaseinitMongoDB(String mongodbURI) {getLogger("org.mongodb.driver").setLevel(Level.SEVERE);CodecRegistry providers =fromProviders(PojoCodecProvider.builder().register("com.mongodb.models").build());CodecRegistry codecRegistry =fromRegistries(MongoClient.getDefaultCodecRegistry(), providers);MongoClientOptions.Builder options =new MongoClientOptions.Builder().codecRegistry(codecRegistry);MongoClientURI uri =newMongoClientURI(mongodbURI, options);MongoClient client =newMongoClient(uri);MongoDatabase db =client.getDatabase("test");db.drop();db.createCollection("cart");db.createCollection("product",productJsonSchemaValidator());return db; }privatestaticCreateCollectionOptionsproductJsonSchemaValidator() {returnnewCreateCollectionOptions().validationOptions( new ValidationOptions().validationAction(ValidationAction.ERROR).validator(BsonDocument.parse(jsonSchema)));
}}
In this example we have 5 beers to sell. Alice wants to buy 2 beers but we are not going to use the new MongoDB 4.0 multi-document transactions for this. We will observe in the change streams two operations : one creating the cart and one updating the stock at 2 different cluster times.
Then Alice adds 2 more beers in her cart and we are going to use a transaction this time. The result in the change stream will be 2 operations happening at the same cluster time.
Finally, she will try to order 2 extra beers but the jsonSchema validator will fail the product update and result in a rollback. We will not see anything in the change stream. Here is the Transaction.java source code:
// package and importpublicclassTransactions {privatestaticMongoClient client;privatestaticMongoCollection<Cart> cartCollection;privatestaticMongoCollection<Product> productCollection;privatefinalBigDecimal BEER_PRICE =BigDecimal.valueOf(3);privatefinalString BEER_ID ="beer";privatefinalBson stockUpdate =inc("stock",-2);privatefinalBson filterId =eq("_id", BEER_ID);privatefinalBson filterAlice =eq("_id","Alice");privatefinalBson matchBeer =elemMatch("items", eq("productId","beer"));privatefinalBson incrementBeers =inc("items.$.quantity",2);publicstaticvoidmain(String[] args) {initMongoDB(args[0]);newTransactions().demo(); }privatestaticvoidinitMongoDB(String mongodbURI) {getLogger("org.mongodb.driver").setLevel(Level.SEVERE);CodecRegistry codecRegistry =fromRegistries(MongoClient.getDefaultCodecRegistry(), fromProviders(PojoCodecProvider.builder().register("com.mongodb.models").build()));MongoClientOptions.Builder options =new MongoClientOptions.Builder().codecRegistry(codecRegistry);MongoClientURI uri =newMongoClientURI(mongodbURI, options); client =newMongoClient(uri);MongoDatabase db =client.getDatabase("test"); cartCollection =db.getCollection("cart",Cart.class); productCollection =db.getCollection("product",Product.class); }privatevoiddemo() {clearCollections();insertProductBeer();printDatabaseState();System.out.println("######### NO TRANSACTION #########");System.out.println("Alice wants 2 beers."); System.out.println("We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.");
System.out.println("The 2 actions are correlated but can not be executed on the same cluster time.");System.out.println("Any error blocking one operation could result in stock error or beer sale we don't own.");System.out.println("---------------------------------------------------------------------------");aliceWantsTwoBeers();sleep();removingBeersFromStock();System.out.println("####################################\n");printDatabaseState();sleep();System.out.println("\n######### WITH TRANSACTION #########");System.out.println("Alice wants 2 extra beers.");System.out.println("Now we can update the 2 collections simultaneously.");System.out.println("The 2 operations only happen when the transaction is committed.");System.out.println("---------------------------------------------------------------------------");aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();sleep();System.out.println("\n######### WITH TRANSACTION #########");System.out.println("Alice wants 2 extra beers.");System.out.println("This time we do not have enough beers in stock so the transaction will rollback.");System.out.println("---------------------------------------------------------------------------");aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();client.close(); }privatevoidaliceWantsTwoExtraBeersInTransactionThenCommitOrRollback() {ClientSession session =client.startSession();try {session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build());aliceWantsTwoExtraBeers(session);sleep();removingBeerFromStock(session);session.commitTransaction(); } catch (MongoCommandException e) {session.abortTransaction();System.out.println("####### ROLLBACK TRANSACTION #######"); } finally {session.close();System.out.println("####################################\n");printDatabaseState(); } }privatevoidremovingBeersFromStock() {System.out.println("Trying to update beer stock : -2 beers.");try {productCollection.updateOne(filterId, stockUpdate); } catch (MongoCommandException e) {System.out.println("##### MongoCommandException #####");System.out.println("##### STOCK CANNOT BE NEGATIVE #####");throw e; } }privatevoidremovingBeerFromStock(ClientSession session) {System.out.println("Trying to update beer stock : -2 beers.");try {productCollection.updateOne(session, filterId, stockUpdate); } catch (MongoCommandException e) {System.out.println("##### MongoCommandException #####");System.out.println("##### STOCK CANNOT BE NEGATIVE #####");throw e; } }privatevoidaliceWantsTwoBeers() {System.out.println("Alice adds 2 beers in her cart.");cartCollection.insertOne(newCart("Alice",Collections.singletonList(new Cart.Item(BEER_ID,2, BEER_PRICE)))); }privatevoidaliceWantsTwoExtraBeers(ClientSession session) {System.out.println("Updating Alice cart : adding 2 beers.");cartCollection.updateOne(session,and(filterAlice, matchBeer), incrementBeers); }privatevoidinsertProductBeer() {productCollection.insertOne(newProduct(BEER_ID,5, BEER_PRICE)); }privatevoidclearCollections() {productCollection.deleteMany(newBsonDocument());cartCollection.deleteMany(newBsonDocument()); }privatevoidprintDatabaseState() {System.out.println("Database state:");printProducts(productCollection.find().into(newArrayList<>()));printCarts(cartCollection.find().into(newArrayList<>()));System.out.println(); }privatevoidprintProducts(List<Product> products) {products.forEach(System.out::println); }privatevoidprintCarts(List<Cart> carts) {if (carts.isEmpty())System.out.println("No carts...");elsecarts.forEach(System.out::println); }privatevoidsleep() {System.out.println("Sleeping 3 seconds...");try {Thread.sleep(3000); } catch (InterruptedException e) {System.err.println("Oups...");e.printStackTrace(); } }}
As you can see here, we only get four operations because the two last operations were never committed to the database, and therefore the change stream has nothing to show.
You can also note that the two first cluster times are different because we did not use a transaction for the two first operations, and the two last operations share the same cluster time because we used the new MongoDB 4.0 multi-document transaction system, and thus they are atomic.
Here is the console of the Transaction java process that sum up everything I said earlier.
$ ./transactions.sh
Database state:
Product{id='beer', stock=5, price=3}
No carts...
######### NO TRANSACTION #########
Alice wants 2 beers.
We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.
The 2 actions are correlated but can not be executed on the same cluster time.
Any error blocking one operation could result in stock error or a sale of beer that we can’t fulfill as we have no stock.