Skip to content

Commit

Permalink
Merge pull request #42 from pedohorse/houdini-distributed-sim-refactor
Browse files Browse the repository at this point in the history
Houdini distributed sim refactor
  • Loading branch information
pedohorse authored Feb 13, 2024
2 parents d01e0d3 + 6208c4b commit 7923020
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 106 deletions.
Binary file not shown.
10 changes: 10 additions & 0 deletions src/lifeblood/stock_nodes/houdini_distributed_sim/data/killer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
import lifeblood_connection


def main(invocation_iid: int, addressee: str, timeout: float = 90):
lifeblood_connection.message_to_invocation_send(invocation_iid, addressee, b'stop', timeout)


if __name__ == '__main__':
main(int(sys.argv[1]), sys.argv[2], float(sys.argv[3]))
121 changes: 121 additions & 0 deletions src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import sys
import os
from pathlib import Path
import re
import asyncio
import lifeblood_connection
import json
import shutil
import tempfile

class ControlSignalProcessor:
def __init__(self, my_addressee: str):
self.__my_addressee = my_addressee
self.__stop_event = asyncio.Event()
self.__main_task = None

def stop(self):
self.__stop_event.set()

async def wait_till_stops(self):
if self.__main_task is None:
raise RuntimeError('not started')
await self.__main_task

def start(self):
self.__main_task = asyncio.create_task(self.__control_signal_processor())

async def __control_signal_processor(self):
control_waiter = asyncio.get_event_loop().run_in_executor(None, lifeblood_connection._message_to_invocation_receive_blocking, self.__my_addressee)
stop_waiter = asyncio.create_task(self.__stop_event.wait())

while True:
done, pend = await asyncio.wait([control_waiter, stop_waiter], return_when=asyncio.FIRST_COMPLETED)
if control_waiter in done:
src, message = await control_waiter
print(f'got control message: "{message.decode("latin1")}" from iid:{src}')
if message == b'stop':
break
control_waiter = asyncio.get_event_loop().run_in_executor(None, lifeblood_connection._message_to_invocation_receive_blocking, self.__my_addressee)
if stop_waiter in done:
break

if not control_waiter.done():
control_waiter.cancel()
if not stop_waiter.done():
stop_waiter.cancel()


async def main(port: int, webport: int, my_addressee: str, attr_file_path: str):
# need to find houdini's python.
# we cannot rely on hython being actual hython and not a wrapper
# first we check obvious place, then actually call hython
simtracker_path = None
hfs_hou = Path(shutil.which('hython')).parent.parent / 'houdini'
if hfs_hou.exists():
for elem in hfs_hou.iterdir():
if re.match(r'^python\d\.\d+libs$', elem.name) and (maybepath := elem / 'simtracker.py').exists():
simtracker_path = str(maybepath)
break

if simtracker_path is None:
fd, tpath = tempfile.mkstemp('.txt')
tproc = await asyncio.create_subprocess_exec('hython', '-c', f'import simtracker;f=open({repr(tpath)},"w");f.write(simtracker.__file__);f.close()')
tproc_exit_code = await tproc.wait()
with open(tpath, 'r') as f:
simtracker_path = f.read()
os.close(fd)
os.unlink(tpath)
if tproc_exit_code != 0:
print('FAILED to find simtracker')
return tproc_exit_code

signal_processor = ControlSignalProcessor(my_addressee)
signal_processor.start()
print('signal handler started')

ip = lifeblood_connection.get_host_ip()
# find free ports
port = lifeblood_connection.get_free_tcp_port(ip, port)
webport = lifeblood_connection.get_free_tcp_port(ip, webport)
if webport == port:
webport = lifeblood_connection.get_free_tcp_port(ip, webport + 1)
print(f'found free ports: {port}, {webport}')

# at this point we have free ports, but by the time we start our servers - ports might get taken
# TODO: maybe add option to add '-v' flag for debugging
proc = await asyncio.create_subprocess_exec('python', simtracker_path, str(port), str(webport))
print('simtracker started')

with open(attr_file_path, 'r') as f:
attrs = json.load(f)
attrs['simtracker_host'] = ip
attrs['simtracker_port'] = port
attrs['tracker_control_iid'] = lifeblood_connection.get_my_invocation_id()
attrs['tracker_control_addressee'] = my_addressee
lifeblood_connection.create_task('spawned task', attrs)

proc_end_waiter = asyncio.create_task(proc.wait())
signal_processor_waiter = asyncio.create_task(signal_processor.wait_till_stops())

done, pend = await asyncio.wait([proc_end_waiter, signal_processor_waiter], return_when=asyncio.FIRST_COMPLETED)
print(f'done, exiting!')
for task in pend:
task.cancel()

proc_error = proc.returncode # if simtracker process had error - return it, otherwise we terminate it and don't care about it's return code
if proc_error is None:
print(f'terminating process {proc.pid}')
proc.terminate()
await proc.wait()
print('wait done')
print('closed simtracker')
signal_processor.stop()
await signal_processor.wait_till_stops()
print('closed server')

return proc_error or 0 # read above - None counts as good as we terminate it ourselves


_port, _webport = [int(x) for x in sys.argv[1:3]]
sys.exit(asyncio.get_event_loop().run_until_complete(main(_port, _webport, sys.argv[3], sys.argv[4])))
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import re
import inspect

import json
import uuid
from lifeblood.basenode import BaseNode
from lifeblood.nodethings import ProcessingResult, InvocationJob
from lifeblood.invocationjob import InvocationRequirements
from lifeblood.enums import NodeParameterType, WorkerType
from lifeblood.text import filter_by_pattern


from typing import Iterable

Expand All @@ -14,71 +13,6 @@ def node_class():
return HoudiniDistributedTracker


async def do(port, webport, killport):
import asyncio
import os
import signal
import socket
import lifeblood_connection
import json

close_event = asyncio.Event()

async def conn_cb(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
await reader.readexactly(1)
print('got termination request')
writer.close()
close_event.set()

ip = lifeblood_connection.get_host_ip()
# find free ports
port = lifeblood_connection.get_free_tcp_port(ip, port)
webport = lifeblood_connection.get_free_tcp_port(ip, webport)
if webport == port:
webport = lifeblood_connection.get_free_tcp_port(ip, webport + 1)
killport = lifeblood_connection.get_free_tcp_port(ip, killport)
if killport == port:
killport = lifeblood_connection.get_free_tcp_port(ip, killport + 1)
if killport == webport: # NOTE: port < webport, always, so we need 2 independent checks. worst case both conditions may be true
killport = lifeblood_connection.get_free_tcp_port(ip, killport + 1)

# at this point we have free ports, but by the time we start our servers - ports might get taken

server = await asyncio.start_server(conn_cb, port=killport, family=socket.AF_INET)
proc = await asyncio.create_subprocess_exec('hython', '-m', 'simtracker', '-v', str(port), str(webport))

async def server_close_waiter(server: asyncio.AbstractServer):
await close_event.wait()
server.close()
await server.wait_closed()

async def proc_end_waiter(proc: asyncio.subprocess.Process):
return await proc.wait()

await server.start_serving()

attrs = json.loads(os.environ['LBATTRS_JSON'])
attrs['simtracker_host'] = ip
attrs['simtracker_port'] = port
attrs['tracker kill port'] = killport
lifeblood_connection.create_task('spawned task', attrs)
await asyncio.wait([proc_end_waiter(proc), server_close_waiter(server)], return_when=asyncio.FIRST_COMPLETED)
print(f'done, exiting!')

proc_error = proc.returncode # if simtracker process had error - return it, otherwise we terminate it and don't care about it's return code
if proc_error is None:
proc.terminate()
await proc.wait()
print('wait done')
print('closed simtracker')
if server.is_serving():
server.close()
await server.wait_closed()
print('closed server')

return proc_error or 0 # read above - None counts as good as we terminate it ourselves


class HoudiniDistributedTracker(BaseNode):
@classmethod
def label(cls) -> str:
Expand Down Expand Up @@ -110,18 +44,14 @@ def __init__(self, name: str):
ui.add_output('spawned')
ui.add_parameter('port', 'sim port', NodeParameterType.INT, 19375)
ui.add_parameter('wport', 'web port', NodeParameterType.INT, 19376)
ui.add_parameter('kport', 'kill port', NodeParameterType.INT, 19377)
ui.add_parameter('use helper', 'run on scheduler helper', NodeParameterType.BOOL, True)

def process_task(self, context) -> ProcessingResult:
code = 'import sys\n' \
'import asyncio\n\n'
code += inspect.getsource(do)
code += '\n' \
'port, webport, killport = [int(x) for x in sys.argv[1:4]]\n' \
'sys.exit(asyncio.get_event_loop().run_until_complete(do(port, webport, killport)))\n'
invoc = InvocationJob(['python', ':/work_to_do.py', context.param_value('port'), context.param_value('wport'), context.param_value('kport')])
code = (self.my_plugin().package_data() / 'server.py').read_text()
addressee_name = str(uuid.uuid4())
invoc = InvocationJob(['python', ':/work_to_do.py', context.param_value('port'), context.param_value('wport'), addressee_name, ':/task_base_attrs.json'])
invoc.set_extra_file('work_to_do.py', code)
invoc.set_extra_file('task_base_attrs.json', json.dumps(dict(context.task_attributes())))

if context.param_value('use helper'):
invoc.set_requirements(InvocationRequirements(worker_type=WorkerType.SCHEDULER_HELPER))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import socket
import sys

from lifeblood.basenode import BaseNode
from lifeblood.nodethings import ProcessingResult, InvocationJob
from lifeblood.invocationjob import InvocationRequirements
from lifeblood.enums import NodeParameterType, WorkerType
from lifeblood.taskspawn import TaskSpawn

from typing import Iterable

Expand All @@ -31,18 +30,20 @@ def __init__(self, name: str):
ui = self.get_ui()
with ui.initializing_interface_lock():
ui.color_scheme().set_main_color(0.5, 0.25, 0.125)
ui.add_parameter('host', 'sim tracker host', NodeParameterType.STRING, "`task['SIMTRACKER_HOST']`")
ui.add_parameter('kport', 'kill port', NodeParameterType.INT, 19377) # TODO: make default to be expression returning attrib kill port
#ui.add_parameter('use helper', 'run on scheduler helper', NodeParameterType.BOOL, True)
ui.add_parameter('target_iid', 'simtracker invocation id', NodeParameterType.INT, 0).set_expression("task['tracker_control_iid']")
ui.add_parameter('target_addressee', 'simtracker identifier', NodeParameterType.STRING, "`task['tracker_control_addressee']`")
ui.add_parameter('message_timeout', 'timeout', NodeParameterType.FLOAT, 90)

def process_task(self, context) -> ProcessingResult:
addr = (context.param_value('host'), context.param_value('kport'))
s = socket.socket()
s.connect(addr)
try:
s.sendall(b'\0')
finally:
s.close()

res = ProcessingResult()
invoc = InvocationJob([
sys.executable, # we can use sys.executable only because we run killer only on SCHEDULER_HELPERs
':/work_to_do.py',
str(context.param_value('target_iid')),
str(context.param_value('target_addressee')),
str(context.param_value('message_timeout')),
])
code = (self.my_plugin().package_data() / 'killer.py').read_text()
invoc.set_extra_file('work_to_do.py', code)
invoc.set_requirements(InvocationRequirements(worker_type=WorkerType.SCHEDULER_HELPER))
res = ProcessingResult(invoc)
return res
Loading

0 comments on commit 7923020

Please sign in to comment.