CDRT conflict-free replicated data type 無衝突複製資料類型是一種可以在網路中的多台電腦上複製的資料結構,每一個副本可獨立各自更新,不需要在副本之間進行協調,已透過數學確認可解決可能出現的不一致問題。
對於不同節點上的共用資料,如果在某一個節點上更新了資料,會產生不一致性,如果節點之間沒有協調更新的權限而產生資料衝突,就可能需要放棄一部分更新,以達到最終的一致性,分散式計算都集中在如何防止複製資料的並行更新問題上。
另一種方式 Optimistic Replication,是讓各節點都能做更新,允許所有更新都能同時執行,然後再來考慮如何合併解決衝突問題,CRDT 是一種資料結構,可在不同的副本上進行更新,最後透過合併解決衝突問題。
CRDT 種類
基於狀態的 CRDT 比較容易實作,但需要每個 CRDT 都要將狀態傳給其他副本,傳輸資料耗費較大,基於操作的 CRDT 只需要傳送更新的動作記錄。
基於操作的 CRDT
也稱為 交換性複製資料類型(commutative replicated data types,或CmRDT)
只傳輸更新操作來傳播狀態,必須確保操作過程都有傳遞給每一個資料副本,可不依照順序,但不能重複。
基於狀態的 CRDT
被稱為收斂複製資料類型(convergent replicated data types,或CvRDT)
必須將完整的本地端狀態,重送給其他副本。
State-Based LWW-Element-Graph CRDT
GitHub - juliuskrah/crdt-java: A minimal CRDT implementation 這是一個簡易的 java library,實作了 CvRDT,用 graph 方式,連結不同的節點。
當兩個節點沒有連線時,就是 split-brain 狀態,CRDT 可在節點連接後,讓不同的副本資料合併達到一致性。
這邊測試四種演算法
Grow-Only Set:會一直不斷增加的 Set,最終不同節點的 Set 會合併再一起
Increment-Only Counter: 結果會是所有節點的加總總和
PN Counter:結果是所有節點的 加總總和 - 減法總和
Last-Writer-Wins Register:在合併時,只會保留最新的異動內容。同時發生的操作會被丟棄
<!-- Experimental CRDT-implementations for the JVM -->
<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>
import com.netopyr.wurmloch.crdt.GCounter;
import com.netopyr.wurmloch.crdt.GSet;
import com.netopyr.wurmloch.crdt.LWWRegister;
import com.netopyr.wurmloch.crdt.PNCounter;
import com.netopyr.wurmloch.store.LocalCrdtStore;
import org.junit.Test;
import static org.junit.Assert.*;
public class CRDTTest {
@Test
public void gset() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
GSet<String> replica1 = crdtStore1.createGSet("fruit");
GSet<String> replica2 = crdtStore2.<String>findGSet("fruit").get();
replica1.add("apple");
replica2.add("banana");
// 確認replica1, replica2 都有 "apple" 及 “”banana
assertTrue( replica1.contains("apple") );
assertTrue( replica1.contains("banana") );
assertTrue( replica2.contains("apple") );
assertTrue( replica2.contains("banana") );
// 刻意斷線
crdtStore1.disconnect(crdtStore2);
// 異動 replica1, replica2
replica1.add("strawberry");
replica2.add("pear");
assertTrue( replica1.contains("strawberry") );
assertFalse( replica2.contains("strawberry") );
assertFalse( replica1.contains("pear") );
assertTrue( replica2.contains("pear") );
// 連線
crdtStore1.connect(crdtStore2);
assertTrue( replica1.contains("strawberry") );
assertTrue( replica2.contains("strawberry") );
assertTrue( replica1.contains("pear") );
assertTrue( replica2.contains("pear") );
}
@Test
public void gcounter() {
// 結果會是所有節點的 sum
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
GCounter replica1 = crdtStore1.createGCounter("counter");
GCounter replica2 = crdtStore2.findGCounter("counter").get();
replica1.increment();
replica2.increment(2L);
assertEquals(3L, replica1.get());
assertEquals(3L, replica2.get());
// 斷線
crdtStore1.disconnect(crdtStore2);
replica1.increment(3L);
replica2.increment(5L);
assertEquals(6L, replica1.get());
assertEquals(8L, replica2.get());
// connect
crdtStore1.connect(crdtStore2);
assertEquals(11L, replica1.get());
assertEquals(11L, replica2.get());
}
@Test
public void pncounter() {
// 結果是所有節點的 加總總和 - 減法總和
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
PNCounter replica1 = crdtStore1.createPNCounter("pncounter");
PNCounter replica2 = crdtStore2.findPNCounter("pncounter").get();
replica1.increment();
replica2.decrement(2L);
assertEquals(-1L, replica1.get());
assertEquals(-1L, replica2.get());
// disconnect
crdtStore1.disconnect(crdtStore2);
replica1.decrement(3L);
replica2.increment(5L);
assertEquals(-4L, replica1.get());
assertEquals(4L, replica2.get());
// connect
crdtStore1.connect(crdtStore2);
assertEquals(1L, replica1.get());
assertEquals(1L, replica2.get());
}
@Test
public void lwwregister() {
// 在合併時,只會保留最新的異動內容。同時發生的操作會被丟棄
LocalCrdtStore crdtStore1 = new LocalCrdtStore("crdt");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("crdt");
crdtStore1.connect(crdtStore2);
LWWRegister<String> replica1 = crdtStore1.createLWWRegister("lwwregister");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("lwwregister").get();
replica1.set("apple");
replica2.set("banana");
assertEquals("banana", replica1.get());
assertEquals("banana", replica2.get());
//disconnect
crdtStore1.disconnect(crdtStore2);
replica1.set("strawberry");
replica2.set("pear");
assertEquals("strawberry", replica1.get());
assertEquals("pear", replica2.get());
// connect
crdtStore1.connect(crdtStore2);
// buggy:應該要是 pear
assertEquals("strawberry", replica1.get());
assertEquals("strawberry", replica2.get());
}
}
References
Introduction to Conflict-Free Replicated Data Types | Baeldung
GitHub - juliuskrah/crdt-java: A minimal CRDT implementation