Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 12 additions & 69 deletions agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ spec:
configMapKeyRef:
name: cluster-config
key: cluster_location
image: gcr.io/stackdriver-agents/stackdriver-logging-agent:1.6.26
image: gcr.io/stackdriver-kubernetes-1337/stackdriver-logging-agent:jkohen-gke-plugin
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
Expand All @@ -181,7 +181,7 @@ spec:
resources:
limits:
cpu: "1"
memory: 500Mi
memory: 200Mi
requests:
cpu: 100m
memory: 200Mi
Expand Down Expand Up @@ -337,42 +337,12 @@ data:
</filter>

<filter reform.**>
# We have to use record_modifier because only this plugin supports complex
# logic to modify record the way we need.
@type record_modifier
enable_ruby true
<record>
# Extract "kubernetes"->"labels" and set them as
# "logging.googleapis.com/labels". Prefix these labels with
# "k8s-pod-labels" to distinguish with other labels and avoid
# label name collision with other types of labels.
_dummy_ ${if record.is_a?(Hash) && record.has_key?('kubernetes') && record['kubernetes'].has_key?('labels') && record['kubernetes']['labels'].is_a?(Hash); then; record["logging.googleapis.com/labels"] = record['kubernetes']['labels'].map{ |k, v| ["k8s-pod-label/#{k}", v]}.to_h; end; nil}
</record>
# Delete this dummy field and the rest of "kubernetes" and "docker".
remove_keys _dummy_,kubernetes,docker
@type gke
mode "1"
</filter>

<match reform.**>
@type record_reformer
enable_ruby true
<record>
# Extract local_resource_id from tag for 'k8s_container' monitored
# resource. The format is:
# 'k8s_container.<namespace_name>.<pod_name>.<container_name>'.
"logging.googleapis.com/local_resource_id" ${"k8s_container.#{tag_suffix[4].rpartition('.')[0].split('_')[1]}.#{tag_suffix[4].rpartition('.')[0].split('_')[0]}.#{tag_suffix[4].rpartition('.')[0].split('_')[2].rpartition('-')[0]}"}
# Rename the field 'log' to a more generic field 'message'. This way the
# fluent-plugin-google-cloud knows to flatten the field as textPayload
# instead of jsonPayload after extracting 'time', 'severity' and
# 'stream' from the record.
message ${record['log']}
# If 'severity' is not set, assume stderr is ERROR and stdout is INFO.
severity ${record['severity'] || if record['stream'] == 'stderr' then 'ERROR' else 'INFO' end}
</record>
tag ${if record['stream'] == 'stderr' then 'raw.stderr' else 'raw.stdout' end}
remove_keys stream,log
</match>

# Detect exceptions in the log output and forward them as one log entry.
# Detect exceptions in the log output and insert the entry back into the pipeline as one log entry.
# TODO: enable once we build with rewrite_tag_filter. see git branch jkohen-gke-plugin-detect-exceptions
<match {raw.stderr,raw.stdout}>
@type detect_exceptions

Expand Down Expand Up @@ -663,16 +633,9 @@ data:

# This section is exclusive for k8s_container logs. Those come with
# 'stderr'/'stdout' tags.
# TODO(instrumentation): Reconsider this workaround later.
# Trim the entries which exceed slightly less than 100KB, to avoid
# dropping them. It is a necessity, because Stackdriver only supports
# entries that are up to 100KB in size.
<filter {stderr,stdout}>
@type record_transformer
enable_ruby true
<record>
message ${record['message'].length > 100000 ? "[Trimmed]#{record['message'][0..100000]}..." : record['message']}
</record>
<filter reform.**>
@type gke
mode "2"
</filter>

# Do not collect fluentd's own logs to avoid infinite loops.
Expand All @@ -686,27 +649,11 @@ data:
@type add_insert_ids
</filter>

# This filter parses the 'source' field created for glog lines into a single
# top-level field, for proper processing by the output plugin.
# For example, if a record includes:
# {"source":"handlers.go:131"},
# then the following entry will be added to the record:
# {"logging.googleapis.com/sourceLocation":
# {"file":"handlers.go", "line":"131"}
# }
<filter **>
@type record_transformer
enable_ruby true
<record>
"logging.googleapis.com/sourceLocation" ${if record.is_a?(Hash) && record.has_key?('source'); source_parts = record['source'].split(':', 2); {'file' => source_parts[0], 'line' => source_parts[1]} if source_parts.length == 2; else; nil; end}
</record>
</filter>

# This section is exclusive for k8s_container logs. These logs come with
# 'stderr'/'stdout' tags.
# We use a separate output stanza for 'k8s_node' logs with a smaller buffer
# because node logs are less important than user's container logs.
<match {stderr,stdout}>
<match reform.**>
@type google_cloud

# Try to detect JSON formatted log entries.
Expand Down Expand Up @@ -742,13 +689,9 @@ data:
adjust_invalid_timestamps false
</match>

# Attach local_resource_id for 'k8s_node' monitored resource.
<filter **>
@type record_transformer
enable_ruby true
<record>
"logging.googleapis.com/local_resource_id" ${"k8s_node.#{ENV['NODE_NAME']}"}
</record>
@type gke
mode "3"
</filter>

# This section is exclusive for 'k8s_node' logs. These logs come with tags
Expand Down
79 changes: 11 additions & 68 deletions logging-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
configMapKeyRef:
name: cluster-config
key: cluster_location
image: gcr.io/stackdriver-agents/stackdriver-logging-agent:1.6.26
image: gcr.io/stackdriver-kubernetes-1337/stackdriver-logging-agent:jkohen-gke-plugin
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
Expand Down Expand Up @@ -217,42 +217,12 @@ data:
</filter>

<filter reform.**>
# We have to use record_modifier because only this plugin supports complex
# logic to modify record the way we need.
@type record_modifier
enable_ruby true
<record>
# Extract "kubernetes"->"labels" and set them as
# "logging.googleapis.com/labels". Prefix these labels with
# "k8s-pod-labels" to distinguish with other labels and avoid
# label name collision with other types of labels.
_dummy_ ${if record.is_a?(Hash) && record.has_key?('kubernetes') && record['kubernetes'].has_key?('labels') && record['kubernetes']['labels'].is_a?(Hash); then; record["logging.googleapis.com/labels"] = record['kubernetes']['labels'].map{ |k, v| ["k8s-pod-label/#{k}", v]}.to_h; end; nil}
</record>
# Delete this dummy field and the rest of "kubernetes" and "docker".
remove_keys _dummy_,kubernetes,docker
@type gke
mode "1"
</filter>

<match reform.**>
@type record_reformer
enable_ruby true
<record>
# Extract local_resource_id from tag for 'k8s_container' monitored
# resource. The format is:
# 'k8s_container.<namespace_name>.<pod_name>.<container_name>'.
"logging.googleapis.com/local_resource_id" ${"k8s_container.#{tag_suffix[4].rpartition('.')[0].split('_')[1]}.#{tag_suffix[4].rpartition('.')[0].split('_')[0]}.#{tag_suffix[4].rpartition('.')[0].split('_')[2].rpartition('-')[0]}"}
# Rename the field 'log' to a more generic field 'message'. This way the
# fluent-plugin-google-cloud knows to flatten the field as textPayload
# instead of jsonPayload after extracting 'time', 'severity' and
# 'stream' from the record.
message ${record['log']}
# If 'severity' is not set, assume stderr is ERROR and stdout is INFO.
severity ${record['severity'] || if record['stream'] == 'stderr' then 'ERROR' else 'INFO' end}
</record>
tag ${if record['stream'] == 'stderr' then 'raw.stderr' else 'raw.stdout' end}
remove_keys stream,log
</match>

# Detect exceptions in the log output and forward them as one log entry.
# Detect exceptions in the log output and insert the entry back into the pipeline as one log entry.
# TODO: enable once we build with rewrite_tag_filter. see git branch jkohen-gke-plugin-detect-exceptions
<match {raw.stderr,raw.stdout}>
@type detect_exceptions

Expand Down Expand Up @@ -543,16 +513,9 @@ data:

# This section is exclusive for k8s_container logs. Those come with
# 'stderr'/'stdout' tags.
# TODO(instrumentation): Reconsider this workaround later.
# Trim the entries which exceed slightly less than 100KB, to avoid
# dropping them. It is a necessity, because Stackdriver only supports
# entries that are up to 100KB in size.
<filter {stderr,stdout}>
@type record_transformer
enable_ruby true
<record>
message ${record['message'].length > 100000 ? "[Trimmed]#{record['message'][0..100000]}..." : record['message']}
</record>
<filter reform.**>
@type gke
mode "2"
</filter>

# Do not collect fluentd's own logs to avoid infinite loops.
Expand All @@ -566,27 +529,11 @@ data:
@type add_insert_ids
</filter>

# This filter parses the 'source' field created for glog lines into a single
# top-level field, for proper processing by the output plugin.
# For example, if a record includes:
# {"source":"handlers.go:131"},
# then the following entry will be added to the record:
# {"logging.googleapis.com/sourceLocation":
# {"file":"handlers.go", "line":"131"}
# }
<filter **>
@type record_transformer
enable_ruby true
<record>
"logging.googleapis.com/sourceLocation" ${if record.is_a?(Hash) && record.has_key?('source'); source_parts = record['source'].split(':', 2); {'file' => source_parts[0], 'line' => source_parts[1]} if source_parts.length == 2; else; nil; end}
</record>
</filter>

# This section is exclusive for k8s_container logs. These logs come with
# 'stderr'/'stdout' tags.
# We use a separate output stanza for 'k8s_node' logs with a smaller buffer
# because node logs are less important than user's container logs.
<match {stderr,stdout}>
<match reform.**>
@type google_cloud

# Try to detect JSON formatted log entries.
Expand Down Expand Up @@ -622,13 +569,9 @@ data:
adjust_invalid_timestamps false
</match>

# Attach local_resource_id for 'k8s_node' monitored resource.
<filter **>
@type record_transformer
enable_ruby true
<record>
"logging.googleapis.com/local_resource_id" ${"k8s_node.#{ENV['NODE_NAME']}"}
</record>
@type gke
mode "3"
</filter>

# This section is exclusive for 'k8s_node' logs. These logs come with tags
Expand Down