From ff3a4d8b50857cbc4ba5e6bdac937cb700005ab9 Mon Sep 17 00:00:00 2001 From: Ryan Pavlik Date: Thu, 28 May 2020 13:09:16 -0500 Subject: [PATCH] ipc: Fix all Flake8 warnings --- src/xrt/ipc/proto.py | 228 +++++++++++++++++++++++++------------------ 1 file changed, 134 insertions(+), 94 deletions(-) diff --git a/src/xrt/ipc/proto.py b/src/xrt/ipc/proto.py index c67f07316..2f6d6d08f 100755 --- a/src/xrt/ipc/proto.py +++ b/src/xrt/ipc/proto.py @@ -1,46 +1,52 @@ #!/usr/bin/env python3 # Copyright 2020, Collabora, Ltd. # SPDX-License-Identifier: BSL-1.0 +"""Generate code from a JSON file describing the IPC protocol.""" import json import argparse class Arg: - @classmethod - def parseArray(cls, a): - ret = [] - for elm in a: - ret.append(cls(elm)) - return ret + """An IPC call argument.""" - def getFuncArgumentIn(self): - if self.isAggregate: + @classmethod + def parse_array(cls, a): + """Turn an array of data into an array of Arg objects.""" + return [cls(elm) for elm in a] + + def get_func_argument_in(self): + """Get the type and name of this argument as an input parameter.""" + if self.is_aggregate: return self.typename + " *" + self.name else: return self.typename + " " + self.name - def getFuncArgumentOut(self): + def get_func_argument_out(self): + """Get the type and name of this argument as an output parameter.""" return self.typename + " *out_" + self.name - def getStructField(self): + def get_struct_field(self): + """Get the type and name of this argument as a struct field.""" return self.typename + " " + self.name def dump(self): + """Dump human-readable output to standard out.""" print("\t\t" + self.typename + ": " + self.name) def __init__(self, data): + """Construct an argument.""" self.name = data['name'] self.typename = data['type'] - self.isAggregate = False + self.is_aggregate = False if self.typename.find("struct ") == 0: - self.isAggregate = True + self.is_aggregate = True if self.typename.find("union ") == 0: - self.isAggregate = True + self.is_aggregate = True -def _write_common(f, start, args, indent): - "Write something like a declaration or call." +def write_with_wrapped_args(f, start, args, indent): + """Write something like a declaration or call.""" f.write("\n" + indent) f.write(start) # For parameter indenting @@ -50,47 +56,53 @@ def _write_common(f, start, args, indent): def write_decl(f, return_type, function_name, args, indent=""): + """Write a function declaration/definition with wrapped arguments.""" f.write("\n" + indent) f.write(return_type) - _write_common(f, - "{}(".format(function_name), - args, - indent) + write_with_wrapped_args(f, + "{}(".format(function_name), + args, + indent) def write_invocation(f, return_val, function_name, args, indent=""): - _write_common(f, - "{} = {}(".format(return_val, function_name), - args, - indent) + """Write a function call with saved return value and wrapped arguments.""" + write_with_wrapped_args(f, + "{} = {}(".format(return_val, function_name), + args, + indent) class Call: + """A single IPC call.""" def dump(self): + """Dump human-readable output to standard out.""" print("Call " + self.name) - if self.inArgs: + if self.in_args: print("\tIn:") - for arg in self.inArgs: + for arg in self.in_args: arg.dump() - if self.outArgs: + if self.out_args: print("\tOut:") - for arg in self.outArgs: + for arg in self.out_args: arg.dump() - def writeCallDecl(self, f): + def write_call_decl(self, f): + """Write declaration of ipc_call_CALLNAME.""" args = ["struct ipc_connection *ipc_c"] - args.extend(arg.getFuncArgumentIn() for arg in self.inArgs) - args.extend(arg.getFuncArgumentOut() for arg in self.outArgs) - if self.outFds: + args.extend(arg.get_func_argument_in() for arg in self.in_args) + args.extend(arg.get_func_argument_out() for arg in self.out_args) + if self.out_fds: args.extend(("int *fds", "size_t num_fds")) write_decl(f, 'ipc_result_t', 'ipc_call_' + self.name, args) - def writeHandleDecl(self, f): + def write_handle_decl(self, f): + """Write declaration of ipc_handle_CALLNAME.""" args = ["volatile struct ipc_client_state *cs"] - args.extend(arg.getFuncArgumentIn() for arg in self.inArgs) - args.extend(arg.getFuncArgumentOut() for arg in self.outArgs) - if self.outFds: + args.extend(arg.get_func_argument_in() for arg in self.in_args) + args.extend(arg.get_func_argument_out() for arg in self.out_args) + if self.out_fds: args.extend(( "size_t max_num_fds", "int *out_fds", @@ -98,21 +110,21 @@ class Call: write_decl(f, 'ipc_result_t', 'ipc_handle_' + self.name, args) def __init__(self, name, data): - + """Construct a call from call name and call data dictionary.""" self.id = None self.name = name - self.inArgs = [] - self.outArgs = [] - self.outFds = False - for key in data: + self.in_args = [] + self.out_args = [] + self.out_fds = False + for key, val in data.items(): if key == 'id': - self.id = data[key] + self.id = val elif key == 'in': - self.inArgs = Arg.parseArray(data[key]) + self.in_args = Arg.parse_array(val) elif key == 'out': - self.outArgs = Arg.parseArray(data[key]) + self.out_args = Arg.parse_array(val) elif key == 'out_fds': - self.outFds = data[key] + self.out_fds = val else: raise RuntimeError("Unrecognized key") if not self.id: @@ -120,26 +132,27 @@ class Call: class Proto: + """An IPC protocol containing one or more calls.""" + @classmethod def parse(cls, data): + """Parse a dictionary defining a protocol into Call objects.""" return cls(data) @classmethod - def loadAndParse(cls, file): + def load_and_parse(cls, file): + """Load a JSON file and parse it into Call objects.""" with open(file) as infile: return cls.parse(json.loads(infile.read())) def dump(self): + """Dump human-readable output to standard out.""" for call in self.calls: call.dump() - def addCall(self, name, data): - self.calls.append(Call(name, data)) - def __init__(self, data): - self.calls = [] - for name, call in data.items(): - self.addCall(name, call) + """Construct a protocol from a dictionary of calls.""" + self.calls = [Call(name, call) for name, call in data.items()] header = '''// Copyright 2020, Collabora, Ltd. @@ -153,7 +166,11 @@ header = '''// Copyright 2020, Collabora, Ltd. ''' -def doH(file, p): +def generate_h(file, p): + """Generate protocol header. + + Defines command enum, utility functions, and command and reply structures. + """ f = open(file, "w") f.write(header.format(brief='Generated IPC protocol header', suffix='')) f.write(''' @@ -204,27 +221,28 @@ ipc_cmd_to_str(ipc_command_t id) for call in p.calls: # Should we emit a msg struct. - if call.inArgs: + if call.in_args: f.write("\nstruct ipc_" + call.name + "_msg\n") f.write("{\n") f.write("\tenum ipc_command cmd;\n") - for arg in call.inArgs: - f.write("\t" + arg.getStructField() + ";\n") + for arg in call.in_args: + f.write("\t" + arg.get_struct_field() + ";\n") f.write("};\n") # Should we emit a reply struct. - if call.outArgs: + if call.out_args: f.write("\nstruct ipc_" + call.name + "_reply\n") f.write("{\n") f.write("\tipc_result_t result;\n") - for arg in call.outArgs: - f.write("\t" + arg.getStructField() + ";\n") + for arg in call.out_args: + f.write("\t" + arg.get_struct_field() + ";\n") f.write("};\n") f.write("\n// clang-format on\n") f.close() -def doClientC(file, p): +def generate_client_c(file, p): + """Generate IPC client proxy source.""" f = open(file, "w") f.write(header.format(brief='Generated IPC client code', suffix='_client')) f.write(''' @@ -236,31 +254,31 @@ def doClientC(file, p): # Loop over all of the calls. for call in p.calls: - call.writeCallDecl(f) + call.write_call_decl(f) f.write("\n{\n") # Message struct - if call.inArgs: + if call.in_args: f.write("\tstruct ipc_" + call.name + "_msg _msg = {\n") else: f.write("\tstruct ipc_command_msg _msg = {\n") f.write("\t .cmd = " + str(call.id) + ",\n") - for arg in call.inArgs: - if arg.isAggregate: + for arg in call.in_args: + if arg.is_aggregate: f.write("\t ." + arg.name + " = *" + arg.name + ",\n") else: f.write("\t ." + arg.name + " = " + arg.name + ",\n") f.write("\t};\n") # Reply struct - if call.outArgs: + if call.out_args: f.write("\tstruct ipc_" + call.name + "_reply _reply;\n") else: f.write("\tstruct ipc_result_reply _reply = {0};\n") func = 'ipc_client_send_and_get_reply' args = ['ipc_c', '&_msg', 'sizeof(_msg)', '&_reply', 'sizeof(_reply)'] - if call.outFds: + if call.out_fds: func += '_fds' args.extend(('fds', 'num_fds')) write_invocation(f, 'ipc_result_t ret', func, args, indent="\t") @@ -270,14 +288,18 @@ def doClientC(file, p): \t\treturn ret; \t} \n''') - for arg in call.outArgs: + for arg in call.out_args: f.write("\t*out_" + arg.name + " = _reply." + arg.name + ";\n") f.write("\n\treturn _reply.result;\n}\n") f.write("\n// clang-format off\n") f.close() -def doClientH(file, p): +def generate_client_h(file, p): + """Generate IPC client header. + + Contains prototypes for generated IPC proxy call functions. + """ f = open(file, "w") f.write(header.format(brief='Generated IPC client code', suffix='_client')) f.write(''' @@ -292,13 +314,14 @@ def doClientH(file, p): ''') for call in p.calls: - call.writeCallDecl(f) + call.write_call_decl(f) f.write(";\n") f.write("\n// clang-format on\n") f.close() -def doServerC(file, p): +def generate_server_c(file, p): + """Generate IPC server stub/dispatch source.""" f = open(file, "w") f.write(header.format(brief='Generated IPC server code', suffix='_server')) f.write(''' @@ -322,38 +345,47 @@ ipc_dispatch(volatile struct ipc_client_state *cs, ipc_command_t *ipc_command) for call in p.calls: f.write("\tcase " + call.id + ": {\n") - if call.inArgs: - f.write("\t\tstruct ipc_" + call.name + "_msg *msg =\n") - f.write("\t\t (struct ipc_" + call.name + "_msg *)ipc_command;\n") - if call.outArgs: - f.write("\t\tstruct ipc_" + call.name + "_reply reply = {0};\n") + if call.in_args: + f.write( + "\t\tstruct ipc_{}_msg *msg =\n".format(call.name)) + f.write( + "\t\t (struct ipc_{}_msg *)ipc_command;\n".format( + call.name)) + if call.out_args: + f.write("\t\tstruct ipc_%s_reply reply = {0};\n" % call.name) else: f.write("\t\tstruct ipc_result_reply reply = {0};\n") - if call.outFds: + if call.out_fds: f.write("\t\tint fds[MAX_FDS] = {0};\n") f.write("\t\tsize_t num_fds = {0};\n") f.write("\n") # Write call to ipc_handle_CALLNAME args = ["cs"] - for arg in call.inArgs: + for arg in call.in_args: args.append(("&msg->" + arg.name) - if arg.isAggregate + if arg.is_aggregate else ("msg->" + arg.name)) - args.extend("&reply." + arg.name for arg in call.outArgs) - if call.outFds: + args.extend("&reply." + arg.name for arg in call.out_args) + if call.out_fds: args.extend(("MAX_FDS", "fds", "&num_fds",)) - write_invocation(f, 'reply.result', 'ipc_handle_' + call.name, args, indent="\t\t") + write_invocation(f, 'reply.result', 'ipc_handle_' + + call.name, args, indent="\t\t") f.write(";\n") - if call.outFds: + if call.out_fds: f.write( - "\t\treturn ipc_reply_fds(cs->ipc_socket_fd, &reply, sizeof(reply), fds, num_fds);\n") + "\t\t" + "return ipc_reply_fds(cs->ipc_socket_fd, " + "&reply, sizeof(reply), " + "fds, num_fds);\n") else: f.write( - "\t\treturn ipc_reply(cs->ipc_socket_fd, &reply, sizeof(reply));\n") + "\t\t" + "return ipc_reply(cs->ipc_socket_fd, " + "&reply, sizeof(reply));\n") f.write("\t}\n") f.write('''\tdefault: \t\tprintf("UNHANDLED IPC MESSAGE! %d\\n", *ipc_command); @@ -366,7 +398,12 @@ ipc_dispatch(volatile struct ipc_client_state *cs, ipc_command_t *ipc_command) f.close() -def doServerH(file, p): +def generate_server_header(file, p): + """Generate IPC server header. + + Declares handler prototypes to implement, + as well as the prototype for the generated dispatch function. + """ f = open(file, "w") f.write(header.format(brief='Generated IPC server code', suffix='_server')) f.write(''' @@ -383,32 +420,35 @@ ipc_dispatch(volatile struct ipc_client_state *cs, ipc_command_t *ipc_command); ''') for call in p.calls: - call.writeHandleDecl(f) + call.write_handle_decl(f) f.write(";\n") f.write("\n// clang-format on\n") f.close() def main(): + """Handle command line and generate a file.""" parser = argparse.ArgumentParser(description='Protocol generator.') - parser.add_argument('proto', help='Protocol file to use') - parser.add_argument('output', type=str, nargs='+', - help='Output file, uses the ending to figure out what file it should generate') + parser.add_argument( + 'proto', help='Protocol file to use') + parser.add_argument( + 'output', type=str, nargs='+', + help='Output file, uses the name to choose output type') args = parser.parse_args() - p = Proto.loadAndParse(args.proto) + p = Proto.load_and_parse(args.proto) for output in args.output: if output.endswith("ipc_protocol_generated.h"): - doH(output, p) + generate_h(output, p) if output.endswith("ipc_client_generated.c"): - doClientC(output, p) + generate_client_c(output, p) if output.endswith("ipc_client_generated.h"): - doClientH(output, p) + generate_client_h(output, p) if output.endswith("ipc_server_generated.c"): - doServerC(output, p) + generate_server_c(output, p) if output.endswith("ipc_server_generated.h"): - doServerH(output, p) + generate_server_header(output, p) if __name__ == "__main__":