Databus Relays

Databus Relays概述DatabusRelays主要负责以下两个工作:从databus源数据库中读取变化行,并序列化为事件流保存至内存中;接受客户端的请求,并将数据变化事件流返回给客户端。技术架构 EventProducer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;CircularBuffer:Relay有一个或多个环形的缓冲池用来保存按递增

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

概述

Databus Relays主要负责以下两个工作:

  • 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
  • 接受客户端的请求,并将数据变化事件流返回给客户端。

技术架构

Databus Relays

 

  • Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
  • Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
  • SCN Writer/Reader:用来读取和写入SCN号至硬盘;
  • RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。

数据抓取

目前支持Oracle和Mysql两种数据源的抓取。

Oracle数据抓取

抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。

Oracle数据库配置

首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):

  1. 创建databus表空间、用户、赋权限

    createUser.sh
    -- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件
    create 
    tablespace TBS_DATABUS datafile 
    '${DBDIR}/tbs_databus_01.dbf' 
    size 
    50M reuse autoextend 
    on 
    next 
    50M maxsize unlimited extent management 
    local 
    uniform 
    size 
    2M;
    create 
    tablespace TBS_DATABUS_IDX datafile 
    '${DBDIR}/tbs_databus_idx_01.dbf' 
    size 
    50M reuse autoextend 
    on 
    next 
    50M maxsize unlimited extent management 
    local 
    uniform 
    size 
    2M;
    create 
    user 
    DATABUS identified 
    by 
    DATABUS 
    default 
    tablespace TBS_DATABUS 
    temporary 
    tablespace temp1;
    grant 
    create 
    session, 
    create 
    table

    create 
    view

    create 
    sequence

    create 
    procedure

    create 
    trigger

    create 
    type, 
    create 
    job  
    to 
    DATABUS;
    grant 
    query rewrite 
    to 
    DATABUS;
    grant 
    execute 
    on 
    dbms_alert 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_lock 
    to 
    DATABUS;
    grant 
    select 
    on 
    sys.v_$
    database 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aq 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aqadm 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aqin 
    to 
    DATABUS;
    grant 
    execute 
    on 
    sys.dbms_aq_bqview 
    to 
    DATABUS;
    alter 
    user 
    DATABUS quota unlimited 
    on 
    TBS_DATABUS;
    alter 
    user 
    DATABUS quota unlimited 
    on 
    TBS_DATABUS_IDX;

  2.    创建databus核心包、表、存储过程等必须的东西,使用databus用户,执行oracle/bin/createSchema.sh databus/databus@posp_boss ${Dir}/BOOK

    createSchema.sh
    #
    #
    # Copyright 
    2013 
    LinkedIn Corp. All rights reserved
    #
    # Licensed under the Apache License, Version 
    2.0 
    (the 
    "License"
    );
    # you may not use 
    this 
    file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #   http:
    //www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an

    "AS IS" 
    BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License 
    for 
    the
    # specific language governing permissions and limitations
    # under the License.
    #
    #
    usage() {
      
    echo 
    "Usage: $0 <username/password@SID> <SRC_VIEW_DIR>"
    }
    # Database Connection Info
    DB=$
    1
    # Log Directory
    SRC_VIEW_DIR=$
    2
    # Check that DB is set
    if 

    "x$DB" 

    "x" 
    ]
    then
      
    usage
      
    exit 
    1
    fi
    user=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/([^\/]*)\/.*/$1/; print $a;}'
    `;
    password=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/[^\/]*\/([^@]*)\@.*/$1/; print $a;}'
    `;
    sid=`echo $DB | perl -lane 
    '{my $a = $_; $a =~ s/[^\/]*\/[^@]*\@(.*)/$1/; print $a;}'
    `;
    if 

    "x$user" 

    "x" 
    ] || [ 
    "x$password" 

    "x" 
    ] || [ 
    "x$sid" 

    "x" 
    ]
    then
      
    echo 
    "First argument not in format <username/password@SID>"
      
    usage
      
    exit 
    1
    fi
    if 
    [ ! -d 
    "$SRC_VIEW_DIR" 
    ]
    then
      
    echo 
    "Src view directory not found or not a directory"
      
    usage
      
    exit 
    1
    fi
    if 
    [ ! -f 
    "$SRC_VIEW_DIR/tablespace" 
    ]
    then
      
    echo 
    "Tablespace file ($SRC_VIEW_DIR/tablespace) not provided"
      
    usage
      
    exit 
    1
    fi
    tbs=`cat $SRC_VIEW_DIR/tablespace`;
    tbs_uc=`echo $tbs | tr 
    '[A-Z]' 
    '[a-z]'
    `
    tbs_lc=`echo $tbs | tr 
    '[a-z]' 
    '[A-Z]'
    `
     
    echo 
    "User : $user"
    echo 
    "Password : $password"
    echo 
    "SID : $sid"
    echo 
    "DB : $DB"
    echo 
    "TBS : $tbs_uc"
    #
    # create sequenes,tables
    #
    ../fwk/schema/ddl_sqs.sh 
    "$DB"
    ../fwk/schema/ddl_tab.sh 
    "$DB" 
    "$tbs_uc"
    # add column txn
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  echo 
    "Processing Table definition File : $t"
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating Table $table. File is $t"
    #  echo 
    "sqlplus ${DB} "
    @$t
    " ${sid} ${user} ${password} ${tbs_lc}"
    #  sqlplus ${DB} << __EOF__
    #   
    @$t 
    ${sid} ${user} ${password} ${tbs_lc}
    #__EOF__
    #done
    #
    # create contains/indexes
    #
    ../fwk/schema/ddl_con.sh 
    "$DB" 
    "$tbs_uc"
    ../fwk/schema/ddl_ind.sh 
    "$DB" 
    "$tbs_uc"
    #
    # crete procedure/packages/functions
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    do
      
    table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
      
    echo 
    "Setting up Alerts for  $table. File is $t"
      
    ../fwk/schema/ddl_prc.sh 
    "$DB" 
    "$table"
    done
    #
    # create databus views
    #
    ../fwk/schema/ddl_vw.sh 
    "$DB"
    #
    # create business table's view
    #
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.view;
    #
    do
    #  echo 
    "Processing View definition File : $t"
    #  view=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.view/$1/; print $a; }'
    `
    #  echo 
    "Creating view sy\$$view"
    #  pattern=
    "sy\\$"
    ;
    #  pattern+=
    "${view}"
    ;
    #  wc=`grep -ic 
    "${pattern}" 
    $t`
    #  
    if 
    [ $wc -lt 
    1 
    ];
    #  then
    #     echo 
    "View names should start with sy\$. Offending view file ($t)"
    ;
    #     exit 
    1
    #  fi
    #  sqlplus ${DB}  << __EOF__
    #    
    @$t 
    ${sid} ${user} ${password} ${tbs_lc};
    #__EOF__
    #done
    #
    # create sync_alert
    #
    ## execute rest
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    do
      
    table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
      
    echo 
    "Creating trigger for source $table"
      
    ../fwk/schema/ddl_prc1.sh  
    "$DB" 
    "$table"
    done
     
    #
    # create business table's trigger
    #
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating trigger for source $table"
    #  ../fwk/schema/ddl_trg.sh 
    "$DB" 
    "$table"
    #done
    echo 
    "INFO: compiling types"
    sqlplus $DB << __EOF__
    exec compile_allobjects;
    __EOF__
     
    echo 
    "INFO: Sync_core seeding "
    sqlplus $DB << __EOF__
      
    insert into sync_core_settings values (
    'N'
    );
      
    commit;
      
    show errors;
    __EOF__
     
    #
    # sources setting
    #
    #echo 
    "INFO: sy\$SOURCES setup"
    #i=
    0
    ;
    #
    for 
    t in ${SRC_VIEW_DIR}/*\.tab;
    #
    do
    #  echo 
    "Processing Table definition File : $t"
    #  table=`echo $t | perl -lane 
    '{ $a = $_; $a =~ s/.*\/(.*)\.tab/$1/; print $a; }'
    `
    #  echo 
    "Creating Table $table"
    #  sqlplus $DB << __EOF__
    #    insert into sy\$sources values(
    '$table'
    ,$i);
    #__EOF__
    #  ((i++))
    #done

  3. 修改表结构,增加一列 TXN NUMBER(posp_boss)

    person.sql
    -- 创建表,添加TXN列
    create 
    table 
    posp_boss.person
    (
            
    id number 
    primary 
    key
    ,
            
    first_name 
    varchar
    (120) 
    not 
    null
    ,
            
    last_name 
    varchar
    (120) 
    not 
    null
    ,
            
    birth_date 
    date
    ,
            
    deleted 
    varchar
    (5) 
    default 
    'false' 
    not 
    null
    ,
            
    txn number
    );
     

  4. 将源表权限赋给databus(posp_boss)

    grant.sql
    grant 
    insert
    ,
    update
    ,
    select 
    on 
    posp_boss.PERSON 
    to 
    databus;

  5. 给posp_boss赋databus.sync_core包的执行权限(databus)

    grant.sql
    grant 
    execute 
    on 
    databus.sync_core 
    to 
    posp_boss;

  6. 创建索引(posp_boss)

    index.sql
    -- 创建索引(posp_boss)
    create 
    index 
    posp_boss.PERSON_txn 
    on 
    POSP_BOSS.PERSON(txn) tablespace index_data;

  7. 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY

    book_vw.sql
    -- 主键ID 映射为 KEY
    CREATE 
    OR 
    REPLACE 
    FORCE 
    VIEW 
    sy$person
    AS
    SELECT
      
    txn,
      
    id 
    key
    ,
      
    first_name,
      
    last_name,
      
    birth_date,
      
    deleted
    FROM
      
    posp_boss.person;

  8. 新增sy$sources表配置,注意value的值必须小于等于125

    insert.sql
    -- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSON
    insert 
    into 
    databus.sy$sources 
    values
    (
    'POSP_BOSS.PERSON'
    ,1);

  9.  创建触发器

    trigger.sql
    -- 注意和sy$sources插入的值一致
    CREATE TRIGGER PERSON_TRG
      
    before insert or update on POSP_BOSS.PERSON
      
    referencing old as old 
    new 
    as 
    new
      
    for 
    each row
    begin
      
    if 
    (updating and :
    new
    .txn < 
    0
    ) then
        
    :
    new
    .txn := -:
    new
    .txn;
      
    else
        
    :
    new
    .txn := databus.sync_core.getTxn(
    'POSP_BOSS.PERSON'
    );
      
    end 
    if
    ;
    end;

    至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。

     

Mysql数据抓取

Mysql的数据抓取比较简单

  • 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
  • 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
  • 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format  = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件

    my.cnf
    server-id               = 
    1
    log_bin                 = mysql-bin
    expire_logs_days        = 
    10
    max_binlog_size         = 100M
    binlog_format           = ROW

  • 配置数据源,注意sources的id必须与sy$sources中的value一致

    sources.json
    {
        
    "name" 

    "boss"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }

     

    uri的格式为:mysql://用户%2F密码@host:port/serverID/binlog文件名称
    另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table 

  • 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string

部署normal_replay

  1. 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。

    source.json
    # oracle
    {
        
    "name" 

    "person"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }
     
    # mysql
    {
        
    "name" 

    "boss"
    ,
        
    "id"  

    1
    ,
        
    "uri" 

    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,
        
    "slowSourceQueryThreshold" 

    2000
    ,
        
    "sources" 
    :
        
    [
            
    {

    "id" 

    1
    ,
             
    "name" 

    "com.linkedin.events.example.person.Person"
    ,
             
    "uri"

    "lijiang.person"
    ,
             
    "partitionFunction" 

    "constant:1"
            
    }
        
    ]
    }

  2. 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro

    book.avsc
    {
      
    "name" 

    "Person_V1"
    ,
      
    "doc" 

    "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST"
    ,
      
    "type" 

    "record"
    ,
      
    "meta" 

    "dbFieldName=sy$person;pk=key;"
    ,
      
    "namespace" 

    "com.linkedin.events.example.person"
    ,
      
    "fields" 
    : [ {
        
    "name" 

    "txn"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=TXN;dbFieldPosition=0;"
      
    }, {
        
    "name" 

    "key"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=KEY;dbFieldPosition=1;"
      
    }, {
        
    "name" 

    "firstName"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=FIRST_NAME;dbFieldPosition=2;"
      
    }, {
        
    "name" 

    "lastName"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=LAST_NAME;dbFieldPosition=3;"
      
    }, {
        
    "name" 

    "birthDate"
    ,
        
    "type" 
    : [ 
    "long"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=BIRTH_DATE;dbFieldPosition=4;"
      
    }, {
        
    "name" 

    "deleted"
    ,
        
    "type" 
    : [ 
    "string"

    "null" 
    ],
        
    "meta" 

    "dbFieldName=DELETED;dbFieldPosition=5;"
      
    } ]
    }

  3. 启动relay

    starup.sh
    ./bin/startup.sh relay

至此,Relay和数据库都已经配置和部署完成!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Lighttpd 配置fastcgi

    Lighttpd 配置fastcgihttp://my.oschina.net/davehe/blog/108107        在配置之前,先了解下lighttpd和fastcgi.            Lighttpd相信大家都使用过,它是一个具有非常低的内存开销,cpu占用率低,效能好,以及丰富的模块等特点。lighttpd是众多OpenSource轻量级的webserver中较为优秀的一个

  • SQL 获取当前系统时间

    SQL 获取当前系统时间SQL获取当前系统时间

    2022年10月19日
  • 以太坊挖矿回报率_eth挖矿难度曲线

    以太坊挖矿回报率_eth挖矿难度曲线随着时间的推移,全网算力越来越大,挖矿难度越来越高,以太坊挖矿一天收益多少?那么怎么才能提高挖矿收益呢?挖矿的最基本的物理条件就是得有一台运行稳定稳定的矿机,这个不在今天的讨论范围之内。有了一台硬件条件过硬的矿机之后,只要做好一下两点,你就会比别人轻松高出3-5%的收益:第一点:挖矿软件的选择。如果你不想为软件打工的话,一定要选择原版,一定要选择原版、一定要选择原版,重要的事情说三遍。网上的界面版诸如长沙矿工、圣骑士、数字矿工、矿工队长、超级矿工…其实都是在claymore的原版基础上加壳而成,再

    2022年10月15日
  • 深入浅出Python——Python基础语法全解

    深入浅出Python——Python基础语法全解前言:Python是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。文章目录一、Python简介1.了解Python2.Python介绍3.Python特点4.Python发展历史5.Python版本二、Python解释器1.解释器的作用2.解释器的安装三、PyCharm安装与使用1.PyCharm的作用2.PyCharm安装与使用四、注释1.注释的作用2.注释的分类及语法五、变量1.变量的作用2.定义变量2.1标识符2.2命名习惯2.3使用变量2.4认识

  • 面试中如何回答JVM垃圾回收机制[通俗易懂]

    面试中如何回答JVM垃圾回收机制[通俗易懂]JVM中的垃圾回收了解吗首先是如何标记存活对象,主要有两个算法,分别是引用计数法和可达性分析算法。引用计数法:给一个对象添加一个引用计数器,当一个地方引用它时,计算器+1,不引用的时候-1,当引用计数器为0时说明该对象可回收。但是一旦出现互相引用的情况,就会出现无法回收的现象。所以JVM采用的是可达性分析算法。可达性分析算法:首先会标记所有GCroot能够直接关联的对象。GCro…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号