|
|
# nebula增量例行方案
|
|
|
|
|
|
提前准备好数据,建立中间表,mysql和nebula表对表更新。
|
|
|
|
|
|
## tag_firm
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的tag_firm
|
|
|
file 爬虫数据
|
|
|
database 中间表nebula_tag_firm
|
|
|
queue kafka
|
|
|
file data_pump
|
|
|
database utn_ic.ic
|
|
|
database mongo
|
|
|
database 融合库
|
|
|
|
|
|
mongo --> utn_ic.ic
|
|
|
爬虫数据 --> 融合库
|
|
|
utn_ic.ic --> data_pump: 存量
|
|
|
data_pump --> 中间表nebula_tag_firm: 存量入表
|
|
|
中间表nebula_tag_firm --> nebula的tag_firm: binlog更新
|
|
|
utn_ic.ic --> kafka: 增量写kafka
|
|
|
kafka --> 中间表nebula_tag_firm: 例行入表
|
|
|
融合库 --> 中间表nebula_tag_firm: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
企业实体的结合,类型为tag,用fid唯一标识vid(f-company_name_digest),属性包含营业执照信息,表名tag_firm
|
|
|
|
|
|
CREATE TAG `tag_firm` (
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id(工商digest)",
|
|
|
`company_name` string NULL COMMENT "企业姓名",
|
|
|
`credit_no` string NULL COMMENT "统一信用代码",
|
|
|
`company_code` string NULL COMMENT "企业注册号",
|
|
|
`org_code` string NULL COMMENT "组织机构代码",
|
|
|
`tax_code` string NULL COMMENT "纳税人识别号",
|
|
|
`capital` string NULL COMMENT "注册资本",
|
|
|
`real_capital` string NULL COMMENT "实缴资本",
|
|
|
`establish_date` datetime NULL COMMENT "成立日期",
|
|
|
`issue_date` datetime NULL COMMENT "核准日期",
|
|
|
`revoke_date` datetime NULL COMMENT "注销或吊销日期",
|
|
|
`company_status` string NULL COMMENT "登记状态",
|
|
|
`company_type` string NULL COMMENT "企业类型",
|
|
|
`company_major_type` string NULL COMMENT "归类后的企业类型",
|
|
|
`n_company_status` string NULL COMMENT "登记状态(格式化)",
|
|
|
`operation_startdate` string NULL COMMENT "营业起始时间",
|
|
|
`operation_enddate` string NULL COMMENT "营业终止时间",
|
|
|
`authority` string NULL COMMENT "登记机关",
|
|
|
`company_address` string NULL COMMENT "注册地址",
|
|
|
`company_industry` string NULL COMMENT "所属行业",
|
|
|
`province_short` string NULL COMMENT "省份简称",
|
|
|
`city` string NULL COMMENT "城市",
|
|
|
`district` string NULL COMMENT "地区",
|
|
|
`insurance_amount` int NULL COMMENT "参保人数",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "企业实体tag";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```mysql
|
|
|
CREATE TABLE `nebula_tag_firm` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest的拼接,对应nebula中的vid,是否入nebula标志,为null时表示不入nebula',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT 'company唯一键',
|
|
|
`company_name` varchar(255) DEFAULT NULL COMMENT ' 公司名称',
|
|
|
`credit_no` varchar(50) DEFAULT NULL COMMENT '统一社会信用码',
|
|
|
`company_code` varchar(50) DEFAULT NULL COMMENT '公司注册码',
|
|
|
`org_code` varchar(50) DEFAULT NULL COMMENT '组织机构代码',
|
|
|
`tax_code` varchar(50) DEFAULT NULL COMMENT '纳税人识别号',
|
|
|
`n_company_status` varchar(16) DEFAULT NULL COMMENT '登记状态(格式化)',
|
|
|
`capital` varchar(255) DEFAULT NULL COMMENT '注册资本',
|
|
|
`real_capital` varchar(255) DEFAULT NULL COMMENT '实缴资本',
|
|
|
`establish_date` datetime DEFAULT NULL COMMENT '成立日期',
|
|
|
`issue_date` datetime DEFAULT NULL COMMENT '核准日期',
|
|
|
`revoke_date` datetime DEFAULT NULL COMMENT '注销或吊销日期',
|
|
|
`company_status` varchar(50) DEFAULT NULL COMMENT '登记状态',
|
|
|
`company_type` varchar(255) DEFAULT NULL COMMENT '企业类型',
|
|
|
`operation_startdate` varchar(50) DEFAULT NULL COMMENT '营业期限起始时间',
|
|
|
`operation_enddate` varchar(50) DEFAULT NULL COMMENT '营业期限终止时间',
|
|
|
`company_address` varchar(512) DEFAULT NULL COMMENT '公司注册地址',
|
|
|
`authority` varchar(255) DEFAULT NULL COMMENT '登记机关',
|
|
|
`company_industry` varchar(255) DEFAULT NULL COMMENT '所属行业',
|
|
|
`province_short` varchar(32) DEFAULT NULL COMMENT '省份简称',
|
|
|
`city` varchar(32) DEFAULT NULL COMMENT '地区',
|
|
|
`district` varchar(32) DEFAULT NULL COMMENT '城市',
|
|
|
`insurance_amount` int DEFAULT NULL COMMENT '参保人数',
|
|
|
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_digest` (`company_name_digest`),
|
|
|
KEY `idx_company_name` (`company_name`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_firm的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company | nebula_tag_company | 备注 |
|
|
|
| ------------------- | ------------------- | --------------------------- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| company_name_digest | fid | fid='f'+company_name_digest |
|
|
|
|
|
|
#### utn_ic.ic
|
|
|
|
|
|
```
|
|
|
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.ic and deleted=0
|
|
|
```
|
|
|
|
|
|
| utn_ic.ic | nebula_tag_company | 备注 |
|
|
|
| ------------------- | ------------------- | --------------------------- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| company_name | company_name | |
|
|
|
| company_code | company_code | |
|
|
|
| credit_no | credit_no | |
|
|
|
| org_code | org_code | |
|
|
|
| tax_code | tax_code | |
|
|
|
| establish_date | establish_date | |
|
|
|
| company_status | company_status | |
|
|
|
| company_type | company_type | |
|
|
|
| authority | authority | |
|
|
|
| issue_date | issue_date | |
|
|
|
| operation_startdate | operation_startdate | |
|
|
|
| operation_enddate | operation_enddate | |
|
|
|
| capital | capital | |
|
|
|
| real_capital | real_capital | |
|
|
|
| company_address | company_address | |
|
|
|
| cancel_date | revoke_date | 两个字段值合一为revoke_date |
|
|
|
| revoke_date | revoke_date | 两个字段值合一为revoke_date |
|
|
|
| n_company_status | n_company_status | |
|
|
|
| province_short | province_short | |
|
|
|
| city | city | |
|
|
|
| district | district | |
|
|
|
| company_major_type | company_major_type | |
|
|
|
| industries | company_industry | 取最高一级行业划分 |
|
|
|
| insurance_amount | insurance_amount | |
|
|
|
|
|
|
## tag_person
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的tag_person
|
|
|
file 爬虫数据
|
|
|
database 中间表nebula_tag_person
|
|
|
file tb_company_employee
|
|
|
file tb_company_legalperson
|
|
|
file tb_company_partner
|
|
|
file tb_person
|
|
|
database 融合库
|
|
|
|
|
|
爬虫数据 --> 融合库
|
|
|
融合库 --> tb_company_employee
|
|
|
融合库 --> tb_company_legalperson
|
|
|
融合库 --> tb_company_partner
|
|
|
融合库 --> tb_person
|
|
|
tb_company_employee --> 中间表nebula_tag_person: count(pcid) as ac_employee_num,group by ppid
|
|
|
tb_company_legalperson --> 中间表nebula_tag_person: count(pcid) as ac_legalperson_num,group by lp_ppid
|
|
|
tb_company_partner --> 中间表nebula_tag_person: count(pcid) as ac_partner_num,group by partner_ppid
|
|
|
tb_person --> 中间表nebula_tag_person: 存量入表后例行
|
|
|
中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
人员的集合,类型为tag,用pid唯一标识vid(p-ppid),表名 tag_person
|
|
|
|
|
|
CREATE TAG `tag_person` (
|
|
|
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
|
|
|
`person_name` string NULL COMMENT "人员姓名",
|
|
|
`ac_num` int64 NULL COMMENT "关联企业数",
|
|
|
`ac_legalperson_num` int64 NULL COMMENT "担任法定代表人数",
|
|
|
`ac_partner_num` int64 NULL COMMENT "对外投资数",
|
|
|
`ac_employee_num` int64 NULL COMMENT "任职数",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "人员实体tag";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_tag_person` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`pid` char(33) NOT NULL COMMENT '=p和ppid的拼接,对应nebula中的vid',
|
|
|
`ppid` char(32) NOT NULL COMMENT '人员唯一id',
|
|
|
`person_name` varchar(50) DEFAULT NULL COMMENT '人员姓名',
|
|
|
`ac_legalperson_num` int(9) DEFAULT NULL COMMENT '担任法定代表人数',
|
|
|
`ac_num` int(9) DEFAULT NULL COMMENT '关联企业数',
|
|
|
`ac_partner_num` int(9) DEFAULT NULL COMMENT '对外投资数',
|
|
|
`ac_employee_num` int(9) DEFAULT NULL COMMENT '任职数',
|
|
|
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_ppid` (`ppid`),
|
|
|
KEY `idx_name` (`person_name`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_person的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_person
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_person | nebula_tag_person | 备注 |
|
|
|
| ----------- | ----------------- | ------------ |
|
|
|
| ppid | ppid、pid | pid='p'+ppid |
|
|
|
| person_name | person_name | |
|
|
|
| ac_num | ac_num | |
|
|
|
| update_time | update_time | |
|
|
|
|
|
|
#### tb_company_employee
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_employee | nebula_tag_person | 备注 |
|
|
|
| ------------------- | ----------------- | -------------------------------------------- |
|
|
|
| ppid | ppid | |
|
|
|
| pcid | ac_employee_num | count(pcid) as ac_employee_num,group by ppid |
|
|
|
|
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_tag_person | 备注 |
|
|
|
| ------------------ | ----------------- | --------------------------------------------------- |
|
|
|
| partner_ppid | ppid | |
|
|
|
| pcid | ac_partner_num | count(pcid) as ac_partner_num,group by partner_ppid |
|
|
|
|
|
|
#### tb_company_legalperson
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | nebula_tag_person | 备注 |
|
|
|
| ---------------------- | ------------------ | -------------------------------------------------- |
|
|
|
| lp_ppid | ppid | |
|
|
|
| pcid | ac_legalperson_num | count(pcid) as ac_legalperson_num,group by lp_ppid |
|
|
|
|
|
|
## edge_serve
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_serve
|
|
|
file 爬虫数据
|
|
|
database 中间表nebula_edge_serve
|
|
|
queue kafka
|
|
|
file data_pump
|
|
|
database utn_ic.company_employee
|
|
|
database mongo
|
|
|
database 融合库
|
|
|
|
|
|
mongo --> utn_ic.company_employee
|
|
|
爬虫数据 --> 融合库
|
|
|
utn_ic.company_employee --> data_pump: 存量
|
|
|
data_pump --> 中间表nebula_edge_serve: 存量入表
|
|
|
中间表nebula_edge_serve --> nebula的edge_serve: binlog更新
|
|
|
utn_ic.company_employee --> kafka: 增量写kafka
|
|
|
kafka --> 中间表nebula_edge_serve: 例行入表
|
|
|
融合库 --> 中间表nebula_edge_serve: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
企业主要人员与企业之间任职关系的集合,类型为edge,用<pid,edge_serve,(Rank),fid>唯一标识,边类型为edge_serve
|
|
|
CREATE EDGE `edge_serve` (
|
|
|
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`position` string NULL COMMENT "职务",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "任职edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_serve` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
|
|
|
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
|
|
|
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`employee_name` varchar(255) NOT NULL COMMENT '员工名称',
|
|
|
`position` varchar(64) DEFAULT NULL COMMENT '职务',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `employee_name`),
|
|
|
KEY `idx_position` (`position`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_serve的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_enployee
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_enployee | nebula_edge_serve | 备注 |
|
|
|
| ------------------- | ------------------- | --------------------------- |
|
|
|
| ppid | ppid | |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| employee_name | employee_name | |
|
|
|
| company_name_digest | fid | fid='f'+company_name_digest |
|
|
|
| ppid | pid | pid='p'+ppid |
|
|
|
|
|
|
#### utn_ic.company_employee
|
|
|
|
|
|
```
|
|
|
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee
|
|
|
```
|
|
|
|
|
|
| utn_ic.company_employee | nebula_edge_serve | 备注 |
|
|
|
| ----------------------- | ------------------- | ---- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| employee_name | employee_name | |
|
|
|
| position | position | |
|
|
|
| is_history | is_history | |
|
|
|
|
|
|
## edge_invest_h
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_invest_h
|
|
|
file 爬虫数据
|
|
|
database 中间表nebula_edge_invest_h
|
|
|
queue kafka
|
|
|
file data_pump
|
|
|
database utn_ic.company_partner_new
|
|
|
database mongo
|
|
|
database 融合库
|
|
|
|
|
|
mongo --> utn_ic.company_partner_new
|
|
|
爬虫数据 --> 融合库
|
|
|
utn_ic.company_partner_new --> data_pump: 存量
|
|
|
data_pump --> 中间表nebula_edge_invest_h: 存量入表
|
|
|
中间表nebula_edge_invest_h --> nebula的edge_invest_h: binlog更新
|
|
|
utn_ic.company_partner_new --> kafka: 增量写kafka
|
|
|
kafka --> 中间表nebula_edge_invest_h: 例行入表
|
|
|
融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
自然人作为企业股东,与企业间的参股关系的集合,类型为edge,用<ppid,edge_invest_h,(Rank),fid>唯一标识,边类型为edge_invest_h
|
|
|
CREATE EDGE `edge_invest_h` (
|
|
|
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`stock_proportion` string NULL COMMENT "持股比例",
|
|
|
`stock_capital` string NULL COMMENT "应缴金额",
|
|
|
`stock_realcapital` string NULL COMMENT "实缴金额",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "人员参股edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_invest_h` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
|
|
|
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
|
|
|
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
|
|
|
`partner_name` varchar(255) NOT NULL COMMENT '股东名称',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
|
|
|
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
|
|
|
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_id` (`company_name_digest`, `partner_name`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_h的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_edge_invest_h | 备注 |
|
|
|
| ------------------- | -------------------- | --------------------------- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| partner_ppid | ppid | |
|
|
|
| company_name_digest | fid | fid='f'+company_name_digest |
|
|
|
| partner_ppid | pid | pid='p'+ppid |
|
|
|
| partner_name | partner_name | |
|
|
|
|
|
|
#### utn_ic.company_partner_new
|
|
|
|
|
|
```
|
|
|
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 1
|
|
|
```
|
|
|
|
|
|
| company_partner | nebula_edge_invest_h | 备注 |
|
|
|
| ------------------- | -------------------- | ---- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| stock_name | partner_name | |
|
|
|
| stock_capital | stock_capital | |
|
|
|
| stock_proportion | stock_proportion | |
|
|
|
| stock_realcapital | stock_realcapital | |
|
|
|
| is_history | is_history | |
|
|
|
|
|
|
## edge_invest_c
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_invest_c
|
|
|
file 爬虫数据
|
|
|
database 中间表nebula_edge_invest_c
|
|
|
queue kafka
|
|
|
file data_pump
|
|
|
database utn_ic.company_partner_new
|
|
|
database mongo
|
|
|
database 融合库
|
|
|
|
|
|
mongo --> utn_ic.company_partner_new
|
|
|
爬虫数据 --> 融合库
|
|
|
utn_ic.company_partner_new --> data_pump: 存量
|
|
|
data_pump --> 中间表nebula_edge_invest_c: 存量入表
|
|
|
中间表nebula_edge_invest_c --> nebula的edge_invest_c: binlog更新
|
|
|
utn_ic.company_partner_new --> kafka: 增量写kafka
|
|
|
kafka --> 中间表nebula_edge_invest_c: 例行入表
|
|
|
融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
企业作为企业股东,与企业间的参股关系的集合,类型为edge,用<s_fid,invest_c,(Rank),e_fid>唯一标识,表名为edge_invest_c
|
|
|
CREATE EDGE `edge_invest_c` (
|
|
|
`partner_company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`stock_proportion` string NULL COMMENT "持股比例",
|
|
|
`stock_capital` string NULL COMMENT "应缴金额",
|
|
|
`stock_realcapital` string NULL COMMENT "实缴金额",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "公司参股edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_invest_c` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和partner_company_name_digest拼接,对应nebula中的s_fid',
|
|
|
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
|
|
|
`partner_company_name_digest` char(32) DEFAULT NULL COMMENT '股东企业唯一id',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
|
|
|
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
|
|
|
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `partner_company_name_digest`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_c的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_edge_invest_h | 备注 |
|
|
|
| --------------------------- | --------------------------- | ------------------------------------- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| partner_company_name_digest | partner_company_name_digest | |
|
|
|
| company_name_digest | e_fid | e_fid='f'+company_name_digest |
|
|
|
| partner_company_name_digest | s_fid | s_fid='f'+partner_company_name_digest |
|
|
|
|
|
|
#### utn_ic.company_partner_new
|
|
|
|
|
|
```
|
|
|
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 0
|
|
|
```
|
|
|
|
|
|
| company_partner | nebula_edge_invest_h | 备注 |
|
|
|
| ------------------- | --------------------------- | ---- |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| stock_name_digest | partner_company_name_digest | |
|
|
|
| stock_capital | stock_capital | |
|
|
|
| stock_proportion | stock_proportion | |
|
|
|
| stock_realcapital | stock_realcapital | |
|
|
|
| is_history | is_history | |
|
|
|
|
|
|
## edge_own
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_own
|
|
|
file 爬虫数据
|
|
|
database 融合库
|
|
|
|
|
|
爬虫数据 --> 融合库
|
|
|
融合库 --> nebula的edge_own: binlog更新
|
|
|
@enduml
|
|
|
```
|
|
|
直接通过监控融合库tb_company_legalperson表的binlog更新
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own
|
|
|
CREATE EDGE `edge_own` (
|
|
|
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_legalperson
|
|
|
|
|
|
```
|
|
|
通过binlog更新,lp_ppid is not null and company_name_digest is not null
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | 备注 |
|
|
|
| ---------------------- | ---- |
|
|
|
| lp_ppid | |
|
|
|
| company_name_digest | |
|
|
|
| is_history | |
|
|
|
| update_time | |
|
|
|
|
|
|
## edge_own_c
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_own_c
|
|
|
file 爬虫数据
|
|
|
database 融合库
|
|
|
|
|
|
爬虫数据 --> 融合库
|
|
|
融合库 --> nebula的edge_own_c: binlog更新
|
|
|
@enduml
|
|
|
```
|
|
|
直接通过监控融合库tb_company_legalperson表的binlog更新
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
CREATE EDGE `edge_own_c` (
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_legalperson
|
|
|
|
|
|
```
|
|
|
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | 备注 |
|
|
|
| ---------------------- | ---- |
|
|
|
| lp_company_name_digest | |
|
|
|
| company_name_digest | |
|
|
|
| is_history | |
|
|
|
| update_time | |
|
|
|
|
|
|
## edge_branch
|
|
|
|
|
|
### 逻辑
|
|
|
@startuml
|
|
|
database nebula的edge_branch
|
|
|
database 中间表nebula_edge_branch
|
|
|
queue kafka
|
|
|
file data_pump
|
|
|
database utn_ic.company_branch
|
|
|
database mongo
|
|
|
|
|
|
mongo --> utn_ic.company_branch
|
|
|
utn_ic.company_branch --> data_pump: 存量
|
|
|
data_pump --> 中间表nebula_edge_branch: 存量入表
|
|
|
中间表nebula_edge_branch --> nebula的edge_branch: binlog更新
|
|
|
utn_ic.company_branch --> kafka: 增量写kafka
|
|
|
kafka --> 中间表nebula_edge_branch: 例行入表
|
|
|
@enduml
|
|
|
```
|
|
|
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
企业与分支机构企业间关系的集合,类型为edge,用<s_fid, edge_branch, (Rank), e_fid>唯一标识,表名为edge_branch
|
|
|
CREATE EDGE `edge_branch` (
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "母公司企业唯一id",
|
|
|
`branch_company_name_digest` fixed_string(32) NOT NULL COMMENT "分支企业唯一id",
|
|
|
`update_time` datetime NULL COMMENT "最后更新时间"
|
|
|
) ttl_duration = 0, ttl_col = "", comment = "公司分支机构edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_branch` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的s_fid',
|
|
|
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和branch_company_name_digest拼接,对应nebula中的e_fid',
|
|
|
`branch_company_name_digest` char(32) DEFAULT NULL COMMENT '分支机构唯一id',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_branch的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### utn_ic.company_branch
|
|
|
|
|
|
| company_branch | nebula_edge_branch | 备注 |
|
|
|
| ------------------- | -------------------------- | ------------------------------------------------------------ |
|
|
|
| branch_name_digest | branch_company_name_digest | |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| n_company_status | is_history | (case n_company_status when '正常' then 0 else 1 end) as is_history |
|
|
|
| branch_name_digest | e_fid | e_fid='f'+branch_name_digest |
|
|
|
| company_name_digest | s_fid | s_fid='f'+company_name_digest | |
|
|
\ No newline at end of file |