diff --git a/sources/slack/__init__.py b/sources/slack/__init__.py index e59401535..688eb18c2 100644 --- a/sources/slack/__init__.py +++ b/sources/slack/__init__.py @@ -26,6 +26,7 @@ def slack_source( selected_channels: Optional[List[str]] = dlt.config.value, table_per_channel: bool = True, replies: bool = False, + include_private_channels: bool = False, ) -> Iterable[DltResource]: """ The source for the Slack pipeline. Available resources are conversations, conversations_history @@ -40,6 +41,8 @@ def slack_source( table_per_channel: Boolean flag, True by default. If True - for each channel separate table with messages is created. Otherwise, all messages are put in one table. replies: Boolean flag indicating if you want a replies table to be present as well. False by default. + include_private_channels: Boolean flag indicating if you want to include private channels and group DMs. + Defaults to False. Requires appropriate OAuth scopes (groups:read, mpim:read). Returns: Iterable[DltResource]: A list of DltResource objects representing the data resources. @@ -57,7 +60,9 @@ def slack_source( ) def get_channels( - slack_api: SlackAPI, selected_channels: Optional[List[str]] + slack_api: SlackAPI, + selected_channels: Optional[List[str]], + include_private_channels: bool = False, ) -> Tuple[List[TDataItem], List[TDataItem]]: """ Returns channel fetched from slack and list of selected channels. @@ -65,15 +70,24 @@ def get_channels( Args: slack_api: Slack API instance. selected_channels: List of selected channels names or None. + include_private_channels: Whether to include private channels and group DMs. Returns: Tuple[List[TDataItem], List[TDataItem]]: fetched channels and selected fetched channels. """ channels: List[TDataItem] = [] + + # Define conversation types based on the include_private_channels parameter + if include_private_channels: + conversation_types = "public_channel,private_channel,mpim" + else: + conversation_types = "public_channel" + for page_data in slack_api.get_pages( resource="conversations.list", response_path="$.channels[*]", datetime_fields=DEFAULT_DATETIME_FIELDS, + params={"types": conversation_types}, ): channels.extend(page_data) @@ -87,7 +101,9 @@ def get_channels( fetch_channels = channels return channels, fetch_channels - channels, fetched_selected_channels = get_channels(api, selected_channels) + channels, fetched_selected_channels = get_channels( + api, selected_channels, include_private_channels + ) @dlt.resource(name="channels", primary_key="id", write_disposition="replace") def channels_resource() -> Iterable[TDataItem]: diff --git a/sources/slack_pipeline.py b/sources/slack_pipeline.py index c31970988..9ed774a90 100644 --- a/sources/slack_pipeline.py +++ b/sources/slack_pipeline.py @@ -3,7 +3,7 @@ from typing import List import dlt -from pendulum import datetime +from pendulum import datetime, now from slack import slack_source @@ -69,10 +69,33 @@ def get_users() -> None: print(load_info) +def get_messages_and_replies_of_a_private_channel(private_channel_name: str) -> None: + """Execute a pipeline that will load the messages and replies of a private channel.""" + pipeline = dlt.pipeline( + pipeline_name="slack", destination="duckdb", dataset_name="slack_data" + ) + + # Note: if you use the table_per_channel=True, the message-resource will be named after the + # channel, so if you want the replies to a channel, e.g. "3-technical-help", you have to name + # it like this: + # resources = ["3-technical-help", "3-technical-help_replies"] + source = slack_source( + start_date=now().subtract(weeks=1), + end_date=now(), + selected_channels=[private_channel_name], + include_private_channels=True, + replies=True, + ).with_resources(private_channel_name, f"{private_channel_name}_replies") + + load_info = pipeline.run( + source, + ) + print(load_info) + + if __name__ == "__main__": # Add your desired resources to the list... - # resources = ["access_logs", "conversations", "conversations_history"] - + # resources = ["access_logs", "messages", "channels", "replies"] # load_all_resources() # load all resources with replies @@ -81,4 +104,7 @@ def get_users() -> None: # select_resource(selected_channels=["dlt-github-ci"]) # select_resource(selected_channels=["1-announcements", "dlt-github-ci"]) + # private_channel_name = "test-private-channel" + # get_messages_and_replies_of_a_private_channel(private_channel_name=private_channel_name) + get_users() diff --git a/tests/slack/test_slack_source.py b/tests/slack/test_slack_source.py index 19fc4443b..c0d19d571 100644 --- a/tests/slack/test_slack_source.py +++ b/tests/slack/test_slack_source.py @@ -1,13 +1,16 @@ import dlt import pytest -from pendulum import datetime +from dlt.common import pendulum +from dlt.pipeline.exceptions import PipelineStepFailed from sources.slack import slack_source from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts +# NOTE: Since the number of users in our community slack got super big, most tests will exclude it + @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) -def test_tabel_per_channel(destination_name: str) -> None: +def test_table_per_channel(destination_name: str) -> None: pipeline = dlt.pipeline( pipeline_name="slack", destination=destination_name, @@ -17,10 +20,10 @@ def test_tabel_per_channel(destination_name: str) -> None: # Set page size to ensure we use pagination source = slack_source( - start_date=datetime(2024, 1, 31), - end_date=datetime(2024, 2, 1), + start_date=pendulum.now().subtract(weeks=1), + end_date=pendulum.now(), selected_channels=["dlt-github-ci", "3-technical-help"], - ) + ).with_resources("dlt-github-ci", "3-technical-help", "channels") load_info = pipeline.run(source) assert_load_info(load_info) @@ -33,8 +36,9 @@ def test_tabel_per_channel(destination_name: str) -> None: assert set(table_counts.keys()) >= set(expected_tables) assert table_counts["channels"] >= 15 - assert table_counts[ci_table] == 6 - assert table_counts[help_table] == 5 + # Note: Message counts may vary with dynamic dates, so we check for > 0 + assert table_counts[ci_table] > 0 + assert table_counts[help_table] > 0 @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -49,12 +53,17 @@ def test_all_resources(destination_name: str) -> None: # Set page size to ensure we use pagination source = slack_source( page_size=40, - start_date=datetime(2024, 1, 31), - end_date=datetime(2024, 2, 1), + start_date=pendulum.now().subtract(weeks=1), + end_date=pendulum.now(), selected_channels=["dlt-github-ci", "1-announcements"], table_per_channel=False, ) - load_info = pipeline.run(source) + almost_all_resources = [ + source + for source in source.resources.keys() + if source != "users" and source != "access_logs" + ] + load_info = pipeline.run(source.with_resources(*almost_all_resources)) assert_load_info(load_info) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] @@ -65,7 +74,26 @@ def test_all_resources(destination_name: str) -> None: assert set(table_counts.keys()) >= set(expected_tables) assert "replies" not in table_names assert table_counts["channels"] >= 15 - assert table_counts["messages"] == 34 + # Note: Message counts may vary with dynamic dates, so we check for > 0 + assert table_counts["messages"] > 0 + + +# @pytest.mark.skip(reason="Access logs require paid plan") +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_access_logs_resource(destination_name: str) -> None: + pipeline = dlt.pipeline( + pipeline_name="slack", + destination=destination_name, + dataset_name="slack_data", + dev_mode=True, + ) + source = slack_source( + start_date=pendulum.now().subtract(weeks=1), + end_date=pendulum.now(), + ).with_resources("access_logs") + with pytest.raises(PipelineStepFailed) as exc_info: + pipeline.run(source) + assert "just available on paid accounts" in str(exc_info.value) @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -79,19 +107,20 @@ def test_replies(destination_name: str) -> None: # Set page size to ensure we use pagination source = slack_source( - start_date=datetime(2023, 12, 19), - end_date=datetime(2024, 1, 10), - selected_channels=["1-announcements"], + start_date=pendulum.now().subtract(weeks=1), + end_date=pendulum.now(), + selected_channels=["3-technical-help"], replies=True, table_per_channel=False, - ) + ).with_resources("messages", "replies") load_info = pipeline.run(source) assert_load_info(load_info) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] table_counts = load_table_counts(pipeline, *table_names) assert "replies" in table_names - assert table_counts["replies"] >= 5 + # Note: Reply counts may vary with dynamic dates, so we check for > 0 + assert table_counts["replies"] > 0 @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -107,14 +136,17 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool) dev_mode=True, ) - # Set page size to ensure we use pagination + def get_resource_names(table_per_channel: bool, channel_name: str) -> str: + return channel_name if table_per_channel else "messages" + + channel_name = "1-announcements" + resource_names = get_resource_names(table_per_channel, channel_name) source = slack_source( - start_date=datetime(2023, 12, 19), - end_date=datetime(2024, 1, 10), - selected_channels=["1-announcements"], - replies=True, + start_date=pendulum.now().subtract(weeks=4), + end_date=pendulum.now().subtract(weeks=1), + selected_channels=[channel_name], table_per_channel=table_per_channel, - ) + ).with_resources(resource_names) pipeline.run(source) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] current_table_counts = load_table_counts(pipeline, *table_names) @@ -126,7 +158,6 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool) assert all( table_counts[table_name] == current_table_counts[table_name] for table_name in table_names - if table_name != "users" ) @@ -140,8 +171,13 @@ def test_users(destination_name: str) -> None: ) # Selected just one channel to avoid loading all channels - source = slack_source( - selected_channels=["1-announcements"], + source = ( + slack_source( + page_size=200, + selected_channels=["1-announcements"], + ) + .with_resources("users") + .add_limit(3) ) load_info = pipeline.run(source) assert_load_info(load_info) @@ -154,3 +190,27 @@ def test_users(destination_name: str) -> None: print(table_counts.keys()) assert set(table_counts.keys()) >= set(expected_tables) assert table_counts["users"] >= 300 # The number of users can increase over time + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_private_channels(destination_name: str) -> None: + pipeline = dlt.pipeline( + pipeline_name="slack", + destination=destination_name, + dataset_name="slack_data", + dev_mode=True, + ) + PRIVATE_CHANNEL_NAME = "test-private-channel" + source = slack_source( + start_date=pendulum.now().subtract(weeks=1), + end_date=pendulum.now(), + selected_channels=[PRIVATE_CHANNEL_NAME], + include_private_channels=True, + ).with_resources(PRIVATE_CHANNEL_NAME) + load_info = pipeline.run(source) + assert_load_info(load_info) + table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + + expected_message_table_name = f"{PRIVATE_CHANNEL_NAME}_message".replace("-", "_") + + assert expected_message_table_name in table_names