airbyte_cdk.sources.file_based.schema_validation_policies

 1from airbyte_cdk.sources.file_based.schema_validation_policies.abstract_schema_validation_policy import (
 2    AbstractSchemaValidationPolicy,
 3)
 4from airbyte_cdk.sources.file_based.schema_validation_policies.default_schema_validation_policies import (
 5    DEFAULT_SCHEMA_VALIDATION_POLICIES,
 6    EmitRecordPolicy,
 7    SkipRecordPolicy,
 8    WaitForDiscoverPolicy,
 9)
10
11__all__ = [
12    "DEFAULT_SCHEMA_VALIDATION_POLICIES",
13    "AbstractSchemaValidationPolicy",
14    "EmitRecordPolicy",
15    "SkipRecordPolicy",
16    "WaitForDiscoverPolicy",
17]
DEFAULT_SCHEMA_VALIDATION_POLICIES = {<ValidationPolicy.emit_record: 'Emit Record'>: <EmitRecordPolicy object>, <ValidationPolicy.skip_record: 'Skip Record'>: <SkipRecordPolicy object>, <ValidationPolicy.wait_for_discover: 'Wait for Discover'>: <WaitForDiscoverPolicy object>}
class AbstractSchemaValidationPolicy(abc.ABC):
10class AbstractSchemaValidationPolicy(ABC):
11    name: str
12    validate_schema_before_sync = False  # Whether to verify that records conform to the schema during the stream's availabilty check
13
14    @abstractmethod
15    def record_passes_validation_policy(
16        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
17    ) -> bool:
18        """
19        Return True if the record passes the user's validation policy.
20        """
21        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

name: str
validate_schema_before_sync = False
@abstractmethod
def record_passes_validation_policy( self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool:
14    @abstractmethod
15    def record_passes_validation_policy(
16        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
17    ) -> bool:
18        """
19        Return True if the record passes the user's validation policy.
20        """
21        raise NotImplementedError()

Return True if the record passes the user's validation policy.

17class EmitRecordPolicy(AbstractSchemaValidationPolicy):
18    name = "emit_record"
19
20    def record_passes_validation_policy(
21        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
22    ) -> bool:
23        return True

Helper class that provides a standard way to create an ABC using inheritance.

name = 'emit_record'
def record_passes_validation_policy( self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool:
20    def record_passes_validation_policy(
21        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
22    ) -> bool:
23        return True

Return True if the record passes the user's validation policy.

26class SkipRecordPolicy(AbstractSchemaValidationPolicy):
27    name = "skip_record"
28
29    def record_passes_validation_policy(
30        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
31    ) -> bool:
32        return schema is not None and conforms_to_schema(record, schema)

Helper class that provides a standard way to create an ABC using inheritance.

name = 'skip_record'
def record_passes_validation_policy( self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool:
29    def record_passes_validation_policy(
30        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
31    ) -> bool:
32        return schema is not None and conforms_to_schema(record, schema)

Return True if the record passes the user's validation policy.

35class WaitForDiscoverPolicy(AbstractSchemaValidationPolicy):
36    name = "wait_for_discover"
37    validate_schema_before_sync = True
38
39    def record_passes_validation_policy(
40        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
41    ) -> bool:
42        if schema is None or not conforms_to_schema(record, schema):
43            raise StopSyncPerValidationPolicy(
44                FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY
45            )
46        return True

Helper class that provides a standard way to create an ABC using inheritance.

name = 'wait_for_discover'
validate_schema_before_sync = True
def record_passes_validation_policy( self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool:
39    def record_passes_validation_policy(
40        self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]
41    ) -> bool:
42        if schema is None or not conforms_to_schema(record, schema):
43            raise StopSyncPerValidationPolicy(
44                FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY
45            )
46        return True

Return True if the record passes the user's validation policy.