|
from diffusers.modular_pipelines import ModularPipelineBlocks, PipelineState, InputParam, OutputParam |
|
from diffusers.utils import load_image |
|
from PIL import Image |
|
import cv2 |
|
import numpy as np |
|
|
|
class GetImageStep(ModularPipelineBlocks): |
|
|
|
PROCESSOR_IDS = set([ |
|
"canny", "lineart_anime", |
|
]) |
|
|
|
def __init__(self): |
|
super().__init__() |
|
from controlnet_aux.processor import Processor |
|
self.processor = Processor |
|
|
|
@staticmethod |
|
def make_canny(image): |
|
image = np.array(image) |
|
image = cv2.Canny(image, 100, 200) |
|
image = image[:, :, None] |
|
image = np.concatenate([image, image, image], axis=2) |
|
return Image.fromarray(image) |
|
|
|
def make_lineart_anime(self, image): |
|
return self.processor("lineart_anime")(image) |
|
|
|
|
|
def check_inputs(self, data) -> None: |
|
""" |
|
Validates that `processor_id` is one of the supported processors. |
|
Raises: |
|
ValueError: if `processor_id` is not in PROCESSOR_IDS. |
|
""" |
|
|
|
if data.image_url is None and data.image is None: |
|
raise ValueError("Either `image_url` or `image` must be provided.") |
|
|
|
if data.image_url is not None and data.image is not None: |
|
raise ValueError("Only one of `image_url` or `image` must be provided.") |
|
|
|
if data.processor_id is not None and data.processor_id not in self.PROCESSOR_IDS: |
|
raise ValueError( |
|
f"Processor id '{data.processor_id}' not found. " |
|
f"Please use one of the following: {self.PROCESSOR_IDS}" |
|
) |
|
|
|
@property |
|
def inputs(self): |
|
return [ |
|
InputParam("image", type_hint=Image.Image), |
|
InputParam("image_url", type_hint=str, description="The url of the image to load"), |
|
InputParam("size", description="The size of the image"), |
|
InputParam("processor_id", type_hint=str, description="The id of the processor to use for controlnet") |
|
] |
|
|
|
@property |
|
def intermediate_outputs(self): |
|
return [ |
|
OutputParam("image", type_hint=Image.Image), |
|
] |
|
|
|
def __call__(self, pipeline, state: PipelineState): |
|
|
|
data = self.get_block_state(state) |
|
self.check_inputs(data) |
|
|
|
if data.image is None: |
|
data.image = load_image(data.image_url).convert("RGB") |
|
|
|
if data.size is not None: |
|
data.image = data.image.resize(data.size) |
|
|
|
if data.processor_id is not None: |
|
if data.processor_id == "canny": |
|
data.image = self.make_canny(data.image) |
|
elif data.processor_id == "lineart_anime": |
|
data.image = self.make_lineart_anime(data.image) |
|
|
|
self.set_block_state(state, data) |
|
|
|
return pipeline, state |