File size: 2,833 Bytes
b357a0e
2d79447
 
 
 
 
d78cf03
2d79447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from diffusers.modular_pipelines import ModularPipelineBlocks, PipelineState, InputParam, OutputParam
from diffusers.utils import load_image
from PIL import Image
import cv2
import numpy as np

class GetImageStep(ModularPipelineBlocks):

    PROCESSOR_IDS = set([
        "canny", "lineart_anime",
    ])

    def __init__(self):
        super().__init__()
        from controlnet_aux.processor import Processor
        self.processor = Processor
    
    @staticmethod
    def make_canny(image):
        image = np.array(image)
        image = cv2.Canny(image, 100, 200)
        image = image[:, :, None]
        image = np.concatenate([image, image, image], axis=2)
        return Image.fromarray(image)
    
    def make_lineart_anime(self, image):
        return self.processor("lineart_anime")(image)
    
    
    def check_inputs(self, data) -> None:
        """
        Validates that `processor_id` is one of the supported processors.
        Raises:
            ValueError: if `processor_id` is not in PROCESSOR_IDS.
        """

        if data.image_url is None and data.image is None:
            raise ValueError("Either `image_url` or `image` must be provided.")

        if data.image_url is not None and data.image is not None:
            raise ValueError("Only one of `image_url` or `image` must be provided.")
        
        if data.processor_id is not None and data.processor_id not in self.PROCESSOR_IDS:
            raise ValueError(
                f"Processor id '{data.processor_id}' not found. "
                f"Please use one of the following: {self.PROCESSOR_IDS}"
            )
    
    @property
    def inputs(self):
        return [
            InputParam("image", type_hint=Image.Image),
            InputParam("image_url", type_hint=str, description="The url of the image to load"),
            InputParam("size", description="The size of the image"),
            InputParam("processor_id", type_hint=str, description="The id of the processor to use for controlnet")
        ]

    @property
    def intermediate_outputs(self):
        return [
          OutputParam("image", type_hint=Image.Image),
        ]

    def __call__(self, pipeline, state: PipelineState):

        data = self.get_block_state(state)
        self.check_inputs(data)

        if data.image is None:
            data.image = load_image(data.image_url).convert("RGB")
        
        if data.size is not None:
            data.image = data.image.resize(data.size)

        if data.processor_id is not None:
            if data.processor_id == "canny":
                data.image = self.make_canny(data.image)
            elif data.processor_id == "lineart_anime":
                data.image = self.make_lineart_anime(data.image)
        
        self.set_block_state(state, data)
        
        return pipeline, state