peillute/
state.rs

1//! Application state management for Peillute
2//!
3//! This module handles the global application state, including site information,
4//! peer management, and logical clock synchronization.
5
6#[cfg(feature = "server")]
7#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
8pub enum MutexTag {
9    Request,
10    Release,
11    #[allow(dead_code)]
12    Ack,
13}
14
15#[cfg(feature = "server")]
16#[derive(serde::Serialize, serde::Deserialize, Copy, Clone, Debug)]
17pub struct MutexStamp {
18    pub tag: MutexTag,
19    pub date: i64,
20}
21
22#[cfg(feature = "server")]
23/// Represents the global state of a Peillute node
24pub struct AppState {
25    // --- Site Info ---
26    /// Unique identifier for this site
27    site_id: String,
28    /// Unique address for this site
29    site_addr: std::net::SocketAddr,
30    /// List of peer addresses given in arguments at the launch of the application
31    cli_peer_addrs: Vec<std::net::SocketAddr>,
32    /// List of deg(1) neighbours connected addresses
33    connected_neighbours_addrs: Vec<std::net::SocketAddr>,
34    /// Hashmap of sockets for each deg(1) neighbours
35    neighbours_socket: std::collections::HashMap<std::net::SocketAddr, std::net::SocketAddr>,
36    /// Synchronization boolean
37    sync_needed: bool,
38    /// Number of attended neighbours at launch, for the discovery phase
39    nb_first_attended_neighbours: i64,
40
41    pub site_ids_to_adr: std::collections::HashMap<std::net::SocketAddr, String>,
42
43    // --- Message Diffusion Info for Transaction ---
44    /// Adress of the parent (deg(1) neighbour for this site) for a specific wave from initiator id
45    pub parent_addr_for_transaction_wave: std::collections::HashMap<String, std::net::SocketAddr>,
46    /// Number of response expected from our direct neighbours (deg(1) neighbours for this site) = nb of connected neighbours - 1 (parent) for a specific wave initiator id
47    pub attended_neighbours_nb_for_transaction_wave: std::collections::HashMap<String, i64>,
48
49    // --- Logical Clocks ---
50    /// Logical clock implementation for distributed synchronization
51    clocks: crate::clock::Clock,
52
53    // GLobal mutex
54    pub global_mutex_fifo: std::collections::HashMap<String, MutexStamp>,
55    pub waiting_sc: bool,
56    pub in_sc: bool,
57    pub notify_sc: std::sync::Arc<tokio::sync::Notify>,
58    pub pending_commands: std::collections::VecDeque<crate::control::CriticalCommands>,
59}
60
61#[cfg(feature = "server")]
62impl AppState {
63    /// Creates a new AppState instance with the given configuration
64    pub fn new(
65        site_id: String,
66        peer_addrs: Vec<std::net::SocketAddr>,
67        local_addr: std::net::SocketAddr,
68    ) -> Self {
69        let clocks = crate::clock::Clock::new();
70        let parent_addr = std::collections::HashMap::new();
71        let nb_of_attended_neighbors = std::collections::HashMap::new();
72        let in_use_neighbors = Vec::new();
73        let sockets_for_connected_peers = std::collections::HashMap::new();
74        let gm = std::collections::HashMap::new();
75        let waiting_sc = false;
76        let in_sc = false;
77
78        Self {
79            site_id,
80            cli_peer_addrs: peer_addrs,
81            neighbours_socket: sockets_for_connected_peers,
82            site_addr: local_addr,
83            parent_addr_for_transaction_wave: parent_addr,
84            attended_neighbours_nb_for_transaction_wave: nb_of_attended_neighbors,
85            connected_neighbours_addrs: in_use_neighbors,
86            clocks,
87            sync_needed: false,
88            nb_first_attended_neighbours: 0,
89            global_mutex_fifo: gm,
90            waiting_sc,
91            in_sc,
92            notify_sc: std::sync::Arc::new(tokio::sync::Notify::new()),
93            pending_commands: std::collections::VecDeque::new(),
94            site_ids_to_adr: std::collections::HashMap::new(),
95        }
96    }
97
98    pub fn get_global_mutex_fifo(&self) -> &std::collections::HashMap<String, MutexStamp> {
99        &self.global_mutex_fifo
100    }
101
102    pub fn set_global_mutex_fifo(
103        &mut self,
104        global_mutex_fifo: std::collections::HashMap<String, MutexStamp>,
105    ) {
106        if self.global_mutex_fifo.len() >= global_mutex_fifo.len() {
107            return; // Do not overwrite if the new FIFO is smaller or equal
108        }
109        self.global_mutex_fifo = global_mutex_fifo;
110    }
111
112    pub fn add_site_id(&mut self, site_id: String, addr: std::net::SocketAddr) {
113        if !self.site_ids_to_adr.contains_key(&addr) {
114            self.site_ids_to_adr.insert(addr, site_id);
115        }
116    }
117
118    /// Sets the site ID at initialization
119    pub fn init_site_id(&mut self, site_id: String) {
120        self.site_id = site_id;
121    }
122
123    /// Sets the site address at initialization
124    pub fn init_site_addr(&mut self, site_addr: std::net::SocketAddr) {
125        self.site_addr = site_addr;
126    }
127
128    /// Sets the list of CLI peer addresses at initialization
129    pub fn init_cli_peer_addrs(&mut self, cli_peer_addrs: Vec<std::net::SocketAddr>) {
130        self.cli_peer_addrs = cli_peer_addrs;
131    }
132
133    /// Set the clock at initialization
134    pub fn init_clock(&mut self, clock: crate::clock::Clock) {
135        self.clocks = clock;
136    }
137
138    /// Set the sync boolean at initialization
139    pub fn init_sync(&mut self, sync_needed: bool) {
140        if sync_needed {
141            log::info!("Local site need to be in synchronized");
142        }
143        self.sync_needed = sync_needed;
144    }
145
146    /// Get the sync boolean
147    pub fn get_sync(&self) -> bool {
148        self.sync_needed
149    }
150
151    /// Set the number of attended neighbours at initialization
152    pub fn init_nb_first_attended_neighbours(&mut self, nb: i64) {
153        log::debug!("We will wait for {} attended neighbours", nb);
154        self.nb_first_attended_neighbours = nb;
155    }
156
157    /// Get the number of attended neighbours
158    pub fn get_nb_first_attended_neighbours(&self) -> i64 {
159        self.nb_first_attended_neighbours
160    }
161
162    /// Initialize the parent of the current site as self for the wave protocol
163    pub fn init_parent_addr_for_transaction_wave(&mut self) {
164        self.parent_addr_for_transaction_wave
165            .insert(self.site_id.clone(), self.site_addr.clone());
166    }
167
168    /// Adds a new peer to the network and updates the logical clock
169    ///
170    /// This function should be safe to call multiple times
171    ///
172    /// If a new site appear on the netword, every peers will launch a wave diffusion to announce the presence of this new site
173    pub fn add_incomming_peer(
174        &mut self,
175        new_addr: std::net::SocketAddr,
176        new_socket: std::net::SocketAddr,
177        received_clock: crate::clock::Clock,
178    ) {
179        if !self.connected_neighbours_addrs.contains(&new_addr) {
180            self.connected_neighbours_addrs.push(new_addr);
181            self.clocks
182                .update_clock(self.site_id.clone().as_str(), Some(&received_clock));
183            self.neighbours_socket.insert(new_socket, new_addr);
184        }
185    }
186
187    /// Removes a peer from the network
188    ///
189    /// This function should be safe to call multiple times
190    ///
191    /// If a site disappear from the network, every neighbours will detected the closing of the tcp connection and will launch a wave diffusion to announce the disappearance of this site
192    ///
193    /// If a site is closed properly, it will send a disconnect message to all its neighbours
194    pub async fn remove_peer(&mut self, addr_to_remove: std::net::SocketAddr) {
195        {
196            let mut net_manager = crate::network::NETWORK_MANAGER.lock().await;
197            net_manager.remove_connection(&addr_to_remove);
198        }
199
200        if let Some(pos) = self
201            .connected_neighbours_addrs
202            .iter()
203            .position(|x| *x == addr_to_remove)
204        {
205            self.connected_neighbours_addrs.remove(pos);
206            let site_id = self.site_ids_to_adr.get(&addr_to_remove);
207            if let Some(site_id) = site_id {
208                self.global_mutex_fifo.remove(site_id);
209                self.attended_neighbours_nb_for_transaction_wave
210                    .remove(site_id);
211                self.parent_addr_for_transaction_wave.remove(site_id);
212                self.site_ids_to_adr.remove(&addr_to_remove);
213            }
214
215            // We can keep the clock value for the site we want to remove
216            // if the site re-appears, it will be updated with the new clock value
217        }
218    }
219
220    /// Removes a peer from the network with only an address
221    ///
222    /// This function should be safe to call multiple times
223    ///
224    /// If a site disappear from the network, every neighbours will detected the closing of the tcp connection and will launch a wave diffusion to announce the disappearance of this site
225    ///
226    /// If a site is closed properly, it will send a disconnect message to all its neighbours
227    pub async fn remove_peer_from_socket_closed(&mut self, socket_to_remove: std::net::SocketAddr) {
228        // Find the site adress based on the socket
229        let Some(addr_to_remove) = self.neighbours_socket.get(&socket_to_remove) else {
230            log::debug!("Site not found in the neighbours socket");
231            return;
232        };
233
234        if let Some(pos) = self
235            .connected_neighbours_addrs
236            .iter()
237            .position(|x| *x == *addr_to_remove)
238        {
239            self.connected_neighbours_addrs.remove(pos);
240            let site_id = self.site_ids_to_adr.get(&addr_to_remove);
241            if let Some(site_id) = site_id {
242                self.global_mutex_fifo.remove(site_id);
243                self.attended_neighbours_nb_for_transaction_wave
244                    .remove(site_id);
245                self.parent_addr_for_transaction_wave.remove(site_id);
246                self.site_ids_to_adr.remove(&addr_to_remove);
247            }
248
249            // We can keep the clock value for the site we want to remove
250            // if the site re-appears, it will be updated with the new clock value
251        }
252    }
253
254    /// Returns the local address as a string
255    pub fn get_site_addr(&self) -> std::net::SocketAddr {
256        self.site_addr.clone()
257    }
258
259    pub async fn acquire_mutex(&mut self) -> Result<(), Box<dyn std::error::Error>> {
260        use crate::message::{Message, MessageInfo, NetworkMessageCode};
261        use crate::network::diffuse_message_without_lock;
262
263        self.update_clock(None).await;
264
265        self.global_mutex_fifo.insert(
266            self.site_id.clone(),
267            MutexStamp {
268                tag: MutexTag::Request,
269                date: self.clocks.get_lamport().clone(),
270            },
271        );
272
273        let msg = Message {
274            sender_id: self.site_id.clone(),
275            sender_addr: self.site_addr,
276            message_initiator_id: self.site_id.clone(),
277            message_initiator_addr: self.site_addr,
278            clock: self.clocks.clone(),
279            command: None,
280            info: MessageInfo::AcquireMutex(crate::message::AcquireMutexPayload),
281            code: NetworkMessageCode::AcquireMutex,
282        };
283
284        let should_diffuse = {
285            // initialisation des paramètres avant la diffusion d'un message
286            self.set_parent_addr(self.site_id.to_string(), self.site_addr);
287            self.set_nb_nei_for_wave(self.site_id.to_string(), self.get_nb_connected_neighbours());
288            self.get_nb_connected_neighbours() > 0
289        };
290
291        if should_diffuse {
292            self.notify_sc.notify_waiters();
293            self.in_sc = false;
294            self.waiting_sc = true;
295            log::info!("Début de la diffusion d'une acquisition de mutex");
296            diffuse_message_without_lock(
297                &msg,
298                self.get_site_addr(),
299                self.get_site_id().as_str(),
300                self.get_connected_nei_addr(),
301                self.get_parent_addr_for_wave(msg.message_initiator_id.clone()),
302            )
303            .await?;
304        } else {
305            log::info!("Il n'y a pas de voisins, on prends la section critique");
306            self.in_sc = true;
307            self.waiting_sc = false;
308            self.notify_sc.notify_waiters();
309        }
310
311        Ok(())
312    }
313
314    pub async fn release_mutex(&mut self) -> Result<(), Box<dyn std::error::Error>> {
315        use crate::message::{Message, MessageInfo, NetworkMessageCode};
316        use crate::network::diffuse_message_without_lock;
317
318        self.update_clock(None).await;
319
320        let msg = Message {
321            sender_id: self.site_id.clone(),
322            sender_addr: self.site_addr,
323            message_initiator_id: self.site_id.clone(),
324            message_initiator_addr: self.site_addr,
325            clock: self.clocks.clone(),
326            command: None,
327            info: MessageInfo::ReleaseMutex(crate::message::ReleaseMutexPayload),
328            code: NetworkMessageCode::ReleaseGlobalMutex,
329        };
330
331        self.global_mutex_fifo.remove(&self.site_id);
332        self.in_sc = false;
333        self.waiting_sc = false;
334
335        let should_diffuse = {
336            // initialisation des paramètres avant la diffusion d'un message
337            self.set_parent_addr(self.site_id.to_string(), self.site_addr);
338            self.set_nb_nei_for_wave(self.site_id.to_string(), self.get_nb_connected_neighbours());
339            self.get_nb_connected_neighbours() > 0
340        };
341
342        if should_diffuse {
343            log::info!("Début de la diffusion d'un relachement de mutex");
344            diffuse_message_without_lock(
345                &msg,
346                self.get_site_addr(),
347                self.get_site_id().as_str(),
348                self.get_connected_nei_addr(),
349                self.get_parent_addr_for_wave(msg.message_initiator_id.clone()),
350            )
351            .await?;
352        }
353        Ok(())
354    }
355
356    pub fn try_enter_sc(&mut self) {
357        // MUST BE CALLED ONLY AFTER A SUCCESSFUL WAVE AFTER ACQUIRE MUTEX
358        // This function checks if the site can enter the critical section
359        // It checks if the site is waiting for the critical section and if it can enter
360        // based on the FIFO order of requests in the global mutex FIFO.
361
362        // Pour respecter l'algo du poly il faut que la vague soit complete
363        // c'est à dire que tout le monde ait répondu ACK pour appeller cette fonction
364        // sinon on va entrer en section critique à un moment sans qu'un des peers ait noté notre demande
365        let my_stamp = match self.global_mutex_fifo.get(&self.site_id) {
366            Some(s) => *s,
367            None => return, // No local request found
368        };
369        let me = (my_stamp.date, self.site_id.clone());
370
371        // ici on compara les stamps des autres demandes, est-ce qu'on est le suivant dans la FIFO ?
372        // si oui on peut entrer en section critique
373        let ok = self.global_mutex_fifo.iter().all(|(id, stamp)| {
374            if id == &self.site_id {
375                true
376            } else {
377                match stamp.tag {
378                    MutexTag::Request => me <= (stamp.date, id.clone()),
379                    _ => true,
380                }
381            }
382        });
383
384        if ok {
385            self.waiting_sc = false;
386            self.in_sc = true;
387            // All other sites are notified that we are in critical section
388            self.notify_sc.notify_waiters(); // notifies worker to execute pending commands
389            // We remove obsolete Releases
390            self.global_mutex_fifo
391                .retain(|_, s| s.tag != MutexTag::Release);
392        } else {
393            println!("\x1b[1;31mSECTION CRITIQUE REFUSEE !\x1b[0m");
394            // print fifo order
395            println!("FIFO order:");
396            for (id, stamp) in &self.global_mutex_fifo {
397                println!("{}: {:?} - {:?}", id, stamp.tag, stamp.date);
398            }
399            // print my stamp
400            println!("{}: {:?} - {:?}", self.site_id, my_stamp.tag, my_stamp.date);
401        }
402    }
403
404    /// Returns the local address as a string
405    pub fn get_site_addr_as_string(&self) -> String {
406        self.site_addr.to_string()
407    }
408
409    /// Returns a reference to the local address as &str
410    pub fn get_site_id(&self) -> String {
411        self.site_id.clone()
412    }
413
414    /// Returns a list of all peer addresses
415    pub fn get_cli_peers_addrs(&self) -> Vec<std::net::SocketAddr> {
416        self.cli_peer_addrs.clone()
417    }
418
419    /// Returns a list of all peer addresses as strings
420    pub fn get_cli_peers_addrs_as_string(&self) -> Vec<String> {
421        self.cli_peer_addrs.iter().map(|x| x.to_string()).collect()
422    }
423
424    /// Returns a list of conncted neibhours
425    pub fn get_connected_nei_addr(&self) -> Vec<std::net::SocketAddr> {
426        self.connected_neighbours_addrs.clone()
427    }
428
429    /// Returns a list of conncted neibhours as strings
430    pub fn get_connected_nei_addr_string(&self) -> Vec<String> {
431        self.connected_neighbours_addrs
432            .iter()
433            .map(|x| x.to_string())
434            .collect()
435    }
436
437    /// Add a connected neighbour to the list of connected neighbours
438    pub fn add_connected_neighbour(&mut self, addr: std::net::SocketAddr) {
439        self.connected_neighbours_addrs.push(addr);
440    }
441
442    /// Returns the clock of the site
443    pub fn get_clock(&self) -> crate::clock::Clock {
444        self.clocks.clone()
445    }
446
447    /// Set the number of attended neighbors for the wave from initiator_id
448    pub fn set_nb_nei_for_wave(&mut self, initiator_id: String, n: i64) {
449        self.attended_neighbours_nb_for_transaction_wave
450            .insert(initiator_id, n);
451    }
452
453    /// Get the list of attended neighbors for the wave from initiator_id
454    pub fn get_parent_for_wave_map(
455        &self,
456    ) -> std::collections::HashMap<String, std::net::SocketAddr> {
457        self.parent_addr_for_transaction_wave.clone()
458    }
459
460    /// Get the list of attended neighbors for the wave from initiator_id
461    pub fn get_nb_nei_for_wave(&self) -> std::collections::HashMap<String, i64> {
462        self.attended_neighbours_nb_for_transaction_wave.clone()
463    }
464
465    /// Get the parent (neighbour deg(1)) address for the wave from initiator_id
466    pub fn get_parent_addr_for_wave(&self, initiator_id: String) -> std::net::SocketAddr {
467        self.parent_addr_for_transaction_wave
468            .get(&initiator_id)
469            .copied()
470            .unwrap_or("0.0.0.0:0".parse().unwrap())
471    }
472
473    /// Set the parent (neighbour deg(1)) address for the wave from initiator_id
474    pub fn set_parent_addr(&mut self, initiator_id: String, peer_adr: std::net::SocketAddr) {
475        self.parent_addr_for_transaction_wave
476            .insert(initiator_id, peer_adr);
477    }
478
479    /// Returns the number of deg(1) neighbors connected
480    pub fn get_nb_connected_neighbours(&self) -> i64 {
481        self.connected_neighbours_addrs.len() as i64
482    }
483
484    /// Update the clock of the site
485    pub async fn update_clock(&mut self, received_vc: Option<&crate::clock::Clock>) {
486        // this wrapper is needed to ensure that the clock is saved
487        // each time it is updated
488        // please DO NOT call the `update_clock` method directly from the clock
489        self.clocks.update_clock(&self.site_id, received_vc);
490        self.save_local_state().await;
491    }
492
493    pub async fn save_local_state(&self) {
494        // this is likely to be called whenever the clocks are updated
495        let _ = crate::db::update_local_state(&self.site_id, self.clocks.clone());
496    }
497
498    /// For tokyo test, set manually the number of connected neighbours
499    /// DO NOT USE IN PRODUCTION
500    #[cfg(test)]
501    pub fn set_nb_connected_neighbours(&mut self, nb: i64) {
502        self.connected_neighbours_addrs.clear();
503        for _ in 0..nb {
504            self.connected_neighbours_addrs
505                .push("127.0.0.1:8081".parse().unwrap());
506        }
507    }
508}
509
510// Singleton
511#[cfg(feature = "server")]
512lazy_static::lazy_static! {
513    pub static ref LOCAL_APP_STATE: std::sync::Arc<tokio::sync::Mutex<AppState>> =
514        std::sync::Arc::new(tokio::sync::Mutex::new(AppState::new(
515            "".to_string(), // empty site id at start
516            Vec::new(),
517            "0.0.0.0:0".parse().unwrap(),
518        )));
519}
520
521#[cfg(test)]
522#[cfg(feature = "server")]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn test_new_state() {
528        let cli_site_id = "A".to_string();
529        let num_sites = 2;
530        let peer_addrs = vec![
531            "127.0.0.1:8081".parse().unwrap(),
532            "127.0.0.1:8082".parse().unwrap(),
533        ];
534        let local_addr: std::net::SocketAddr = format!("127.0.0.1:{}", 8080).parse().unwrap();
535        let shared_state = AppState::new(cli_site_id.clone(), peer_addrs.clone(), local_addr);
536
537        assert_eq!(shared_state.site_id, cli_site_id);
538        assert_eq!(shared_state.cli_peer_addrs.len() as i64, num_sites);
539        assert_eq!(shared_state.cli_peer_addrs, peer_addrs);
540        assert_eq!(shared_state.clocks.get_vector_clock_map().len(), 0); // Initially empty
541    }
542}