airbyte_cdk.sources.file_based.file_based_stream_permissions_reader

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from abc import ABC, abstractmethod
  7from typing import Any, Dict, Iterable, Optional
  8
  9from airbyte_cdk.sources.file_based import AbstractFileBasedSpec
 10from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 11
 12
 13class AbstractFileBasedStreamPermissionsReader(ABC):
 14    """
 15    This class is responsible for reading file permissions and Identities from a source.
 16    """
 17
 18    def __init__(self) -> None:
 19        self._config = None
 20
 21    @property
 22    def config(self) -> Optional[AbstractFileBasedSpec]:
 23        return self._config
 24
 25    @config.setter
 26    @abstractmethod
 27    def config(self, value: AbstractFileBasedSpec) -> None:
 28        """
 29        FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
 30
 31        Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
 32        will require keys that (for example) allow it to authenticate with the 3rd party.
 33
 34        Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct
 35        config type for that type of StreamReader.
 36        """
 37        ...
 38
 39    @abstractmethod
 40    def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
 41        """
 42        This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
 43
 44        e.g.
 45        def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
 46            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
 47            result = api_conn.get_file_permissions_info(file.id)
 48            return MyPermissionsModel(
 49                id=result["id"],
 50                access_control_list = result["access_control_list"],
 51                is_public = result["is_public"],
 52                ).dict()
 53        """
 54        ...
 55
 56    @abstractmethod
 57    def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
 58        """
 59        This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
 60
 61        e.g.
 62        def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
 63            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
 64            users_api = api_conn.users()
 65            groups_api = api_conn.groups()
 66            members_api = self.google_directory_service.members()
 67            for user in users_api.list():
 68                yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
 69            for group in groups_api.list():
 70                group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
 71                for member in members_api.list(group=group):
 72                    group_obj.member_email_addresses = group_obj.member_email_addresses or []
 73                    group_obj.member_email_addresses.append(member.email)
 74                yield group_obj.dict()
 75        """
 76        ...
 77
 78    @property
 79    @abstractmethod
 80    def file_permissions_schema(self) -> Dict[str, Any]:
 81        """
 82        This function should return the permissions schema for file permissions stream.
 83
 84        e.g.
 85        def file_permissions_schema(self) -> Dict[str, Any]:
 86            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
 87            return {
 88                  "type": "object",
 89                  "properties": {
 90                    "id": { "type": "string" },
 91                    "file_path": { "type": "string" },
 92                    "access_control_list": {
 93                      "type": "array",
 94                      "items": { "type": "string" }
 95                    },
 96                    "publicly_accessible": { "type": "boolean" }
 97                  }
 98                }
 99        """
100        ...
101
102    @property
103    @abstractmethod
104    def identities_schema(self) -> Dict[str, Any]:
105        """
106        This function should return the identities schema for file identity stream.
107
108        e.g.
109        def identities_schema(self) -> Dict[str, Any]:
110            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
111            return {
112              "type": "object",
113              "properties": {
114                "id": { "type": "string" },
115                "remote_id": { "type": "string" },
116                "name": { "type": ["null", "string"] },
117                "email_address": { "type": ["null", "string"] },
118                "member_email_addresses": { "type": ["null", "array"] },
119                "type": { "type": "string" },
120              }
121            }
122        """
123        ...
class AbstractFileBasedStreamPermissionsReader(abc.ABC):
 14class AbstractFileBasedStreamPermissionsReader(ABC):
 15    """
 16    This class is responsible for reading file permissions and Identities from a source.
 17    """
 18
 19    def __init__(self) -> None:
 20        self._config = None
 21
 22    @property
 23    def config(self) -> Optional[AbstractFileBasedSpec]:
 24        return self._config
 25
 26    @config.setter
 27    @abstractmethod
 28    def config(self, value: AbstractFileBasedSpec) -> None:
 29        """
 30        FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
 31
 32        Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
 33        will require keys that (for example) allow it to authenticate with the 3rd party.
 34
 35        Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct
 36        config type for that type of StreamReader.
 37        """
 38        ...
 39
 40    @abstractmethod
 41    def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
 42        """
 43        This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
 44
 45        e.g.
 46        def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
 47            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
 48            result = api_conn.get_file_permissions_info(file.id)
 49            return MyPermissionsModel(
 50                id=result["id"],
 51                access_control_list = result["access_control_list"],
 52                is_public = result["is_public"],
 53                ).dict()
 54        """
 55        ...
 56
 57    @abstractmethod
 58    def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
 59        """
 60        This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
 61
 62        e.g.
 63        def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
 64            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
 65            users_api = api_conn.users()
 66            groups_api = api_conn.groups()
 67            members_api = self.google_directory_service.members()
 68            for user in users_api.list():
 69                yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
 70            for group in groups_api.list():
 71                group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
 72                for member in members_api.list(group=group):
 73                    group_obj.member_email_addresses = group_obj.member_email_addresses or []
 74                    group_obj.member_email_addresses.append(member.email)
 75                yield group_obj.dict()
 76        """
 77        ...
 78
 79    @property
 80    @abstractmethod
 81    def file_permissions_schema(self) -> Dict[str, Any]:
 82        """
 83        This function should return the permissions schema for file permissions stream.
 84
 85        e.g.
 86        def file_permissions_schema(self) -> Dict[str, Any]:
 87            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
 88            return {
 89                  "type": "object",
 90                  "properties": {
 91                    "id": { "type": "string" },
 92                    "file_path": { "type": "string" },
 93                    "access_control_list": {
 94                      "type": "array",
 95                      "items": { "type": "string" }
 96                    },
 97                    "publicly_accessible": { "type": "boolean" }
 98                  }
 99                }
100        """
101        ...
102
103    @property
104    @abstractmethod
105    def identities_schema(self) -> Dict[str, Any]:
106        """
107        This function should return the identities schema for file identity stream.
108
109        e.g.
110        def identities_schema(self) -> Dict[str, Any]:
111            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
112            return {
113              "type": "object",
114              "properties": {
115                "id": { "type": "string" },
116                "remote_id": { "type": "string" },
117                "name": { "type": ["null", "string"] },
118                "email_address": { "type": ["null", "string"] },
119                "member_email_addresses": { "type": ["null", "array"] },
120                "type": { "type": "string" },
121              }
122            }
123        """
124        ...

This class is responsible for reading file permissions and Identities from a source.

22    @property
23    def config(self) -> Optional[AbstractFileBasedSpec]:
24        return self._config

FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.

Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.

Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that value is of the correct config type for that type of StreamReader.

@abstractmethod
def get_file_acl_permissions( self, file: airbyte_cdk.sources.file_based.RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
40    @abstractmethod
41    def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
42        """
43        This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
44
45        e.g.
46        def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
47            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
48            result = api_conn.get_file_permissions_info(file.id)
49            return MyPermissionsModel(
50                id=result["id"],
51                access_control_list = result["access_control_list"],
52                is_public = result["is_public"],
53                ).dict()
54        """
55        ...

This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it

e.g. def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): api_conn = some_api.conn(credentials=SOME_CREDENTIALS) result = api_conn.get_file_permissions_info(file.id) return MyPermissionsModel( id=result["id"], access_control_list = result["access_control_list"], is_public = result["is_public"], ).dict()

@abstractmethod
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
57    @abstractmethod
58    def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
59        """
60        This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
61
62        e.g.
63        def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
64            api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
65            users_api = api_conn.users()
66            groups_api = api_conn.groups()
67            members_api = self.google_directory_service.members()
68            for user in users_api.list():
69                yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
70            for group in groups_api.list():
71                group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
72                for member in members_api.list(group=group):
73                    group_obj.member_email_addresses = group_obj.member_email_addresses or []
74                    group_obj.member_email_addresses.append(member.email)
75                yield group_obj.dict()
76        """
77        ...

This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.

e.g. def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: api_conn = some_api.conn(credentials=SOME_CREDENTIALS) users_api = api_conn.users() groups_api = api_conn.groups() members_api = self.google_directory_service.members() for user in users_api.list(): yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() for group in groups_api.list(): group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() for member in members_api.list(group=group): group_obj.member_email_addresses = group_obj.member_email_addresses or [] group_obj.member_email_addresses.append(member.email) yield group_obj.dict()

file_permissions_schema: Dict[str, Any]
 79    @property
 80    @abstractmethod
 81    def file_permissions_schema(self) -> Dict[str, Any]:
 82        """
 83        This function should return the permissions schema for file permissions stream.
 84
 85        e.g.
 86        def file_permissions_schema(self) -> Dict[str, Any]:
 87            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
 88            return {
 89                  "type": "object",
 90                  "properties": {
 91                    "id": { "type": "string" },
 92                    "file_path": { "type": "string" },
 93                    "access_control_list": {
 94                      "type": "array",
 95                      "items": { "type": "string" }
 96                    },
 97                    "publicly_accessible": { "type": "boolean" }
 98                  }
 99                }
100        """
101        ...

This function should return the permissions schema for file permissions stream.

e.g. def file_permissions_schema(self) -> Dict[str, Any]: # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json return { "type": "object", "properties": { "id": { "type": "string" }, "file_path": { "type": "string" }, "access_control_list": { "type": "array", "items": { "type": "string" } }, "publicly_accessible": { "type": "boolean" } } }

identities_schema: Dict[str, Any]
103    @property
104    @abstractmethod
105    def identities_schema(self) -> Dict[str, Any]:
106        """
107        This function should return the identities schema for file identity stream.
108
109        e.g.
110        def identities_schema(self) -> Dict[str, Any]:
111            # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
112            return {
113              "type": "object",
114              "properties": {
115                "id": { "type": "string" },
116                "remote_id": { "type": "string" },
117                "name": { "type": ["null", "string"] },
118                "email_address": { "type": ["null", "string"] },
119                "member_email_addresses": { "type": ["null", "array"] },
120                "type": { "type": "string" },
121              }
122            }
123        """
124        ...

This function should return the identities schema for file identity stream.

e.g. def identities_schema(self) -> Dict[str, Any]: # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json return { "type": "object", "properties": { "id": { "type": "string" }, "remote_id": { "type": "string" }, "name": { "type": ["null", "string"] }, "email_address": { "type": ["null", "string"] }, "member_email_addresses": { "type": ["null", "array"] }, "type": { "type": "string" }, } }