peillute/
clock.rs

1//! Logical clock implementation for distributed synchronization
2//!
3//! This module provides both Lamport and Vector clock implementations for
4//! maintaining causal ordering of events in the distributed system.
5
6#[cfg(feature = "server")]
7/// Implements logical clocks for distributed synchronization
8///
9/// The Clock struct maintains both a Lamport clock for total ordering
10/// and a vector clock for causal ordering of events across the distributed system.
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
12pub struct Clock {
13    /// Lamport clock value for total ordering of events
14    lamport_clock: i64,
15    /// Vector clock mapping site IDs to their clock values
16    ///
17    /// Site_id -> clock value
18    vector_clock: std::collections::HashMap<String, i64>,
19}
20
21#[cfg(feature = "server")]
22impl Clock {
23    /// Creates a new Clock instance with initialized clocks
24    pub fn new() -> Self {
25        Clock {
26            lamport_clock: 0,
27            vector_clock: std::collections::HashMap::new(),
28        }
29    }
30
31    pub fn from_parts(
32        lamport_clock: i64,
33        vector_clock: std::collections::HashMap<String, i64>,
34    ) -> Self {
35        Clock {
36            lamport_clock,
37            vector_clock,
38        }
39    }
40
41    ///Creates a new Clock instance with initialized clocks, used for testing
42    #[cfg(test)]
43    pub fn new_with_values(lamport: i64, vector: std::collections::HashMap<String, i64>) -> Self {
44        Clock {
45            lamport_clock: lamport,
46            vector_clock: vector,
47        }
48    }
49
50    /// Increments the Lamport clock and returns the new value
51    fn increment_lamport(&mut self) {
52        self.lamport_clock += 1;
53    }
54
55    /// Increments the vector clock for a specific site and returns the new value
56    fn increment_vector(&mut self, site_id: &str) {
57        let clock = self.vector_clock.entry(site_id.to_string()).or_insert(0);
58        *clock += 1;
59    }
60
61    /// Returns a reference to the Lamport clock value
62    pub fn get_lamport(&self) -> &i64 {
63        &self.lamport_clock
64    }
65
66    /// Returns a reference to the vector clock
67    pub fn get_vector_clock_map(&self) -> &std::collections::HashMap<String, i64> {
68        &self.vector_clock
69    }
70
71    /// Returns the vector clock as a list of values
72    pub fn get_vector_clock_values(&self) -> Vec<i64> {
73        let mut vc: Vec<i64> = vec![0; self.vector_clock.len()];
74        for (i, clock_value) in self.vector_clock.values().enumerate() {
75            vc[i] = *clock_value;
76        }
77        vc
78    }
79
80    /// Updates the vector clock with received values, taking the maximum of local and received values
81    fn update_vector(&mut self, received_vc: &std::collections::HashMap<String, i64>) {
82        for (site_id, clock_value) in received_vc {
83            let current_value = self.vector_clock.entry(site_id.clone()).or_insert(0);
84            *current_value = (*current_value).max(*clock_value) + 1;
85        }
86    }
87
88    /// Updates the lamport clock with received value, taking the maximum of local and received values
89    fn update_lamport(&mut self, received_lc: &i64) {
90        self.lamport_clock = (self.lamport_clock).max(*received_lc) + 1;
91    }
92
93    /// Update the current clock value with an optional clock
94    ///
95    /// Local lamport clock is incremented
96    ///
97    /// Element of local vector clock is incremented
98    ///
99    /// Then we call update methods to take the maximum of the received clocks if any
100    pub fn update_clock(&mut self, local_site_id: &str, received_clock: Option<&Self>) {
101        if let Some(rc) = received_clock {
102            self.update_vector(rc.get_vector_clock_map());
103            self.update_lamport(rc.get_lamport());
104        } else {
105            // If the received clock is None, we increment the local lamport clock and the local vector clock
106            self.increment_lamport();
107            self.increment_vector(local_site_id);
108        }
109    }
110}
111
112#[cfg(test)]
113#[cfg(feature = "server")]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_new_clock_initialization() {
119        let clock = Clock::new();
120        assert_eq!(*clock.get_lamport(), 0);
121        assert!(clock.get_vector_clock_map().is_empty());
122    }
123
124    #[test]
125    fn test_increment_lamport() {
126        let mut clock = Clock::new();
127        clock.increment_lamport();
128        assert_eq!(*clock.get_lamport(), 1);
129        clock.increment_lamport();
130        assert_eq!(*clock.get_lamport(), 2);
131    }
132
133    #[test]
134    fn test_increment_vector() {
135        let mut clock = Clock::new();
136        clock.increment_vector("A");
137        clock.increment_vector("A");
138        clock.increment_vector("B");
139
140        let vc = clock.get_vector_clock_map();
141        assert_eq!(vc.get("A"), Some(&2));
142        assert_eq!(vc.get("B"), Some(&1));
143    }
144
145    #[test]
146    fn test_get_vector_clock_values() {
147        let mut clock = Clock::new();
148        clock.increment_vector("A");
149        clock.increment_vector("B");
150        clock.increment_vector("B");
151
152        let mut values = clock.get_vector_clock_values();
153        values.sort(); // Order not guaranteed by HashMap
154        assert_eq!(values, vec![1, 2]);
155    }
156
157    #[test]
158    fn test_update_vector_clock() {
159        let mut local = Clock::new();
160        local.increment_vector("A"); // A:1
161
162        let mut incoming = Clock::new();
163        incoming.increment_vector("A"); // A:1
164        incoming.increment_vector("A"); // A:2
165        incoming.increment_vector("B"); // B:1
166
167        local.update_vector(&incoming.get_vector_clock_map());
168
169        let local_vc = local.get_vector_clock_map();
170        assert_eq!(local_vc.get("A"), Some(&3));
171        assert_eq!(local_vc.get("B"), Some(&2));
172    }
173
174    #[test]
175    fn test_update_clock_with_none() {
176        let mut clock = Clock::new();
177        clock.update_clock("A", None);
178
179        assert_eq!(*clock.get_lamport(), 1);
180        assert_eq!(clock.get_vector_clock_map().get("A"), Some(&1));
181    }
182
183    #[test]
184    fn test_update_clock_with_received_clock() {
185        let mut local = Clock::new();
186        local.increment_vector("A"); // A:1
187        local.increment_vector("A"); // A:2
188        local.increment_lamport(); // lamport: 1
189
190        let mut received = Clock::new();
191        received.increment_vector("A"); // A:1
192        received.increment_vector("B"); // B:1
193        received.increment_lamport(); // lamport: 1
194        received.increment_lamport(); // lamport: 2
195
196        // Here local is A:2 before
197        // Incrementing local to A:3
198        // Then updated with received (A:2)
199        // So should be A:3 after
200        // Lamport was 1 before, so should be 2 after
201        local.update_clock("A", Some(&received));
202
203        // Lamport clock should be max(received, local) + 1
204        assert_eq!(*local.get_lamport(), 3);
205        let vc = local.get_vector_clock_map();
206        assert_eq!(vc.get("A"), Some(&3)); // Incremented locally + merged max
207        assert_eq!(vc.get("B"), Some(&2));
208    }
209}