Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threads being implemented in the background?? #122

Open
dluckey opened this issue Oct 12, 2022 · 0 comments
Open

Threads being implemented in the background?? #122

dluckey opened this issue Oct 12, 2022 · 0 comments
Labels
question Further information is requested

Comments

@dluckey
Copy link

dluckey commented Oct 12, 2022

Hi Everyone,

I have a simple object to encapsulate some details and a coroutine that gets added to the event loop for testing as follows:

MQTTStream

import logging
from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_0
from amqtt.session import IncomingApplicationMessage as IncomingMessage

class MQTTStream(object):
    address_prefix = "mqtt://"

    config = {
        "keep_alive": 60,
        "reconnect_max_interval": 5,
        "reconnect_retries": 5,
        "ping_delay": 2,
    }

    def __init__(self, conn_details):
        self.logger = logging.getLogger(__name__)
        self.dev_uid = conn_details.get("dev-uid", "")
        self.name = conn_details.get("name", "")
        self.type = conn_details.get("type", "")

        self.username = conn_details.get("username", "")
        self.password = conn_details.get("password", "")
        self.port = conn_details.get("port", 1883)

        self.host_address = conn_details.get("host_address", "")

        self.topic = conn_details.get("topic", "")

        if self.username and self.password:
            self.client_address = MQTTStream.address_prefix + self.username + ":" + self.password + "@" + self.host_address + '/:' + str(self.port)
        elif self.username:
            self.client_address = MQTTStream.address_prefix + self.username + "@" + self.host_address + '/:' + str(self.port)
        else:
            self.client_address = MQTTStream.address_prefix + self.host_address + '/:' + str(self.port)

        self.client = None

    async def mqtt_function(self):
        self.client = MQTTClient(client_id=self.name.upper(), config=MQTTStream.config)
        try:
            await self.client.connect(self.client_address)
            self.logger.info("Connected")
        except Exception as e:
            self.logger.info("Error:" + str(e))

        await self.client.subscribe([(self.topic, QOS_0)])
        self.logger.info("Subscribed")

        while True:
            self.logger.info("Listening %s" % self.name)
            message: IncomingMessage = await self.client.deliver_message()
            self.logger.info("%s, %s" % (self.name, message))

However when I create several instances with some dummy details(5) using the following:

Runner

import asyncio
from classes import MQTTStream

ttn_conn = {...}    #  Details as required in above code 'config'
cat_conn = {...}
cat0_conn = {...}
cat1_conn = {...}
cat2_conn = {...}

ttn = MQTTStream(ttn_conn)
cat = MQTTStream(cat_conn)
cat0 = MQTTStream(cat0_conn)
cat1 = MQTTStream(cat1_conn)
cat2 = MQTTStream(cat2_conn)

loop = asyncio.new_event_loop()
tasks = {}

tasks[ttn.dev_uid] = loop.create_task(ttn.mqtt_function())
tasks[cat.dev_uid] = loop.create_task(cat.mqtt_function())
tasks[cat0.dev_uid] = loop.create_task(cat0.mqtt_function())
tasks[cat1.dev_uid] = loop.create_task(cat1.mqtt_function())
tasks[cat2.dev_uid] = loop.create_task(cat2.mqtt_function())
loop_thread.run_forever()

I get htop reporting the main python process and 5 user threads.
Capture

Everything else seems to work correctly and I get messages as expected for each Client/Connection, just perplexed as to why the user threads are also there.

I did do a little redacting to the above code but it shouldn't be anything that would alter functionality

Has anyone got any idea whether this is an issue or am I(hopefully) missing something?

I am using:
Python 3.8.14
Ubuntu 18.04
PyCharm

@not-f-elsner not-f-elsner added the question Further information is requested label Feb 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants