#!/usr/bin/env python from dataclasses import dataclass from typing import List, Dict from math import floor from subprocess import run, PIPE from re import sub from json import load, dump UNI_SYNC_FILEPATH = "/etc/uni-sync/uni-sync.json" PROFILE_FILEPATH = "/etc/lian-li-fancontrol/curves.json" def get_temps(): return run(["sensors"], capture_output=True, text=True) \ .stdout \ .strip() \ .split("\n")\ @dataclass class TempCurve: sensor: str channel: int points: [(float, int)] def __init__(self, sensor: str, channel: int, points: [(float, int)]): assert(len(points) >= 3) assert(all(map(lambda p : p[0] >= 0 and p[0] <= 100, points))) assert(all(map(lambda p : p[1] >= 0 and p[1] <= 100, points))) assert(channel >= 0) self.sensor = sensor self.channel = channel self.points = list(sorted(points, key = lambda x: x[0])) if self.points[0][0] > 0: self.points = [(0, self.points[0][1])] + points if self.points[-1][0] < 100: self.points.append((100, self.points[-1][1])) @property def current_temp(self) -> float: temp = "" for line in get_temps(): if line.startswith(self.sensor): temp = line break temp = sub(f"{self.sensor}:.*?\\+", "", temp) temp = sub("°C.*", "", temp) return float(temp) @property def current_speed(self) -> int: return self.speed(self.current_temp) def speed(self, temp: float) -> int: # get upper and lower bounds lower = 0 for (index, (temp_point, speed)) in enumerate(self.points): if temp_point == temp: return speed elif temp_point > temp: lower = index - 1 break if lower == len(self.points) - 1: return self.points[lower][1] (low_temp, low_speed) = self.points[lower] (high_temp, high_speed) = self.points[lower + 1] return floor(low_speed + (((temp - low_temp) / (high_temp - low_temp)) * (high_speed - low_speed))) def read_curves(path: str) -> [TempCurve]: curves = [] with open(path, "r") as fp: json = load(fp) for obj in json: curves.append(TempCurve(obj["sensor"], obj["channel"], [(i["temp"], i["speed"]) for i in obj["curve"]])) return curves def compile_unisync(curves: [TempCurve]): # Setup the version of curves we want indexed_curves = dict() for curve in curves: indexed_curves[curve.channel] = curve.current_speed compiled_curves = [] for i in range(len(curves)): compiled_curves.append({"mode": "Manual", "speed": indexed_curves[i]}) # parse the current unisync current_config = None with open(UNI_SYNC_FILEPATH, "r") as fp: current_config = load(fp) current_config["configs"][0]["channels"] = compiled_curves return current_config def run_unisync(): return run(["uni-sync"]) if __name__ == '__main__': curves = read_curves(PROFILE_FILEPATH) for curve in curves: print(f"Channel [{curve.channel}]: Sensor={curve.sensor} => Speed={curve.current_speed} based on {curve.current_temp}") new_config = compile_unisync(curves) with open(UNI_SYNC_FILEPATH, "w") as fp: dump(new_config, fp) run_unisync()