Create block.py
Browse files
block.py
ADDED
|
@@ -0,0 +1,77 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from diffusers.modular_pipelines import PipelineBlock, PipelineState, InputParam, OutputParam
|
| 2 |
+
class GetImageStep(PipelineBlock):
|
| 3 |
+
|
| 4 |
+
PROCESSOR_IDS = set([
|
| 5 |
+
"canny", "lineart_anime",
|
| 6 |
+
])
|
| 7 |
+
|
| 8 |
+
def __init__(self):
|
| 9 |
+
from controlnet_aux.processor import Processor
|
| 10 |
+
self.processor = Processor
|
| 11 |
+
|
| 12 |
+
@staticmethod
|
| 13 |
+
def make_canny(image):
|
| 14 |
+
image = np.array(image)
|
| 15 |
+
image = cv2.Canny(image, 100, 200)
|
| 16 |
+
image = image[:, :, None]
|
| 17 |
+
image = np.concatenate([image, image, image], axis=2)
|
| 18 |
+
return Image.fromarray(image)
|
| 19 |
+
|
| 20 |
+
def make_lineart_anime(self, image):
|
| 21 |
+
return self.processor("lineart_anime")(image)
|
| 22 |
+
|
| 23 |
+
|
| 24 |
+
def check_inputs(self, data) -> None:
|
| 25 |
+
"""
|
| 26 |
+
Validates that `processor_id` is one of the supported processors.
|
| 27 |
+
Raises:
|
| 28 |
+
ValueError: if `processor_id` is not in PROCESSOR_IDS.
|
| 29 |
+
"""
|
| 30 |
+
|
| 31 |
+
if data.image_url is None and data.image is None:
|
| 32 |
+
raise ValueError("Either `image_url` or `image` must be provided.")
|
| 33 |
+
|
| 34 |
+
if data.image_url is not None and data.image is not None:
|
| 35 |
+
raise ValueError("Only one of `image_url` or `image` must be provided.")
|
| 36 |
+
|
| 37 |
+
if data.processor_id is not None and data.processor_id not in self.PROCESSOR_IDS:
|
| 38 |
+
raise ValueError(
|
| 39 |
+
f"Processor id '{data.processor_id}' not found. "
|
| 40 |
+
f"Please use one of the following: {self.PROCESSOR_IDS}"
|
| 41 |
+
)
|
| 42 |
+
|
| 43 |
+
@property
|
| 44 |
+
def inputs(self):
|
| 45 |
+
return [
|
| 46 |
+
InputParam("image", type_hint=Image.Image),
|
| 47 |
+
InputParam("image_url", type_hint=str, description="The url of the image to load"),
|
| 48 |
+
InputParam("size", description="The size of the image"),
|
| 49 |
+
InputParam("processor_id", type_hint=str, description="The id of the processor to use for controlnet")
|
| 50 |
+
]
|
| 51 |
+
|
| 52 |
+
@property
|
| 53 |
+
def intermediate_outputs(self):
|
| 54 |
+
return [
|
| 55 |
+
OutputParam("image", type_hint=Image.Image),
|
| 56 |
+
]
|
| 57 |
+
|
| 58 |
+
def __call__(self, pipeline, state: PipelineState):
|
| 59 |
+
|
| 60 |
+
data = self.get_block_state(state)
|
| 61 |
+
self.check_inputs(data)
|
| 62 |
+
|
| 63 |
+
if data.image is None:
|
| 64 |
+
data.image = load_image(data.image_url).convert("RGB")
|
| 65 |
+
|
| 66 |
+
if data.size is not None:
|
| 67 |
+
data.image = data.image.resize(data.size)
|
| 68 |
+
|
| 69 |
+
if data.processor_id is not None:
|
| 70 |
+
if data.processor_id == "canny":
|
| 71 |
+
data.image = self.make_canny(data.image)
|
| 72 |
+
elif data.processor_id == "lineart_anime":
|
| 73 |
+
data.image = self.make_lineart_anime(data.image)
|
| 74 |
+
|
| 75 |
+
self.set_block_state(state, data)
|
| 76 |
+
|
| 77 |
+
return pipeline, state
|