#!/usr/bin/python3 import sys import os import o11 import base64 import json user=o11.parse_params(sys.argv, 'user') password=o11.parse_params(sys.argv, 'password') id=o11.parse_params(sys.argv, 'id') action=o11.parse_params(sys.argv, 'action') doh=o11.parse_params(sys.argv, 'doh') bind=o11.parse_params(sys.argv, 'bind') proxy=o11.parse_params(sys.argv, 'proxy') drm=o11.parse_params(sys.argv, 'drm') challenge=o11.parse_params(sys.argv, 'challenge') pssh=o11.parse_params(sys.argv, 'pssh') heartbeaturl=o11.parse_params(sys.argv, 'heartbeaturl') heartbeatparams=o11.parse_params(sys.argv, 'heartbeatparams') o11Session = o11.session(bind=bind, proxy=proxy) req = o11Session.get_session() if doh != "": o11.dns(doh) if challenge == "cert": challenge = "CAQ=" authFile = '/example.tokens' headers = {} def pair(): print("pairing...") response = req.get() try: code = response.json()['code'] print("Please go to ... and enter code " + code) except: print(response.text) sys.exit() while True: response = req.get() try: response.json()['token'] json.dump(response.json(), open(os.path.abspath(os.path.dirname(__file__)) + authFile, 'w')) print("pairing done") break except: pass time.sleep(1) def login(): print("logging in...", file=sys.stderr) # Get token response = req.get() try: response.json()['token'] json.dump(response.json(), open(os.path.abspath(os.path.dirname(__file__)) + authFile, 'w')) except: print(response.text) sys.exit() print("logged in", file=sys.stderr) def do_action(): if action == "pair": pair() sys.exit() try: auth = json.load(open(os.path.abspath(os.path.dirname(__file__)) + authFile)) except: return "error" if action == "channels": output = {} output['Channels'] = [] response = req.get() try: for chan in response.json(): channel = {} channel['Name'] = chan['Name'] channel['Mode'] = "live" channel['SessionManifest'] = True channel['ManifestScript'] = 'id=' + str(chan['Id']) channel['CdmType'] = "widevine" channel['UseCdm'] = True channel['Cdm'] = 'id=' + str(chan['Id']) channel['Video'] = 'best' channel['OnDemand'] = True channel['SpeedUp'] = True output['Channels'].append(channel) print(json.dumps(output, indent=2)) except: print(response.text, file=sys.stderr) return "error" elif action == "events": output = {} output['Events'] = [] response = req.get() try: for ev in response.json(): event = {} event['Name'] = ev['Name'] event['Mode'] = "live" event['SessionManifest'] = True event['ManifestScript'] = 'example id=' + str(ev['Id']) event['CdmType'] = "widevine" event['UseCdm'] = True event['Cdm'] = 'example id=' + str(ev['Id']) event['Video'] = 'best' date_object = datetime.datetime.strptime(ev['startDate'], "%Y-%m-%dT%H:%M:%S.000Z") date_utc = date_object.replace(tzinfo=pytz.UTC) event['Start'] = int(date_utc.timestamp()) date_object = datetime.datetime.strptime(ev['endDate'], "%Y-%m-%dT%H:%M:%S.000Z") date_utc = date_object.replace(tzinfo=pytz.UTC) event['End'] = int(date_utc.timestamp()) output['Events'].append(event) print(json.dumps(output, indent=2)) except: print(response.text, file=sys.stderr) return "error" elif action == "heartbeat": sys.exit() elif action == "manifest": response = req.get("", headers=headers) try: output = { "ManifestUrl" : response.json()['mpdUrl'], "Headers": { "Manifest": { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36' }, "Media": { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36' } }, "Heartbeat": { "Url" : '', "Params": '', "PeriodMs" : 5*60*1000 } } print(json.dumps(output)) except: print(response.text, file=sys.stderr) return "error" elif action == "cdm": response = req.post(response.json()['licUrl'], headers=headers, data=base64.b64decode(challenge)) response_b64 = str(base64.b64encode(response.content), 'ascii') if response_b64.startswith('CAIS'): print(response_b64) else: print(response.text, file=sys.stderr) return "error" else: print("invalid action: " + action) if do_action() == "error": login() do_action()