aboutsummaryrefslogtreecommitdiff
path: root/main.py
diff options
context:
space:
mode:
authorAryadev Chavali <aryadev@aryadevchavali.com>2025-11-03 01:27:34 +0000
committerAryadev Chavali <aryadev@aryadevchavali.com>2025-11-03 01:29:48 +0000
commit0ebaa99bea6975fcb010ea0a3f7932b0afb56ea2 (patch)
tree4e07d4a3ef47b33cc58332f024b31bd02671723d /main.py
downloaduni-sync-temp-timer-0ebaa99bea6975fcb010ea0a3f7932b0afb56ea2.tar.gz
uni-sync-temp-timer-0ebaa99bea6975fcb010ea0a3f7932b0afb56ea2.tar.bz2
uni-sync-temp-timer-0ebaa99bea6975fcb010ea0a3f7932b0afb56ea2.zip
v1.0.0: Basics
Diffstat (limited to 'main.py')
-rwxr-xr-xmain.py109
1 files changed, 109 insertions, 0 deletions
diff --git a/main.py b/main.py
new file mode 100755
index 0000000..d3b06b2
--- /dev/null
+++ b/main.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+
+from dataclasses import dataclass
+from typing import List, Dict
+from math import floor
+from subprocess import run, PIPE
+from re import sub
+from json import load, dump
+
+UNI_SYNC_FILEPATH = "/etc/uni-sync/uni-sync.json"
+PROFILE_FILEPATH = "/etc/lian-li-fancontrol/curves.json"
+
+def get_temps():
+ return run(["sensors"], capture_output=True, text=True) \
+ .stdout \
+ .strip() \
+ .split("\n")\
+
+@dataclass
+class TempCurve:
+ sensor: str
+ channel: int
+ points: [(float, int)]
+
+ def __init__(self, sensor: str, channel: int, points: [(float, int)]):
+ assert(len(points) >= 3)
+ assert(all(map(lambda p : p[0] >= 0 and p[0] <= 100, points)))
+ assert(all(map(lambda p : p[1] >= 0 and p[1] <= 100, points)))
+ assert(channel >= 0)
+ self.sensor = sensor
+ self.channel = channel
+ self.points = list(sorted(points, key = lambda x: x[0]))
+ if self.points[0][0] > 0:
+ self.points = [(0, self.points[0][1])] + points
+ if self.points[-1][0] < 100:
+ self.points.append((100, self.points[-1][1]))
+
+ @property
+ def current_temp(self) -> float:
+ temp = ""
+ for line in get_temps():
+ if line.startswith(self.sensor):
+ temp = line
+ break
+ temp = sub(f"{self.sensor}:.*?\\+", "", temp)
+ temp = sub("°C.*", "", temp)
+ return float(temp)
+
+ @property
+ def current_speed(self) -> int:
+ return self.speed(self.current_temp)
+
+ def speed(self, temp: float) -> int:
+ # get upper and lower bounds
+ lower = 0
+ for (index, (temp_point, speed)) in enumerate(self.points):
+ if temp_point == temp:
+ return speed
+ elif temp_point > temp:
+ lower = index - 1
+ break
+
+ if lower == len(self.points) - 1:
+ return self.points[lower][1]
+
+ (low_temp, low_speed) = self.points[lower]
+ (high_temp, high_speed) = self.points[lower + 1]
+
+ return floor(low_speed + (((temp - low_temp) / (high_temp - low_temp)) * (high_speed - low_speed)))
+
+def read_curves(path: str) -> [TempCurve]:
+ curves = []
+ with open(path, "r") as fp:
+ json = load(fp)
+ for obj in json:
+ curves.append(TempCurve(obj["sensor"], obj["channel"],
+ [(i["temp"], i["speed"])
+ for i in obj["curve"]]))
+
+ return curves
+
+def compile_unisync(curves: [TempCurve]):
+ # Setup the version of curves we want
+ indexed_curves = dict()
+ for curve in curves:
+ indexed_curves[curve.channel] = curve.current_speed
+
+ compiled_curves = []
+ for i in range(len(curves)):
+ compiled_curves.append({"mode": "Manual", "speed": indexed_curves[i]})
+
+ # parse the current unisync
+ current_config = None
+ with open(UNI_SYNC_FILEPATH, "r") as fp:
+ current_config = load(fp)
+ current_config["configs"][0]["channels"] = compiled_curves
+ return current_config
+
+def run_unisync():
+ return run(["uni-sync"])
+
+if __name__ == '__main__':
+ curves = read_curves(PROFILE_FILEPATH)
+ for curve in curves:
+ print(f"Channel [{curve.channel}]: Sensor={curve.sensor} => Speed={curve.current_speed} based on {curve.current_temp}")
+ new_config = compile_unisync(curves)
+ with open(UNI_SYNC_FILEPATH, "w") as fp:
+ dump(new_config, fp)
+ run_unisync()