flink-doris-connector flink1.13.1


 doris 官文: https://doris.incubator.apache.org/docs/ecosystem/flink-doris-connector.html#version-compatibility

https://github.com/apache/incubator-doris-flink-connector/tree/master/flink-doris-connector

  依赖

     
        
            org.apache.doris
            
            flink-doris-connector-1.13_2.12
            
            
            1.0.3
        

source :API   (不支持  UNIQUE  数据模式  ,否则打印报: Caused by: java.lang.IllegalArgumentException: Row arity: 5, but serializer arity: 4)    

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.put("fenodes","192.168.18.51:8030");
        properties.put("username","root");
        properties.put("password","root");
        properties.put("table.identifier","test.top");

        DataStreamSource> listDataStreamSource = env.addSource(new DorisSourceFunction(
                        new DorisStreamOptions(properties),
                        new SimpleListDeserializationSchema()
                )
        );
        listDataStreamSource.print();
        env.execute();
    }

SQL:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE flink_doris (\n" +
                "    siteid INT,\n" +
                "    citycode SMALLINT,\n" +
                "    username STRING,\n" +
                "    pv BIGINT\n" +
                "    ) \n" +
                "    WITH (\n" +
                "      'connector' = 'doris',\n" +
                "      'fenodes' = 'hadoop1:8030',\n" +
                "      'table.identifier' = 'test_db.table1',\n" +
                "      'username' = 'test',\n" +
                "      'password' = 'test'\n" +
                ")\n");
  tableEnv.executeSql("select * from flink_doris").print();

sink:

API:    JsonSink

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env.fromElements("{\"siteid\": \"66\", \"citycode\": \"6\", \"username\": \"pengyuyan\",\"pv\": \"6\"}")
                .addSink(
                        DorisSink.sink(
                                DorisReadOptions.builder().build(),
                                DorisExecutionOptions.builder()
                                        .setBatchSize(3)
                                        .setBatchIntervalMs(0L)
                                        .setMaxRetries(3)
                                        .setStreamLoadProp(pro).build(),
                                DorisOptions.builder()
                                        .setFenodes("hadoop1:8030")
                                        .setTableIdentifier("test_db.table1")
                                        .setUsername("test")
                                        .setPassword("test").build()
                        ));
//            .addSink(
//                DorisSink.sink(
//                        DorisOptions.builder()
//                                .setFenodes("hadoop1:8030")
//                                .setTableIdentifier("test_db.table1")
//                                .setUsername("test")
//                                .setPassword("test").build()
//                ));
        env.execute();
    }
RowData
public class DataStreamRowDataSinkDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStream source = env.fromElements("")
                .map(new MapFunction() {
                    @Override
                    public RowData map(String value) throws Exception {
                        GenericRowData genericRowData = new GenericRowData(4);
                        genericRowData.setField(0, 88);
                        genericRowData.setField(1, new Short("8"));
                        genericRowData.setField(2, StringData.fromString("flink-stream"));
                        genericRowData.setField(3, 8L);
                        return genericRowData;
                    }
                });
        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(32), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};
        source.addSink(
                DorisSink.sink(
                        fields,
                        types,
                        DorisReadOptions.builder().build(),
                        DorisExecutionOptions.builder()
                                .setBatchSize(3)
                                .setBatchIntervalMs(0L)
                                .setMaxRetries(3)
                                .build(),
                        DorisOptions.builder()
                                .setFenodes("hadoop1:8030")
                                .setTableIdentifier("test_db.table1")
                                .setUsername("test")
                                .setPassword("test").build()
                ));
        env.execute();
    }
}