English | 中文
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It not only supports multiple input/output sources and processors, but also enables easy loading and execution of machine learning models, enabling streaming data and inference, anomaly detection, and complex event processing.
ArkFlow enlisted in the CNCF Cloud Native Landscape.
- High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
- Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
- Powerful Processing Capabilities: Built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, batch processing, and other processors
- Extensible: Modular design, easy to extend with new input, buffer, output, and processor components
# Clone the repository
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow
# Build the project
cargo build --release
# Run tests
cargo test- Create a configuration file config.yaml:
logging:
  level: info
streams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1s
      batch_size: 10
    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT * FROM flow WHERE value >= 10"
    output:
      type: "stdout"
    error_output:
      type: "stdout"- Run ArkFlow:
./target/release/arkflow --config config.yamlArkFlow uses YAML format configuration files, supporting the following main configuration items:
logging:
  level: info  # Log level: debug, info, warn, error
streams: # Stream definition list
  - input:      # Input configuration
    # ...
    pipeline:   # Processing pipeline configuration
    # ...
    output:     # Output configuration
    # ...
    error_output: # Error output configuration
    # ...
    buffer:     # Buffer configuration
    # ... ArkFlow supports multiple input sources:
- Kafka: Read data from Kafka topics
- MQTT: Subscribe to messages from MQTT topics
- HTTP: Receive data via HTTP
- File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
- Generator: Generate test data
- Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)
- Nats: Subscribe to messages from Nats topics
- Redis: Subscribe to messages from Redis channels or lists
- Websocket: Subscribe to messages from WebSocket connections
- Modbus: Read data from Modbus devices
Example:
input:
  type: kafka
  brokers:
    - localhost:9092
  topics:
    - test-topic
  consumer_group: test-group
  client_id: arkflow
  start_from_latest: trueArkFlow provides multiple data processors:
- JSON: JSON data processing and transformation
- SQL: Process data using SQL queries
- Protobuf: Protobuf encoding/decoding
- Batch Processing: Process messages in batches
- Vrl: Process data using VRL
Example:
pipeline:
  thread_num: 4
  processors:
    - type: json_to_arrow
    - type: sql
      query: "SELECT * FROM flow WHERE value >= 10"ArkFlow supports multiple output targets:
- Kafka: Write data to Kafka topics
- MQTT: Publish messages to MQTT topics
- HTTP: Send data via HTTP
- Standard Output: Output data to the console
- Drop: Discard data
- Nats: Publish messages to Nats topics
Example:
output:
  type: kafka
  brokers:
    - localhost:9092
  topic:
    type: value
    value:
      type: value
      value: test-topic
  client_id: arkflow-producerArkFlow supports multiple error output targets:
- Kafka: Write error data to Kafka topics
- MQTT: Publish error messages to MQTT topics
- HTTP: Send error data via HTTP
- Standard Output: Output error data to the console
- Drop: Discard error data
- Nats: Publish messages to Nats topics
Example:
error_output:
  type: kafka
  brokers:
    - localhost:9092
  topic:
    type: value
    value: error-topic
  client_id: error-arkflow-producerArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:
- Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation.
- Session Window: The Session Window buffer component provides a session-based message grouping mechanism where messages are grouped based on activity gaps. It implements a session window that closes after a configurable period of inactivity.
- Sliding Window: The Sliding Window buffer component provides a time-based windowing mechanism for processing message batches. It implements a sliding window algorithm with configurable window size, slide interval and slide size.
- Tumbling Window: The Tumbling Window buffer component provides a fixed-size, non-overlapping windowing mechanism for processing message batches. It implements a tumbling window algorithm with configurable interval settings.
Example:
buffer:
  type: memory
  capacity: 10000  # Maximum number of messages to buffer
  timeout: 10s  # Maximum time to buffer messagesstreams:
  - input:
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - test-topic
      consumer_group: test-group
    pipeline:
      thread_num: 4
      processors:
        - type: json_to_arrow
        - type: sql
          query: "SELECT * FROM flow WHERE value > 100"
    output:
      type: kafka
      brokers:
        - localhost:9092
      topic:
        type: value
        value: test-topicstreams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1ms
      batch_size: 10000
    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
    output:
      type: "stdout"- Conalog(Country: South Korea)
ArkFlow is licensed under the Apache License 2.0.
Discord: https://discord.gg/CwKhzb8pux
If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!