Flink

Contents

Flink is a powerful dataflow engine used to process stream data.

Concepts

Topic --> partition by Id --> window Over 5 minute Intervals --> Database
  |                     |                                           |
  |                     |                                           |
Sources               Operator                                     Sink

Sources: kafka / cdc / file systems

Sinks: data target place –> DB/warehouses/MQ/Filesystem

Operators: are the building blocks, input is one or more streams, produces one or more output streams

Map: Transform each element individually
Filter: Remove elements that don't match a condition
Reduce: Combine elements within a key
Window: Group elements by time or count
Join: Combine elements from two streams
FlatMap: Transform each element into zero or more elements
Aggregate: Compute aggregates over windows or keys

Streams: unbounded sequence of data, array of events

watermark: handle out-of-order events, like network delays, different processing speeds across partitions

  1. Bounded out of orderness: wait for events up to a certain time

  2. No watermarks: no wait for late message, process event as they arrive

State: use state to track stateful things, and use checkpoint to restore state

Value State
List State
Map State
Aggregating State
Reducing State

就是reduce做简单的counter,然后ProcessWindowFunction做其他的一些比如log啊,附带对每个窗口的逻辑

window: group elements in stream by time or count

Tumbling window: Fixed-size, non-overlapping.   10:00, 10:05, 10:10
Sliding window: FIxed-size, overlapping: 10:00, 10:01, 10:05 每隔一段时间开启一个window
session window: Dynamic-size window based on activity

Cluster Architecture

1) job Manager: a coordinator for tasks, checkpoints, failures (supervisor)

2) Task Manager: execute the acutal adata processing, each task manager provides a certain number of processing slots

Job Managers are leader-based. This means that there is a single Job Manager that is responsible for coordinating the work in the cluster. High availability is achieved by deploying multiple Job Managers together and using a quorum-based mechanism (usually ZooKeeper) to elect a leader.

When you submit a job to Flink:

  1. The Job Manager receives the application and constructs the execution graph
  2. It allocates tasks to available slots in Task Managers
  3. Task Managers start executing their assigned tasks
  4. The Job Manager monitors execution and handles failures

State Backends

This API gives each job a way to store state alongside each operator either for the entire job, or for each key. The state itself is stored in a backend, which is a component that manages the storage and retrieval of state.

Flink offers different state backends for different use cases:

  1. Memory State Backend: Stores state in JVM heap
  2. FS State Backend: Stores state in filesystem
  3. RocksDB State Backend: Stores state in RocksDB (supports state larger than memory)

exact once processing

when a task manager down:

  1. Failure Detection: Task Manager no longer send heartbeats. marks it as failed.
  2. Job Pause: All tasks are stopped.
  3. State Recovery: Flink retrieves the most recent checkpoint from the state backend (which could be in memory, filesystem, or RocksDB depending on your configuration).
  4. Task Redistribution: The Job Manager redistributes all the tasks that were running on the failed Task Manager to the remaining healthy Task Managers. It may also redistribute other tasks to balance the load.
  5. State Restoration: Each task restores its state from the checkpoint. This means every operator gets back exactly the data it had processed up to the checkpoint.
  6. Source Rewind: Source operators rewind to their checkpoint positions. For example, a Kafka consumer would go back to the offset it had at checkpoint time.
  7. Resume Processing: The job resumes processing from the checkpoint. Since the checkpoint contains information about exactly which records were processed, Flink guarantees exactly-once processing even after a failure.

example code

DataStream<Transaction> transactions = env
    .addSource(new FlinkKafkaConsumer<>("transactions", 
                new KafkaAvroDeserializationSchema<>(Transaction.class), kafkaProps))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
    );
    
// Enrich transactions with account information
DataStream<EnrichedTransaction> enrichedTransactions = 
    transactions.keyBy(t -> t.getAccountId())
                .connect(accountInfoStream.keyBy(a -> a.getAccountId()))
                .process(new AccountEnrichmentFunction());

// Calculate velocity metrics (multiple transactions in short time)
DataStream<VelocityAlert> velocityAlerts = enrichedTransactions
    .keyBy(t -> t.getAccountId())
    .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5)))
    .process(new VelocityDetector(3, 1000.0)); // Alert on 3+ transactions over $1000 in 30 min
    
// Pattern detection with CEP for suspicious sequences
Pattern<EnrichedTransaction, ?> fraudPattern = Pattern.<EnrichedTransaction>begin("small-tx")
    .where(tx -> tx.getAmount() < 10.0)
    .next("large-tx")
    .where(tx -> tx.getAmount() > 1000.0)
    .within(Time.minutes(5));
    
DataStream<PatternAlert> patternAlerts = CEP.pattern(
    enrichedTransactions.keyBy(t -> t.getCardId()), fraudPattern)
    .select(new PatternAlertSelector());
    
// Union all alerts and deduplicate
DataStream<Alert> allAlerts = velocityAlerts.union(patternAlerts)
    .keyBy(Alert::getAlertId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new AlertDeduplicator());
    
// Output to Kafka and Elasticsearch
allAlerts.addSink(new FlinkKafkaProducer<>("alerts", new AlertSerializer(), kafkaProps));
allAlerts.addSink(ElasticsearchSink.builder(elasticsearchConfig).build());

Contents