#!/usr/bin/env python3 """ simple client for que.run """ import argparse import urllib.request as request import urllib.parse import time import subprocess import sys """ example desktop notifications in linux: terminal 1: $ que --poll bs/desktop --then "notify-send '\que' '\msg'" terminal 2: $ echo "hey there" | que bs/desktop - When terminal 2 succeeds, terminal 1 will print "hey there", then call the `notify-send` command, which displays a notification toast in linux with title `bs/desktop` and content `hey there`. On macOS you could use: osascript -e 'display notification "\msg" with title "\que"' in place of notify-send.""" # set to something ridiculously high so we don't run into timeouts while polling # or waiting for a message MAX_TIMEOUT = 100000000 def main(argv=None): cli = argparse.ArgumentParser(description=__doc__) cli.add_argument( "--host", default="http://dev.simatime.com:3000", help="where que-server is running", ) cli.add_argument( "--poll", default=False, action="store_true", help="stream data from the que" ) cli.add_argument( "--then", help="when polling, run this shell command after each response, replacing '{}' in the command with the body of the response", ) cli.add_argument( "target", help="namespace and path of the que, like 'ns/path/subpath'" ) cli.add_argument( "infile", nargs="?", type=argparse.FileType("r"), help="data to put on the que. Use '-' for stdin, otherwise should be a readable file", ) if argv is None: args = cli.parse_args() else: args = cli.parse_args(argv) if args.infile: # send input data data = args.infile.read().encode("utf-8").strip() with request.urlopen( f"{args.host}/{args.target}", data=data, timeout=MAX_TIMEOUT ) as req: print(req.status) else: # no input data, do a read instead params = urllib.parse.urlencode({"poll": args.poll}) url = f"{args.host}/{args.target}?{params}" with request.urlopen(url) as req: if args.poll: while not time.sleep(1): msg = req.readline().decode("utf-8").strip() print(msg) if args.then: subprocess.run( args.then.replace("\msg", msg).replace("\que", args.target), shell=True, ) else: print(req.read().decode("utf-8").strip()) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Interrupted") sys.exit(0)