0

I am using Python 3.12.7. I have multiple servers that run themselves multi-threaded in a networked environment. I want to increase the variable self.msg_id in the handle_message function. Everything else runs fine. However, when I execute the line self.msg_id += 1 I get the error:

[ERROR] Attribute msg_id already defined. Plugin conflict?

Reading the value from the variable however works just fine. Note that this is a task at uni, therefore there are some strange comments insight the code.

The init of the Server class

class Server(Bottle):

    def __init__(self, ID, IP, server_list):
        super(Server, self).__init__()
        self.id = int(ID)
        self.ip = str(IP)
        self.server_list = server_list
        self.msg_id = 1

        self.status = {
            "crashed": False,
            "notes": "",
            "num_entries": 0,  # we use this to generate ids for the entries
        }

        self.lock = threading.RLock()  # use reentry lock for the server
        self.msg_lock = threading.RLock()

        # Handle CORS
        self.route("/<:re:.*>", method="OPTIONS", callback=self.add_cors_headers)
        self.add_hook("after_request", self.add_cors_headers)

        # Those two http calls simulate crashes, i.e., unavailability of the server
        self.post("/crash", callback=self.crash_request)
        self.post("/recover", callback=self.recover_request)
        self.get("/status", callback=self.status_request)

        # Define REST URIs for the frontend (note that we define multiple update and delete routes right now)
        self.post("/entries", callback=self.create_entry_request)
        self.get("/entries", callback=self.list_entries_request)
        self.put("/entries/<entry_id>", callback=self.update_entry_request)
        self.delete("/entries/<entry_id>", callback=self.delete_entry_request)

        # REST URIs for the later algorithms
        self.post("/message", callback=self.message_request)

        self.board = Board()

The relevant function

    def handle_message(self, message):
        # Note that you might need to use the lock
        print("Received message: ", message)

        if "type" in message:
            type = message["type"]

            if type == "add_entry":

                assert (
                    self.id == 0
                )  # ID 0 is coordinator only this server should receive add_entry messages right now
                with self.msg_lock:
                    entry_value = message["entry_value"]

                    # We can safely propagate here since we always have a single frontend client, right? So no need to lock, right? Right?!
                    for other in self.server_list:
                        # TODO: Send message to other servers concurrently?
                        self.send_message(
                            other,
                            {
                                "type": "propagate",
                                "entry_value": entry_value,
                                "origin_id": self.id,
                                "message_id": self.msg_id,
                            },
                        )
                    self.msg_id += 1

            elif type == "propagate":
                entry_value = message["entry_value"]
                msg_id = message["message_id"]
                # Let's hope this is from the coordinator
                with self.lock:
                    if msg_id in self.board.indexed_entries:
                        return {}
                    self.status["num_entries"] += 1
                    entry = Entry(msg_id, entry_value)
                    self.board.add_entry(entry)
            else:
                print("Received weird message?")

        return {}

Main code execution

# Sleep a bit to allow logging to be attached
time.sleep(2)

# the server_list contains all server ips of the distributed blackboard
server_list = os.getenv("SERVER_LIST").split(",")
own_id = int(os.getenv("SERVER_ID"))
own_ip = server_list[own_id]

server = Server(own_id, own_ip, server_list)

NUM_THREADS = 10
print("#### Starting Server {} with {} threads".format(str(own_id), NUM_THREADS))
httpserver.serve(
    server,
    host="0.0.0.0",
    port=80,
    threadpool_workers=NUM_THREADS,
    threadpool_options={"spawn_if_under": NUM_THREADS},
)

1 Answer 1

1

This seems to be an Issue of the Bottle library.

When I was testing locally, I was able to keep track of such a variable in the server status dict that you were showing.

class Server(Bottle):

def __init__(self, ID, IP, server_list):
    super(Server, self).__init__()
    self.id = int(ID)
    self.ip = str(IP)
    self.server_list = server_list


    self.status = {
        "crashed": False,
        "notes": "",
        "num_entries": 0,  # we use this to generate ids for the entries
        "msg_id": 1, # To keep track of the order of messages and filter out duplicates
    }

    self.lock = threading.RLock()  # use reentry lock for the server
    self.msg_lock = threading.RLock()

With this I was able to modify the self.status["msg_id"] as you intended.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks! I can reproduce this locally. Is there an explanation for this behaviour?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.