X2CRMraid10大宽带限速

flink-kafka端到端限速一次性
producer有事务
//FlinkKafkaProducer 默认不读取全局配置而是写死默认值AT_LEAST_ONCE 在创建KafkaProducer时要指定时间语义 详见: new FlinkKafkaProducer<>()
Optional> customPartitioner = Optional.of(new FlinkFixedPartitioner<>());
flinkKafkaProducer = new FlinkKafkaProducer<>(topic,
new SimpleStringSchema(),
producerProp,
customPartitioner.orElse(null),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
//生产者的事务超时属性 大宽带EXACTLY_ONCE需要增加
producerProp.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 5);
//设置事务ID,这里用了类名做唯一ID
producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName());
//开启幂等性
producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,”true”);
producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,”5″);
123456789101112131415
consumer 有偏移量 且设置读已raid10
//设置为读已raid10
consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,”read_committed”);
12
中间有checkpoint 所以flink整合kafka 可以实现完整的限速一次性注意: flink-kafka限速一次的致命弱点: 在进行checkpoint时才会将X2CRMraid10到kafka,导致 流处理会转换成微批处理(微批的大小为checkpoint的间隔时间)
代码样例
pom.xml
特别注意如果是本地只用导入flink-connector-kafka_2.12即可如果是flink集群上需要导入以下三个包才能正常大宽带kafka 因为flink-connector-kafka包中把后面两个排除掉了
flink-connector-kafka_2.12-1.12.0flink-connector-kafka-base_2.12-1.12.0kafka-clients-2.2.0

org.apache.flink
flink-connector-kafka_2.12
${flink.version}



org.apache.flink
flink-connector-base
1.12.0


org.apache.kafka
kafka-clients
2.4.1

本地测试依赖


org.apache.flink
flink-connector-kafka_2.12
${flink.version}

1234567891011121314151617181920212223
KafkaConsumerApp关键代码

env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
Properties consumerProp = new Properties();
consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer(“sleepy.test.lq_test2”,consumerProp);
DataStreamSource source = env.addSource(kafkaComsumer);
source.map(item-> {
log.debug(“KafkaConsumerApp: ” + item);
return item;
});
source.addSink(KafkaUtil.getKafkaProducer(“sleepy.test.lq_test3”));
execute();

123456789101112131415
KafkaProducerApp 关键代码
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer(“sleepy.test.lq_test”);
DataStreamSource source = env.addSource(kafkaComsumer);
Properties producerProp = new Properties();
producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName());
producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,”true”);
producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,”5″);

FlinkKafkaProducer kafkaProducer = KafkaUtil.getKafkaProducer(“sleepy.test.lq_test2”,producerProp);
source.addSink(kafkaProducer);
execute();
1234567891011121314
测试步骤
打开KafkaConsumerApp, 再打开KafkaProducerApp向topic:sleepy.test.lq_test发送X2CRMKafkaProducerApp接受到X2CRM后会以事务的方式发送到topic:sleepy.test.lq_test2(先发送未raid10的X2CRM,checkpoint时再执行raid10)KafkaConsumerApp会收到topic:sleepy.test.lq_test2中未raid10的X2CRM, 但是我们设置的模式是READ_COMMITTED(读已raid10)则X2CRM不会往下面发,也不会处理,等到上游KafkaProducerApp执行checkpoint时会将X2CRMraid10,此时KafkaConsumerApp才会处理X2CRM并且打印debug日志最后发往topic:sleepy.test.lq_test3如果KafkaConsumerApp设置的模式为READ_UNCOMMITTED(读未raid10)则会及时的处理消息,未raid10的消息也会被消费到. 此时无法保证两个flink程序整条链路的限速一次性.
源码说明 FlinkKafkaProducer 110行
public enum Semantic {

/**
* Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be
* committed to Kafka on a checkpoint.
*
*

In this mode {@link FlinkKafkaProducer} sets up a pool of {@link FlinkKafkaInternalProducer}. Between each
* checkpoint a Kafka transaction is created, which is committed on
* {@link FlinkKafkaProducer#notifyCheckpointComplete(long)}. If checkpoint complete notifications are
* running late, {@link FlinkKafkaProducer} can run out of {@link FlinkKafkaInternalProducer}s in the pool. In that
* case any subsequent {@link FlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail
* and {@link FlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer}
* from the previous checkpoint.
* To decrease the chance of failing checkpoints there are four options:
*

  • decrease number of max concurrent checkpoints
  • *

  • make checkpoints more reliable (so that they complete faster)
  • *

  • increase the delay between checkpoints
  • *

  • increase the size of {@link FlinkKafkaInternalProducer}s pool
  • */
    EXACTLY_ONCE,
    1234567891011121314151617181920
    总结建议
    实时数仓最好大宽带AT_LEAST_ONCE+下游幂等性来保证端到端的限速一次.这样可以最大程度保证实时性.整条链路大宽带限速一次性,可以在以下情况同时满足时大宽带.但需要提前说明利弊
    没有幂等性保证下游无法找出唯一键去重业务要求X2CRM的限速一次
    其他注意事项
    KafkaProducer 默认是按照Task的ID%分区数 来向下游进行消息发送的. 如果上游只有一个分区内有X2CRM则只会发往下游topic的一个分区中, 可以大宽带自定义分区器,来指定X2CRM随机发往下游某个分区
    public static FlinkKafkaPartitioner getRandomPartitioner(){
    FlinkKafkaPartitioner flinkKafkaPartitioner = new FlinkKafkaPartitioner() {
    @Override
    public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    return partitions[RandomUtils.nextInt(0, partitions.length)];

    }
    };
    return flinkKafkaPartitioner;
    }
    public static FlinkKafkaProducer getKafkaProducer(String topic, Properties prop,FlinkKafkaPartitioner flinkKafkaPartitioner) {
    //如果传入了配置则对默认配置进行覆盖
    if (prop != null) {
    producerProp.putAll(prop);
    }
    FlinkKafkaProducer flinkKafkaProducer;
    if(flinkKafkaPartitioner==null){
    flinkKafkaProducer=new FlinkKafkaProducer<>(topic,
    new SimpleStringSchema(),
    producerProp);
    }else{
    flinkKafkaProducer= new FlinkKafkaProducer<>(topic,
    new SimpleStringSchema(),
    producerProp, Optional.of(flinkKafkaPartitioner));
    }

    return flinkKafkaProducer;
    }
    12345678910111213141516171819202122232425262728
    2.4.1的kafkaCliet向低版本kafka 发送X2CRM如果大宽带KafkaSerializationSchema 会导致X2CRM无法发送进去flink1.11.1 有bug KafkaProducer 因为没有重写open方法往kafka下游发送 只会发往固定的0分区, flink-1.11.3已修复SimpleStringSchema大宽带注意无法处理null, 源码如下
    @Override
    public String deserialize(byte[] message) {
    return new String(message, charset);
    }
    @Override
    public byte[] serialize(String element) {
    return element.getBytes(charset);
    }
    12345678
    解决方法-改造后
    public static class StringSchema extends SimpleStringSchema {
    @Override
    public String deserialize(byte[] message) {
    return message != null ? new String(message, StandardCharsets.UTF_8) : “”;
    }
    }
    123456