#!/usr/bin/env python3 """ Class to manage Jellyfin server connection and REST API calls """ import jellyfin_apiclient_python as jellyfin import json import os import sys import warnings import threading import subprocess import ssl import shlex from configparser import ConfigParser APP_NAME = "joc" CLIENT_VERSION = "0.01" DEVICE_NAME = "joc" USER_AGENT = "joc/0.01" DEVICE_ID = "joc" NO_VERIFY = 0 VERIFY = 1 SORT_ALNUM = 0 SORT_DATE = 1 MEDIA_TYPES = ["Audio","AudioBook","Book","Episode","Movie","LiveTvProgram", "MusicVideo","Photo","Program","TvProgram","Video"] CONNECTION_STATE = jellyfin.connection_manager.CONNECTION_STATE def die(msg): print(msg) sys.exit(1) class JellyfinConnection(object): def __init__(self, parser:ConfigParser): self.parser = parser self.client = None self.http_client = None self.api = None self.verify_tls = None self.server = None self.username = None self.password = None self.passcmd = None self.read_config() self.connect() def read_config(self): """ Read connection information from configuratino file """ if not self.parser.has_section("connection"): die("error: unable to find connection info") if not self.parser.has_option("connection","server"): die("error: no server given") if not self.parser.has_option("connection","username"): die("error: no username given") if not self.parser.has_option("connection","password") and \ not self.parser.has_option("connection","passcmd"): die("error: no password or passcmd given") self.server = self.parser.get("connection", "server") self.username = self.parser.get("connection", "username") self.password = self.parser.get("connection", "password", fallback=None) self.passcmd = self.parser.get("connection", "passcmd", fallback=None) verify_tls = self.parser.get("connection", "verify_tls", fallback=VERIFY) try: if int(verify_tls) not in [0,1]: self.verify_tls = VERIFY else: self.verify_tls = int(verify_tls) except ValueError: self.verify_tls = VERIFY # Taken mostly from jellyfin-mpv-shim def client_factory(self): """ Build and return JellyfinClient instance """ client = jellyfin.client.JellyfinClient() client.config.data["app.default"] = True client.config.app(APP_NAME,CLIENT_VERSION,DEVICE_NAME,DEVICE_ID) client.config.data["http.user_agent"] = USER_AGENT client.config.data["auth.ssl"] = self.verify_tls if self.verify_tls == NO_VERIFY: warnings.filterwarnings("ignore") return client def login(self, server:str, username:str, password:str): """ Login to Jellyfin server with JellyfinClient instance """ client = self.client_factory() result = client.auth.connect_to_address(server) if result["State"] == CONNECTION_STATE["Unavailable"]: die("error: unable to connect to server") result = client.auth.login(server,username,password) return client def connect(self): """ Initalizes JellyfinClient, HTTP connection, and API instance. """ # Prep Connection if self.password == None: password = subprocess.run(self.passcmd, shell=True, \ capture_output=True,text=True).stdout self.password = password.strip() self.client = self.login(self.server,self.username,self.password) self.http_client = jellyfin.http.HTTP(self.client) self.api = jellyfin.api.API(self.http_client) def get_libraries(self): """ Get user root media folders """ folders = self.api.get_media_folders()["Items"] return sorted(folders,key=lambda folder: folder["Name"]) def get_children(self,parent_id:str): """ Get children of a given parent_id. Sorts by date created if items are videos or by name otherwise. Returns the sotrted list """ children = self.api.get_items_by_letter(parent_id=parent_id, recurse=False) children = children["Items"] # Sort items, by date if video, by name otherwise if len(children) > 0 and children[0]["Type"] == "MusicVideo": children = sorted(children,key=lambda item: item["DateCreated"], reverse=True) else: children = sorted(children,key=lambda item: item["Name"]) return children def get_previous(self,item_id:str): """Gets items that were shown on the previous screen, i.e. the parent item's siblings, i.e. the children of the grandparent item """ PARENT_INDEX = 0 GRANDPARENT_INDEX = 1 ancestors = self.api.get_ancestors(item_id) if len(ancestors) == 1 and ancestors[0]["Type"] == "UserRootFolder": return self.get_libraries() else: grandparent = ancestors[GRANDPARENT_INDEX] return self.get_children(grandparent["Id"]) def get_parent(self,item_id:str): """ Get parent (first ancestor) of a given item """ PARENT_INDEX = 0 ancestors = self.api.get_ancestors(item_id) parent = ancestors[PARENT_INDEX] return parent def search(self, term:str): """ Search for a given term and return a alphabetically sorted list """ items = self.api.search_media_items(term,MEDIA_TYPES,None)["Items"] items = sorted(items,key=lambda item: item["Name"]) return items def get_info(self, item_id): """ Get stored information about an item and return a pretty printed JSON string representation of the item """ item = self.api.get_item(item_id) return json.dumps(item,indent=2) def mark_watched(self, item_id:str, watched=True): """ Mark an item as watched or unwatched""" self.api.item_played(item_id, watched) def mark_favorite(self, item_id:str, favorite=True): """ Mark an item as a favorite or unfavorite the item""" self.api.favorite(item_id, favorite) def get_url(self, item_id:str): """ Get download URL of the selected media item """ url = self.api.download_url(item_id) return url if __name__ == '__main__': main()