1#[cfg(feature = "server")]
7#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
8pub enum MutexTag {
9 Request,
10 Release,
11 #[allow(dead_code)]
12 Ack,
13}
14
15#[cfg(feature = "server")]
16#[derive(serde::Serialize, serde::Deserialize, Copy, Clone, Debug)]
17pub struct MutexStamp {
18 pub tag: MutexTag,
19 pub date: i64,
20}
21
22#[cfg(feature = "server")]
23pub struct AppState {
25 site_id: String,
28 site_addr: std::net::SocketAddr,
30 cli_peer_addrs: Vec<std::net::SocketAddr>,
32 connected_neighbours_addrs: Vec<std::net::SocketAddr>,
34 neighbours_socket: std::collections::HashMap<std::net::SocketAddr, std::net::SocketAddr>,
36 sync_needed: bool,
38 nb_first_attended_neighbours: i64,
40
41 pub site_ids_to_adr: std::collections::HashMap<std::net::SocketAddr, String>,
42
43 pub parent_addr_for_transaction_wave: std::collections::HashMap<String, std::net::SocketAddr>,
46 pub attended_neighbours_nb_for_transaction_wave: std::collections::HashMap<String, i64>,
48
49 clocks: crate::clock::Clock,
52
53 pub global_mutex_fifo: std::collections::HashMap<String, MutexStamp>,
55 pub waiting_sc: bool,
56 pub in_sc: bool,
57 pub notify_sc: std::sync::Arc<tokio::sync::Notify>,
58 pub pending_commands: std::collections::VecDeque<crate::control::CriticalCommands>,
59}
60
61#[cfg(feature = "server")]
62impl AppState {
63 pub fn new(
65 site_id: String,
66 peer_addrs: Vec<std::net::SocketAddr>,
67 local_addr: std::net::SocketAddr,
68 ) -> Self {
69 let clocks = crate::clock::Clock::new();
70 let parent_addr = std::collections::HashMap::new();
71 let nb_of_attended_neighbors = std::collections::HashMap::new();
72 let in_use_neighbors = Vec::new();
73 let sockets_for_connected_peers = std::collections::HashMap::new();
74 let gm = std::collections::HashMap::new();
75 let waiting_sc = false;
76 let in_sc = false;
77
78 Self {
79 site_id,
80 cli_peer_addrs: peer_addrs,
81 neighbours_socket: sockets_for_connected_peers,
82 site_addr: local_addr,
83 parent_addr_for_transaction_wave: parent_addr,
84 attended_neighbours_nb_for_transaction_wave: nb_of_attended_neighbors,
85 connected_neighbours_addrs: in_use_neighbors,
86 clocks,
87 sync_needed: false,
88 nb_first_attended_neighbours: 0,
89 global_mutex_fifo: gm,
90 waiting_sc,
91 in_sc,
92 notify_sc: std::sync::Arc::new(tokio::sync::Notify::new()),
93 pending_commands: std::collections::VecDeque::new(),
94 site_ids_to_adr: std::collections::HashMap::new(),
95 }
96 }
97
98 pub fn get_global_mutex_fifo(&self) -> &std::collections::HashMap<String, MutexStamp> {
99 &self.global_mutex_fifo
100 }
101
102 pub fn set_global_mutex_fifo(
103 &mut self,
104 global_mutex_fifo: std::collections::HashMap<String, MutexStamp>,
105 ) {
106 if self.global_mutex_fifo.len() >= global_mutex_fifo.len() {
107 return; }
109 self.global_mutex_fifo = global_mutex_fifo;
110 }
111
112 pub fn add_site_id(&mut self, site_id: String, addr: std::net::SocketAddr) {
113 if !self.site_ids_to_adr.contains_key(&addr) {
114 self.site_ids_to_adr.insert(addr, site_id);
115 }
116 }
117
118 pub fn init_site_id(&mut self, site_id: String) {
120 self.site_id = site_id;
121 }
122
123 pub fn init_site_addr(&mut self, site_addr: std::net::SocketAddr) {
125 self.site_addr = site_addr;
126 }
127
128 pub fn init_cli_peer_addrs(&mut self, cli_peer_addrs: Vec<std::net::SocketAddr>) {
130 self.cli_peer_addrs = cli_peer_addrs;
131 }
132
133 pub fn init_clock(&mut self, clock: crate::clock::Clock) {
135 self.clocks = clock;
136 }
137
138 pub fn init_sync(&mut self, sync_needed: bool) {
140 if sync_needed {
141 log::info!("Local site need to be in synchronized");
142 }
143 self.sync_needed = sync_needed;
144 }
145
146 pub fn get_sync(&self) -> bool {
148 self.sync_needed
149 }
150
151 pub fn init_nb_first_attended_neighbours(&mut self, nb: i64) {
153 log::debug!("We will wait for {} attended neighbours", nb);
154 self.nb_first_attended_neighbours = nb;
155 }
156
157 pub fn get_nb_first_attended_neighbours(&self) -> i64 {
159 self.nb_first_attended_neighbours
160 }
161
162 pub fn init_parent_addr_for_transaction_wave(&mut self) {
164 self.parent_addr_for_transaction_wave
165 .insert(self.site_id.clone(), self.site_addr.clone());
166 }
167
168 pub fn add_incomming_peer(
174 &mut self,
175 new_addr: std::net::SocketAddr,
176 new_socket: std::net::SocketAddr,
177 received_clock: crate::clock::Clock,
178 ) {
179 if !self.connected_neighbours_addrs.contains(&new_addr) {
180 self.connected_neighbours_addrs.push(new_addr);
181 self.clocks
182 .update_clock(self.site_id.clone().as_str(), Some(&received_clock));
183 self.neighbours_socket.insert(new_socket, new_addr);
184 }
185 }
186
187 pub async fn remove_peer(&mut self, addr_to_remove: std::net::SocketAddr) {
195 {
196 let mut net_manager = crate::network::NETWORK_MANAGER.lock().await;
197 net_manager.remove_connection(&addr_to_remove);
198 }
199
200 if let Some(pos) = self
201 .connected_neighbours_addrs
202 .iter()
203 .position(|x| *x == addr_to_remove)
204 {
205 self.connected_neighbours_addrs.remove(pos);
206 let site_id = self.site_ids_to_adr.get(&addr_to_remove);
207 if let Some(site_id) = site_id {
208 self.global_mutex_fifo.remove(site_id);
209 self.attended_neighbours_nb_for_transaction_wave
210 .remove(site_id);
211 self.parent_addr_for_transaction_wave.remove(site_id);
212 self.site_ids_to_adr.remove(&addr_to_remove);
213 }
214
215 }
218 }
219
220 pub async fn remove_peer_from_socket_closed(&mut self, socket_to_remove: std::net::SocketAddr) {
228 let Some(addr_to_remove) = self.neighbours_socket.get(&socket_to_remove) else {
230 log::debug!("Site not found in the neighbours socket");
231 return;
232 };
233
234 if let Some(pos) = self
235 .connected_neighbours_addrs
236 .iter()
237 .position(|x| *x == *addr_to_remove)
238 {
239 self.connected_neighbours_addrs.remove(pos);
240 let site_id = self.site_ids_to_adr.get(&addr_to_remove);
241 if let Some(site_id) = site_id {
242 self.global_mutex_fifo.remove(site_id);
243 self.attended_neighbours_nb_for_transaction_wave
244 .remove(site_id);
245 self.parent_addr_for_transaction_wave.remove(site_id);
246 self.site_ids_to_adr.remove(&addr_to_remove);
247 }
248
249 }
252 }
253
254 pub fn get_site_addr(&self) -> std::net::SocketAddr {
256 self.site_addr.clone()
257 }
258
259 pub async fn acquire_mutex(&mut self) -> Result<(), Box<dyn std::error::Error>> {
260 use crate::message::{Message, MessageInfo, NetworkMessageCode};
261 use crate::network::diffuse_message_without_lock;
262
263 self.update_clock(None).await;
264
265 self.global_mutex_fifo.insert(
266 self.site_id.clone(),
267 MutexStamp {
268 tag: MutexTag::Request,
269 date: self.clocks.get_lamport().clone(),
270 },
271 );
272
273 let msg = Message {
274 sender_id: self.site_id.clone(),
275 sender_addr: self.site_addr,
276 message_initiator_id: self.site_id.clone(),
277 message_initiator_addr: self.site_addr,
278 clock: self.clocks.clone(),
279 command: None,
280 info: MessageInfo::AcquireMutex(crate::message::AcquireMutexPayload),
281 code: NetworkMessageCode::AcquireMutex,
282 };
283
284 let should_diffuse = {
285 self.set_parent_addr(self.site_id.to_string(), self.site_addr);
287 self.set_nb_nei_for_wave(self.site_id.to_string(), self.get_nb_connected_neighbours());
288 self.get_nb_connected_neighbours() > 0
289 };
290
291 if should_diffuse {
292 self.notify_sc.notify_waiters();
293 self.in_sc = false;
294 self.waiting_sc = true;
295 log::info!("Début de la diffusion d'une acquisition de mutex");
296 diffuse_message_without_lock(
297 &msg,
298 self.get_site_addr(),
299 self.get_site_id().as_str(),
300 self.get_connected_nei_addr(),
301 self.get_parent_addr_for_wave(msg.message_initiator_id.clone()),
302 )
303 .await?;
304 } else {
305 log::info!("Il n'y a pas de voisins, on prends la section critique");
306 self.in_sc = true;
307 self.waiting_sc = false;
308 self.notify_sc.notify_waiters();
309 }
310
311 Ok(())
312 }
313
314 pub async fn release_mutex(&mut self) -> Result<(), Box<dyn std::error::Error>> {
315 use crate::message::{Message, MessageInfo, NetworkMessageCode};
316 use crate::network::diffuse_message_without_lock;
317
318 self.update_clock(None).await;
319
320 let msg = Message {
321 sender_id: self.site_id.clone(),
322 sender_addr: self.site_addr,
323 message_initiator_id: self.site_id.clone(),
324 message_initiator_addr: self.site_addr,
325 clock: self.clocks.clone(),
326 command: None,
327 info: MessageInfo::ReleaseMutex(crate::message::ReleaseMutexPayload),
328 code: NetworkMessageCode::ReleaseGlobalMutex,
329 };
330
331 self.global_mutex_fifo.remove(&self.site_id);
332 self.in_sc = false;
333 self.waiting_sc = false;
334
335 let should_diffuse = {
336 self.set_parent_addr(self.site_id.to_string(), self.site_addr);
338 self.set_nb_nei_for_wave(self.site_id.to_string(), self.get_nb_connected_neighbours());
339 self.get_nb_connected_neighbours() > 0
340 };
341
342 if should_diffuse {
343 log::info!("Début de la diffusion d'un relachement de mutex");
344 diffuse_message_without_lock(
345 &msg,
346 self.get_site_addr(),
347 self.get_site_id().as_str(),
348 self.get_connected_nei_addr(),
349 self.get_parent_addr_for_wave(msg.message_initiator_id.clone()),
350 )
351 .await?;
352 }
353 Ok(())
354 }
355
356 pub fn try_enter_sc(&mut self) {
357 let my_stamp = match self.global_mutex_fifo.get(&self.site_id) {
366 Some(s) => *s,
367 None => return, };
369 let me = (my_stamp.date, self.site_id.clone());
370
371 let ok = self.global_mutex_fifo.iter().all(|(id, stamp)| {
374 if id == &self.site_id {
375 true
376 } else {
377 match stamp.tag {
378 MutexTag::Request => me <= (stamp.date, id.clone()),
379 _ => true,
380 }
381 }
382 });
383
384 if ok {
385 self.waiting_sc = false;
386 self.in_sc = true;
387 self.notify_sc.notify_waiters(); self.global_mutex_fifo
391 .retain(|_, s| s.tag != MutexTag::Release);
392 } else {
393 println!("\x1b[1;31mSECTION CRITIQUE REFUSEE !\x1b[0m");
394 println!("FIFO order:");
396 for (id, stamp) in &self.global_mutex_fifo {
397 println!("{}: {:?} - {:?}", id, stamp.tag, stamp.date);
398 }
399 println!("{}: {:?} - {:?}", self.site_id, my_stamp.tag, my_stamp.date);
401 }
402 }
403
404 pub fn get_site_addr_as_string(&self) -> String {
406 self.site_addr.to_string()
407 }
408
409 pub fn get_site_id(&self) -> String {
411 self.site_id.clone()
412 }
413
414 pub fn get_cli_peers_addrs(&self) -> Vec<std::net::SocketAddr> {
416 self.cli_peer_addrs.clone()
417 }
418
419 pub fn get_cli_peers_addrs_as_string(&self) -> Vec<String> {
421 self.cli_peer_addrs.iter().map(|x| x.to_string()).collect()
422 }
423
424 pub fn get_connected_nei_addr(&self) -> Vec<std::net::SocketAddr> {
426 self.connected_neighbours_addrs.clone()
427 }
428
429 pub fn get_connected_nei_addr_string(&self) -> Vec<String> {
431 self.connected_neighbours_addrs
432 .iter()
433 .map(|x| x.to_string())
434 .collect()
435 }
436
437 pub fn add_connected_neighbour(&mut self, addr: std::net::SocketAddr) {
439 self.connected_neighbours_addrs.push(addr);
440 }
441
442 pub fn get_clock(&self) -> crate::clock::Clock {
444 self.clocks.clone()
445 }
446
447 pub fn set_nb_nei_for_wave(&mut self, initiator_id: String, n: i64) {
449 self.attended_neighbours_nb_for_transaction_wave
450 .insert(initiator_id, n);
451 }
452
453 pub fn get_parent_for_wave_map(
455 &self,
456 ) -> std::collections::HashMap<String, std::net::SocketAddr> {
457 self.parent_addr_for_transaction_wave.clone()
458 }
459
460 pub fn get_nb_nei_for_wave(&self) -> std::collections::HashMap<String, i64> {
462 self.attended_neighbours_nb_for_transaction_wave.clone()
463 }
464
465 pub fn get_parent_addr_for_wave(&self, initiator_id: String) -> std::net::SocketAddr {
467 self.parent_addr_for_transaction_wave
468 .get(&initiator_id)
469 .copied()
470 .unwrap_or("0.0.0.0:0".parse().unwrap())
471 }
472
473 pub fn set_parent_addr(&mut self, initiator_id: String, peer_adr: std::net::SocketAddr) {
475 self.parent_addr_for_transaction_wave
476 .insert(initiator_id, peer_adr);
477 }
478
479 pub fn get_nb_connected_neighbours(&self) -> i64 {
481 self.connected_neighbours_addrs.len() as i64
482 }
483
484 pub async fn update_clock(&mut self, received_vc: Option<&crate::clock::Clock>) {
486 self.clocks.update_clock(&self.site_id, received_vc);
490 self.save_local_state().await;
491 }
492
493 pub async fn save_local_state(&self) {
494 let _ = crate::db::update_local_state(&self.site_id, self.clocks.clone());
496 }
497
498 #[cfg(test)]
501 pub fn set_nb_connected_neighbours(&mut self, nb: i64) {
502 self.connected_neighbours_addrs.clear();
503 for _ in 0..nb {
504 self.connected_neighbours_addrs
505 .push("127.0.0.1:8081".parse().unwrap());
506 }
507 }
508}
509
510#[cfg(feature = "server")]
512lazy_static::lazy_static! {
513 pub static ref LOCAL_APP_STATE: std::sync::Arc<tokio::sync::Mutex<AppState>> =
514 std::sync::Arc::new(tokio::sync::Mutex::new(AppState::new(
515 "".to_string(), Vec::new(),
517 "0.0.0.0:0".parse().unwrap(),
518 )));
519}
520
521#[cfg(test)]
522#[cfg(feature = "server")]
523mod tests {
524 use super::*;
525
526 #[test]
527 fn test_new_state() {
528 let cli_site_id = "A".to_string();
529 let num_sites = 2;
530 let peer_addrs = vec![
531 "127.0.0.1:8081".parse().unwrap(),
532 "127.0.0.1:8082".parse().unwrap(),
533 ];
534 let local_addr: std::net::SocketAddr = format!("127.0.0.1:{}", 8080).parse().unwrap();
535 let shared_state = AppState::new(cli_site_id.clone(), peer_addrs.clone(), local_addr);
536
537 assert_eq!(shared_state.site_id, cli_site_id);
538 assert_eq!(shared_state.cli_peer_addrs.len() as i64, num_sites);
539 assert_eq!(shared_state.cli_peer_addrs, peer_addrs);
540 assert_eq!(shared_state.clocks.get_vector_clock_map().len(), 0); }
542}