Skip to content

Commit

Permalink
remove the use of multiprocess or threading when creating process in …
Browse files Browse the repository at this point in the history
…launcher
  • Loading branch information
duguyue100 committed Jan 5, 2021
1 parent 83e16af commit 63df4b4
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 26 deletions.
28 changes: 7 additions & 21 deletions pyaer/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import signal
from collections import OrderedDict
from datetime import datetime
# from threading import Thread, Event
from multiprocessing import Process, Event
import numpy as np
import zmq
import h5py
Expand Down Expand Up @@ -114,7 +112,6 @@ def __init__(self, url="tcp://127.0.0.1",
self.hub_pub_port = hub_pub_port
self.hub_sub_port = hub_pub_port
self.hub_pub_url = url+":{}".format(hub_pub_port)
print(self.hub_pub_url)
self.hub_sub_url = url+":{}".format(hub_sub_port)

# logger
Expand Down Expand Up @@ -714,8 +711,8 @@ def close(self):
self.aer_file.close()


class AERProcess(Process):
def __init__(self, cmd, daemon=True):
class AERProcess(object):
def __init__(self, cmd):
"""AER Process.
# Arguments
Expand All @@ -724,32 +721,21 @@ def __init__(self, cmd, daemon=True):
"""
super().__init__()

self.event = Event()
self.daemon = daemon

self.cmd = cmd
self.program_name = cmd[0]

def create_process(self):
pid = subprocess.Popen(self.cmd)
time.sleep(3)
time.sleep(0.5)
assert pid.poll() is None, 'Process {} launch failed'.format(
self.program_name)

return pid

def run(self):
pid = self.create_process()

while not self.event.is_set():
time.sleep(1)
assert pid.poll() is None, "Process {} was killed".format(
self.program_name)

pid.send_signal(signal.SIGINT)
pid.terminate()
pid.communicate()
self.pid = self.create_process()

def stop(self):
self.event.set()
self.join()
self.pid.send_signal(signal.SIGINT)
self.pid.terminate()
self.pid.communicate()
3 changes: 3 additions & 0 deletions scripts/aer_comm/aer_hub
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ parser.add_argument("--aer_hub_name", type=str,
args = parser.parse_args()

# print all options
print("="*50)
print(json.dumps(args.__dict__, indent=4, sort_keys=True))
print("="*50)


aer_hub = AERHub(url=args.url,
Expand All @@ -50,6 +52,7 @@ aer_hub.logger.info("aer_hub: launch a central message relay hub")
aer_hub.logger.info("aer_lstopic: display all published topics")
aer_hub.logger.info("aer_publisher: add a custom publisher")
aer_hub.logger.info("aer_subscriber: add a custom subscriber")
aer_hub.logger.info("aer_saver: add an AER Saver")
aer_hub.logger.info("="*50)

# run the hub
Expand Down
10 changes: 5 additions & 5 deletions scripts/aer_comm/aer_launch
Original file line number Diff line number Diff line change
Expand Up @@ -182,26 +182,26 @@ for pg_i, (pg_type, pg_desc) in enumerate(launch_desc.items()):
if pg_type == HUB:
parsed_hub_cmd = parse_hub(pg_desc)
process_collector.append(
AERProcess(parsed_hub_cmd, daemon=True))
AERProcess(parsed_hub_cmd))
elif PUB in pg_type:
parsed_pub_cmd = parse_publisher(pg_desc)
process_collector.append(
AERProcess(parsed_pub_cmd, daemon=True))
AERProcess(parsed_pub_cmd))
elif SUB in pg_type:
parsed_sub_cmd = parse_subscriber(pg_desc)
process_collector.append(
AERProcess(parsed_sub_cmd, daemon=True))
AERProcess(parsed_sub_cmd))
elif SAVER in pg_type:
parsed_saver_cmd = parse_saver(pg_desc)
process_collector.append(
AERProcess(parsed_saver_cmd, daemon=True))
AERProcess(parsed_saver_cmd))
else:
launch_logger.error("Unsupported Type {}".format(pg_type))


# launching
for process in process_collector:
process.start()
process.run()

while True:
try:
Expand Down
2 changes: 2 additions & 0 deletions scripts/aer_comm/aer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ args, custom_args = parser.parse_known_args()
custom_args_dict = parse_custom_args(custom_args)

# print all options
print("="*50)
print(json.dumps(
{**args.__dict__, **custom_args_dict},
indent=4, sort_keys=True))
print("="*50)

# open the device
if args.device == "None":
Expand Down
4 changes: 4 additions & 0 deletions scripts/aer_comm/aer_saver
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ parser.add_argument("--libver", type=str,
args = parser.parse_args()

# print all options
print("="*50)
print(json.dumps(args.__dict__, indent=4, sort_keys=True))
print("="*50)


if args.hdf5:
Expand All @@ -114,5 +116,7 @@ saver_sub = AERSaverSubscriber(
# set saver
saver_sub.set_saver(saver)

saver_sub.logger.info("AER Saver initialized.")

# Start sending data
saver_sub.run()
2 changes: 2 additions & 0 deletions scripts/aer_comm/aer_subscriber
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ args, custom_args = parser.parse_known_args()
custom_args_dict = parse_custom_args(custom_args)

# print all options
print("="*50)
print(json.dumps(
{**args.__dict__, **custom_args_dict},
indent=4, sort_keys=True))
print("="*50)

# define subscriber
if args.use_default_sub:
Expand Down

0 comments on commit 63df4b4

Please sign in to comment.