# Parsing Logs

### Overview

Transform unstructured log messages into structured, queryable data. By extracting fields from log messages, you can unlock powerful filtering, searching, and analysis capabilities in groundcover.

### Why Parse Logs?

Raw log messages often contain valuable information buried in unstructured text. Parsing allows you to:

* **Extract meaningful fields** from log messages for better searchability
* **Structure your data** to enable powerful filtering and querying
* **Enrich logs** with additional context and metadata
* **Standardize formats** across different services and applications

### Best Practices

1. **Use conditions effectively** - Only parse logs from relevant workloads to minimize processing overhead
2. **Test in the Parsing Playground** - Always test your parsing rules before deploying
3. **Cache intermediate results** - Use the `cache` variable for temporary storage during multi-step transformations
4. **Be specific with patterns** - More specific GROK patterns perform better and are less likely to cause false matches

### Common Parsing Use Cases

#### Parsing JSON Logs

Extract fields from JSON-formatted log messages.

**Log**

```json
{
  "body": "2025-03-23 14:55:12,456 ERROR {\"event\":\"user_login\",\"user_id\":12345,\"status\":\"failed\",\"ip\":\"192.168.1.10\"}"
}
```

**Rule**

```yaml
ottlRules:
  - ruleName: "parse_json_logs"
    conditions:
      - 'workload == "auth-service"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL}%{SPACE}(?<json_body>\\{.*\\})"))'
      - 'set(cache["parsed_json"], ParseJSON(cache["json_body"]))'
      - 'merge_maps(attributes, cache["parsed_json"], "insert")'
```

**Result**

```json
{
  "event": "user_login",
  "user_id": 12345,
  "status": "failed",
  "ip": "192.168.1.10"
}
```

#### Merging Multiline Logs

Merge multiline log entries (stack traces, system warnings, etc) into a single record.

**Log**

```json
{
  "body": "2026-01-15 10:30:00 ERROR java.lang.NullPointerException: user is null"
}
{
  "body": "  at com.example.UserService.getUser(UserService.java:42)"
}
{
  "body": "  at com.example.ApiHandler.handle(ApiHandler.java:128)"
}
```

**Rule**

```yaml
ottlRules:
  - ruleName: "java-stacktrace"
    conditions:
      - 'workload == "my-java-app"'
    statements:
      - 'multiline_merge("^\\d{4}-\\d{2}-\\d{2}")'
```

**Result**

{% code overflow="wrap" %}

```json
{
  "body": "2026-01-15 10:30:00 ERROR java.lang.NullPointerException: user is null\n  at com.example.UserService.getUser(UserService.java:42)\n  at com.example.ApiHandler.handle(ApiHandler.java:128)"
}
```

{% endcode %}

#### Extracting Structured Data with GROK Patterns

Use GROK patterns to extract specific fields from formatted log messages.

**Log**

```json
{
  "body": "2025-03-23 10:30:45 INFO User login attempt from 192.168.1.100"
}
```

**Rule**

```yaml
ottlRules:
  - ruleName: "extract_login_info"
    conditions:
      - 'workload == "auth-service"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}User login attempt from %{IP:source_ip}"))'
      - 'keep_matching_keys(cache, "^[a-z_]+$")'
      - 'merge_maps(attributes, cache, "insert")'
```

**Result**

```json
{
  "timestamp": "2025-03-23 10:30:45",
  "level": "INFO",
  "source_ip": "192.168.1.100"
}
```

#### Parsing Key-Value Pairs

Extract multiple key-value pairs from bracketed log sections.

**Log**

```json
{
  "body": "2025-03-23 15:20:12,512 - EventProcessor - DEBUG - Completed event processing [analyzer_name=disk-space-check] [node_id=7f5e9aa8412d4c0003a7b2c5] [service_id=813dd10298f77700029d54e3] [sensor_id=3] [tracking_code=19fd5b6e72c7e94088a9ff3d] [log_id=b'67acfe0c92d43000'] [instance_id=microservice-7894563210]"
}
```

**Rule**

```yaml
ottlRules:
  - ruleName: "parse_event_processor"
    conditions:
      - 'workload == "event-processor"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}-%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{LOGLEVEL:level}%{DATA}(?<kv>\\[%{GREEDYDATA})"))'
      - 'replace_pattern(cache["kv"], "[\\[\\]]", "")'
      - 'merge_maps(attributes, ParseKeyValue(cache["kv"]), "insert")'
      - 'set(attributes["timestamp"], cache["timestamp"])'
```

**Result**

```json
{
  "instance_id": "microservice-7894563210",
  "analyzer_name": "disk-space-check",
  "node_id": "7f5e9aa8412d4c0003a7b2c5",
  "service_id": "813dd10298f77700029d54e3",
  "sensor_id": "3",
  "tracking_code": "19fd5b6e72c7e94088a9ff3d",
  "log_id": "b67acfe0c92d43000",
  "timestamp": "2025-03-23 15:20:12,512"
}
```

#### Updating The Log Level

Some attributes in the groundcover log structure are considered "root level" and should not be updated under the `attributes` field.&#x20;

Examples of such attributes are `level` that can override the automatically detected log level.

To update the log level you should use a rule with the following syntax:

```yaml
ottlRules:
  - ruleName: "set_level"
    conditions:
      - 'workload == "event-processor"'
    statements:
      - 'set(level, "warning")'
```

### Common GROK Patterns

Here are some commonly used GROK patterns:

* `%{TIMESTAMP_ISO8601}` - ISO8601 timestamps (2025-03-23T10:30:45)
* `%{LOGLEVEL}` - Log levels (INFO, ERROR, DEBUG, etc.)
* `%{IP}` - IP addresses
* `%{NUMBER}` - Numeric values
* `%{WORD}` - Single words
* `%{NOTSPACE}` - Non-whitespace characters
* `%{GREEDYDATA}` - Match everything (greedy)
* `%{DATA}` - Match everything (non-greedy)
* `%{SPACE}` - Whitespace characters

### Key Functions

#### ExtractGrokPatterns

Extracts structured data using GROK patterns.

**Basic Usage:**

```yaml
- 'set(cache, ExtractGrokPatterns(body, "pattern"))'
```

**Example - Extract timestamp and error code:**

```yaml
ottlRules:
  - ruleName: "extract_error_details"
    conditions:
      - 'level == "error"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "^%{TIMESTAMP_ISO8601:timestamp}.*Error %{NUMBER:error_code}:"))'
      - 'merge_maps(attributes, cache, "insert")'
```

#### ParseJSON

Parses JSON strings into structured attributes.

**Basic Usage:**

```yaml
- 'set(cache["parsed"], ParseJSON(cache["json_string"]))'
```

**Example - Parse embedded JSON:**

```yaml
ottlRules:
  - ruleName: "parse_json_payload"
    conditions:
      - 'format == "json"'
    statements:
      - 'set(attributes["parsed_data"], ParseJSON(body))'
      - 'set(attributes["user_id"], attributes["parsed_data"]["user"]["id"])'
```

#### ParseKeyValue

Parses key=value formatted strings.

**Basic Usage:**

```yaml
- 'merge_maps(attributes, ParseKeyValue(cache["kv"]), "insert")'
```

**Example - Parse query parameters:**

```yaml
ottlRules:
  - ruleName: "parse_query_params"
    conditions:
      - 'IsMatch(body, "params=")'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "params=(?<params>.+)$"))'
      - 'merge_maps(attributes, ParseKeyValue(cache["params"], "&", "="), "insert")'
```

#### merge\_maps

Merges extracted data into the attributes map.

**Basic Usage:**

```yaml
- 'merge_maps(attributes, cache, "insert")'
```

**Modes:**

* `insert` - Only add keys that don't exist
* `update` - Only update keys that exist
* `upsert` - Add or update keys (default behavior)

**Example:**

```yaml
ottlRules:
  - ruleName: "merge_extracted_fields"
    conditions:
      - 'workload == "api-gateway"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "user=(?<user>\\w+) action=(?<action>\\w+)"))'
      - 'merge_maps(attributes, cache, "upsert")'
```

#### keep\_matching\_keys

Filters a map to keep only keys matching a regex pattern.

**Basic Usage:**

```yaml
- 'keep_matching_keys(cache, "^[a-z_]+$")'
```

**Example - Keep only lowercase field names:**

```yaml
ottlRules:
  - ruleName: "filter_extracted_fields"
    conditions:
      - 'workload == "data-processor"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "(?<TIMESTAMP>\\d+) (?<user_id>\\d+) (?<action>\\w+)"))'
      - 'keep_matching_keys(cache, "^[a-z_]+$")'  # Keeps user_id and action, drops TIMESTAMP
      - 'merge_maps(attributes, cache, "insert")'
```

#### set

Sets a value to a specific field or attribute.

**Basic Usage:**

```yaml
- 'set(attributes["field"], "value")'
- 'set(cache["temp"], body)'
```

**Example - Set computed fields:**

```yaml
ottlRules:
  - ruleName: "add_computed_fields"
    conditions:
      - 'workload == "payment-service"'
    statements:
      - 'set(attributes["environment"], "production")'
      - 'set(attributes["service_type"], "payment")'
      - 'set(attributes["processed_at"], UnixNano())'
```

#### replace\_pattern

Replaces parts of a string matching a regex pattern.

**Basic Usage:**

```yaml
- 'replace_pattern(target_field, "pattern", "replacement")'
```

**Example - Clean up log messages:**

```yaml
ottlRules:
  - ruleName: "clean_log_format"
    conditions:
      - 'container_name == "nginx"'
    statements:
      - 'replace_pattern(body, "\\[|\\]", "")'  # Remove brackets
      - 'replace_pattern(body, "\\s+", " ")'    # Normalize whitespace
```

#### delete\_key

Removes a specific key from a map.

**Basic Usage:**

```yaml
- 'delete_key(attributes, "field_name")'
```

**Example - Remove temporary fields:**

```yaml
ottlRules:
  - ruleName: "cleanup_fields"
    conditions:
      - 'workload == "processor"'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "(?<temp_field>\\w+) (?<user_id>\\d+)"))'
      - 'merge_maps(attributes, cache, "insert")'
      - 'delete_key(attributes, "temp_field")'
```

#### Concat

Concatenates multiple strings or values together.

**Basic Usage:**

```yaml
- 'set(attributes["combined"], Concat([value1, value2, value3], " - "))'
```

**Example - Create composite fields:**

```yaml
ottlRules:
  - ruleName: "create_composite_id"
    conditions:
      - 'attributes["user_id"] != nil'
    statements:
      - 'set(attributes["request_id"], Concat([attributes["user_id"], attributes["timestamp"]], "_"))'
      - 'set(attributes["full_path"], Concat([attributes["namespace"], "/", attributes["workload"]], ""))'
```

#### ToLowerCase / ToUpperCase

Converts strings to lowercase or uppercase.

**Basic Usage:**

```yaml
- 'set(attributes["normalized_country"], ToLowerCase(attributes["country"]))'
- 'set(attributes["region"], ToUpperCase(attributes["region"]))'
```

**Example - Normalize field values:**

```yaml
ottlRules:
  - ruleName: "normalize_values"
    conditions:
      - 'attributes["handle"] != nil'
    statements:
      - 'set(attributes["handle"], ToLowerCase(attributes["handle"]))'
      - 'set(attributes["host"], ToUpperCase(attributes["host"]))'
```

#### Substring

Extracts a portion of a string.

**Basic Usage:**

```yaml
- 'set(attributes["short_id"], Substring(attributes["request_id"], 0, 8))'
```

**Example - Extract prefixes and suffixes:**

```yaml
ottlRules:
  - ruleName: "extract_id_prefix"
    conditions:
      - 'attributes["transaction_id"] != nil'
    statements:
      - 'set(attributes["tx_prefix"], Substring(attributes["transaction_id"], 0, 6))'
      - 'set(attributes["tx_type"], Substring(attributes["transaction_id"], 0, 3))'
```

#### Split

Splits a string into an array based on a delimiter.

**Basic Usage:**

```yaml
- 'set(cache["parts"], Split(body, ","))'
```

**Example - Parse comma-separated values:**

```yaml
ottlRules:
  - ruleName: "parse_csv_log"
    conditions:
      - 'format == "csv"'
    statements:
      - 'set(cache["fields"], Split(body, ","))'
      - 'set(attributes["user_id"], cache["fields"][0])'
      - 'set(attributes["action"], cache["fields"][1])'
      - 'set(attributes["timestamp"], cache["fields"][2])'
```

#### Len

Returns the length of a string or array.

**Basic Usage:**

```yaml
- 'set(attributes["message_length"], Len(body))'
```

**Example - Conditional processing based on length:**

```yaml
ottlRules:
  - ruleName: "truncate_long_messages"
    conditions:
      - 'Len(body) > 1000'
    statements:
      - 'set(attributes["original_length"], Len(body))'
      - 'set(body, Substring(body, 0, 1000))'
      - 'set(attributes["truncated"], true)'
```

#### Int / Double / String

Type conversion functions.

**Basic Usage:**

```yaml
- 'set(attributes["status_code_int"], Int(attributes["status_code"]))'
- 'set(attributes["response_time_ms"], Double(attributes["response_time"]))'
- 'set(attributes["user_id_str"], String(attributes["user_id"]))'
```

**Example - Convert and compute:**

```yaml
ottlRules:
  - ruleName: "convert_and_calculate"
    conditions:
      - 'attributes["duration"] != nil'
    statements:
      - 'set(attributes["duration_ms"], Int(attributes["duration"]))'
      - 'set(attributes["duration_sec"], Double(attributes["duration"]) / 1000.0)'
      - 'set(attributes["is_slow"], attributes["duration_ms"] > 5000)'
```

#### IsMatch

Checks if a field matches a regex pattern.

**Basic Usage:**

```yaml
- 'set(attributes["is_error"], IsMatch(body, "ERROR|FATAL"))'
```

**Example - Conditional field extraction:**

```yaml
ottlRules:
  - ruleName: "extract_http_status"
    conditions:
      - 'IsMatch(body, "HTTP/\\d\\.\\d\\s+\\d{3}")'
    statements:
      - 'set(cache, ExtractGrokPatterns(body, "HTTP/%{NUMBER:http_version}\\s+%{NUMBER:status_code}"))'
      - 'merge_maps(attributes, cache, "insert")'
      - 'set(attributes["is_error"], Int(attributes["status_code"]) >= 400)'
```

#### multiline\_merge

Reassembles multiline log entries into a single merge record.

**Basic Usage:**

```yaml
- 'multiline_merge("^\\d{4}-\\d{2}-\\d{2}")'
```

**Arguments:**

* `first_line_pattern` (string, required) - Regex matching the first line of a new block. Lines not matching are continuations.
* `max_lines` (int, optional, default: 128)- Maximum lines per block. Buffer is flushed when reached.
* `max_time` (string, optional, default: "800ms")- Maximum time for continuations. Buffer is flushed when reached.

**Example - Merge then parse with grok:**

```yaml
ottlRules:
  - ruleName: "merge-and-extract"
    conditions:
      - 'workload == "api-server"'
    statements:
      - 'multiline_merge("^\\d{4}-\\d{2}-\\d{2}", 64, "500ms")'
      - 'set(cache, ExtractGrokPatterns(body, "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:message}"))'
      - 'merge_maps(attributes, cache, "insert")'
```

**Important notes:**

* Place `multiline_merge` as the first statement in the rule. Subsequent statements see the merged content when a flush occurs.
* The rule's conditions must capture both first lines and continuations, don't filter on a pattern that only matches first lines.
* Buffers are isolated per instance. Logs from different pods. containers or files never mix.
