airbyte_cdk.sources.specs.transfer_modes

 1#
 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Literal
 6
 7from pydantic.v1 import AnyUrl, BaseModel, Field
 8
 9from airbyte_cdk import OneOfOptionConfig
10
11
12class DeliverPermissions(BaseModel):
13    class Config(OneOfOptionConfig):
14        title = "Replicate Permissions ACL"
15        description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source."
16        discriminator = "delivery_type"
17
18    delivery_type: Literal["use_permissions_transfer"] = Field(
19        "use_permissions_transfer", const=True
20    )
21
22    include_identities_stream: bool = Field(
23        title="Include Identity Stream",
24        description="This data can be used in downstream systems to recreate permission restrictions mirroring the original source",
25        default=True,
26    )
class DeliverPermissions(pydantic.v1.main.BaseModel):
13class DeliverPermissions(BaseModel):
14    class Config(OneOfOptionConfig):
15        title = "Replicate Permissions ACL"
16        description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source."
17        discriminator = "delivery_type"
18
19    delivery_type: Literal["use_permissions_transfer"] = Field(
20        "use_permissions_transfer", const=True
21    )
22
23    include_identities_stream: bool = Field(
24        title="Include Identity Stream",
25        description="This data can be used in downstream systems to recreate permission restrictions mirroring the original source",
26        default=True,
27    )
delivery_type: Literal['use_permissions_transfer']
include_identities_stream: bool
class DeliverPermissions.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
14    class Config(OneOfOptionConfig):
15        title = "Replicate Permissions ACL"
16        description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source."
17        discriminator = "delivery_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Replicate Permissions ACL'
description = 'Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source.'
discriminator = 'delivery_type'