diff --git a/dcc_plugins_workspace/houdini/presets/Driver/lifeblood-lifeblood_submitter-1.0.0.idx b/dcc_plugins_workspace/houdini/presets/Driver/lifeblood-lifeblood_submitter-1.0.0.idx index ffd54ec5..20278cb8 100644 Binary files a/dcc_plugins_workspace/houdini/presets/Driver/lifeblood-lifeblood_submitter-1.0.0.idx and b/dcc_plugins_workspace/houdini/presets/Driver/lifeblood-lifeblood_submitter-1.0.0.idx differ diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/data/killer.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/killer.py new file mode 100644 index 00000000..f443a4da --- /dev/null +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/killer.py @@ -0,0 +1,10 @@ +import sys +import lifeblood_connection + + +def main(invocation_iid: int, addressee: str, timeout: float = 90): + lifeblood_connection.message_to_invocation_send(invocation_iid, addressee, b'stop', timeout) + + +if __name__ == '__main__': + main(int(sys.argv[1]), sys.argv[2], float(sys.argv[3])) diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py new file mode 100644 index 00000000..50c7b389 --- /dev/null +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py @@ -0,0 +1,121 @@ +import sys +import os +from pathlib import Path +import re +import asyncio +import lifeblood_connection +import json +import shutil +import tempfile + +class ControlSignalProcessor: + def __init__(self, my_addressee: str): + self.__my_addressee = my_addressee + self.__stop_event = asyncio.Event() + self.__main_task = None + + def stop(self): + self.__stop_event.set() + + async def wait_till_stops(self): + if self.__main_task is None: + raise RuntimeError('not started') + await self.__main_task + + def start(self): + self.__main_task = asyncio.create_task(self.__control_signal_processor()) + + async def __control_signal_processor(self): + control_waiter = asyncio.get_event_loop().run_in_executor(None, lifeblood_connection._message_to_invocation_receive_blocking, self.__my_addressee) + stop_waiter = asyncio.create_task(self.__stop_event.wait()) + + while True: + done, pend = await asyncio.wait([control_waiter, stop_waiter], return_when=asyncio.FIRST_COMPLETED) + if control_waiter in done: + src, message = await control_waiter + print(f'got control message: "{message.decode("latin1")}" from iid:{src}') + if message == b'stop': + break + control_waiter = asyncio.get_event_loop().run_in_executor(None, lifeblood_connection._message_to_invocation_receive_blocking, self.__my_addressee) + if stop_waiter in done: + break + + if not control_waiter.done(): + control_waiter.cancel() + if not stop_waiter.done(): + stop_waiter.cancel() + + +async def main(port: int, webport: int, my_addressee: str, attr_file_path: str): + # need to find houdini's python. + # we cannot rely on hython being actual hython and not a wrapper + # first we check obvious place, then actually call hython + simtracker_path = None + hfs_hou = Path(shutil.which('hython')).parent.parent / 'houdini' + if hfs_hou.exists(): + for elem in hfs_hou.iterdir(): + if re.match(r'^python\d\.\d+libs$', elem.name) and (maybepath := elem / 'simtracker.py').exists(): + simtracker_path = str(maybepath) + break + + if simtracker_path is None: + fd, tpath = tempfile.mkstemp('.txt') + tproc = await asyncio.create_subprocess_exec('hython', '-c', f'import simtracker;f=open({repr(tpath)},"w");f.write(simtracker.__file__);f.close()') + tproc_exit_code = await tproc.wait() + with open(tpath, 'r') as f: + simtracker_path = f.read() + os.close(fd) + os.unlink(tpath) + if tproc_exit_code != 0: + print('FAILED to find simtracker') + return tproc_exit_code + + signal_processor = ControlSignalProcessor(my_addressee) + signal_processor.start() + print('signal handler started') + + ip = lifeblood_connection.get_host_ip() + # find free ports + port = lifeblood_connection.get_free_tcp_port(ip, port) + webport = lifeblood_connection.get_free_tcp_port(ip, webport) + if webport == port: + webport = lifeblood_connection.get_free_tcp_port(ip, webport + 1) + print(f'found free ports: {port}, {webport}') + + # at this point we have free ports, but by the time we start our servers - ports might get taken + # TODO: maybe add option to add '-v' flag for debugging + proc = await asyncio.create_subprocess_exec('python', simtracker_path, str(port), str(webport)) + print('simtracker started') + + with open(attr_file_path, 'r') as f: + attrs = json.load(f) + attrs['simtracker_host'] = ip + attrs['simtracker_port'] = port + attrs['tracker_control_iid'] = lifeblood_connection.get_my_invocation_id() + attrs['tracker_control_addressee'] = my_addressee + lifeblood_connection.create_task('spawned task', attrs) + + proc_end_waiter = asyncio.create_task(proc.wait()) + signal_processor_waiter = asyncio.create_task(signal_processor.wait_till_stops()) + + done, pend = await asyncio.wait([proc_end_waiter, signal_processor_waiter], return_when=asyncio.FIRST_COMPLETED) + print(f'done, exiting!') + for task in pend: + task.cancel() + + proc_error = proc.returncode # if simtracker process had error - return it, otherwise we terminate it and don't care about it's return code + if proc_error is None: + print(f'terminating process {proc.pid}') + proc.terminate() + await proc.wait() + print('wait done') + print('closed simtracker') + signal_processor.stop() + await signal_processor.wait_till_stops() + print('closed server') + + return proc_error or 0 # read above - None counts as good as we terminate it ourselves + + +_port, _webport = [int(x) for x in sys.argv[1:3]] +sys.exit(asyncio.get_event_loop().run_until_complete(main(_port, _webport, sys.argv[3], sys.argv[4]))) diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py index 06d07878..977115c9 100644 --- a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py @@ -1,11 +1,10 @@ -import re -import inspect - +import json +import uuid from lifeblood.basenode import BaseNode from lifeblood.nodethings import ProcessingResult, InvocationJob from lifeblood.invocationjob import InvocationRequirements from lifeblood.enums import NodeParameterType, WorkerType -from lifeblood.text import filter_by_pattern + from typing import Iterable @@ -14,71 +13,6 @@ def node_class(): return HoudiniDistributedTracker -async def do(port, webport, killport): - import asyncio - import os - import signal - import socket - import lifeblood_connection - import json - - close_event = asyncio.Event() - - async def conn_cb(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - await reader.readexactly(1) - print('got termination request') - writer.close() - close_event.set() - - ip = lifeblood_connection.get_host_ip() - # find free ports - port = lifeblood_connection.get_free_tcp_port(ip, port) - webport = lifeblood_connection.get_free_tcp_port(ip, webport) - if webport == port: - webport = lifeblood_connection.get_free_tcp_port(ip, webport + 1) - killport = lifeblood_connection.get_free_tcp_port(ip, killport) - if killport == port: - killport = lifeblood_connection.get_free_tcp_port(ip, killport + 1) - if killport == webport: # NOTE: port < webport, always, so we need 2 independent checks. worst case both conditions may be true - killport = lifeblood_connection.get_free_tcp_port(ip, killport + 1) - - # at this point we have free ports, but by the time we start our servers - ports might get taken - - server = await asyncio.start_server(conn_cb, port=killport, family=socket.AF_INET) - proc = await asyncio.create_subprocess_exec('hython', '-m', 'simtracker', '-v', str(port), str(webport)) - - async def server_close_waiter(server: asyncio.AbstractServer): - await close_event.wait() - server.close() - await server.wait_closed() - - async def proc_end_waiter(proc: asyncio.subprocess.Process): - return await proc.wait() - - await server.start_serving() - - attrs = json.loads(os.environ['LBATTRS_JSON']) - attrs['simtracker_host'] = ip - attrs['simtracker_port'] = port - attrs['tracker kill port'] = killport - lifeblood_connection.create_task('spawned task', attrs) - await asyncio.wait([proc_end_waiter(proc), server_close_waiter(server)], return_when=asyncio.FIRST_COMPLETED) - print(f'done, exiting!') - - proc_error = proc.returncode # if simtracker process had error - return it, otherwise we terminate it and don't care about it's return code - if proc_error is None: - proc.terminate() - await proc.wait() - print('wait done') - print('closed simtracker') - if server.is_serving(): - server.close() - await server.wait_closed() - print('closed server') - - return proc_error or 0 # read above - None counts as good as we terminate it ourselves - - class HoudiniDistributedTracker(BaseNode): @classmethod def label(cls) -> str: @@ -110,18 +44,14 @@ def __init__(self, name: str): ui.add_output('spawned') ui.add_parameter('port', 'sim port', NodeParameterType.INT, 19375) ui.add_parameter('wport', 'web port', NodeParameterType.INT, 19376) - ui.add_parameter('kport', 'kill port', NodeParameterType.INT, 19377) ui.add_parameter('use helper', 'run on scheduler helper', NodeParameterType.BOOL, True) def process_task(self, context) -> ProcessingResult: - code = 'import sys\n' \ - 'import asyncio\n\n' - code += inspect.getsource(do) - code += '\n' \ - 'port, webport, killport = [int(x) for x in sys.argv[1:4]]\n' \ - 'sys.exit(asyncio.get_event_loop().run_until_complete(do(port, webport, killport)))\n' - invoc = InvocationJob(['python', ':/work_to_do.py', context.param_value('port'), context.param_value('wport'), context.param_value('kport')]) + code = (self.my_plugin().package_data() / 'server.py').read_text() + addressee_name = str(uuid.uuid4()) + invoc = InvocationJob(['python', ':/work_to_do.py', context.param_value('port'), context.param_value('wport'), addressee_name, ':/task_base_attrs.json']) invoc.set_extra_file('work_to_do.py', code) + invoc.set_extra_file('task_base_attrs.json', json.dumps(dict(context.task_attributes()))) if context.param_value('use helper'): invoc.set_requirements(InvocationRequirements(worker_type=WorkerType.SCHEDULER_HELPER)) diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_stopper.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_stopper.py index c85de60d..9eb4ce91 100644 --- a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_stopper.py +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_stopper.py @@ -1,10 +1,9 @@ -import socket +import sys from lifeblood.basenode import BaseNode from lifeblood.nodethings import ProcessingResult, InvocationJob from lifeblood.invocationjob import InvocationRequirements from lifeblood.enums import NodeParameterType, WorkerType -from lifeblood.taskspawn import TaskSpawn from typing import Iterable @@ -31,18 +30,20 @@ def __init__(self, name: str): ui = self.get_ui() with ui.initializing_interface_lock(): ui.color_scheme().set_main_color(0.5, 0.25, 0.125) - ui.add_parameter('host', 'sim tracker host', NodeParameterType.STRING, "`task['SIMTRACKER_HOST']`") - ui.add_parameter('kport', 'kill port', NodeParameterType.INT, 19377) # TODO: make default to be expression returning attrib kill port - #ui.add_parameter('use helper', 'run on scheduler helper', NodeParameterType.BOOL, True) + ui.add_parameter('target_iid', 'simtracker invocation id', NodeParameterType.INT, 0).set_expression("task['tracker_control_iid']") + ui.add_parameter('target_addressee', 'simtracker identifier', NodeParameterType.STRING, "`task['tracker_control_addressee']`") + ui.add_parameter('message_timeout', 'timeout', NodeParameterType.FLOAT, 90) def process_task(self, context) -> ProcessingResult: - addr = (context.param_value('host'), context.param_value('kport')) - s = socket.socket() - s.connect(addr) - try: - s.sendall(b'\0') - finally: - s.close() - - res = ProcessingResult() + invoc = InvocationJob([ + sys.executable, # we can use sys.executable only because we run killer only on SCHEDULER_HELPERs + ':/work_to_do.py', + str(context.param_value('target_iid')), + str(context.param_value('target_addressee')), + str(context.param_value('message_timeout')), + ]) + code = (self.my_plugin().package_data() / 'killer.py').read_text() + invoc.set_extra_file('work_to_do.py', code) + invoc.set_requirements(InvocationRequirements(worker_type=WorkerType.SCHEDULER_HELPER)) + res = ProcessingResult(invoc) return res diff --git a/tools/idx_editor.py b/tools/idx_editor.py index 24e9e132..fd5ce55e 100644 --- a/tools/idx_editor.py +++ b/tools/idx_editor.py @@ -1,6 +1,6 @@ import struct -from typing import List, Optional +from typing import List, Optional, Tuple class Entry: @@ -93,6 +93,91 @@ def parse_index_root(cls, data: bytes): return index +class CpioSection: + def __init__(self, headerstart: bytes, timestamp: int, name: bytes, data: bytes): + self.__headerstart = headerstart + self.__timestamp = timestamp + self.__name = name + self.__data = data + + @classmethod + def empty(cls) -> "CpioSection": + return CpioSection( + b'070707000001000000000666000000000000000001000000', + 1, + b'', + b'', + ) + + def set_name(self, name: bytes): + self.__name = name + + def set_data(self, data: bytes): + self.__data = data + + @classmethod + def from_bytes(cls, data: bytes) -> Tuple["CpioSection", bytes]: + """ + returns section and remainings of data that does not belong to the section returned + """ + header = data[:76] + headerstart = header[:48] + mtime = int(header[48:59], base=8) + name_size = int(header[59:65], base=8) + body_size = int(header[65:], base=8) + name = data[76:76+name_size-1] # -1 cuz strings are 0-terminated + sec_data = data[76+name_size:76+name_size+body_size] + rest = data[76+name_size+body_size:] + + return CpioSection(headerstart, mtime, name, sec_data), rest + + def to_bytes(self) -> bytes: + return b''.join(( + self.__headerstart, + b'%011o' % self.__timestamp, + b'%06o' % (len(self.__name) + 1), # for zero end + b'%011o' % len(self.__data), + self.__name, b'\0', + self.__data + )) + + def name(self) -> bytes: + return self.__name + + def data(self) -> bytes: + return self.__data + + +def split_curly_block(data: bytes) -> Tuple[bytes, bytes]: + view = memoryview(data) + assert view[0:1] == b'{' + brackets = 1 + in_string = False + escaping = False + for i in range(1, len(view)): + c = view[i:i+1] + if not in_string and not escaping: + if c == b'{': + brackets += 1 + elif c == b'}': + brackets -= 1 + elif c == b'"': + in_string = True + elif in_string and not escaping and c == b'"': + in_string = False + + if not escaping and c == b'\\': + escaping = True + elif escaping: + escaping = False + + if brackets == 0: + if data[i+1:i+2] == b'\n': # add newline to data if it's next symbol + i += 1 + return data[:i+1], data[i+1:] + raise ValueError('input data is malformed') + + def bytes_to_text(data: bytes) -> str: partlist = [] for c in data: @@ -103,16 +188,16 @@ def bytes_to_text(data: bytes) -> str: else: partlist.append(sc) else: - partlist.append(rf'\x{c:02}') + partlist.append(rf'\x{c:02x}') return ''.join(partlist) -def text_to_bytes(text: str) -> bytes: +def text_to_bytes(text: str) -> bytes: # TODO: wtd are these? at least add unit tests partlist = [] escaping = False hexcode = None - for c in text: + for i, c in enumerate(text): if hexcode is not None: if hexcode < 0: # first symbol hexcode = int(c, base=16) * 16 @@ -127,7 +212,7 @@ def text_to_bytes(text: str) -> bytes: elif c == 'x': hexcode = -1 else: - raise RuntimeError('should not happen!') + raise RuntimeError(f'should not happen: escaping "{c}" at {i}') else: if c == '\\': escaping = True @@ -137,11 +222,77 @@ def text_to_bytes(text: str) -> bytes: from PySide2.QtWidgets import QWidget, QHBoxLayout, QVBoxLayout, QTabWidget, QTextEdit, QPushButton, QFileDialog,\ - QInputDialog, QMessageBox + QInputDialog, QMessageBox, QLineEdit from PySide2.QtGui import QFont import string +class SectionDisplay(QWidget): + def __init__(self, parent=None): + super(SectionDisplay, self).__init__(parent) + layout = QVBoxLayout(self) + self.__section_tabs = QTabWidget() + self.__header_edit = QTextEdit() + self.__def_section_edit = QTextEdit() + self.__chan_section_edit = QTextEdit() + self.__val_section_edit = QTextEdit() + self.__def_section: CpioSection = CpioSection.empty() + self.__find_edit = QLineEdit() + + font = QFont('monospace') + font.setFixedPitch(True) + self.__header_edit.setFont(font) + self.__def_section_edit.setFont(font) + self.__chan_section_edit.setFont(font) + self.__val_section_edit.setFont(font) + self.__find_edit.setPlaceholderText('find in text') + + self.__section_tabs.addTab(self.__header_edit, 'header') + self.__section_tabs.addTab(self.__def_section_edit, 'opspareparmdef') + self.__section_tabs.addTab(self.__chan_section_edit, 'opchannels') + self.__section_tabs.addTab(self.__val_section_edit, 'opvalues') + layout.addWidget(self.__find_edit) + layout.addWidget(self.__section_tabs) + + self.__find_edit.editingFinished.connect(self.__find_in_tab_text) + + def __find_in_tab_text(self): + editor: QTextEdit = self.__section_tabs.currentWidget() + editor.find(self.__find_edit.text()) + + def set_section_data(self, data: bytes): + parts = data.split(b'\n', 2) + self.__header_edit.setText(bytes_to_text(b'\n'.join(parts[:2]))) + + sec_to_edit = { + b'opspareparmdef': self.__def_section_edit, + b'opchannels': self.__chan_section_edit, + b'opvalues': self.__val_section_edit, + } + + data = parts[2] + while data: + sec_name, sec_data = data.lstrip().split(b'\n', 1) + if sec_name == b'opspareparmdef': + self.__def_section, data = CpioSection.from_bytes(sec_data) + sec_to_edit[sec_name].setText(bytes_to_text(self.__def_section.data())) + else: + cur_data, data = split_curly_block(sec_data) + sec_to_edit[sec_name].setText(bytes_to_text(cur_data)) + + def get_section_data(self): + self.__def_section.set_data(text_to_bytes(self.__def_section_edit.toPlainText())) + return b'\n'.join(( + text_to_bytes(self.__header_edit.toPlainText()), + b'opspareparmdef', + self.__def_section.to_bytes(), + b'opchannels', + text_to_bytes(self.__chan_section_edit.toPlainText()), + b'opvalues', + text_to_bytes(self.__val_section_edit.toPlainText()), + )) + + class Editor(QWidget): def __init__(self, parent=None): super(Editor, self).__init__(parent) @@ -182,11 +333,8 @@ def clear(self): self.__tabs.clear() self.__index = Index() - def _new_tab_widget(self) -> QTextEdit: - tab = QTextEdit() - font = QFont('monospace') - font.setFixedPitch(True) - tab.setFont(font) + def _new_tab_widget(self) -> SectionDisplay: + tab = SectionDisplay() return tab def load_file(self, filepath): @@ -199,14 +347,15 @@ def load_file(self, filepath): text = bytes_to_text(entry.rawdata) assert entry.rawdata == text_to_bytes(text) - tab.setText(text) + + tab.set_section_data(entry.rawdata) self.__tabs.append(tab) self.__tabwidget.addTab(tab, entry.name.decode('ascii')) def save_file(self, filepath): for entry, tab in zip(self.__index.entries, self.__tabs): - entry.rawdata = text_to_bytes(tab.toPlainText()) + entry.rawdata = tab.get_section_data() with open(filepath, 'wb') as f: f.write(self.__index.serialize()) @@ -214,13 +363,19 @@ def _load_button_callback(self): filepath, _ = QFileDialog.getOpenFileName(self) if not filepath: return - self.load_file(filepath) + try: + self.load_file(filepath) + except Exception as e: + QMessageBox.warning(self, 'Error happened, file NOT loaded', f'exception happened: {str(e)}') def _save_button_callback(self): filepath, _ = QFileDialog.getSaveFileName(self) if not filepath: return - self.save_file(filepath) + try: + self.save_file(filepath) + except Exception as e: + QMessageBox.warning(self, 'Error happened, file NOT saved', f'exception happened: {str(e)}') def _newtab_callback(self): name, good =QInputDialog.getText(self, 'entry name', 'name of new preset')