peillute/
network.rs

1//! Network communication and peer management
2//!
3//! This module handles all network-related functionality, including peer discovery,
4//! message sending/receiving, and connection management in the distributed system.
5
6#[cfg(feature = "server")]
7/// Represents a connection to a peer node
8pub struct PeerConnection {
9    /// Channel sender for sending messages to the peer
10    pub sender: tokio::sync::mpsc::Sender<Vec<u8>>,
11}
12
13#[cfg(feature = "server")]
14/// Manages network connections and peer communication
15pub struct NetworkManager {
16    /// Number of currently active peer connections
17    pub nb_active_connections: u16,
18    /// Pool of active peer connections
19    pub connection_pool: std::collections::HashMap<std::net::SocketAddr, PeerConnection>,
20}
21
22#[cfg(feature = "server")]
23impl NetworkManager {
24    /// Creates a new NetworkManager instance
25    pub fn new() -> Self {
26        Self {
27            nb_active_connections: 0,
28            connection_pool: std::collections::HashMap::new(),
29        }
30    }
31
32    /// Adds a new peer connection to the connection pool
33    fn add_connection(
34        &mut self,
35        site_addr: std::net::SocketAddr,
36        sender: tokio::sync::mpsc::Sender<Vec<u8>>,
37    ) {
38        self.connection_pool
39            .insert(site_addr, PeerConnection { sender });
40        self.nb_active_connections += 1;
41    }
42
43    /// Establishes a new connection to a peer
44    pub async fn create_connection(
45        &mut self,
46        site_addr: std::net::SocketAddr,
47    ) -> Result<(), Box<dyn std::error::Error>> {
48        use tokio::net::TcpStream;
49        use tokio::sync::mpsc;
50
51        let stream = TcpStream::connect(site_addr).await?;
52        let (tx, rx) = mpsc::channel(256);
53        spawn_writer_task(stream, rx).await;
54        self.add_connection(site_addr, tx);
55        Ok(())
56    }
57
58    /// Remove and destroy a connection
59    pub fn remove_connection(&mut self, site_addr: &std::net::SocketAddr) {
60        self.connection_pool.remove(site_addr);
61    }
62
63    /// Returns the message sender for a specific peer address
64    pub fn get_sender(
65        &self,
66        addr: &std::net::SocketAddr,
67    ) -> Option<tokio::sync::mpsc::Sender<Vec<u8>>> {
68        self.connection_pool.get(addr).map(|p| p.sender.clone())
69    }
70}
71
72#[cfg(feature = "server")]
73lazy_static::lazy_static! {
74    pub static ref NETWORK_MANAGER: std::sync::Arc<tokio::sync::Mutex<NetworkManager>> =
75        std::sync::Arc::new(tokio::sync::Mutex::new(NetworkManager::new()));
76}
77
78#[cfg(feature = "server")]
79/// Spawns a task to handle writing messages to a peer connection
80pub async fn spawn_writer_task(
81    stream: tokio::net::TcpStream,
82    mut rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
83) {
84    use tokio::io::AsyncWriteExt;
85
86    tokio::spawn(async move {
87        let mut stream = stream;
88        while let Some(data) = rx.recv().await {
89            if stream.write_all(&data).await.is_err() {
90                log::error!("Failed to send message");
91                break;
92            }
93        }
94        log::debug!("Writer task closed.");
95    });
96}
97
98#[cfg(feature = "server")]
99/// Announces this node's presence to potential peers in the network.
100/// If the user gave peers in args, we will only connect to those peers.
101/// If not, we will scan the port range and try connecting to all sockets.
102pub async fn announce(ip: &str, start_port: u16, end_port: u16, selected_port: u16) {
103    use crate::message::{MessageInfo, NetworkMessageCode};
104    use crate::state::LOCAL_APP_STATE;
105
106    let (local_addr, site_id, clocks, cli_peers) = {
107        let state = LOCAL_APP_STATE.lock().await;
108        (
109            state.get_site_addr(),
110            state.get_site_id(),
111            state.get_clock(),
112            state.get_cli_peers_addrs(),
113        )
114    };
115
116    // Collect all peers to contact
117    let peer_to_ping: Vec<std::net::SocketAddr> = if !cli_peers.is_empty() {
118        log::debug!("Manually connecting to peers based on args");
119        cli_peers
120    } else {
121        log::debug!("Looking for all ports to find potential peers");
122        (start_port..=end_port)
123            .filter(|&port| port != selected_port)
124            .map(|port| format!("{}:{}", ip, port).parse().unwrap())
125            .collect()
126    };
127
128    //If there are no peers, we don't need to do anything
129    if peer_to_ping.is_empty() {
130        return;
131    } else {
132        let mut state = LOCAL_APP_STATE.lock().await;
133        state.init_sync(true); // we need to sync with other sites
134    }
135
136    use std::sync::Arc;
137    use std::sync::atomic::{AtomicUsize, Ordering};
138
139    let success_count = Arc::new(AtomicUsize::new(0));
140
141    // Send discovery messages after the decision logic
142    let mut handles = Vec::new();
143    for addr in peer_to_ping {
144        let site_id = site_id.clone();
145        let clocks = clocks.clone();
146        let local_addr = local_addr.clone();
147        let success_count = Arc::clone(&success_count);
148
149        let handle = tokio::spawn(async move {
150            let result = send_message(
151                addr,
152                MessageInfo::None,
153                None,
154                NetworkMessageCode::Discovery,
155                local_addr,
156                &site_id,
157                &site_id,
158                local_addr,
159                clocks,
160            )
161            .await;
162
163            if result.is_ok() {
164                success_count.fetch_add(1, Ordering::SeqCst);
165            }
166        });
167        handles.push(handle);
168    }
169
170    // Await all task
171    for handle in handles {
172        let _ = handle.await;
173    }
174
175    // Update the number of attended neighbours
176    {
177        let mut state = LOCAL_APP_STATE.lock().await;
178        state.init_nb_first_attended_neighbours(success_count.load(Ordering::SeqCst) as i64);
179    }
180}
181
182#[cfg(feature = "server")]
183/// Starts listening for messages from a new peer
184pub async fn start_listening(stream: tokio::net::TcpStream, addr: std::net::SocketAddr) {
185    log::debug!("Accepted connection from: {}", addr);
186
187    tokio::spawn(async move {
188        if let Err(e) = handle_network_message(stream, addr).await {
189            log::error!("Error handling connection from {}: {}", addr, e);
190        }
191    });
192}
193
194#[cfg(feature = "server")]
195/// Handles incoming messages from a peer
196/// Implement our wave diffusion protocol
197pub async fn handle_network_message(
198    mut stream: tokio::net::TcpStream,
199    socket_of_the_sender: std::net::SocketAddr,
200) -> Result<(), Box<dyn std::error::Error>> {
201    use crate::message::{Message, MessageInfo, NetworkMessageCode};
202    use crate::state::LOCAL_APP_STATE;
203    use rmp_serde::decode;
204    use tokio::io::AsyncReadExt;
205
206    let mut buf = vec![0; 1024];
207    loop {
208        let n = stream.read(&mut buf).await?;
209
210        if n == 0 {
211            log::warn!("Connection closed by: {}", socket_of_the_sender);
212            // Here we should remove the site from the network in the app state
213            {
214                log::debug!("Removing {} from the peers", socket_of_the_sender);
215                let mut state = LOCAL_APP_STATE.lock().await;
216                state
217                    .remove_peer_from_socket_closed(socket_of_the_sender)
218                    .await;
219            }
220            return Ok(());
221        }
222
223        log::debug!("Received {} bytes from {}", n, socket_of_the_sender);
224
225        let message: Message = match decode::from_slice(&buf[..n]) {
226            Ok(msg) => msg,
227            Err(e) => {
228                log::error!("Error decoding message: {}", e);
229                continue;
230            }
231        };
232
233        log::debug!(
234            "Message received from site {} : {:?}",
235            message.sender_addr,
236            message.clone()
237        );
238
239        {
240            let mut state = LOCAL_APP_STATE.lock().await;
241            state.add_site_id(
242                message.message_initiator_id.clone(),
243                message.message_initiator_addr.clone(),
244            );
245        }
246
247        match message.code {
248            NetworkMessageCode::AcquireMutex => {
249                // We store the request
250                {
251                    let mut st = LOCAL_APP_STATE.lock().await;
252                    st.global_mutex_fifo.insert(
253                        message.message_initiator_id.clone(),
254                        crate::state::MutexStamp {
255                            tag: crate::state::MutexTag::Request,
256                            date: message.clock.get_lamport().clone(),
257                        },
258                    );
259                }
260                // wave diffusion
261                let mut diffuse = false;
262                let (local_site_id, local_site_addr) = {
263                    let mut state = LOCAL_APP_STATE.lock().await;
264                    let parent_id = state
265                        .parent_addr_for_transaction_wave
266                        .get(&message.message_initiator_id)
267                        .unwrap_or(&"0.0.0.0:0".parse().unwrap())
268                        .to_string();
269                    if parent_id == "0.0.0.0:0" {
270                        state.set_parent_addr(
271                            message.message_initiator_id.clone(),
272                            message.sender_addr,
273                        );
274
275                        let nb_neighbours = state.get_nb_connected_neighbours();
276                        let current_value = state
277                            .attended_neighbours_nb_for_transaction_wave
278                            .get(&message.message_initiator_id)
279                            .copied()
280                            .unwrap_or(nb_neighbours);
281
282                        state
283                            .attended_neighbours_nb_for_transaction_wave
284                            .insert(message.message_initiator_id.clone(), current_value - 1);
285
286                        log::debug!("Nombre de voisin : {}", current_value - 1);
287
288                        diffuse = state
289                            .attended_neighbours_nb_for_transaction_wave
290                            .get(&message.message_initiator_id)
291                            .copied()
292                            .unwrap_or(0)
293                            > 0;
294                    }
295                    (state.get_site_id(), state.get_site_addr())
296                };
297
298                if diffuse {
299                    let mut snd_msg = message.clone();
300                    snd_msg.sender_id = local_site_id.to_string();
301                    snd_msg.sender_addr = local_site_addr;
302                    diffuse_message(&snd_msg).await?;
303                } else {
304                    let (parent_addr, local_addr, site_id) = {
305                        let state = LOCAL_APP_STATE.lock().await;
306                        (
307                            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
308                            &state.get_site_addr(),
309                            &state.get_site_id().to_string(),
310                        )
311                    };
312                    // Acquit message to parent
313                    log::debug!(
314                        "Réception d'un message de d'acquisition de mutex, on est sur une feuille, on acquite, envoie à {}",
315                        message.sender_addr.to_string().as_str()
316                    );
317                    send_message(
318                        message.sender_addr,
319                        MessageInfo::AckMutex(crate::message::AckMutexPayload {
320                            clock: message.clock.get_lamport().clone(),
321                        }),
322                        None,
323                        NetworkMessageCode::AckGlobalMutex,
324                        *local_addr,
325                        site_id,
326                        &message.message_initiator_id,
327                        message.message_initiator_addr,
328                        message.clock.clone(),
329                    )
330                    .await?;
331
332                    if message.sender_addr == parent_addr {
333                        // réinitialisation s'il s'agit de la remontée après réception des rouges de tous les fils
334                        let mut state = LOCAL_APP_STATE.lock().await;
335                        let peer_count = state.get_nb_connected_neighbours();
336                        state
337                            .attended_neighbours_nb_for_transaction_wave
338                            .insert(message.message_initiator_id.clone(), peer_count as i64);
339                        state.parent_addr_for_transaction_wave.insert(
340                            message.message_initiator_id.clone(),
341                            "0.0.0.0:0".parse().unwrap(),
342                        );
343                    }
344                }
345            }
346
347            NetworkMessageCode::AckGlobalMutex => {
348                // Message rouge
349                let mut state = LOCAL_APP_STATE.lock().await;
350
351                let nb_neighbours = state.get_nb_connected_neighbours();
352                let current_value = state
353                    .attended_neighbours_nb_for_transaction_wave
354                    .get(&message.message_initiator_id)
355                    .copied()
356                    .unwrap_or(nb_neighbours);
357                state
358                    .attended_neighbours_nb_for_transaction_wave
359                    .insert(message.message_initiator_id.clone(), current_value - 1);
360
361                if state
362                    .attended_neighbours_nb_for_transaction_wave
363                    .get(&message.message_initiator_id.clone())
364                    .copied()
365                    .unwrap_or(-1)
366                    == 0
367                {
368                    if state
369                        .parent_addr_for_transaction_wave
370                        .get(&message.message_initiator_id.clone())
371                        .copied()
372                        .unwrap_or("99.99.99.99:0".parse().unwrap())
373                        == state.get_site_addr()
374                    {
375                        // on est chez le parent
376                        // diffusion terminée
377                        // Réinitialisation
378
379                        println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
380                        state.try_enter_sc();
381                    } else {
382                        log::debug!(
383                            "On est de le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
384                            state.get_site_addr(),
385                            state
386                                .get_parent_addr_for_wave(message.message_initiator_id.clone())
387                                .to_string()
388                                .as_str()
389                        );
390                        send_message(
391                            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
392                            MessageInfo::AckMutex(crate::message::AckMutexPayload {
393                                clock: message.clock.get_lamport().clone(),
394                            }),
395                            None,
396                            NetworkMessageCode::AckGlobalMutex,
397                            state.get_site_addr(),
398                            &state.get_site_id().to_string(),
399                            &message.message_initiator_id,
400                            message.message_initiator_addr,
401                            state.get_clock().clone(),
402                        )
403                        .await?;
404                    }
405
406                    let peer_count = state.get_nb_connected_neighbours();
407                    state
408                        .attended_neighbours_nb_for_transaction_wave
409                        .insert(message.message_initiator_id.clone(), peer_count as i64);
410                    state.parent_addr_for_transaction_wave.insert(
411                        message.message_initiator_id.clone(),
412                        "0.0.0.0:0".parse().unwrap(),
413                    );
414                }
415            }
416
417            NetworkMessageCode::AckReleaseGlobalMutex => {
418                // Message rouge
419                let mut state = LOCAL_APP_STATE.lock().await;
420
421                let nb_neighbours = state.get_nb_connected_neighbours();
422                let current_value = state
423                    .attended_neighbours_nb_for_transaction_wave
424                    .get(&message.message_initiator_id)
425                    .copied()
426                    .unwrap_or(nb_neighbours);
427                state
428                    .attended_neighbours_nb_for_transaction_wave
429                    .insert(message.message_initiator_id.clone(), current_value - 1);
430
431                if state
432                    .attended_neighbours_nb_for_transaction_wave
433                    .get(&message.message_initiator_id.clone())
434                    .copied()
435                    .unwrap_or(-1)
436                    == 0
437                {
438                    if state
439                        .parent_addr_for_transaction_wave
440                        .get(&message.message_initiator_id.clone())
441                        .copied()
442                        .unwrap_or("99.99.99.99:0".parse().unwrap())
443                        == state.get_site_addr()
444                    {
445                        // on est chez le parent
446                        // diffusion terminée
447                        // Réinitialisation
448
449                        println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
450                        // On vient de release la section critique, on peut essayer d'y entrer à nouveau
451                        state.try_enter_sc();
452                    } else {
453                        log::debug!(
454                            "On est de le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
455                            state.get_site_addr(),
456                            state
457                                .get_parent_addr_for_wave(message.message_initiator_id.clone())
458                                .to_string()
459                                .as_str()
460                        );
461                        send_message(
462                            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
463                            MessageInfo::None,
464                            None,
465                            NetworkMessageCode::AckReleaseGlobalMutex,
466                            state.get_site_addr(),
467                            &state.get_site_id().to_string(),
468                            &message.message_initiator_id,
469                            message.message_initiator_addr,
470                            state.get_clock().clone(),
471                        )
472                        .await?;
473                    }
474
475                    let peer_count = state.get_nb_connected_neighbours();
476                    state
477                        .attended_neighbours_nb_for_transaction_wave
478                        .insert(message.message_initiator_id.clone(), peer_count as i64);
479                    state.parent_addr_for_transaction_wave.insert(
480                        message.message_initiator_id.clone(),
481                        "0.0.0.0:0".parse().unwrap(),
482                    );
483                }
484            }
485
486            NetworkMessageCode::ReleaseGlobalMutex => {
487                // A node is releasing the critical section
488                {
489                    let mut st = LOCAL_APP_STATE.lock().await;
490                    st.global_mutex_fifo.remove(&message.message_initiator_id);
491                    st.try_enter_sc();
492                }
493                // wave diffusion
494                let mut diffuse = false;
495                let (local_site_id, local_site_addr) = {
496                    let mut state = LOCAL_APP_STATE.lock().await;
497                    let parent_id = state
498                        .parent_addr_for_transaction_wave
499                        .get(&message.message_initiator_id)
500                        .unwrap_or(&"0.0.0.0:0".parse().unwrap())
501                        .to_string();
502                    if parent_id == "0.0.0.0:0" {
503                        state.set_parent_addr(
504                            message.message_initiator_id.clone(),
505                            message.sender_addr,
506                        );
507
508                        let nb_neighbours = state.get_nb_connected_neighbours();
509                        let current_value = state
510                            .attended_neighbours_nb_for_transaction_wave
511                            .get(&message.message_initiator_id)
512                            .copied()
513                            .unwrap_or(nb_neighbours);
514
515                        state
516                            .attended_neighbours_nb_for_transaction_wave
517                            .insert(message.message_initiator_id.clone(), current_value - 1);
518
519                        log::debug!("Nombre de voisin : {}", current_value - 1);
520
521                        diffuse = state
522                            .attended_neighbours_nb_for_transaction_wave
523                            .get(&message.message_initiator_id)
524                            .copied()
525                            .unwrap_or(0)
526                            > 0;
527                    }
528                    (state.get_site_id(), state.get_site_addr())
529                };
530
531                if diffuse {
532                    let mut snd_msg = message.clone();
533                    snd_msg.sender_id = local_site_id.to_string();
534                    snd_msg.sender_addr = local_site_addr;
535                    diffuse_message(&snd_msg).await?;
536                } else {
537                    let (parent_addr, local_addr, site_id) = {
538                        let state = LOCAL_APP_STATE.lock().await;
539                        (
540                            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
541                            &state.get_site_addr(),
542                            &state.get_site_id().to_string(),
543                        )
544                    };
545                    // Acquit message to parent
546                    log::debug!(
547                        "Réception d'un message de relachement de mutex global, on est sur une feuille, on acquite, envoie à {}",
548                        message.sender_addr.to_string().as_str()
549                    );
550                    send_message(
551                        message.sender_addr,
552                        MessageInfo::None,
553                        None,
554                        NetworkMessageCode::AckReleaseGlobalMutex,
555                        *local_addr,
556                        site_id,
557                        &message.message_initiator_id,
558                        message.message_initiator_addr,
559                        message.clock.clone(),
560                    )
561                    .await?;
562
563                    if message.sender_addr == parent_addr {
564                        // réinitialisation s'il s'agit de la remontée après réception des rouges de tous les fils
565                        let mut state = LOCAL_APP_STATE.lock().await;
566                        let peer_count = state.get_nb_connected_neighbours();
567                        state
568                            .attended_neighbours_nb_for_transaction_wave
569                            .insert(message.message_initiator_id.clone(), peer_count as i64);
570                        state.parent_addr_for_transaction_wave.insert(
571                            message.message_initiator_id.clone(),
572                            "0.0.0.0:0".parse().unwrap(),
573                        );
574                    }
575                }
576            }
577
578            NetworkMessageCode::Discovery => {
579                let mut state = LOCAL_APP_STATE.lock().await;
580
581                // Try to add this new site as a new peer
582                state.add_incomming_peer(
583                    message.message_initiator_addr,
584                    socket_of_the_sender,
585                    message.clock.clone(),
586                );
587
588                // Return ack message if this we are connected to the site
589                if state
590                    .get_connected_nei_addr()
591                    .iter()
592                    .find(|addr| addr == &&message.sender_addr)
593                    .is_some()
594                {
595                    if state
596                        .get_connected_nei_addr()
597                        .iter()
598                        .find(|addr| addr == &&message.sender_addr)
599                        .is_none()
600                    {
601                        state.add_connected_neighbour(message.sender_addr);
602                    }
603                    send_message(
604                        message.sender_addr,
605                        MessageInfo::Acknowledge(crate::message::AcknowledgePayload {
606                            global_fifo: state.get_global_mutex_fifo().clone(),
607                        }),
608                        None,
609                        NetworkMessageCode::Acknowledgment,
610                        state.get_site_addr(),
611                        state.get_site_id().as_str(),
612                        &message.message_initiator_id.clone(),
613                        message.message_initiator_addr,
614                        state.get_clock(),
615                    )
616                    .await?;
617                }
618            }
619
620            NetworkMessageCode::Acknowledgment => {
621                let ready_to_sync = {
622                    let mut state = LOCAL_APP_STATE.lock().await;
623                    // If the site received an acknoledgement from a site,
624                    // It can be a site that is not in the network anymore
625                    state.add_incomming_peer(
626                        message.sender_addr,
627                        socket_of_the_sender,
628                        message.clock.clone(),
629                    );
630                    if message.message_initiator_addr == state.get_site_addr() {
631                        for (site_id, nb_a_i) in state.get_nb_nei_for_wave().iter() {
632                            state
633                                .attended_neighbours_nb_for_transaction_wave
634                                .insert(site_id.clone(), *nb_a_i + 1);
635                        }
636                    }
637                    // If we are in sync mode, we can start the sync process
638                    // And we have received all the responses from the first attended neighbours counter
639                    // We can start the sync process by starting a snapshot with sync mode
640                    state.get_sync()
641                        && state.get_nb_first_attended_neighbours()
642                            == state.get_nb_connected_neighbours()
643                };
644
645                // Récupérer le global_fifo envoyé dans l'acknowledgment
646                let global_fifo = match &message.info {
647                    MessageInfo::Acknowledge(payload) => Some(payload.global_fifo.clone()),
648                    _ => None,
649                };
650
651                if let Some(global_fifo) = global_fifo {
652                    let mut state = LOCAL_APP_STATE.lock().await;
653                    state.set_global_mutex_fifo(global_fifo);
654                }
655
656                if ready_to_sync {
657                    log::info!("All neighbours have responded, starting synchronization");
658                    crate::control::enqueue_critical(
659                        crate::control::CriticalCommands::SyncSnapshot,
660                    )
661                    .await?;
662                }
663            }
664
665            NetworkMessageCode::Transaction => {
666                // messages bleus
667                if message.command.is_some() {
668                    if let Err(e) = crate::control::process_network_command(
669                        message.info.clone(),
670                        message.clock.clone(),
671                        message.message_initiator_id.as_str(),
672                    )
673                    .await
674                    {
675                        log::error!("Error handling command:\n{}", e);
676                    }
677                    // wave diffusion
678                    let mut diffuse = false;
679                    let (local_site_id, local_site_addr) = {
680                        let mut state = LOCAL_APP_STATE.lock().await;
681                        let parent_id = state
682                            .parent_addr_for_transaction_wave
683                            .get(&message.message_initiator_id.clone())
684                            .unwrap_or(&"0.0.0.0:0".parse().unwrap())
685                            .to_string();
686                        if parent_id == "0.0.0.0:0" {
687                            state.set_parent_addr(
688                                message.message_initiator_id.clone(),
689                                message.sender_addr,
690                            );
691
692                            let nb_neighbours = state.get_nb_connected_neighbours();
693                            let current_value = state
694                                .attended_neighbours_nb_for_transaction_wave
695                                .get(&message.message_initiator_id)
696                                .copied()
697                                .unwrap_or(nb_neighbours);
698
699                            state
700                                .attended_neighbours_nb_for_transaction_wave
701                                .insert(message.message_initiator_id.clone(), current_value - 1);
702
703                            log::debug!("Nombre de voisin : {}", current_value - 1);
704
705                            diffuse = state
706                                .attended_neighbours_nb_for_transaction_wave
707                                .get(&message.message_initiator_id.clone())
708                                .copied()
709                                .unwrap_or(0)
710                                > 0;
711                        }
712                        (state.get_site_id(), state.get_site_addr())
713                    };
714
715                    if diffuse {
716                        let mut snd_msg = message.clone();
717                        snd_msg.sender_id = local_site_id.to_string();
718                        snd_msg.sender_addr = local_site_addr;
719                        diffuse_message(&snd_msg).await?;
720                    } else {
721                        let (parent_addr, local_addr, site_id) = {
722                            let state = LOCAL_APP_STATE.lock().await;
723                            (
724                                state
725                                    .get_parent_addr_for_wave(message.message_initiator_id.clone()),
726                                state.get_site_addr(),
727                                state.get_site_id().to_string(),
728                            )
729                        };
730                        // Acquit message to parent
731                        log::debug!(
732                            "Réception d'un message de transaction, on est sur une feuille, on acquite, envoie à {}",
733                            message.sender_addr.to_string().as_str()
734                        );
735                        send_message(
736                            message.sender_addr,
737                            MessageInfo::None,
738                            None,
739                            NetworkMessageCode::TransactionAcknowledgement,
740                            local_addr,
741                            site_id.as_str(),
742                            &message.message_initiator_id.clone(),
743                            message.message_initiator_addr,
744                            message.clock.clone(),
745                        )
746                        .await?;
747
748                        if message.sender_addr == parent_addr {
749                            // réinitialisation s'il s'agit de la remontée après réception des rouges de tous les fils
750                            let mut state = LOCAL_APP_STATE.lock().await;
751                            let peer_count = state.get_nb_connected_neighbours();
752                            state
753                                .attended_neighbours_nb_for_transaction_wave
754                                .insert(message.message_initiator_id.clone(), peer_count as i64);
755                            state
756                                .parent_addr_for_transaction_wave
757                                .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
758                        }
759                    }
760                } else {
761                    log::error!("Command is None for Transaction message");
762                }
763            }
764            NetworkMessageCode::TransactionAcknowledgement => {
765                let mut should_reset = false;
766
767                // Message rouge
768                let mut state = LOCAL_APP_STATE.lock().await;
769
770                let nb_neighbours = state.get_nb_connected_neighbours();
771                let current_value = state
772                    .attended_neighbours_nb_for_transaction_wave
773                    .get(&message.message_initiator_id)
774                    .copied()
775                    .unwrap_or(nb_neighbours);
776                state
777                    .attended_neighbours_nb_for_transaction_wave
778                    .insert(message.message_initiator_id.clone(), current_value - 1);
779
780                if state
781                    .attended_neighbours_nb_for_transaction_wave
782                    .get(&message.message_initiator_id)
783                    .copied()
784                    .unwrap_or(-1)
785                    == 0
786                {
787                    if state
788                        .parent_addr_for_transaction_wave
789                        .get(&message.message_initiator_id)
790                        .copied()
791                        .unwrap_or("99.99.99.99:0".parse().unwrap())
792                        == state.get_site_addr()
793                    {
794                        // on est chez le parent
795                        // diffusion terminée
796                        // Réinitialisation
797
798                        println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
799                        should_reset = true;
800                    } else {
801                        log::debug!(
802                            "On est dans le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
803                            state.get_site_addr().to_string().as_str(),
804                            state
805                                .get_parent_addr_for_wave(message.message_initiator_id.clone())
806                                .to_string()
807                                .as_str()
808                        );
809                        send_message(
810                            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
811                            MessageInfo::None,
812                            None,
813                            NetworkMessageCode::TransactionAcknowledgement,
814                            state.get_site_addr(),
815                            &state.get_site_id().to_string(),
816                            &message.message_initiator_id,
817                            message.message_initiator_addr,
818                            state.get_clock(),
819                        )
820                        .await?;
821                    }
822
823                    let peer_count = state.get_nb_connected_neighbours();
824                    state
825                        .attended_neighbours_nb_for_transaction_wave
826                        .insert(message.message_initiator_id.clone(), peer_count as i64);
827                    state
828                        .parent_addr_for_transaction_wave
829                        .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
830
831                    if should_reset && state.pending_commands.len() == 0 {
832                        // fin de la section critique on peut notifier les pairs
833                        state.release_mutex().await?;
834                    };
835                }
836            }
837
838            NetworkMessageCode::Error => {
839                log::debug!("Error message received: {:?}", message);
840            }
841            NetworkMessageCode::Disconnect => {
842                {
843                    let mut state = LOCAL_APP_STATE.lock().await;
844                    state.remove_peer(message.message_initiator_addr).await;
845                }
846                println!(
847                    "\x1b[1;31mSITE {} DISCONNECTED !\x1b[0m",
848                    message.message_initiator_id
849                );
850            }
851            NetworkMessageCode::SnapshotRequest => {
852                // messages bleus
853                // wave diffusion
854                let mut diffuse = false;
855                let (local_site_id, local_site_addr) = {
856                    let mut state = LOCAL_APP_STATE.lock().await;
857                    let parent_id = state
858                        .parent_addr_for_transaction_wave
859                        .get(&message.message_initiator_id.clone())
860                        .unwrap_or(&"0.0.0.0:0".parse().unwrap())
861                        .to_string();
862                    if parent_id == "0.0.0.0:0" {
863                        state.set_parent_addr(
864                            message.message_initiator_id.clone(),
865                            message.sender_addr,
866                        );
867
868                        let nb_neighbours = state.get_nb_connected_neighbours();
869                        let current_value = state
870                            .attended_neighbours_nb_for_transaction_wave
871                            .get(&message.message_initiator_id)
872                            .copied()
873                            .unwrap_or(nb_neighbours);
874
875                        state
876                            .attended_neighbours_nb_for_transaction_wave
877                            .insert(message.message_initiator_id.clone(), current_value - 1);
878
879                        log::debug!("Nombre de voisin : {}", current_value - 1);
880
881                        diffuse = state
882                            .attended_neighbours_nb_for_transaction_wave
883                            .get(&message.message_initiator_id.clone())
884                            .copied()
885                            .unwrap_or(0)
886                            > 0;
887                    }
888                    (state.get_site_id(), state.get_site_addr())
889                };
890
891                if diffuse {
892                    let mut snd_msg = message.clone();
893                    snd_msg.sender_id = local_site_id.to_string();
894                    snd_msg.sender_addr = local_site_addr;
895                    // Here we should diffuse the message because we are not on a leaf
896                    // We should also start a snapshot in network mode
897                    // When this snapshot is done, we should send the global snapshot to the parent
898                    log::debug!(
899                        "We are not on a leaf, we start our own global snapshot construction and diffuse the request to other nodes"
900                    );
901                    crate::snapshot::start_snapshot(crate::snapshot::SnapshotMode::NetworkMode)
902                        .await?;
903                    // When can then diffuse the request to other nodes
904                    diffuse_message(&snd_msg).await?;
905                } else {
906                    let parent_addr = {
907                        let state = LOCAL_APP_STATE.lock().await;
908                        state.get_parent_addr_for_wave(message.message_initiator_id.clone())
909                    };
910                    // Acquit message to parent
911                    log::debug!(
912                        "Réception d'une demande de snapshot, on est sur une feuille, on crée un snapshot local, on envoie à {}",
913                        message.sender_addr.to_string().as_str()
914                    );
915                    // Here we are on a leaf, we can crate a local snapshot and send it to the parent
916                    let txs = crate::db::get_local_transaction_log()?;
917                    let summaries: Vec<_> = txs.iter().map(|t| t.into()).collect();
918
919                    let (site_id, clock, local_addr) = {
920                        let st = LOCAL_APP_STATE.lock().await;
921                        (st.get_site_id(), st.get_clock(), st.get_site_addr())
922                    };
923
924                    send_message(
925                        message.sender_addr,
926                        MessageInfo::SnapshotResponse(crate::message::SnapshotResponse {
927                            site_id: site_id.clone(),
928                            clock: clock.clone(),
929                            tx_log: summaries,
930                        }),
931                        None,
932                        NetworkMessageCode::SnapshotResponse,
933                        local_addr,
934                        &site_id,
935                        &message.message_initiator_id,
936                        message.message_initiator_addr,
937                        clock.clone(),
938                    )
939                    .await?;
940
941                    if message.sender_addr == parent_addr {
942                        // réinitialisation s'il s'agit de la remontée après réception des rouges de tous les fils
943                        let mut state = LOCAL_APP_STATE.lock().await;
944                        let peer_count = state.get_nb_connected_neighbours();
945                        state
946                            .attended_neighbours_nb_for_transaction_wave
947                            .insert(message.message_initiator_id.clone(), peer_count as i64);
948                        state
949                            .parent_addr_for_transaction_wave
950                            .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
951                    }
952                }
953            }
954            NetworkMessageCode::SnapshotResponse => {
955                // Message rouge
956                let mut should_reset = false;
957                let mut state = LOCAL_APP_STATE.lock().await;
958
959                let nb_neighbours = state.get_nb_connected_neighbours();
960                let current_value = state
961                    .attended_neighbours_nb_for_transaction_wave
962                    .get(&message.message_initiator_id)
963                    .copied()
964                    .unwrap_or(nb_neighbours);
965                state
966                    .attended_neighbours_nb_for_transaction_wave
967                    .insert(message.message_initiator_id.clone(), current_value - 1);
968
969                if state
970                    .attended_neighbours_nb_for_transaction_wave
971                    .get(&message.message_initiator_id)
972                    .copied()
973                    .unwrap_or(-1)
974                    == 0
975                {
976                    if state
977                        .parent_addr_for_transaction_wave
978                        .get(&message.message_initiator_id)
979                        .copied()
980                        .unwrap_or("99.99.99.99:0".parse().unwrap())
981                        == state.get_site_addr()
982                    {
983                        // on est chez le parent
984                        // diffusion terminée
985                        // Réinitialisation
986                        log::debug!(
987                            "L'initiateur à reçu toutes ses snapshots, il devrait créer sa snapshot globale"
988                        );
989                        if let MessageInfo::SnapshotResponse(resp) = message.info {
990                            let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
991                            if mgr.mode == crate::snapshot::SnapshotMode::FileMode {
992                                log::debug!("La snapshot devrait être sauvegardée");
993                                if let Some(gs) = mgr.push(resp) {
994                                    log::info!(
995                                        "Global snapshot ready to save, hold per site : {:#?}",
996                                        gs.missing
997                                    );
998                                    mgr.path = crate::snapshot::persist(&gs, state.get_site_id())
999                                        .await
1000                                        .unwrap()
1001                                        .parse()
1002                                        .ok();
1003                                }
1004                            } else if mgr.mode == crate::snapshot::SnapshotMode::SyncMode {
1005                                log::debug!(
1006                                    "La snapshot devrait être utilisée pour la synchronisation"
1007                                );
1008                                if let Some(gs) = mgr.push(resp) {
1009                                    log::info!(
1010                                        "Global snapshot ready to be synced, hold per site : {:#?}",
1011                                        gs.missing
1012                                    );
1013                                    crate::db::update_db_with_snapshot(
1014                                        &gs,
1015                                        state.get_clock().get_vector_clock_map(),
1016                                    );
1017                                }
1018                            }
1019                        }
1020
1021                        println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
1022                        should_reset = true;
1023                    } else {
1024                        log::debug!(
1025                            "On est dans le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
1026                            state.get_site_addr().to_string().as_str(),
1027                            state
1028                                .get_parent_addr_for_wave(message.message_initiator_id.clone())
1029                                .to_string()
1030                                .as_str()
1031                        );
1032                        log::debug!(
1033                            "On devrait pouvoir construire une snapshot globale avec tous nos voisins et l'envoyer à l'adresse de notre parent {}",
1034                            state
1035                                .get_parent_addr_for_wave(message.message_initiator_id.clone())
1036                                .to_string()
1037                                .as_str()
1038                        );
1039                        if let MessageInfo::SnapshotResponse(resp) = message.info {
1040                            let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
1041                            if mgr.mode == crate::snapshot::SnapshotMode::NetworkMode {
1042                                log::debug!("La snapshot devrait être envoyés au père");
1043                                if let Some(gs) = mgr.push(resp) {
1044                                    log::info!(
1045                                        "Global snapshot ready to be send to parent, hold per site : {:#?}",
1046                                        gs.missing
1047                                    );
1048                                    send_message(
1049                                        state.get_parent_addr_for_wave(
1050                                            message.message_initiator_id.clone(),
1051                                        ),
1052                                        MessageInfo::SnapshotResponse(
1053                                            crate::message::SnapshotResponse {
1054                                                site_id: state.get_site_id().to_string(),
1055                                                clock: state.get_clock(),
1056                                                tx_log: gs.all_transactions.into_iter().collect(),
1057                                            },
1058                                        ),
1059                                        None,
1060                                        NetworkMessageCode::SnapshotResponse,
1061                                        state.get_site_addr(),
1062                                        &state.get_site_id().to_string(),
1063                                        &message.message_initiator_id,
1064                                        message.message_initiator_addr,
1065                                        state.get_clock(),
1066                                    )
1067                                    .await?;
1068                                } else {
1069                                    log::error!("Le site aurait du récupérer toutes ses snapshots");
1070                                }
1071                            }
1072                        } else {
1073                            log::error!("Message de type SnapshotResponse attendu, mais pas reçu");
1074                        }
1075                    }
1076
1077                    let peer_count = state.get_nb_connected_neighbours();
1078                    state
1079                        .attended_neighbours_nb_for_transaction_wave
1080                        .insert(message.message_initiator_id.clone(), peer_count as i64);
1081                    state
1082                        .parent_addr_for_transaction_wave
1083                        .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
1084                    if should_reset && state.pending_commands.len() == 0 {
1085                        // fin de la section critique on peut notifier les pairs
1086                        state.release_mutex().await?;
1087                    };
1088                } else {
1089                    log::debug!(
1090                        "On a reçu un message rouge d'un des fils mais la vague n'est pas encore terminée"
1091                    );
1092                    // We should add the Snapshot to our manager
1093                    if let MessageInfo::SnapshotResponse(resp) = message.info {
1094                        let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
1095                        log::debug!("La snapshot devrait être ajoutés à l'état du manager");
1096                        if let Some(_) = mgr.push(resp) {
1097                            log::error!(
1098                                "On ne devrait pas encore pouvoir construire une snapshot globale vu que la vague n'est pas terminée"
1099                            );
1100                        }
1101                    } else {
1102                        log::error!("Message de type SnapshotResponse attendu, mais pas reçu");
1103                    }
1104                }
1105            }
1106        }
1107
1108        let mut state = LOCAL_APP_STATE.lock().await;
1109        state.update_clock(Some(&message.clock.clone())).await;
1110    }
1111}
1112
1113#[cfg(feature = "server")]
1114/// Send a message to a specific peer
1115pub async fn send_message(
1116    recipient_address: std::net::SocketAddr,
1117    info: crate::message::MessageInfo,
1118    command: Option<crate::control::Command>,
1119    code: crate::message::NetworkMessageCode,
1120    local_addr: std::net::SocketAddr,
1121    local_site: &str,
1122    initiator_id: &str,
1123    initiator_addr: std::net::SocketAddr,
1124    sender_clock: crate::clock::Clock,
1125) -> Result<(), Box<dyn std::error::Error>> {
1126    use crate::message::Message;
1127    use rmp_serde::encode;
1128
1129    if code == crate::message::NetworkMessageCode::Transaction && command.is_none() {
1130        log::error!("Command is None for Transaction message");
1131        return Err("Command is None for Transaction message".into());
1132    }
1133
1134    let msg = Message {
1135        sender_id: local_site.to_string(),
1136        sender_addr: local_addr,
1137        message_initiator_id: initiator_id.to_string(),
1138        clock: sender_clock,
1139        command,
1140        info,
1141        code,
1142        message_initiator_addr: initiator_addr,
1143    };
1144
1145    if recipient_address.ip().is_unspecified() || recipient_address.port() == 0 {
1146        log::warn!("Skipping invalid peer address {}", recipient_address);
1147        return Ok(());
1148    }
1149
1150    let buf = encode::to_vec(&msg)?;
1151
1152    let mut manager = NETWORK_MANAGER.lock().await;
1153
1154    let sender = match manager.get_sender(&recipient_address) {
1155        Some(s) => s,
1156        None => {
1157            if let Err(e) = manager.create_connection(recipient_address).await {
1158                return Err(format!(
1159                    "error with connection to {}: {}",
1160                    recipient_address.to_string(),
1161                    e
1162                )
1163                .into());
1164            }
1165            match manager.get_sender(&recipient_address) {
1166                Some(s) => s,
1167                None => {
1168                    let err_msg =
1169                        format!("Sender not found after connecting to {}", recipient_address);
1170                    log::error!("{}", err_msg);
1171                    return Err(err_msg.into());
1172                }
1173            }
1174        }
1175    };
1176
1177    match sender.send(buf).await {
1178        Ok(s) => s,
1179        Err(e) => {
1180            let err_msg = format!(
1181                "Impossible to send msg to {} due to error : {}",
1182                recipient_address, e
1183            );
1184            log::error!("{}", err_msg);
1185            return Err(err_msg.into());
1186        }
1187    };
1188    log::debug!("Sent message {:?} to {}", &msg, recipient_address);
1189    Ok(())
1190}
1191
1192#[cfg(feature = "server")]
1193/// Implement our wave diffusion protocol
1194///
1195/// Lock the app state and diffuse a message
1196pub async fn diffuse_message(
1197    message: &crate::message::Message,
1198) -> Result<(), Box<dyn std::error::Error>> {
1199    use crate::state::LOCAL_APP_STATE;
1200
1201    log::debug!(
1202        "Début de la diffusion d'un message de type {:?}",
1203        message.code
1204    );
1205
1206    let (local_addr, site_id, connected_nei_addr, parent_address) = {
1207        let state = LOCAL_APP_STATE.lock().await;
1208        (
1209            state.get_site_addr(),
1210            state.get_site_id(),
1211            state.get_connected_nei_addr(),
1212            state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
1213        )
1214    };
1215    diffuse_message_without_lock(
1216        message,
1217        local_addr,
1218        &site_id,
1219        connected_nei_addr,
1220        parent_address,
1221    )
1222    .await?;
1223    Ok(())
1224}
1225
1226#[cfg(feature = "server")]
1227/// Implement our wave diffusion protocol
1228///
1229/// Diffuse a message without locking the app state
1230pub async fn diffuse_message_without_lock(
1231    message: &crate::message::Message,
1232    local_addr: std::net::SocketAddr,
1233    site_id: &str,
1234    connected_nei_addr: Vec<std::net::SocketAddr>,
1235    parent_address: std::net::SocketAddr,
1236) -> Result<(), Box<dyn std::error::Error>> {
1237    for connected_nei in connected_nei_addr {
1238        let peer_addr_str = connected_nei.to_string();
1239        if connected_nei != parent_address {
1240            log::debug!("Sending message to: {}", peer_addr_str);
1241
1242            if let Err(e) = send_message(
1243                connected_nei,
1244                message.info.clone(),
1245                message.command.clone(),
1246                message.code.clone(),
1247                local_addr,
1248                &site_id,
1249                &message.message_initiator_id,
1250                message.message_initiator_addr,
1251                message.clock.clone(),
1252            )
1253            .await
1254            {
1255                log::error!("❌ Impossible d’envoyer à {} : {}", peer_addr_str, e);
1256            }
1257        }
1258    }
1259    Ok(())
1260}
1261
1262#[cfg(test)]
1263#[cfg(feature = "server")]
1264mod tests {
1265    use super::*;
1266    use tokio::net::TcpListener;
1267
1268    #[tokio::test]
1269    async fn test_send_message() -> Result<(), Box<dyn std::error::Error>> {
1270        use crate::clock::Clock;
1271        use crate::message::{MessageInfo, NetworkMessageCode};
1272
1273        let address: std::net::SocketAddr = "127.0.0.1:8081".parse().unwrap();
1274        let local_addr: std::net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
1275        let local_site = "A";
1276        let clock = Clock::new();
1277
1278        let _listener = TcpListener::bind(address).await?;
1279
1280        let code = NetworkMessageCode::Discovery;
1281
1282        let send_result = send_message(
1283            address,
1284            MessageInfo::None,
1285            None,
1286            code,
1287            local_addr,
1288            local_site,
1289            local_site,
1290            local_addr,
1291            clock,
1292        )
1293        .await;
1294        assert!(send_result.is_ok());
1295        Ok(())
1296    }
1297}