utils: add method for retrieving unix socket peercred

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-11-08 16:59:12 -05:00
parent f089794adc
commit 53129bef7e
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 36 additions and 27 deletions

View File

@ -12,10 +12,8 @@ import logging
import json import json
import getpass import getpass
import asyncio import asyncio
import socket
import struct
import pathlib import pathlib
from utils import ServerError from utils import ServerError, get_unix_peer_credentials
# Annotation imports # Annotation imports
from typing import ( from typing import (
@ -37,7 +35,6 @@ if TYPE_CHECKING:
from components.klippy_apis import KlippyAPI from components.klippy_apis import KlippyAPI
from components.file_manager.file_manager import FileManager from components.file_manager.file_manager import FileManager
from components.machine import Machine from components.machine import Machine
from asyncio.trsock import TransportSocket
FlexCallback = Callable[..., Optional[Coroutine]] FlexCallback = Callable[..., Optional[Coroutine]]
INIT_TIME = .25 INIT_TIME = .25
@ -258,30 +255,9 @@ class KlippyConnection:
return await self._init_klippy_connection() return await self._init_klippy_connection()
def _get_peer_credentials(self, writer: asyncio.StreamWriter) -> bool: def _get_peer_credentials(self, writer: asyncio.StreamWriter) -> bool:
sock: TransportSocket self._peer_cred = get_unix_peer_credentials(writer, "Klippy")
sock = writer.get_extra_info("socket", None) if not self._peer_cred:
if sock is None:
logging.debug(
"Unable to get Unix Socket, cant fetch peer credentials"
)
return False return False
data: bytes = b""
try:
size = struct.calcsize("3I")
data = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, size)
pid, uid, gid = struct.unpack("3I", data)
except asyncio.CancelledError:
raise
except Exception:
logging.exception(
f"Failed to get Klippy Peer Credentials, raw: 0x{data.hex()}"
)
return False
self._peer_cred = {
"process_id": pid,
"user_id": uid,
"group_id": gid
}
logging.debug( logging.debug(
f"Klippy Connection: Received Peer Credentials: {self._peer_cred}" f"Klippy Connection: Received Peer Credentials: {self._peer_cred}"
) )

View File

@ -18,6 +18,8 @@ import hashlib
import json import json
import shlex import shlex
import re import re
import struct
import socket
from queue import SimpleQueue as Queue from queue import SimpleQueue as Queue
# Annotation imports # Annotation imports
@ -33,6 +35,7 @@ from typing import (
if TYPE_CHECKING: if TYPE_CHECKING:
from types import ModuleType from types import ModuleType
from asyncio.trsock import TransportSocket
MOONRAKER_PATH = str(pathlib.Path(__file__).parent.parent.resolve()) MOONRAKER_PATH = str(pathlib.Path(__file__).parent.parent.resolve())
SYS_MOD_PATHS = glob.glob("/usr/lib/python3*/dist-packages") SYS_MOD_PATHS = glob.glob("/usr/lib/python3*/dist-packages")
@ -230,3 +233,33 @@ def load_system_module(name: str) -> ModuleType:
else: else:
raise ServerError(f"Unable to import module {name}") raise ServerError(f"Unable to import module {name}")
return module return module
def get_unix_peer_credentials(
writer: asyncio.StreamWriter, name: str
) -> Dict[str, int]:
sock: TransportSocket
sock = writer.get_extra_info("socket", None)
if sock is None:
logging.debug(
f"Unable to get underlying Unix Socket for {name}, "
"cant fetch peer credentials"
)
return {}
data: bytes = b""
try:
size = struct.calcsize("3I")
data = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, size)
pid, uid, gid = struct.unpack("3I", data)
except asyncio.CancelledError:
raise
except Exception:
logging.exception(
f"Failed to get Unix Socket Peer Credentials for {name}"
f", raw: 0x{data.hex()}"
)
return {}
return {
"process_id": pid,
"user_id": uid,
"group_id": gid
}