|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Contains commands to interact with jobs on the Hugging Face Hub. |
|
|
|
Usage: |
|
# run a job |
|
hf jobs run <image> <command> |
|
|
|
# List running or completed jobs |
|
hf jobs ps [-a] [-f key=value] [--format TEMPLATE] |
|
|
|
# Stream logs from a job |
|
hf jobs logs <job-id> |
|
|
|
# Inspect detailed information about a job |
|
hf jobs inspect <job-id> |
|
|
|
# Cancel a running job |
|
hf jobs cancel <job-id> |
|
""" |
|
|
|
import json |
|
import os |
|
import re |
|
from argparse import Namespace, _SubParsersAction |
|
from dataclasses import asdict |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Union |
|
|
|
import requests |
|
|
|
from huggingface_hub import HfApi, SpaceHardware, get_token |
|
from huggingface_hub.utils import logging |
|
from huggingface_hub.utils._dotenv import load_dotenv |
|
|
|
from . import BaseHuggingfaceCLICommand |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
SUGGESTED_FLAVORS = [item.value for item in SpaceHardware if item.value != "zero-a10g"] |
|
|
|
|
|
class JobsCommands(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction): |
|
jobs_parser = parser.add_parser("jobs", help="Run and manage Jobs on the Hub.") |
|
jobs_subparsers = jobs_parser.add_subparsers(help="huggingface.co jobs related commands") |
|
|
|
|
|
jobs_parser.set_defaults(func=lambda args: jobs_parser.print_help()) |
|
|
|
|
|
InspectCommand.register_subcommand(jobs_subparsers) |
|
LogsCommand.register_subcommand(jobs_subparsers) |
|
PsCommand.register_subcommand(jobs_subparsers) |
|
RunCommand.register_subcommand(jobs_subparsers) |
|
CancelCommand.register_subcommand(jobs_subparsers) |
|
UvCommand.register_subcommand(jobs_subparsers) |
|
|
|
|
|
class RunCommand(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction) -> None: |
|
run_parser = parser.add_parser("run", help="Run a Job") |
|
run_parser.add_argument("image", type=str, help="The Docker image to use.") |
|
run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value") |
|
run_parser.add_argument( |
|
"-s", |
|
"--secrets", |
|
action="append", |
|
help=( |
|
"Set secret environment variables. E.g. --secrets SECRET=value " |
|
"or `--secrets HF_TOKEN` to pass your Hugging Face token." |
|
), |
|
) |
|
run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") |
|
run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.") |
|
run_parser.add_argument( |
|
"--flavor", |
|
type=str, |
|
help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", |
|
) |
|
run_parser.add_argument( |
|
"--timeout", |
|
type=str, |
|
help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).", |
|
) |
|
run_parser.add_argument( |
|
"-d", |
|
"--detach", |
|
action="store_true", |
|
help="Run the Job in the background and print the Job ID.", |
|
) |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace where the Job will be created. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument( |
|
"--token", |
|
type=str, |
|
help="A User Access Token generated from https://huggingface.co/settings/tokens", |
|
) |
|
run_parser.add_argument("command", nargs="...", help="The command to run.") |
|
run_parser.set_defaults(func=RunCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
self.image: str = args.image |
|
self.command: List[str] = args.command |
|
self.env: dict[str, Optional[str]] = {} |
|
if args.env_file: |
|
self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) |
|
for env_value in args.env or []: |
|
self.env.update(load_dotenv(env_value, environ=os.environ.copy())) |
|
self.secrets: dict[str, Optional[str]] = {} |
|
extended_environ = _get_extended_environ() |
|
if args.secrets_file: |
|
self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) |
|
for secret in args.secrets or []: |
|
self.secrets.update(load_dotenv(secret, environ=extended_environ)) |
|
self.flavor: Optional[SpaceHardware] = args.flavor |
|
self.timeout: Optional[str] = args.timeout |
|
self.detach: bool = args.detach |
|
self.namespace: Optional[str] = args.namespace |
|
self.token: Optional[str] = args.token |
|
|
|
def run(self) -> None: |
|
api = HfApi(token=self.token) |
|
job = api.run_job( |
|
image=self.image, |
|
command=self.command, |
|
env=self.env, |
|
secrets=self.secrets, |
|
flavor=self.flavor, |
|
timeout=self.timeout, |
|
namespace=self.namespace, |
|
) |
|
|
|
print(f"Job started with ID: {job.id}") |
|
print(f"View at: {job.url}") |
|
|
|
if self.detach: |
|
return |
|
|
|
|
|
for log in api.fetch_job_logs(job_id=job.id): |
|
print(log) |
|
|
|
|
|
class LogsCommand(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction) -> None: |
|
run_parser = parser.add_parser("logs", help="Fetch the logs of a Job") |
|
run_parser.add_argument("job_id", type=str, help="Job ID") |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace where the job is running. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument( |
|
"--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" |
|
) |
|
run_parser.set_defaults(func=LogsCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
self.job_id: str = args.job_id |
|
self.namespace: Optional[str] = args.namespace |
|
self.token: Optional[str] = args.token |
|
|
|
def run(self) -> None: |
|
api = HfApi(token=self.token) |
|
for log in api.fetch_job_logs(job_id=self.job_id, namespace=self.namespace): |
|
print(log) |
|
|
|
|
|
def _tabulate(rows: List[List[Union[str, int]]], headers: List[str]) -> str: |
|
""" |
|
Inspired by: |
|
|
|
- stackoverflow.com/a/8356620/593036 |
|
- stackoverflow.com/questions/9535954/printing-lists-as-tabular-data |
|
""" |
|
col_widths = [max(len(str(x)) for x in col) for col in zip(*rows, headers)] |
|
terminal_width = max(os.get_terminal_size().columns, len(headers) * 12) |
|
while len(headers) + sum(col_widths) > terminal_width: |
|
col_to_minimize = col_widths.index(max(col_widths)) |
|
col_widths[col_to_minimize] //= 2 |
|
if len(headers) + sum(col_widths) <= terminal_width: |
|
col_widths[col_to_minimize] = terminal_width - sum(col_widths) - len(headers) + col_widths[col_to_minimize] |
|
row_format = ("{{:{}}} " * len(headers)).format(*col_widths) |
|
lines = [] |
|
lines.append(row_format.format(*headers)) |
|
lines.append(row_format.format(*["-" * w for w in col_widths])) |
|
for row in rows: |
|
row_format_args = [ |
|
str(x)[: col_width - 3] + "..." if len(str(x)) > col_width else str(x) |
|
for x, col_width in zip(row, col_widths) |
|
] |
|
lines.append(row_format.format(*row_format_args)) |
|
return "\n".join(lines) |
|
|
|
|
|
class PsCommand(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction) -> None: |
|
run_parser = parser.add_parser("ps", help="List Jobs") |
|
run_parser.add_argument( |
|
"-a", |
|
"--all", |
|
action="store_true", |
|
help="Show all Jobs (default shows just running)", |
|
) |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace from where it lists the jobs. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument( |
|
"--token", |
|
type=str, |
|
help="A User Access Token generated from https://huggingface.co/settings/tokens", |
|
) |
|
|
|
run_parser.add_argument( |
|
"-f", |
|
"--filter", |
|
action="append", |
|
default=[], |
|
help="Filter output based on conditions provided (format: key=value)", |
|
) |
|
|
|
run_parser.add_argument( |
|
"--format", |
|
type=str, |
|
help="Format output using a custom template", |
|
) |
|
run_parser.set_defaults(func=PsCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
self.all: bool = args.all |
|
self.namespace: Optional[str] = args.namespace |
|
self.token: Optional[str] = args.token |
|
self.format: Optional[str] = args.format |
|
self.filters: Dict[str, str] = {} |
|
|
|
|
|
for f in args.filter: |
|
if "=" in f: |
|
key, value = f.split("=", 1) |
|
self.filters[key.lower()] = value |
|
else: |
|
print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.") |
|
|
|
def run(self) -> None: |
|
""" |
|
Fetch and display job information for the current user. |
|
Uses Docker-style filtering with -f/--filter flag and key=value pairs. |
|
""" |
|
try: |
|
api = HfApi(token=self.token) |
|
|
|
|
|
jobs = api.list_jobs(namespace=self.namespace) |
|
|
|
|
|
table_headers = ["JOB ID", "IMAGE/SPACE", "COMMAND", "CREATED", "STATUS"] |
|
|
|
|
|
rows = [] |
|
|
|
for job in jobs: |
|
|
|
status = job.status.stage if job.status else "UNKNOWN" |
|
|
|
|
|
if not self.all and status not in ("RUNNING", "UPDATING"): |
|
continue |
|
|
|
|
|
job_id = job.id |
|
|
|
|
|
image_or_space = job.docker_image or "N/A" |
|
|
|
|
|
command = job.command or [] |
|
command_str = " ".join(command) if command else "N/A" |
|
|
|
|
|
created_at = job.created_at or "N/A" |
|
|
|
|
|
job_properties = { |
|
"id": job_id, |
|
"image": image_or_space, |
|
"status": status.lower(), |
|
"command": command_str, |
|
} |
|
|
|
|
|
if not self._matches_filters(job_properties): |
|
continue |
|
|
|
|
|
rows.append([job_id, image_or_space, command_str, created_at, status]) |
|
|
|
|
|
if not rows: |
|
filters_msg = "" |
|
if self.filters: |
|
filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}" |
|
|
|
print(f"No jobs found{filters_msg}") |
|
return |
|
|
|
|
|
self._print_output(rows, table_headers) |
|
|
|
except requests.RequestException as e: |
|
print(f"Error fetching jobs data: {e}") |
|
except (KeyError, ValueError, TypeError) as e: |
|
print(f"Error processing jobs data: {e}") |
|
except Exception as e: |
|
print(f"Unexpected error - {type(e).__name__}: {e}") |
|
|
|
def _matches_filters(self, job_properties: Dict[str, str]) -> bool: |
|
"""Check if job matches all specified filters.""" |
|
for key, pattern in self.filters.items(): |
|
|
|
if key not in job_properties: |
|
return False |
|
|
|
|
|
if "*" in pattern or "?" in pattern: |
|
|
|
regex_pattern = pattern.replace("*", ".*").replace("?", ".") |
|
if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE): |
|
return False |
|
|
|
elif pattern.lower() not in job_properties[key].lower(): |
|
return False |
|
|
|
return True |
|
|
|
def _print_output(self, rows, headers): |
|
"""Print output according to the chosen format.""" |
|
if self.format: |
|
|
|
template = self.format |
|
for row in rows: |
|
line = template |
|
for i, field in enumerate(["id", "image", "command", "created", "status"]): |
|
placeholder = f"{{{{.{field}}}}}" |
|
if placeholder in line: |
|
line = line.replace(placeholder, str(row[i])) |
|
print(line) |
|
else: |
|
|
|
print( |
|
_tabulate( |
|
rows, |
|
headers=headers, |
|
) |
|
) |
|
|
|
|
|
class InspectCommand(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction) -> None: |
|
run_parser = parser.add_parser("inspect", help="Display detailed information on one or more Jobs") |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace where the job is running. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument( |
|
"--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" |
|
) |
|
run_parser.add_argument("job_ids", nargs="...", help="The jobs to inspect") |
|
run_parser.set_defaults(func=InspectCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
self.namespace: Optional[str] = args.namespace |
|
self.token: Optional[str] = args.token |
|
self.job_ids: List[str] = args.job_ids |
|
|
|
def run(self) -> None: |
|
api = HfApi(token=self.token) |
|
jobs = [api.inspect_job(job_id=job_id, namespace=self.namespace) for job_id in self.job_ids] |
|
print(json.dumps([asdict(job) for job in jobs], indent=4, default=str)) |
|
|
|
|
|
class CancelCommand(BaseHuggingfaceCLICommand): |
|
@staticmethod |
|
def register_subcommand(parser: _SubParsersAction) -> None: |
|
run_parser = parser.add_parser("cancel", help="Cancel a Job") |
|
run_parser.add_argument("job_id", type=str, help="Job ID") |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace where the job is running. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument( |
|
"--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" |
|
) |
|
run_parser.set_defaults(func=CancelCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
self.job_id: str = args.job_id |
|
self.namespace = args.namespace |
|
self.token: Optional[str] = args.token |
|
|
|
def run(self) -> None: |
|
api = HfApi(token=self.token) |
|
api.cancel_job(job_id=self.job_id, namespace=self.namespace) |
|
|
|
|
|
class UvCommand(BaseHuggingfaceCLICommand): |
|
"""Run UV scripts on Hugging Face infrastructure.""" |
|
|
|
@staticmethod |
|
def register_subcommand(parser): |
|
"""Register UV run subcommand.""" |
|
uv_parser = parser.add_parser( |
|
"uv", |
|
help="Run UV scripts (Python with inline dependencies) on HF infrastructure", |
|
) |
|
|
|
subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True) |
|
|
|
|
|
run_parser = subparsers.add_parser( |
|
"run", |
|
help="Run a UV script (local file or URL) on HF infrastructure", |
|
) |
|
run_parser.add_argument("script", help="UV script to run (local file or URL)") |
|
run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[]) |
|
run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.") |
|
run_parser.add_argument( |
|
"--repo", |
|
help="Repository name for the script (creates ephemeral if not specified)", |
|
) |
|
run_parser.add_argument( |
|
"--flavor", |
|
type=str, |
|
help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", |
|
) |
|
run_parser.add_argument("-e", "--env", action="append", help="Environment variables") |
|
run_parser.add_argument( |
|
"-s", |
|
"--secrets", |
|
action="append", |
|
help=( |
|
"Set secret environment variables. E.g. --secrets SECRET=value " |
|
"or `--secrets HF_TOKEN` to pass your Hugging Face token." |
|
), |
|
) |
|
run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") |
|
run_parser.add_argument( |
|
"--secrets-file", |
|
type=str, |
|
help="Read in a file of secret environment variables.", |
|
) |
|
run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)") |
|
run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background") |
|
run_parser.add_argument( |
|
"--namespace", |
|
type=str, |
|
help="The namespace where the Job will be created. Defaults to the current user's namespace.", |
|
) |
|
run_parser.add_argument("--token", type=str, help="HF token") |
|
|
|
run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_") |
|
run_parser.add_argument( |
|
"-p", "--python", type=str, help="The Python interpreter to use for the run environment" |
|
) |
|
run_parser.set_defaults(func=UvCommand) |
|
|
|
def __init__(self, args: Namespace) -> None: |
|
"""Initialize the command with parsed arguments.""" |
|
self.script = args.script |
|
self.script_args = args.script_args |
|
self.dependencies = args.with_ |
|
self.python = args.python |
|
self.image = args.image |
|
self.env: dict[str, Optional[str]] = {} |
|
if args.env_file: |
|
self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) |
|
for env_value in args.env or []: |
|
self.env.update(load_dotenv(env_value, environ=os.environ.copy())) |
|
self.secrets: dict[str, Optional[str]] = {} |
|
extended_environ = _get_extended_environ() |
|
if args.secrets_file: |
|
self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) |
|
for secret in args.secrets or []: |
|
self.secrets.update(load_dotenv(secret, environ=extended_environ)) |
|
self.flavor: Optional[SpaceHardware] = args.flavor |
|
self.timeout: Optional[str] = args.timeout |
|
self.detach: bool = args.detach |
|
self.namespace: Optional[str] = args.namespace |
|
self.token: Optional[str] = args.token |
|
self._repo = args.repo |
|
|
|
def run(self) -> None: |
|
"""Execute UV command.""" |
|
logging.set_verbosity(logging.INFO) |
|
api = HfApi(token=self.token) |
|
job = api.run_uv_job( |
|
script=self.script, |
|
script_args=self.script_args, |
|
dependencies=self.dependencies, |
|
python=self.python, |
|
image=self.image, |
|
env=self.env, |
|
secrets=self.secrets, |
|
flavor=self.flavor, |
|
timeout=self.timeout, |
|
namespace=self.namespace, |
|
_repo=self._repo, |
|
) |
|
|
|
|
|
print(f"Job started with ID: {job.id}") |
|
print(f"View at: {job.url}") |
|
|
|
if self.detach: |
|
return |
|
|
|
|
|
for log in api.fetch_job_logs(job_id=job.id): |
|
print(log) |
|
|
|
|
|
def _get_extended_environ() -> Dict[str, str]: |
|
extended_environ = os.environ.copy() |
|
if (token := get_token()) is not None: |
|
extended_environ["HF_TOKEN"] = token |
|
return extended_environ |
|
|