Databus Relays

Databus Relays概述DatabusRelays主要负责以下两个工作:从databus源数据库中读取变化行,并序列化为事件流保存至内存中;接受客户端的请求,并将数据变化事件流返回给客户端。技术架构 EventProducer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;CircularBuffer:Relay有一个或多个环形的缓冲池用来保存按递增

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

概述

Databus Relays主要负责以下两个工作:

  • 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
  • 接受客户端的请求,并将数据变化事件流返回给客户端。

技术架构

Databus Relays

 

  • Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
  • Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
  • SCN Writer/Reader:用来读取和写入SCN号至硬盘;
  • RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。

数据抓取

目前支持Oracle和Mysql两种数据源的抓取。

Oracle数据抓取

抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。

Oracle数据库配置

首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):

  1. 创建databus表空间、用户、赋权限

    createUser.sh
    -- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件
    create 
    tablespace TBS_DATABUS datafile 
    '${DBDIR}/tbs_databus_01.dbf' 
    size 
    50M reuse autoextend 
    on 
    next 
    50M maxsize unlimited extent management 
    local 
    uniform 
    size 
    2M;
    create 
    tablespace TBS_DATABUS_IDX datafile 
    '${DBDIR}/tbs_databus_idx_01.dbf' 
    size 
    50M reuse autoextend 
    on 
    next 
    50M maxsize unlimited extent management 
    local 
    uniform 
    size 
    2M;
    create 
    user 
    DATABUS identified 
    by 
    DATABUS 
    default 
    tablespace TBS_DATABUS 
    temporary 
    tablespace temp1;
    grant 
    create 
    session, 
    create 
    table

    create 
    view

    create 
    sequence

    create 
    procedure

    create 
    trigger

    create 
    type, 
    create 
    job  
    to 
    DATABUS;
    grant 
    query rewrite 
    to 
    DATABUS;
    grant 
    execute 
    on 
    dbms_alert 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_lock 
    to 
    DATABUS;
    grant 
    select 
    on 
    sys.v_$
    database 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aq 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aqadm 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aqin 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aq_bqview 
    to 
    DATABUS;
    alter 
    user 
    DATABUS quota unlimited 
    on 
    TBS_DATABUS;
    alter 
    user 
    DATABUS quota unlimited 
    on 
    TBS_DATABUS_IDX;

  2.    创建databus核心包、表、存储过程等必须的东西,使用databus用户,执行oracle/bin/createSchema.sh databus/databus@posp_boss ${Dir}/BOOK

    createSchema.sh
    #
    #
    # Copyright 
    2013 
    LinkedIn Corp. All rights reserved
    #
    # Licensed under the Apache License, Version 
    2.0 
    (the 
    "License"
    );
    # you may not use 
    this 
    file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #   http:
    //www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an

    "AS IS" 
    BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License 
    for 
    the
    # specific language governing permissions and limitations
    # under the License.
    #
    #
    usage() {
      
    echo 
    "Usage: $0 <username/password@SID> <SRC_VIEW_DIR>"
    }
    # Database Connection Info
    DB=$
    1
    # Log Directory
    SRC_VIEW_DIR=$
    2
    # Check that DB is set
    if 

    "x$DB" 

    "x" 
    ]
    then
      
    usage
      
    exit 
    1
    fi
    user=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/([^\/]*)\/.*/$1/; print $a;}'
    `;
    password=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/[^\/]*\/([^@]*)\@.*/$1/; print $a;}'
    `;
    sid=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/[^\/]*\/[^@]*\@(.*)/$1/; print $a;}'
    `;
    if 

    "x$user" 

    "x" 
    ] || [ 
    "x$password" 

    "x" 
    ] || [ 
    "x$sid" 

    "x" 
    ]
    then
      
    echo 
    "First argument not in format <username/password@SID>"
      
    usage
      
    exit 
    1
    fi
    if 
    [ ! -d 
    "$SRC_VIEW_DIR" 
    ]
    then
      
    echo 
    "Src view directory not found or not a directory"
      
    usage
      
    exit 
    1
    fi
    if 
    [ ! -f 
    "$SRC_VIEW_DIR/tablespace" 
    ]
    then
      
    echo 
    "Tablespace file ($SRC_VIEW_DIR/tablespace) not provided"
      
    usage
      
    exit 
    1
    fi
    tbs=`cat $SRC_VIEW_DIR/tablespace`;
    tbs_uc=`echo $tbs | tr 
    '[A-Z]' 
    '[a-z]'
    `
    tbs_lc=`echo $tbs | tr 
    '[a-z]' 
    '[A-Z]'
    `
     
    echo 
    "User : $user"
    echo 
    "Password : $password"
    echo 
    "SID : $sid"
    echo 
    "DB : $DB"
    echo 
    "TBS : $tbs_uc"
    #
    # create sequenes,tables
    #
    ../fwk/schema/ddl_sqs.sh 
    "$DB"
    ../fwk/schema/ddl_tab.sh 
    "$DB" 
    "$tbs_uc"
    # add column txn
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  echo 
    "Processing Table definition File : $t"
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating Table $table. File is $t"
    #  echo 
    "sqlplus ${DB} "
    @$t
    " ${sid} ${user} ${password} ${tbs_lc}"
    #  sqlplus ${DB} << __EOF__
    #   
    @$t 
    ${sid} ${user} ${password} ${tbs_lc}
    #__EOF__
    #done
    #
    # create contains/indexes
    #
    ../fwk/schema/ddl_con.sh 
    "$DB" 
    "$tbs_uc"
    ../fwk/schema/ddl_ind.sh 
    "$DB" 
    "$tbs_uc"
    #
    # crete procedure/packages/functions
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    do
      
    table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
      
    echo 
    "Setting up Alerts for  $table. File is $t"
      
    ../fwk/schema/ddl_prc.sh 
    "$DB" 
    "$table"
    done
    #
    # create databus views
    #
    ../fwk/schema/ddl_vw.sh 
    "$DB"
    #
    # create business table's view
    #
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.view;
    #
    do
    #  echo 
    "Processing View definition File : $t"
    #  view=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.view/$1/; print $a; }'
    `
    #  echo 
    "Creating view sy\$$view"
    #  pattern=
    "sy\\$"
    ;
    #  pattern+=
    "${view}"
    ;
    #  wc=`grep -ic 
    "${pattern}" 
    $t`
    #  
    if 
    [ $wc -lt 
    1 
    ];
    #  then
    #     echo 
    "View names should start with sy\$. Offending view file ($t)"
    ;
    #     exit 
    1
    #  fi
    #  sqlplus ${DB}  << __EOF__
    #    
    @$t 
    ${sid} ${user} ${password} ${tbs_lc};
    #__EOF__
    #done
    #
    # create sync_alert
    #
    ## execute rest
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    do
      
    table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
      
    echo 
    "Creating trigger for source $table"
      
    ../fwk/schema/ddl_prc1.sh  
    "$DB" 
    "$table"
    done
     
    #
    # create business table's trigger
    #
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating trigger for source $table"
    #  ../fwk/schema/ddl_trg.sh 
    "$DB" 
    "$table"
    #done
    echo 
    "INFO: compiling types"
    sqlplus $DB << __EOF__
    exec compile_allobjects;
    __EOF__
     
    echo 
    "INFO: Sync_core seeding "
    sqlplus $DB << __EOF__
      
    insert into sync_core_settings values (
    'N'
    );
      
    commit;
      
    show errors;
    __EOF__
     
    #
    # sources setting
    #
    #echo 
    "INFO: sy\$SOURCES setup"
    #i=
    0
    ;
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  echo 
    "Processing Table definition File : $t"
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating Table $table"
    #  sqlplus $DB << __EOF__
    #    insert into sy\$sources values(
    '$table'
    ,$i);
    #__EOF__
    #  ((i++))
    #done

  3. 修改表结构,增加一列 TXN NUMBER(posp_boss)

    person.sql
    -- 创建表,添加TXN列
    create 
    table 
    posp_boss.person
    (
            
    id number 
    primary 
    key
    ,
            
    first_name 
    varchar
    (120) 
    not 
    null
    ,
            
    last_name 
    varchar
    (120) 
    not 
    null
    ,
            
    birth_date 
    date
    ,
            
    deleted 
    varchar
    (5) 
    default 
    'false' 
    not 
    null
    ,
            
    txn number
    );
     

  4. 将源表权限赋给databus(posp_boss)

    grant.sql
    grant 
    insert
    ,
    update
    ,
    select 
    on 
    posp_boss.PERSON 
    to 
    databus;

  5. 给posp_boss赋databus.sync_core包的执行权限(databus)

    grant.sql
    grant 
    execute 
    on 
    databus.sync_core 
    to 
    posp_boss;

  6. 创建索引(posp_boss)

    index.sql
    -- 创建索引(posp_boss)
    create 
    index 
    posp_boss.PERSON_txn 
    on 
    POSP_BOSS.PERSON(txn) tablespace index_data;

  7. 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY

    book_vw.sql
    -- 主键ID 映射为 KEY
    CREATE 
    OR 
    REPLACE 
    FORCE 
    VIEW 
    sy$person
    AS
    SELECT
      
    txn,
      
    id 
    key
    ,
      
    first_name,
      
    last_name,
      
    birth_date,
      
    deleted
    FROM
      
    posp_boss.person;

  8. 新增sy$sources表配置,注意value的值必须小于等于125

    insert.sql
    -- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSON
    insert 
    into 
    databus.sy$sources 
    values
    (
    'POSP_BOSS.PERSON'
    ,1);

  9.  创建触发器

    trigger.sql
    -- 注意和sy$sources插入的值一致
    CREATE TRIGGER PERSON_TRG
      
    before insert or update on POSP_BOSS.PERSON
      
    referencing old as old 
    new 
    as 
    new
      
    for 
    each row
    begin
      
    if 
    (updating and :
    new
    .txn < 
    0
    ) then
        
    :
    new
    .txn := -:
    new
    .txn;
      
    else
        
    :
    new
    .txn := databus.sync_core.getTxn(
    'POSP_BOSS.PERSON'
    );
      
    end 
    if
    ;
    end;

    至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。

     

Mysql数据抓取

Mysql的数据抓取比较简单

  • 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
  • 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
  • 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format  = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件

    my.cnf
    server-id               = 
    1
    log_bin                 = mysql-bin
    expire_logs_days        = 
    10
    max_binlog_size         = 100M
    binlog_format           = ROW

  • 配置数据源,注意sources的id必须与sy$sources中的value一致

    sources.json
    {
        
    "name" 

    "boss"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }

     

    uri的格式为:mysql://用户%2F密码@host:port/serverID/binlog文件名称
    另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table 

  • 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string

部署normal_replay

  1. 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。

    source.json
    # oracle
    {
        
    "name" 

    "person"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }
     
    # mysql
    {
        
    "name" 

    "boss"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }

  2. 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro

    book.avsc
    {
      
    "name" 

    "Person_V1"
    ,
      
    "doc" 

    "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST"
    ,
      
    "type" 

    "record"
    ,
      
    "meta" 

    "dbFieldName=sy$person;pk=key;"
    ,
      
    "namespace" 

    "com.linkedin.events.example.person"
    ,
      
    "fields" 
    : [ {
        
    "name" 

    "txn"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=TXN;dbFieldPosition=0;"
      
    }, {
        
    "name" 

    "key"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=KEY;dbFieldPosition=1;"
      
    }, {
        
    "name" 

    "firstName"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=FIRST_NAME;dbFieldPosition=2;"
      
    }, {
        
    "name" 

    "lastName"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=LAST_NAME;dbFieldPosition=3;"
      
    }, {
        
    "name" 

    "birthDate"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=BIRTH_DATE;dbFieldPosition=4;"
      
    }, {
        
    "name" 

    "deleted"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=DELETED;dbFieldPosition=5;"
      
    } ]
    }

  3. 启动relay

    starup.sh
    ./bin/startup.sh relay

至此,Relay和数据库都已经配置和部署完成!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • css半透明层

    css半透明层首次登录弹出提示层,主要有两个层:半透明层,遮住下面的内容;提示层(主要内容),下面为这两个层的css样式。针对IE透明使用的是filter:alpha(opacity=35),针对FF透明的相关代码是opacity:0.35,这样至少在IE和FF下是兼容的,通过测试。.mask{ border:0px; background:#000; width:100%; …

  • CSS画猪

    效果查看:http://hovertree.com/texiao/css3/6/CSS3画猪头:http://hovertree.com/texiao/css3/6/1/代码如下:转自:htt

    2021年12月22日
  • Java中的native修饰符

    Java中的native修饰符今天偶然看代码,发现别人有这样写的方法,并且jar里面有几个dll文件,比较奇怪,于是把代码打开,发现如下写法。public native String GSMModemSMSReadAll(String s, int i);public native String GSMModemGetErrorMsg(String s);public native boolean GSMModemI…

  • java运行时异常的特点是什么_java运行时异常与一般异常

    java运行时异常的特点是什么_java运行时异常与一般异常1,java.lang.NullPointerException这个异常的解释是”程序遇上了空指针”,简单地说就是调用了未经初始化的对象或者是不存在的对象,这个错误经常出现在创建图片,调用数组这些操作中,比如图片未经初始化,或者图片创建时的路径错误等等。2,java.lang.ClassNotFoundException异常的解释是”指定的类不存在”,这里主要考虑一下类的名称和路径是否正确即可…

  • 带case操作的update语句_多个case when嵌套

    带case操作的update语句_多个case when嵌套1、场景:由于多次循环执行数据库操作是非常耗费性能的。因此,我们需要尽可能一条UPDATE语句更新多条数据。2、方式:casewhen拼凑UPDATE表名SET(目标字段)BRANCH_NO=CASEWHEN(筛选条件)BANK_BRANCH_ID=’-10212’THEN ‘TU32958123’WHENBANK_BRANCH_ID=’-10213’THEN ‘TU32958112’ELSE’测试’END,COMMENTS=CASEWH

  • string转map_jsjson转string

    string转map_jsjson转string例如:varr=”{‘msg’:’你好’}”;varmap=eval(“(“+r+”)”);//r为String类型的数据varxx=map.msg;//此时xx的值为你好

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号