Skip to content
20 changes: 18 additions & 2 deletions sources/slack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def slack_source(
selected_channels: Optional[List[str]] = dlt.config.value,
table_per_channel: bool = True,
replies: bool = False,
include_private_channels: bool = False,
) -> Iterable[DltResource]:
"""
The source for the Slack pipeline. Available resources are conversations, conversations_history
Expand All @@ -40,6 +41,8 @@ def slack_source(
table_per_channel: Boolean flag, True by default. If True - for each channel separate table with messages is created.
Otherwise, all messages are put in one table.
replies: Boolean flag indicating if you want a replies table to be present as well. False by default.
include_private_channels: Boolean flag indicating if you want to include private channels and group DMs.
Defaults to False. Requires appropriate OAuth scopes (groups:read, mpim:read).

Returns:
Iterable[DltResource]: A list of DltResource objects representing the data resources.
Expand All @@ -57,23 +60,34 @@ def slack_source(
)

def get_channels(
slack_api: SlackAPI, selected_channels: Optional[List[str]]
slack_api: SlackAPI,
selected_channels: Optional[List[str]],
include_private_channels: bool = False,
) -> Tuple[List[TDataItem], List[TDataItem]]:
"""
Returns channel fetched from slack and list of selected channels.

Args:
slack_api: Slack API instance.
selected_channels: List of selected channels names or None.
include_private_channels: Whether to include private channels and group DMs.

Returns:
Tuple[List[TDataItem], List[TDataItem]]: fetched channels and selected fetched channels.
"""
channels: List[TDataItem] = []

# Define conversation types based on the include_private_channels parameter
if include_private_channels:
conversation_types = "public_channel,private_channel,mpim"
else:
conversation_types = "public_channel"

for page_data in slack_api.get_pages(
resource="conversations.list",
response_path="$.channels[*]",
datetime_fields=DEFAULT_DATETIME_FIELDS,
params={"types": conversation_types},
):
channels.extend(page_data)

Expand All @@ -87,7 +101,9 @@ def get_channels(
fetch_channels = channels
return channels, fetch_channels

channels, fetched_selected_channels = get_channels(api, selected_channels)
channels, fetched_selected_channels = get_channels(
api, selected_channels, include_private_channels
)

@dlt.resource(name="channels", primary_key="id", write_disposition="replace")
def channels_resource() -> Iterable[TDataItem]:
Expand Down
32 changes: 29 additions & 3 deletions sources/slack_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List

import dlt
from pendulum import datetime
from pendulum import datetime, now
from slack import slack_source


Expand Down Expand Up @@ -69,10 +69,33 @@ def get_users() -> None:
print(load_info)


def get_messages_and_replies_of_a_private_channel(private_channel_name: str) -> None:
"""Execute a pipeline that will load the messages and replies of a private channel."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination="duckdb", dataset_name="slack_data"
)

# Note: if you use the table_per_channel=True, the message-resource will be named after the
# channel, so if you want the replies to a channel, e.g. "3-technical-help", you have to name
# it like this:
# resources = ["3-technical-help", "3-technical-help_replies"]
source = slack_source(
start_date=now().subtract(weeks=1),
end_date=now(),
selected_channels=[private_channel_name],
include_private_channels=True,
replies=True,
).with_resources(private_channel_name, f"{private_channel_name}_replies")

load_info = pipeline.run(
source,
)
print(load_info)


if __name__ == "__main__":
# Add your desired resources to the list...
# resources = ["access_logs", "conversations", "conversations_history"]

# resources = ["access_logs", "messages", "channels", "replies"]
# load_all_resources()

# load all resources with replies
Expand All @@ -81,4 +104,7 @@ def get_users() -> None:
# select_resource(selected_channels=["dlt-github-ci"])
# select_resource(selected_channels=["1-announcements", "dlt-github-ci"])

# private_channel_name = "test-private-channel"
# get_messages_and_replies_of_a_private_channel(private_channel_name=private_channel_name)

get_users()
110 changes: 85 additions & 25 deletions tests/slack/test_slack_source.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import dlt
import pytest
from pendulum import datetime
from dlt.common import pendulum
from dlt.pipeline.exceptions import PipelineStepFailed

from sources.slack import slack_source
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts

# NOTE: Since the number of users in our community slack got super big, most tests will exclude it


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_tabel_per_channel(destination_name: str) -> None:
def test_table_per_channel(destination_name: str) -> None:
pipeline = dlt.pipeline(
pipeline_name="slack",
destination=destination_name,
Expand All @@ -17,10 +20,10 @@ def test_tabel_per_channel(destination_name: str) -> None:

# Set page size to ensure we use pagination
source = slack_source(
start_date=datetime(2024, 1, 31),
end_date=datetime(2024, 2, 1),
start_date=pendulum.now().subtract(weeks=1),
end_date=pendulum.now(),
selected_channels=["dlt-github-ci", "3-technical-help"],
)
).with_resources("dlt-github-ci", "3-technical-help", "channels")
load_info = pipeline.run(source)
assert_load_info(load_info)

Expand All @@ -33,8 +36,9 @@ def test_tabel_per_channel(destination_name: str) -> None:

assert set(table_counts.keys()) >= set(expected_tables)
assert table_counts["channels"] >= 15
assert table_counts[ci_table] == 6
assert table_counts[help_table] == 5
# Note: Message counts may vary with dynamic dates, so we check for > 0
assert table_counts[ci_table] > 0
assert table_counts[help_table] > 0


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
Expand All @@ -49,12 +53,17 @@ def test_all_resources(destination_name: str) -> None:
# Set page size to ensure we use pagination
source = slack_source(
page_size=40,
start_date=datetime(2024, 1, 31),
end_date=datetime(2024, 2, 1),
start_date=pendulum.now().subtract(weeks=1),
end_date=pendulum.now(),
selected_channels=["dlt-github-ci", "1-announcements"],
table_per_channel=False,
)
load_info = pipeline.run(source)
almost_all_resources = [
source
for source in source.resources.keys()
if source != "users" and source != "access_logs"
]
load_info = pipeline.run(source.with_resources(*almost_all_resources))
assert_load_info(load_info)

table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
Expand All @@ -65,7 +74,26 @@ def test_all_resources(destination_name: str) -> None:
assert set(table_counts.keys()) >= set(expected_tables)
assert "replies" not in table_names
assert table_counts["channels"] >= 15
assert table_counts["messages"] == 34
# Note: Message counts may vary with dynamic dates, so we check for > 0
assert table_counts["messages"] > 0


# @pytest.mark.skip(reason="Access logs require paid plan")
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_access_logs_resource(destination_name: str) -> None:
pipeline = dlt.pipeline(
pipeline_name="slack",
destination=destination_name,
dataset_name="slack_data",
dev_mode=True,
)
source = slack_source(
start_date=pendulum.now().subtract(weeks=1),
end_date=pendulum.now(),
).with_resources("access_logs")
with pytest.raises(PipelineStepFailed) as exc_info:
pipeline.run(source)
assert "just available on paid accounts" in str(exc_info.value)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
Expand All @@ -79,19 +107,20 @@ def test_replies(destination_name: str) -> None:

# Set page size to ensure we use pagination
source = slack_source(
start_date=datetime(2023, 12, 19),
end_date=datetime(2024, 1, 10),
selected_channels=["1-announcements"],
start_date=pendulum.now().subtract(weeks=1),
end_date=pendulum.now(),
selected_channels=["3-technical-help"],
replies=True,
table_per_channel=False,
)
).with_resources("messages", "replies")
load_info = pipeline.run(source)
assert_load_info(load_info)

table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
table_counts = load_table_counts(pipeline, *table_names)
assert "replies" in table_names
assert table_counts["replies"] >= 5
# Note: Reply counts may vary with dynamic dates, so we check for > 0
assert table_counts["replies"] > 0


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
Expand All @@ -107,14 +136,17 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool)
dev_mode=True,
)

# Set page size to ensure we use pagination
def get_resource_names(table_per_channel: bool, channel_name: str) -> str:
return channel_name if table_per_channel else "messages"

channel_name = "1-announcements"
resource_names = get_resource_names(table_per_channel, channel_name)
source = slack_source(
start_date=datetime(2023, 12, 19),
end_date=datetime(2024, 1, 10),
selected_channels=["1-announcements"],
replies=True,
start_date=pendulum.now().subtract(weeks=4),
end_date=pendulum.now().subtract(weeks=1),
selected_channels=[channel_name],
table_per_channel=table_per_channel,
)
).with_resources(resource_names)
pipeline.run(source)
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
current_table_counts = load_table_counts(pipeline, *table_names)
Expand All @@ -126,7 +158,6 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool)
assert all(
table_counts[table_name] == current_table_counts[table_name]
for table_name in table_names
if table_name != "users"
)


Expand All @@ -140,8 +171,13 @@ def test_users(destination_name: str) -> None:
)

# Selected just one channel to avoid loading all channels
source = slack_source(
selected_channels=["1-announcements"],
source = (
slack_source(
page_size=200,
selected_channels=["1-announcements"],
)
.with_resources("users")
.add_limit(3)
)
load_info = pipeline.run(source)
assert_load_info(load_info)
Expand All @@ -154,3 +190,27 @@ def test_users(destination_name: str) -> None:
print(table_counts.keys())
assert set(table_counts.keys()) >= set(expected_tables)
assert table_counts["users"] >= 300 # The number of users can increase over time


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_private_channels(destination_name: str) -> None:
pipeline = dlt.pipeline(
pipeline_name="slack",
destination=destination_name,
dataset_name="slack_data",
dev_mode=True,
)
PRIVATE_CHANNEL_NAME = "test-private-channel"
source = slack_source(
start_date=pendulum.now().subtract(weeks=1),
end_date=pendulum.now(),
selected_channels=[PRIVATE_CHANNEL_NAME],
include_private_channels=True,
).with_resources(PRIVATE_CHANNEL_NAME)
load_info = pipeline.run(source)
assert_load_info(load_info)
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]

expected_message_table_name = f"{PRIVATE_CHANNEL_NAME}_message".replace("-", "_")

assert expected_message_table_name in table_names
Loading