airbyte_cdk.sources.file_based.file_based_stream_permissions_reader
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from abc import ABC, abstractmethod 7from typing import Any, Dict, Iterable, Optional 8 9from airbyte_cdk.sources.file_based import AbstractFileBasedSpec 10from airbyte_cdk.sources.file_based.remote_file import RemoteFile 11 12 13class AbstractFileBasedStreamPermissionsReader(ABC): 14 """ 15 This class is responsible for reading file permissions and Identities from a source. 16 """ 17 18 def __init__(self) -> None: 19 self._config = None 20 21 @property 22 def config(self) -> Optional[AbstractFileBasedSpec]: 23 return self._config 24 25 @config.setter 26 @abstractmethod 27 def config(self, value: AbstractFileBasedSpec) -> None: 28 """ 29 FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader. 30 31 Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader 32 will require keys that (for example) allow it to authenticate with the 3rd party. 33 34 Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct 35 config type for that type of StreamReader. 36 """ 37 ... 38 39 @abstractmethod 40 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: 41 """ 42 This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it 43 44 e.g. 45 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): 46 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 47 result = api_conn.get_file_permissions_info(file.id) 48 return MyPermissionsModel( 49 id=result["id"], 50 access_control_list = result["access_control_list"], 51 is_public = result["is_public"], 52 ).dict() 53 """ 54 ... 55 56 @abstractmethod 57 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 58 """ 59 This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists. 60 61 e.g. 62 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 63 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 64 users_api = api_conn.users() 65 groups_api = api_conn.groups() 66 members_api = self.google_directory_service.members() 67 for user in users_api.list(): 68 yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() 69 for group in groups_api.list(): 70 group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() 71 for member in members_api.list(group=group): 72 group_obj.member_email_addresses = group_obj.member_email_addresses or [] 73 group_obj.member_email_addresses.append(member.email) 74 yield group_obj.dict() 75 """ 76 ... 77 78 @property 79 @abstractmethod 80 def file_permissions_schema(self) -> Dict[str, Any]: 81 """ 82 This function should return the permissions schema for file permissions stream. 83 84 e.g. 85 def file_permissions_schema(self) -> Dict[str, Any]: 86 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 87 return { 88 "type": "object", 89 "properties": { 90 "id": { "type": "string" }, 91 "file_path": { "type": "string" }, 92 "access_control_list": { 93 "type": "array", 94 "items": { "type": "string" } 95 }, 96 "publicly_accessible": { "type": "boolean" } 97 } 98 } 99 """ 100 ... 101 102 @property 103 @abstractmethod 104 def identities_schema(self) -> Dict[str, Any]: 105 """ 106 This function should return the identities schema for file identity stream. 107 108 e.g. 109 def identities_schema(self) -> Dict[str, Any]: 110 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 111 return { 112 "type": "object", 113 "properties": { 114 "id": { "type": "string" }, 115 "remote_id": { "type": "string" }, 116 "name": { "type": ["null", "string"] }, 117 "email_address": { "type": ["null", "string"] }, 118 "member_email_addresses": { "type": ["null", "array"] }, 119 "type": { "type": "string" }, 120 } 121 } 122 """ 123 ...
14class AbstractFileBasedStreamPermissionsReader(ABC): 15 """ 16 This class is responsible for reading file permissions and Identities from a source. 17 """ 18 19 def __init__(self) -> None: 20 self._config = None 21 22 @property 23 def config(self) -> Optional[AbstractFileBasedSpec]: 24 return self._config 25 26 @config.setter 27 @abstractmethod 28 def config(self, value: AbstractFileBasedSpec) -> None: 29 """ 30 FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader. 31 32 Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader 33 will require keys that (for example) allow it to authenticate with the 3rd party. 34 35 Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct 36 config type for that type of StreamReader. 37 """ 38 ... 39 40 @abstractmethod 41 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: 42 """ 43 This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it 44 45 e.g. 46 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): 47 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 48 result = api_conn.get_file_permissions_info(file.id) 49 return MyPermissionsModel( 50 id=result["id"], 51 access_control_list = result["access_control_list"], 52 is_public = result["is_public"], 53 ).dict() 54 """ 55 ... 56 57 @abstractmethod 58 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 59 """ 60 This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists. 61 62 e.g. 63 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 64 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 65 users_api = api_conn.users() 66 groups_api = api_conn.groups() 67 members_api = self.google_directory_service.members() 68 for user in users_api.list(): 69 yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() 70 for group in groups_api.list(): 71 group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() 72 for member in members_api.list(group=group): 73 group_obj.member_email_addresses = group_obj.member_email_addresses or [] 74 group_obj.member_email_addresses.append(member.email) 75 yield group_obj.dict() 76 """ 77 ... 78 79 @property 80 @abstractmethod 81 def file_permissions_schema(self) -> Dict[str, Any]: 82 """ 83 This function should return the permissions schema for file permissions stream. 84 85 e.g. 86 def file_permissions_schema(self) -> Dict[str, Any]: 87 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 88 return { 89 "type": "object", 90 "properties": { 91 "id": { "type": "string" }, 92 "file_path": { "type": "string" }, 93 "access_control_list": { 94 "type": "array", 95 "items": { "type": "string" } 96 }, 97 "publicly_accessible": { "type": "boolean" } 98 } 99 } 100 """ 101 ... 102 103 @property 104 @abstractmethod 105 def identities_schema(self) -> Dict[str, Any]: 106 """ 107 This function should return the identities schema for file identity stream. 108 109 e.g. 110 def identities_schema(self) -> Dict[str, Any]: 111 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 112 return { 113 "type": "object", 114 "properties": { 115 "id": { "type": "string" }, 116 "remote_id": { "type": "string" }, 117 "name": { "type": ["null", "string"] }, 118 "email_address": { "type": ["null", "string"] }, 119 "member_email_addresses": { "type": ["null", "array"] }, 120 "type": { "type": "string" }, 121 } 122 } 123 """ 124 ...
This class is responsible for reading file permissions and Identities from a source.
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.
Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that value
is of the correct
config type for that type of StreamReader.
40 @abstractmethod 41 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: 42 """ 43 This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it 44 45 e.g. 46 def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): 47 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 48 result = api_conn.get_file_permissions_info(file.id) 49 return MyPermissionsModel( 50 id=result["id"], 51 access_control_list = result["access_control_list"], 52 is_public = result["is_public"], 53 ).dict() 54 """ 55 ...
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
e.g. def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): api_conn = some_api.conn(credentials=SOME_CREDENTIALS) result = api_conn.get_file_permissions_info(file.id) return MyPermissionsModel( id=result["id"], access_control_list = result["access_control_list"], is_public = result["is_public"], ).dict()
57 @abstractmethod 58 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 59 """ 60 This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists. 61 62 e.g. 63 def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: 64 api_conn = some_api.conn(credentials=SOME_CREDENTIALS) 65 users_api = api_conn.users() 66 groups_api = api_conn.groups() 67 members_api = self.google_directory_service.members() 68 for user in users_api.list(): 69 yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() 70 for group in groups_api.list(): 71 group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() 72 for member in members_api.list(group=group): 73 group_obj.member_email_addresses = group_obj.member_email_addresses or [] 74 group_obj.member_email_addresses.append(member.email) 75 yield group_obj.dict() 76 """ 77 ...
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
e.g. def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: api_conn = some_api.conn(credentials=SOME_CREDENTIALS) users_api = api_conn.users() groups_api = api_conn.groups() members_api = self.google_directory_service.members() for user in users_api.list(): yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() for group in groups_api.list(): group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() for member in members_api.list(group=group): group_obj.member_email_addresses = group_obj.member_email_addresses or [] group_obj.member_email_addresses.append(member.email) yield group_obj.dict()
79 @property 80 @abstractmethod 81 def file_permissions_schema(self) -> Dict[str, Any]: 82 """ 83 This function should return the permissions schema for file permissions stream. 84 85 e.g. 86 def file_permissions_schema(self) -> Dict[str, Any]: 87 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 88 return { 89 "type": "object", 90 "properties": { 91 "id": { "type": "string" }, 92 "file_path": { "type": "string" }, 93 "access_control_list": { 94 "type": "array", 95 "items": { "type": "string" } 96 }, 97 "publicly_accessible": { "type": "boolean" } 98 } 99 } 100 """ 101 ...
This function should return the permissions schema for file permissions stream.
e.g. def file_permissions_schema(self) -> Dict[str, Any]: # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json return { "type": "object", "properties": { "id": { "type": "string" }, "file_path": { "type": "string" }, "access_control_list": { "type": "array", "items": { "type": "string" } }, "publicly_accessible": { "type": "boolean" } } }
103 @property 104 @abstractmethod 105 def identities_schema(self) -> Dict[str, Any]: 106 """ 107 This function should return the identities schema for file identity stream. 108 109 e.g. 110 def identities_schema(self) -> Dict[str, Any]: 111 # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json 112 return { 113 "type": "object", 114 "properties": { 115 "id": { "type": "string" }, 116 "remote_id": { "type": "string" }, 117 "name": { "type": ["null", "string"] }, 118 "email_address": { "type": ["null", "string"] }, 119 "member_email_addresses": { "type": ["null", "array"] }, 120 "type": { "type": "string" }, 121 } 122 } 123 """ 124 ...
This function should return the identities schema for file identity stream.
e.g. def identities_schema(self) -> Dict[str, Any]: # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json return { "type": "object", "properties": { "id": { "type": "string" }, "remote_id": { "type": "string" }, "name": { "type": ["null", "string"] }, "email_address": { "type": ["null", "string"] }, "member_email_addresses": { "type": ["null", "array"] }, "type": { "type": "string" }, } }