airbyte_cdk.sources.declarative.validators

 1#
 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.validators.dpath_validator import DpathValidator
 6from airbyte_cdk.sources.declarative.validators.predicate_validator import PredicateValidator
 7from airbyte_cdk.sources.declarative.validators.validate_adheres_to_schema import (
 8    ValidateAdheresToSchema,
 9)
10from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
11from airbyte_cdk.sources.declarative.validators.validator import Validator
12
13__all__ = [
14    "Validator",
15    "DpathValidator",
16    "ValidationStrategy",
17    "ValidateAdheresToSchema",
18    "PredicateValidator",
19]
class Validator(abc.ABC):
10class Validator(ABC):
11    @abstractmethod
12    def validate(self, input_data: Any) -> None:
13        """
14        Validates the input data.
15
16        :param input_data: The data to validate
17        :raises ValueError: If validation fails
18        """
19        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def validate(self, input_data: Any) -> None:
11    @abstractmethod
12    def validate(self, input_data: Any) -> None:
13        """
14        Validates the input data.
15
16        :param input_data: The data to validate
17        :raises ValueError: If validation fails
18        """
19        pass

Validates the input data.

Parameters
  • input_data: The data to validate
Raises
  • ValueError: If validation fails
@dataclass
class DpathValidator(airbyte_cdk.sources.declarative.validators.Validator):
16@dataclass
17class DpathValidator(Validator):
18    """
19    Validator that extracts a value at a specific path in the input data
20    and applies a validation strategy to it.
21    """
22
23    field_path: List[str]
24    strategy: ValidationStrategy
25
26    def __post_init__(self) -> None:
27        self._field_path = [
28            InterpolatedString.create(path, parameters={}) for path in self.field_path
29        ]
30        for path_index in range(len(self.field_path)):
31            if isinstance(self.field_path[path_index], str):
32                self._field_path[path_index] = InterpolatedString.create(
33                    self.field_path[path_index], parameters={}
34                )
35
36    def validate(self, input_data: dict[str, Any]) -> None:
37        """
38        Extracts the value at the specified path and applies the validation strategy.
39
40        :param input_data: Dictionary containing the data to validate
41        :raises ValueError: If the path doesn't exist or validation fails
42        """
43        path = [path.eval({}) for path in self._field_path]
44
45        if len(path) == 0:
46            raise ValueError("Field path is empty")
47
48        if "*" in path:
49            try:
50                values = dpath.values(input_data, path)
51                for value in values:
52                    self.strategy.validate(value)
53            except KeyError as e:
54                raise ValueError(f"Error validating path '{self.field_path}': {e}")
55        else:
56            try:
57                value = dpath.get(input_data, path)
58                self.strategy.validate(value)
59            except KeyError as e:
60                raise ValueError(f"Error validating path '{self.field_path}': {e}")

Validator that extracts a value at a specific path in the input data and applies a validation strategy to it.

DpathValidator( field_path: List[str], strategy: ValidationStrategy)
field_path: List[str]
strategy: ValidationStrategy
def validate(self, input_data: dict[str, typing.Any]) -> None:
36    def validate(self, input_data: dict[str, Any]) -> None:
37        """
38        Extracts the value at the specified path and applies the validation strategy.
39
40        :param input_data: Dictionary containing the data to validate
41        :raises ValueError: If the path doesn't exist or validation fails
42        """
43        path = [path.eval({}) for path in self._field_path]
44
45        if len(path) == 0:
46            raise ValueError("Field path is empty")
47
48        if "*" in path:
49            try:
50                values = dpath.values(input_data, path)
51                for value in values:
52                    self.strategy.validate(value)
53            except KeyError as e:
54                raise ValueError(f"Error validating path '{self.field_path}': {e}")
55        else:
56            try:
57                value = dpath.get(input_data, path)
58                self.strategy.validate(value)
59            except KeyError as e:
60                raise ValueError(f"Error validating path '{self.field_path}': {e}")

Extracts the value at the specified path and applies the validation strategy.

Parameters
  • input_data: Dictionary containing the data to validate
Raises
  • ValueError: If the path doesn't exist or validation fails
class ValidationStrategy(abc.ABC):
10class ValidationStrategy(ABC):
11    """
12    Base class for validation strategies.
13    """
14
15    @abstractmethod
16    def validate(self, value: Any) -> None:
17        """
18        Validates a value according to a specific strategy.
19
20        :param value: The value to validate
21        :raises ValueError: If validation fails
22        """
23        pass

Base class for validation strategies.

@abstractmethod
def validate(self, value: Any) -> None:
15    @abstractmethod
16    def validate(self, value: Any) -> None:
17        """
18        Validates a value according to a specific strategy.
19
20        :param value: The value to validate
21        :raises ValueError: If validation fails
22        """
23        pass

Validates a value according to a specific strategy.

Parameters
  • value: The value to validate
Raises
  • ValueError: If validation fails
@dataclass
class ValidateAdheresToSchema(airbyte_cdk.sources.declarative.validators.ValidationStrategy):
15@dataclass
16class ValidateAdheresToSchema(ValidationStrategy):
17    """
18    Validates that a value adheres to a specified JSON schema.
19    """
20
21    schema: Mapping[str, Any]
22
23    def validate(self, value: Any) -> None:
24        """
25        Validates the value against the JSON schema.
26
27        :param value: The value to validate
28        :raises ValueError: If the value does not adhere to the schema
29        """
30
31        if isinstance(value, str):
32            try:
33                value = json.loads(value)
34            except json.JSONDecodeError as e:
35                raise ValueError(f"Invalid JSON string: {value}") from e
36
37        try:
38            jsonschema.validate(instance=value, schema=self.schema)
39        except jsonschema.ValidationError as e:
40            raise ValueError(f"JSON schema validation error: {e.message}")

Validates that a value adheres to a specified JSON schema.

ValidateAdheresToSchema(schema: Mapping[str, Any])
schema: Mapping[str, Any]
def validate(self, value: Any) -> None:
23    def validate(self, value: Any) -> None:
24        """
25        Validates the value against the JSON schema.
26
27        :param value: The value to validate
28        :raises ValueError: If the value does not adhere to the schema
29        """
30
31        if isinstance(value, str):
32            try:
33                value = json.loads(value)
34            except json.JSONDecodeError as e:
35                raise ValueError(f"Invalid JSON string: {value}") from e
36
37        try:
38            jsonschema.validate(instance=value, schema=self.schema)
39        except jsonschema.ValidationError as e:
40            raise ValueError(f"JSON schema validation error: {e.message}")

Validates the value against the JSON schema.

Parameters
  • value: The value to validate
Raises
  • ValueError: If the value does not adhere to the schema
@dataclass
class PredicateValidator:
12@dataclass
13class PredicateValidator:
14    """
15    Validator that applies a validation strategy to a value.
16    """
17
18    value: Any
19    strategy: ValidationStrategy
20
21    def validate(self) -> None:
22        """
23        Applies the validation strategy to the value.
24
25        :raises ValueError: If validation fails
26        """
27        self.strategy.validate(self.value)

Validator that applies a validation strategy to a value.

PredicateValidator( value: Any, strategy: ValidationStrategy)
value: Any
strategy: ValidationStrategy
def validate(self) -> None:
21    def validate(self) -> None:
22        """
23        Applies the validation strategy to the value.
24
25        :raises ValueError: If validation fails
26        """
27        self.strategy.validate(self.value)

Applies the validation strategy to the value.

Raises
  • ValueError: If validation fails