How Clojure Atom and swap! internals work
What is wrong with this code?
class Example {
private int x = 0;
public int increment() {
this.x = this.x + 1;
return x;
}
}
It’s okay until you try to use it in concurrent environment, from different threads. In that case we have a classic race condition, increment is not atomic, two threads can read value at the same time and one one increment will be written.
There are ways to improve it, for example, we can use synchronized
keyword on the method name, which will always attempt to acquire a lock on the object and force a single-thread execution of the method.
public synchronized int increment() {
this.x = this.x + 1;
return x;
}
Also, we can use explicit locks, like ReentrantLock
:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
Or alternatively, if we want a thread-safe way to work with an Integer we can completely swap our own implementation AtomicInteger:
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet();
There are other classes available in the import java.util.concurrent.atomic.* package. The backbone of that is the Compare And Set (CAS) operation: Java will rely on the underlying platform processor instructions, for example, CMPXCHG
on x86/x64
to atomically swap the variable value.
So how is this applicable to our topic, Clojure Atoms?
Let’s check Atom semantics, we can define an atom containing any value type, from primitives to data structures like vectors, sets and maps, to update those values we use swap!
function, it is guaranteed to be thread-safe.
(def x (atom 0))
(swap! x inc)
=> 1
(deref x)
=> 1
@x
=> 1
Note that to get value from an atom there is deref
function, @
is the short way of writing deref
.
As I’ve said we can use data structures inside atom container as well:
(def x (atom [1 2 3]))
(swap! x (fn [v] (mapv inc v)))
=> [2 3 4]
Let’s look inside of the atom swap! implementation:
(defn swap!
"Atomically swaps the value of atom to be:
(apply f current-value-of-atom args). Note that f may be called
multiple times, and thus should be free of side effects. Returns
the value that was swapped in."
{:added "1.0"
:static true}
([^clojure.lang.IAtom atom f] (.swap atom f))
([^clojure.lang.IAtom atom f x] (.swap atom f x))
([^clojure.lang.IAtom atom f x y] (.swap atom f x y))
([^clojure.lang.IAtom atom f x y & args] (.swap atom f x y args)))
import java.util.concurrent.atomic.AtomicReference;
public final class Atom extends ARef implements IAtom2 {
final AtomicReference state;
public Object deref() {
return this.state.get();
}
public Object swap(IFn f) {
Object v;
Object newv;
do {
v = this.deref();
newv = f.invoke(v);
this.validate(newv);
} while(!this.state.compareAndSet(v, newv));
this.notifyWatches(v, newv);
return newv;
}
We can see the usage of AtomicReference,
that’s basically how our object is stored in the atom. Deref is simple, we just call get
on the AtomicReference:
public Object deref() {
return this.state.get();
}
In the swap we see a common pattern: Compare And Set Loop. The idea is simple:
get current value
calculate the next value (the invoke call of user-provided function with the current value as the input)
try to apply changes with compareAndSet
But why do we need the loop? CompareAndSet will return true in case of success, but there is still a possibility that other threads changed the value in between! That’s why we need to retry that until we get lucky and our update is applied:
/**
* Atomically sets the value to {@code newValue}
* if the current value {@code == expectedValue},
* with memory effects as specified by {@link VarHandle#compareAndSet}.
*
* @param expectedValue the expected value
* @param newValue the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(V expectedValue, V newValue) {
return VALUE.compareAndSet(this, expectedValue, newValue);
}
Due to this retry logic - it’s clear that you shouldn’t add side-effects into the swap fn, as you can see there is no guarantee that it will be executed once. It’s only guaranteed that the result will be updated in a thread-safe way.
Also, note that we have this line:
this.notifyWatches(v, newv);
It’s a watcher feature of the Atom, not commonly used or known but pretty cool.
(def a (atom {}))
(add-watch a :watcher
(fn [key atom old-state new-state]
(println "Atom value changed)))
Now the watcher fn will be triggered each time the value inside the atom is changed, it’s possible to add multiple watchers as well.
Hope that was useful!