Parallel Processing

Home


Figure 1 : Pi "cluster"

We have 80 desks in the lab, each having a Raspberry Pi base node (Link), these contain 3 Raspberry Pis, thus forming a cluster of 240 processing nodes. Each node (Raspberry Pi) has 4 processing cores, so in total we have 960 processing elements, soooo Pi super computer :). In the past when attempting parallel programming i've used the MPI library (Link). This library provides the functions needed to distribute, control and execute code across yours hosts e.g. your classic Beowulf cluster (Link). MPI is a definite contender as a software architecture, however, as this cluster is a teaching platform i decided to go back to basics i.e. build the required software to distribute, control and execute code from first principles.

Table of Contents

Temperature Display.
Finding Prime Numbers.
Game of Life.

Temperature Display


Figure 2 : example GUI of Pi temperature plot

Not really a parallel processing applications, but to start off i decided to write a simple program to monitor the temperatures of the Pis in the cluster i.e. a head node that receives IP + temperature values from each worker node and displays this data on a screen. This data will be colour coded, as shown in figure 2 e.g. blue=40, green=50, yellow=60, orange=70 and red=80 degrees. The worker node code is shown below, or you can download it using this link (Link):

###############
# WORKER NODE #
###############

import socket
import json
import time
import os
import psutil
import sys

# Head nodes IP address

SERVER_IP = "192.168.1.100"  
PORT = 5050

# vcgencmd to get CPU temp

def get_temp():
  try:
    temp_str = os.popen("vcgencmd measure_temp").readline()
    temp = float(temp_str.replace("temp=", "").replace("'C\n", ""))
  except:
    temp = 0.0  # default
  return temp

# check to see if code already running, if so exit

def is_another_instance_running():

  # PID of this program & full path + filename

  current_pid = os.getpid()                        
  current_script = os.path.abspath(__file__)  

  # get PID and "full path + filename" of running processes and check if 
  # "full path + filename" already exists

  for proc in psutil.process_iter(['pid', 'cmdline']):
  try:
    if proc.info['pid'] == current_pid:
      continue
    cmdline = proc.info['cmdline']
    if cmdline and current_script in cmdline:
      return True
  except (psutil.NoSuchProcess, psutil.AccessDenied):
    continue
  return False

# main loop, get IP address, TX

def send_temp():
  
  # Get an IP address
 
  addrs = psutil.net_if_addrs()
  ip = "???"
  if "eth0" in addrs:
    for addr in addrs["eth0"]:
      if addr.family == socket.AF_INET:
        ip = addr.address

  # create dictionary, convert into byte stream, calc length, TX 

  while True:
    temp = get_temp()
    payload = {
      "ip": ip,
      "temp": temp
    }

    # convert data into bytes. 
    # dictionary converted into JSON string, then into bytes
    # length of data converted into fixed length, four byte value

    data = json.dumps(payload).encode('utf-8')
    length = len(data).to_bytes(4, 'big')

    # try to TX

    try:
      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((SERVER_IP, PORT))
        s.sendall(length + data)
        print(f"Sent: {payload}")
    except Exception as e:
      print(f"Failed to send data: {e}")

    # send IP + TEMP every 5 seconds
    time.sleep(5)  

if __name__ == "__main__":
  if is_another_instance_running():
    print("Another instance of this script is already running. Exiting.")
    sys.exit(0)
  send_temp()

This code needs to be copied to each worker node and executed. We could do this manually e.g. ftp, but that's not very practical, so i wrote a Bash script to do this. To specify which host should receive and run this code each worker node's IP address is listed in the file: pi_list.txt, its format is shown below, you can download it using this link (Link):

192.168.4.1
192.168.4.2
192.168.4.3
192.168.5.1
192.168.5.2
192.168.5.3
...
192.168.66.1
192.168.66.2
192.168.66.3
192.168.67.1
192.168.67.2
192.168.67.3

To generate this list for a few host i simply cut and pasted the required values e.g. for testing i only distributed the worker code to 10 Pis. For larger clusters you need to automate this, so i wrote the script below, you can download it using this link (Link):

#!/bin/bash

start_subnet=4
end_subnet=67

echo -n > pi_list.txt
for subnet in $(seq $start_subnet $end_subnet)
do
  for host in {1..3}
  do
    echo "192.168.$subnet.$host" >> pi_list.txt
  done
done

To distribute the worker code across the cluster the script below uses scp and ssh. This is a simple implementation so it has its failings :). The script reads the file: pi_list.txt, accesses each node's IP address, transfers the file: worker-1.py, to this address using scp and executes it using ssh. You can download it using this link (Link).

 
#!/bin/bash

WORKER_SCRIPT="worker-1.py"
REMOTE_PATH="/home/pi/$WORKER_SCRIPT"
PI_LIST="pi_list.txt"
SSH_USER="pi"
PASSWORD="12345"
MAX_RETRIES=3

count=0
echo -n > failed.log
echo $count > count.log

cat "$PI_LIST" | while read ip 
do
  if test -z "$ip" 
  then
    continue
  fi

  echo "Deploying to $ip..."

  attempt=1
  success=0

  while test $attempt -le $MAX_RETRIES 
  do
    echo "Attempt $attempt to copy script to $ip..."

    expect <<EOF
      set timeout 10
      spawn scp $WORKER_SCRIPT $SSH_USER@$ip:$REMOTE_PATH
      expect {
        "continue connecting (yes/no)?" {
          send "yes\r" 
          exp_continue
        }
        "password:" {
          send "$PASSWORD\r"
          expect {
            "100%" { exit 0 }
            eof { exit 1 }
          }
        }
        timeout { exit 1 }
        eof { exit 1 }
      }
EOF

    if test $? -eq 0 
    then
      echo "Transfer successful to $ip."
      success=1
      break
    else
      echo "Transfer failed to $ip. Retrying..."
      attempt=$((attempt + 1))
    fi
  done

  if test $success -eq 1
  then
    echo "Starting worker script on $ip..."
    expect <<EOF
      spawn ssh $SSH_USER@$ip "nohup python3 $REMOTE_PATH > /home/pi/pi_worker.log 2>&1 &"
      expect "password:"
      send "$PASSWORD\r"
      expect eof
EOF
    count=$((count + 1))
    echo $count > count.log

  else
    echo "Failed to deploy to $ip after $MAX_RETRIES attempts."
  fi
done

count=`cat ./count.log`
echo "Deployment complete: $count workers"

You can definitely make the case that placing passwords in a script is bad practice, but for the purposes of teaching i'm going to let that one slide. Just in case there are any issue with the transfer the script tries 3 times on scp. Note, when testing in the lab i had problems with the lab's switch dropping connections. I was told later it was something to do with the VLAN allocation. This meant that connections to each Pi would drop out from time to time. This was not toooo much of an issues as you can rerun this script if there is a disconnect. The worker code will prevent multiple copies running on the same node at the same time. When using the scp and ssh commands you need to send a password, i could have gone password-less using SSH keys, but decided against that as this is an adhoc cluster, soooo within the Bash script you have two Expect scripts that describe the expected interactions between client and server. Note, these do assume no fault conditions, if things do go wrong hosts can easily get out of sync and fail. However, this is not a big issue as you can keep running the distribution script again and again until all worker nodes are up and running.

Extra note : tried this on a newer laptop this script had issues with host key checking, not sure why, did add an extra test to the except script, but, as this is a closed network could get around this using : ssh -o StrictHostKeyChecking=no user@new-server-ip. Seems to work at the moment :)

To make things look nice the head node uses TKinter, to produce the GUI shown in figure 2. The basic operation is the head node waits for incoming connections from the worker nodes. It then creates a handle_client thread that receives this data, updates a shared dictionary containing IP + temperature values, then finally closes the connection. This code is shown below, or you can download it using this link (Link):

#############
# HEAD NODE #
#############

import socket
import json
import threading
import tkinter as tk

# Server port and shared data
PORT = 5050
temps = {}
lock = threading.Lock()

# Temperature range
MIN = 40   # Coldest expected temperature
MAX = 80   # Hottest expected temperature

# Grid configuration (240 Pi)
ROWS = 12
COLS = 20
SQUARE_SIZE = 70
PADDING = 10

# GUI setup
root = tk.Tk()
root.title("Live Pi Cluster Temperatures")
canvas_width = COLS * (SQUARE_SIZE + PADDING)
canvas_height = ROWS * (SQUARE_SIZE + PADDING)
canvas = tk.Canvas(root, width=canvas_width, height=canvas_height, bg="white")
canvas.pack()

def get_color(temp):
  # Clamp and normalize temperature

  temp = max(MIN, min(MAX, temp))
  t = (temp - MIN) / (MAX - MIN)

  # Interpolate RGB from blue → green → yellow → orange → red

  if t < 0.25:
    r = 0
    g = int(255 * (t / 0.25))
    b = 255 - g
  elif t < 0.5:
    r = int(255 * ((t - 0.25) / 0.25))
    g = 255
    b = 0
  elif t < 0.75:
    r = 255
    g = int(255 * (1 - ((t - 0.5) / 0.25)))
    b = 0
  else:
    r = 255
    g = 0
    b = 0

  return f'#{r:02x}{g:02x}{b:02x}'

def draw_chart():
  canvas.delete("all")
  with lock:
    items = list(temps.items())

  if not items:
    canvas.create_text(canvas_width // 2, canvas_height // 2, text="Waiting for data...", font=("Arial", 16))
    return

  for idx, (ip, temp) in enumerate(items):
    row = idx // COLS
    col = idx % COLS
    if row >= ROWS:
      break  # Don't draw beyond grid

    x0 = col * (SQUARE_SIZE + PADDING) + PADDING
    y0 = row * (SQUARE_SIZE + PADDING) + PADDING
    x1 = x0 + SQUARE_SIZE
    y1 = y0 + SQUARE_SIZE

    color = get_color(temp)
    canvas.create_rectangle(x0, y0, x1, y1, fill=color, outline="black")
    canvas.create_text((x0 + x1) // 2, y0 + 15, text=ip, font=("Arial", 8))
    canvas.create_text((x0 + x1) // 2, y1 - 15, text=f"{temp:.1f}°C", font=("Arial", 10))

def update_loop():
  draw_chart()
  root.after(2000, update_loop)   # update screen every 2000ms (2sec)

# Thread to handle each connection

def handle_client(conn, addr):
  try:
    length = int.from_bytes(conn.recv(4), 'big')
    data = conn.recv(length)

    payload = json.loads(data.decode('utf-8'))
    ip = payload["ip"]
    temp = payload["temp"]
    with lock:
      temps[ip] = temp
    print(f"{ip} reports {temp:.2f}°C")
  except:
    pass
  finally:
    conn.close()

# Socket server

def server():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.bind(('', PORT))
  s.listen(240)
  print(f"Listening on port {PORT}...")
  while True:
    conn, addr = s.accept()
    threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start()

# Start server and GUI loop

threading.Thread(target=server, daemon=True).start()
update_loop()
root.mainloop()

In operation the Pis IP and temperature are displayed in a coloured square, as shown in figure 3. These screenshots were taken approximately 10 seconds apart, if you look carefully you can see some colour / temperature changes :). Alternatively, you can see this software in action in this short video: (Link).




Figure 3 : Pi temperature plot

In addition to drawing nice pictures, this applications demonstrates your classic TCP phases each time a worker node transfers its data:


Figure 4 : SYN - SYNCACK - ACK


Figure 5 : PSH - ACK


Figure 6 : DATA


Figure 7 : FIN - FIN ACK - ACK

Finding Prime Numbers

The previous example developed the framework needed to distribute and run code across the cluster, but it didn't do any processing. The original plan was to implement the Game of Life (Link), but distributing this across a variable number of nodes was a little more complex than i thought, soooo decided to take smaller steps and implement a system that would look for prime numbers within a specified range i.e. break a larger number ranges down into equal sized blocks and then distribute these across a network of nodes, such that each node search for primes within their assigned block in parallel, returning identified prime numbers to the head node.

The first problem to solve is how to split this number range i.e. MIN_VALUE to MAX_VALUE, across the cluster. Decided to implement this through the distribution script, i.e. script calculates the number of nodes from the pi_list.txt file, divides the number range by this and then passes start and end values to each worker node via command line parameters, as shown below, or you can download it using this link (Link):

#!/bin/bash

WORKER_SCRIPT="worker-2.py"
REMOTE_PATH="/home/pi/$WORKER_SCRIPT"
PI_LIST="pi_list.txt"
SSH_USER="pi"
PASSWORD="12345"
MAX_RETRIES=3
MAX_VAL=1000000000
MIN_VAL=1

NUM_WORKERS=`cat $PI_LIST | grep -v '^$' | wc -l`
RANGE=$(( (MAX_VAL - MIN_VAL +1) / NUM_WORKERS ))
START=$MIN_VAL

echo $NUM_WORKERS $RANGE $START

count=0
echo -n > failed.log
echo $count > count.log

cat "$PI_LIST" | while read ip 
do
  if test -z "$ip" 
  then
    continue
  fi

  echo "Deploying to $ip..."

  attempt=1
  success=0

  while test $attempt -le $MAX_RETRIES 
  do
    echo "Attempt $attempt to copy script to $ip..."

    expect <<EOF
      set timeout 10
      spawn scp $WORKER_SCRIPT $SSH_USER@$ip:$REMOTE_PATH
      expect {
        "continue connecting (yes/no)?" {
          send "yes\r"
          exp_continue
        }
        "password:" {
          send "$PASSWORD\r"
          expect {
            "100%" { exit 0 }
            eof { exit 1 }
          }
        }
        timeout { exit 1 }
        eof { exit 1 }
      }
EOF

    if test $? -eq 0 
    then
      echo "Transfer successful to $ip."
      success=1
      break
    else
      echo "Transfer failed to $ip. Retrying..."
      attempt=$((attempt + 1))
    fi
  done

  if test $success -eq 1
  then
    END=$((START + RANGE))

    if test $END -gt $MAX_VAL
    then
      END=$MAX_VAL
    fi

    echo Host: $ip start:$START end:$END
    expect <<EOF
      spawn ssh $SSH_USER@$ip "nohup python3 $REMOTE_PATH $START $END > /home/pi/pi_worker.log 2>&1 &"
      expect "password:"
      send "$PASSWORD\r"
      expect eof
EOF
    count=$((count + 1))
    echo $count > count.log
    START=$((END+1))

  else
    echo "Failed to deploy to $ip after $MAX_RETRIES attempts."
    echo "Failed to deploy to $ip after $MAX_RETRIES attempts." >> failed.log
  fi
done

count=`cat ./count.log`
echo "Deployment complete: $count workers"

The worker node code then looks for primes within the start and end range. The final worker node code is shown below, or you can download it using this link (Link):

import socket
import time
import os
import psutil
import sys

# Server IP address

SERVER_IP = "192.168.71.1" 
PORT = 4040

def is_another_instance_running():
    current_pid = os.getpid()
    current_script = os.path.abspath(__file__)

    for proc in psutil.process_iter(['pid', 'cmdline']):
        try:
            if proc.info['pid'] == current_pid:
                continue
            cmdline = proc.info['cmdline']
            if cmdline and current_script in cmdline:
                return True
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return False

def how_many_instances_are_running():
    current_pid = os.getpid()
    current_script = os.path.abspath(__file__)

    count = 0
    for proc in psutil.process_iter(['pid', 'cmdline']):
        try:
            if proc.info['pid'] == current_pid:
                continue
            cmdline = proc.info['cmdline']
            if cmdline and current_script in cmdline:
                count = count + 1 
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return count 

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True

def send(value):
    resend = True
    data = str(value).encode()
    length = len(data).to_bytes(4, 'big')
    while resend:
        try:
            payload = length + data
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.connect((SERVER_IP, PORT))
                s.sendall(payload)
                print(f"Sent: {payload}")
                resend = False
        except Exception as e:
            print(f"Failed to send data: {e}")
            time.sleep(5) 

if __name__ == "__main__":
    #if is_another_instance_running():
    #    print("Another instance of this script is already running. Exiting.")
    #    sys.exit(0)

    if how_many_instances_are_running() > 3:
        print("To many instance of this script are already running. Exiting.")
        sys.exit(0)

    if len(sys.argv) != 3:
        print("Usage: python script.py  ")
        sys.exit(1)

    try:
        min_val = int(sys.argv[1])
        max_val = int(sys.argv[2])
    except ValueError:
        print("Both min and max must be integers.")
        sys.exit(1)

    if min_val > max_val:
        print("Min must be less than or equal to Max.")
        sys.exit(1)

    print(f"Prime numbers between {min_val} and {max_val}:")
    for value in range( min_val, max_val+1 ):
        if is_prime(value):
            send(value)

Another change here is that this worker code allows multiple instances of the worker code to run on the same host. The thought here is that there are four processors on each Raspberry Pi. My assumptions is its probably best to reserve one processor for the OS, sooo you could have three instances running, one on each of the "spare" processors. This is just guess work, the optimal number of instances to maximise processing performance would need to be tested e.g. does the OS need multiple processor cores to run, does each processor have the capacity to run multiple instances i.e. what are the multi-tasking bottlenecks, is it IO, shared resources etc? To load these multiple instances on to each processor i simply added the same IP three times to the pi_list.txt file. Not the most efficient solution, the worker code is transferred multiple time, but the code is small and it works :). Welll, it did lock up once, normally you see a constant flow of value from each node, but once this seemed to stall / lock up, not sure if this is due to doubling of RTO in the event that the head node stopped working. Need to check this, but in general seems fine.

The head node code receives the identified prime values from each worker node. As the number ranges do not overlap it doesn't need to check that these values are unique. These received values are displayed on the screen along with the largest prime and the number of primes identified. In addition to this the IP addresses of each node sending data is displayed. The user can start and stop this code by pressing the appropriate button. When they do the received values are written to a file. The final head node code is shown below, or you can download it using this link (Link):

import socket
import threading
import tkinter as tk
from tkinter import scrolledtext

# Server IP address

HOST = '192.168.1.100'
PORT = 4040

class ServerApp:
    def __init__(self, master):
        self.master = master
        self.master.title("Mike's Prime Number Finder")
        self.master.geometry("600x500")

        self.running = False
        self.server_socket = None
        self.max_prime = None
        self.unique_ips = set()
        self.unique_primes = set()

        # Text area for logs
        self.text_area = scrolledtext.ScrolledText(master, wrap=tk.WORD, state='disabled', height=10)
        self.text_area.pack(padx=10, pady=10, fill=tk.X)

        # Max prime display
        self.prime_label = tk.Label(master, text="Max prime: None", font=("Arial", 12, "bold"))
        self.prime_label.pack(pady=5)

        # Unique prime count display
        self.prime_count_label = tk.Label(master, text="Number of primes found: 0", font=("Arial", 12, "bold"))
        self.prime_count_label.pack(pady=5)

        # Unique IP list label
        self.ip_label = tk.Label(master, text="Worker IPs:", font=("Arial", 12, "bold"))
        self.ip_label.pack(pady=(10, 0))

        # Frame for IP listbox and scrollbar
        self.ip_frame = tk.Frame(master)
        self.ip_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=False)

        self.ip_scrollbar = tk.Scrollbar(self.ip_frame, orient=tk.VERTICAL)
        self.ip_listbox = tk.Listbox(self.ip_frame, height=6, yscrollcommand=self.ip_scrollbar.set)
        self.ip_scrollbar.config(command=self.ip_listbox.yview)

        self.ip_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        self.ip_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        # Buttons
        self.button_frame = tk.Frame(master)
        self.button_frame.pack(pady=10)

        self.start_button = tk.Button(self.button_frame, text="Start Server", command=self.start_server_thread)
        self.start_button.grid(row=0, column=0, padx=10)

        self.stop_button = tk.Button(self.button_frame, text="Stop Server", command=self.stop_server, state='disabled')
        self.stop_button.grid(row=0, column=1, padx=10)

        self.quit_button = tk.Button(self.button_frame, text="Quit", command=self.quit_app)
        self.quit_button.grid(row=0, column=2, padx=10)

        # Handle window close
        self.master.protocol("WM_DELETE_WINDOW", self.quit_app)

    def log_message(self, message):
        self.text_area.config(state='normal')
        self.text_area.insert(tk.END, message + '\n')
        self.text_area.config(state='disabled')
        self.text_area.see(tk.END)

    def update_max_prime(self, value):
        try:
            num = int(value)
            self.unique_primes.add(num)
            self.prime_count_label.config(text=f"Number of primes found: {len(self.unique_primes)}")
            if self.max_prime is None or num > self.max_prime:
                self.max_prime = num
                self.prime_label.config(text=f"Max Prime: {self.max_prime}")
        except ValueError:
            pass

    def update_ip_list(self, ip):
        if ip not in self.unique_ips:
            self.unique_ips.add(ip)
            self.ip_listbox.insert(tk.END, ip)

    def receive_data(self, conn, addr):
        self.update_ip_list(addr[0])
        while self.running:
            try:
                length_bytes = conn.recv(4)
                if not length_bytes:
                    break
                length = int.from_bytes(length_bytes, 'big')

                data = b''
                while len(data) < length:
                    packet = conn.recv(length - len(data))
                    if not packet:
                        break
                    data += packet

                message = data.decode()
                self.log_message(f"Received from {addr[0]}: {message}")
                self.update_max_prime(message)
            except (ConnectionResetError, OSError):
                break

    def start_server(self):
        self.running = True
        self.start_button.config(state='disabled')
        self.stop_button.config(state='normal')

        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((HOST, PORT))
        self.server_socket.listen()
        self.log_message(f"Server listening on {HOST}:{PORT}")

        while self.running:
            try:
                self.server_socket.settimeout(1.0)
                conn, addr = self.server_socket.accept()
                #self.log_message(f"Connected by {addr}")
                threading.Thread(target=self.receive_data, args=(conn, addr), daemon=True).start()
            except socket.timeout:
                continue
            except OSError:
                break

    def start_server_thread(self):
        threading.Thread(target=self.start_server, daemon=True).start()

    def save_primes_to_file(self):
        try:
            with open("unique_primes.txt", "w") as f:
                for prime in sorted(self.unique_primes):
                    f.write(f"{prime}\n")
            self.log_message("Found primes saved to unique_primes.txt")
        except Exception as e:
            self.log_message(f"Error saving primes: {e}")

    def stop_server(self):
        self.running = False
        self.start_button.config(state='normal')
        self.stop_button.config(state='disabled')
        if self.server_socket:
            try:
                self.server_socket.close()
            except OSError:
                pass
            self.server_socket = None
        self.log_message("Server stopped.")
        self.save_primes_to_file()

    def quit_app(self):
        self.stop_server()
        self.master.quit()

if __name__ == "__main__":
    root = tk.Tk()
    app = ServerApp(root)
    root.mainloop()

A screen shot of this code running is shown below in figure 8.


Figure 8 : prime head node GUI

I decided to make this demo into an openday demo. I had previously bought some 20mm Vtype, or 2020 aluminium extrusion, sooo i made a quick frame to create Mike's Prime Number Finder :), as shown in figure 9. This beast / mini-supercomputer has 12 Raspberry Pis, 11 are worker nodes, 1 is the head node. The worker nodes run three instances of the prime number finder worker code and the temperature code. This data i.e. primes and temperature, is transmitted to the head node and displayed on the top monitor. Connecting these processors together we have an old HP ProCurve 2524 (J4813A) switch (Link). This give me a lot more ports than i need, its only 100Mbps, but its key advantage is its weight, giving the frame a low centre of gravity :). Sooo we have:

Number of processors = 11
Number of processes running 11 x 3 = 33

Figure 9 : Mike's Prime Number Finder (MPNF)

As always there is testing and testing :). Forgot that this code will be running for a long time and that a computer does not have infinite memory :). As you may have spotted the display code will keep dumping values the set of unique_primes and logging messages, eventually we run out of memory and the OS can not create a thread to handle the incoming connections, soooo, a new and improved version of the display code was needed. This dumps log messages now to disk i.e. when we get 10,000 lines the text box is dumped to disk and cleared. The same is true for the set of unique_primes, this is dumped to a file every five minutes. The final head node code is shown below, or you can download it using this link (Link):

import socket
import threading
import tkinter as tk
from tkinter import scrolledtext

HOST = '192.168.71.1'
PORT = 4040

MAX_LINES_TRIGGER = 10000
KEEP_LINES = 20
SAVE_PERIOD = 60 * 5

class ServerApp:
    def __init__(self, master):
        self.master = master
        self.master.title("Primes Socket Server")
        self.master.geometry("600x600")

        self.running = False
        self.server_socket = None
        self.max_prime = None
        self.unique_ips = set()
        self.unique_primes = set()

        # Text area for logs
        self.text_area = scrolledtext.ScrolledText(master, wrap=tk.WORD, state='disabled', height=10)
        self.text_area.pack(padx=10, pady=10, fill=tk.X)

        # Max prime display
        self.prime_label = tk.Label(master, text="Max Prime: None", font=("Arial", 12, "bold"))
        self.prime_label.pack(pady=5)

        # Unique prime count display
        self.prime_count_label = tk.Label(master, text="Unique Primes: 0", font=("Arial", 12, "bold"))
        self.prime_count_label.pack(pady=5)

        # Unique IP list label
        self.ip_label = tk.Label(master, text="Unique IPs:", font=("Arial", 12, "bold"))
        self.ip_label.pack(pady=(10, 0))

        # Frame for IP listbox and scrollbar
        self.ip_frame = tk.Frame(master)
        self.ip_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=False)

        self.ip_scrollbar = tk.Scrollbar(self.ip_frame, orient=tk.VERTICAL)
        self.ip_listbox = tk.Listbox(self.ip_frame, height=12, yscrollcommand=self.ip_scrollbar.set)
        self.ip_scrollbar.config(command=self.ip_listbox.yview)

        self.ip_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        self.ip_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        # Buttons
        self.button_frame = tk.Frame(master)
        self.button_frame.pack(pady=10)

        self.start_button = tk.Button(self.button_frame, text="Start Server", command=self.start_server_thread)
        self.start_button.grid(row=0, column=0, padx=10)

        self.stop_button = tk.Button(self.button_frame, text="Stop Server", command=self.stop_server, state='disabled')
        self.stop_button.grid(row=0, column=1, padx=10)

        self.quit_button = tk.Button(self.button_frame, text="Quit", command=self.quit_app)
        self.quit_button.grid(row=0, column=2, padx=10)

        # File access
        self.file_lock = threading.Lock()

        # Handle window close
        self.master.protocol("WM_DELETE_WINDOW", self.quit_app)

    def log_to_file(self, message):
        with open("server_log.txt", "a") as log_file:
            log_file.write(message + '\n')

    def log_message(self, message):
        self.text_area.config(state='normal')
        self.text_area.insert(tk.END, message + '\n')

        lines = self.text_area.get("1.0", tk.END).splitlines()
        if len(lines) > MAX_LINES_TRIGGER:
            self.text_area.delete("1.0", tk.END)
            self.text_area.insert(tk.END, "\n".join(lines[-KEEP_LINES:]) + '\n')

        self.text_area.config(state='disabled')
        self.text_area.see(tk.END)
        self.log_to_file(message)  

    def update_max_prime(self, value):
        try:
            num = int(value)
            self.unique_primes.add(num)
            self.prime_count_label.config(text=f"Unique Primes: {len(self.unique_primes)}")
            if self.max_prime is None or num > self.max_prime:
                self.max_prime = num
                self.prime_label.config(text=f"Max Prime: {self.max_prime}")
        except ValueError:
            pass

    def update_ip_list(self, ip):
        if ip not in self.unique_ips:
            self.unique_ips.add(ip)
            self.ip_listbox.insert(tk.END, ip)

    def receive_data(self, conn, addr):
        self.update_ip_list(addr[0])
        while self.running:
            try:
                length_bytes = conn.recv(4)
                if not length_bytes:
                    break
                length = int.from_bytes(length_bytes, 'big')

                data = b''
                while len(data) < length:
                    packet = conn.recv(length - len(data))
                    if not packet:
                        break
                    data += packet

                message = data.decode()
                self.log_message(f"Received from {addr[0]}: {message}")
                self.update_max_prime(message)
            except (ConnectionResetError, OSError):
                break
            finally:
                conn.close()

    def start_server_thread(self):
        threading.Thread(target=self.start_server, daemon=True).start()
        threading.Thread(target=self.periodic_save_primes, daemon=True).start()

    def start_server(self):
        self.running = True
        self.start_button.config(state='disabled')
        self.stop_button.config(state='normal')

        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((HOST, PORT))
        self.server_socket.listen(50)
        self.log_message(f"Server listening on {HOST}:{PORT}")

        while self.running:
            try:
                self.server_socket.settimeout(1.0)
                conn, addr = self.server_socket.accept()
                print(f"Connection from: {addr[0]}, port: {addr[1]}")
                threading.Thread(target=self.receive_data, args=(conn, addr), daemon=True).start()
            except socket.timeout:
                continue
            except OSError:
                break

    def save_primes_to_file(self):
        try:
            with self.file_lock:
                with open("unique_primes.txt", "a") as f:
                    for prime in sorted(self.unique_primes):
                        f.write(f"{prime}\n")
            self.log_message("Unique primes saved to unique_primes.txt")
        except Exception as e:
            self.log_message(f"Error saving primes: {e}")
        finally:
            self.unique_primes.clear()

    def periodic_save_primes(self):
        while self.running:
            time.sleep(SAVE_PERIOD)  
            try:
                with self.file_lock:
                    with open("unique_primes.txt", "a") as f:
                        for prime in sorted(self.unique_primes):
                            f.write(f"{prime}\n")
                self.unique_primes.clear()  # Clear after saving
            except Exception as e:
                self.log_to_file(f"Error saving primes: {e}")

    def stop_server(self):
        self.running = False
        self.start_button.config(state='normal')
        self.stop_button.config(state='disabled')
        if self.server_socket:
            try:
                self.server_socket.close()
            except OSError:
                pass
            self.server_socket = None
        self.log_message("Server stopped.")
        self.save_primes_to_file()

    def quit_app(self):
        self.stop_server()
        self.master.quit()

if __name__ == "__main__":
    root = tk.Tk()
    app = ServerApp(root)
    root.mainloop()

Game of Life

The classic distributed Game of Life (Link). This was going to be my first parallel processing demo, however, the code for the cases and making it auto scale to the number of hosts in the cluster was a little toooo much complexity for my brain, hence the prime number demo above. However,

WORK IN PROGRESS

Creative Commons Licence

This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License.

Contact email: mike@simplecpudesign.com

Back