Skip to content

Commit 4ab65e3

Browse files
fix: add reducer for consumer
1 parent 3e0293c commit 4ab65e3

File tree

7 files changed

+271
-13
lines changed

7 files changed

+271
-13
lines changed

src/services/api.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ interface Window {
6262
getTopic: (params: {
6363
path?: string;
6464
}) => Promise<import('../types/api/topic').DescribeTopicResult>;
65+
getConsumer: (params: {
66+
path?: string;
67+
consumer?: string;
68+
}) => Promise<import('../types/api/consumer').DescribeConsumerResult>;
6569
[method: string]: Function;
6670
};
6771
}

src/services/api.js

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,28 @@ export class YdbEmbeddedAPI extends AxiosWrapper {
141141
path,
142142
});
143143
}
144-
getTopic({path}) {
145-
return this.get(this.getPath('/viewer/json/describe_topic'), {
146-
enums: true,
147-
include_stats: true,
148-
path,
149-
});
144+
getTopic({path}, {concurrentId} = {}) {
145+
return this.get(
146+
this.getPath('/viewer/json/describe_topic'),
147+
{
148+
enums: true,
149+
include_stats: true,
150+
path,
151+
},
152+
{concurrentId: concurrentId || 'getTopic'},
153+
);
154+
}
155+
getConsumer({path, consumer}, {concurrentId} = {}) {
156+
return this.get(
157+
this.getPath('/viewer/json/describe_consumer'),
158+
{
159+
enums: true,
160+
include_stats: true,
161+
path,
162+
consumer,
163+
},
164+
{concurrentId: concurrentId || 'getConsumer'},
165+
);
150166
}
151167
getPoolInfo(poolName) {
152168
return this.get(this.getPath('/viewer/json/storage'), {

src/store/reducers/consumer.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/* eslint-disable camelcase */
2+
import type {Reducer} from 'redux';
3+
import {createSelector, Selector} from 'reselect';
4+
5+
import type {
6+
IConsumerAction,
7+
IConsumerRootStateSlice,
8+
IConsumerState,
9+
IPreparedPartitionData,
10+
} from '../../types/store/consumer';
11+
12+
import '../../services/api';
13+
14+
import {convertBytesObjectToSpeed} from '../../utils/bytesParsers';
15+
import {parseLag, parseTimestampToIdleTime} from '../../utils/timeParsers';
16+
import {isNumeric} from '../../utils/utils';
17+
18+
import {createRequestActionTypes, createApiRequest} from '../utils';
19+
20+
export const FETCH_CONSUMER = createRequestActionTypes('consumer', 'FETCH_CONSUMER');
21+
22+
const SET_DATA_WAS_NOT_LOADED = 'consumer/SET_DATA_WAS_NOT_LOADED';
23+
24+
const initialState = {
25+
loading: false,
26+
wasLoaded: false,
27+
data: {},
28+
};
29+
30+
const consumer: Reducer<IConsumerState, IConsumerAction> = (state = initialState, action) => {
31+
switch (action.type) {
32+
case FETCH_CONSUMER.REQUEST: {
33+
return {
34+
...state,
35+
loading: true,
36+
};
37+
}
38+
case FETCH_CONSUMER.SUCCESS: {
39+
// On older version it can return HTML page of Internal Viewer with an error
40+
if (typeof action.data !== 'object') {
41+
return {...state, loading: false, error: {}};
42+
}
43+
44+
return {
45+
...state,
46+
data: action.data,
47+
loading: false,
48+
wasLoaded: true,
49+
error: undefined,
50+
};
51+
}
52+
case FETCH_CONSUMER.FAILURE: {
53+
if (action.error?.isCancelled) {
54+
return state;
55+
}
56+
57+
return {
58+
...state,
59+
error: action.error,
60+
loading: false,
61+
};
62+
}
63+
case SET_DATA_WAS_NOT_LOADED: {
64+
return {
65+
...state,
66+
wasLoaded: false,
67+
};
68+
}
69+
default:
70+
return state;
71+
}
72+
};
73+
74+
export const setDataWasNotLoaded = () => {
75+
return {
76+
type: SET_DATA_WAS_NOT_LOADED,
77+
} as const;
78+
};
79+
80+
export function getConsumer(path?: string, consumerName?: string) {
81+
return createApiRequest({
82+
request: window.api.getConsumer({path, consumer: consumerName}),
83+
actions: FETCH_CONSUMER,
84+
});
85+
}
86+
87+
export const selectPartitions = (state: IConsumerRootStateSlice) => state.consumer.data?.partitions;
88+
89+
export const selectPreparedPartitionsData: Selector<
90+
IConsumerRootStateSlice,
91+
IPreparedPartitionData[] | undefined
92+
> = createSelector([selectPartitions], (partitions) => {
93+
return partitions?.map((partition) => {
94+
// describe_consumer endpoint doesn't return zero values, so some values will be initialized with 0
95+
const {partition_id = '0', partition_stats, partition_consumer_stats} = partition;
96+
97+
const {
98+
partition_offsets,
99+
store_size_bytes = '0',
100+
last_write_time: partition_last_write_time,
101+
max_write_time_lag: partition_write_lag,
102+
bytes_written,
103+
partition_node_id = 0,
104+
} = partition_stats || {};
105+
106+
const {start: start_offset = '0', end: end_offset = '0'} = partition_offsets || {};
107+
108+
const {
109+
last_read_offset = '0',
110+
committed_offset = '0',
111+
read_session_id,
112+
last_read_time: consumer_last_read_time,
113+
max_read_time_lag: consumer_read_lag,
114+
max_write_time_lag: consumer_write_lag,
115+
bytes_read,
116+
reader_name,
117+
connection_node_id = 0,
118+
} = partition_consumer_stats || {};
119+
120+
const uncommitedMessages =
121+
isNumeric(end_offset) && isNumeric(committed_offset)
122+
? Number(end_offset) - Number(committed_offset)
123+
: 0;
124+
125+
const unreadMessages =
126+
isNumeric(end_offset) && isNumeric(last_read_offset)
127+
? Number(end_offset) - Number(last_read_offset)
128+
: 0;
129+
130+
return {
131+
partitionId: partition_id,
132+
storeSize: store_size_bytes,
133+
134+
writeSpeed: convertBytesObjectToSpeed(bytes_written),
135+
readSpeed: convertBytesObjectToSpeed(bytes_read),
136+
137+
partitionWriteLag: parseLag(partition_write_lag),
138+
partitionWriteIdleTime: parseTimestampToIdleTime(partition_last_write_time),
139+
140+
consumerWriteLag: parseLag(consumer_write_lag),
141+
consumerReadLag: parseLag(consumer_read_lag),
142+
consumerReadIdleTime: parseTimestampToIdleTime(consumer_last_read_time),
143+
144+
uncommitedMessages,
145+
unreadMessages,
146+
147+
startOffset: start_offset,
148+
endOffset: end_offset,
149+
commitedOffset: committed_offset,
150+
151+
readSessionId: read_session_id,
152+
readerName: reader_name,
153+
154+
partitionNodeId: partition_node_id,
155+
connectionNodeId: connection_node_id,
156+
};
157+
});
158+
});
159+
160+
export default consumer;

src/store/reducers/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import pool from './pool';
1818
import tenants from './tenants';
1919
import tablet from './tablet';
2020
import topic from './topic';
21+
import consumer from './consumer';
2122
import executeQuery from './executeQuery';
2223
import explainQuery from './explainQuery';
2324
import tabletsFilters from './tabletsFilters';
@@ -57,6 +58,7 @@ export const rootReducer = {
5758
tenants,
5859
tablet,
5960
topic,
61+
consumer,
6062
executeQuery,
6163
explainQuery,
6264
tabletsFilters,

src/store/reducers/topic.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import {createRequestActionTypes, createApiRequest} from '../utils';
77

88
export const FETCH_TOPIC = createRequestActionTypes('topic', 'FETCH_TOPIC');
99

10+
const SET_DATA_WAS_NOT_LOADED = 'topic/SET_DATA_WAS_NOT_LOADED';
11+
1012
const initialState = {
1113
loading: true,
1214
wasLoaded: false,
13-
data: {},
15+
data: undefined,
1416
};
1517

1618
const topic: Reducer<ITopicState, ITopicAction> = (state = initialState, action) => {
@@ -22,6 +24,11 @@ const topic: Reducer<ITopicState, ITopicAction> = (state = initialState, action)
2224
};
2325
}
2426
case FETCH_TOPIC.SUCCESS: {
27+
// On older version it can return HTML page of Internal Viewer with an error
28+
if (typeof action.data !== 'object') {
29+
return {...state, loading: false, error: {}};
30+
}
31+
2532
return {
2633
...state,
2734
data: action.data,
@@ -31,17 +38,33 @@ const topic: Reducer<ITopicState, ITopicAction> = (state = initialState, action)
3138
};
3239
}
3340
case FETCH_TOPIC.FAILURE: {
41+
if (action.error?.isCancelled) {
42+
return state;
43+
}
44+
3445
return {
3546
...state,
3647
error: action.error,
3748
loading: false,
3849
};
3950
}
51+
case SET_DATA_WAS_NOT_LOADED: {
52+
return {
53+
...state,
54+
wasLoaded: false,
55+
};
56+
}
4057
default:
4158
return state;
4259
}
4360
};
4461

62+
export const setDataWasNotLoaded = () => {
63+
return {
64+
type: SET_DATA_WAS_NOT_LOADED,
65+
} as const;
66+
};
67+
4568
export function getTopic(path?: string) {
4669
return createApiRequest({
4770
request: window.api.getTopic({path}),

src/types/store/consumer.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type {IProcessSpeedStats} from '../../utils/bytesParsers';
2+
import type {ApiRequestAction} from '../../store/utils';
3+
4+
import {FETCH_CONSUMER, setDataWasNotLoaded} from '../../store/reducers/consumer';
5+
6+
import type {DescribeConsumerResult} from '../api/consumer';
7+
import type {IResponseError} from '../api/error';
8+
9+
// All fields should be present though they could be undefined
10+
export interface IPreparedPartitionData {
11+
partitionId: string;
12+
storeSize: string;
13+
14+
writeSpeed: IProcessSpeedStats;
15+
readSpeed: IProcessSpeedStats;
16+
17+
partitionWriteLag: number;
18+
partitionWriteIdleTime: number;
19+
20+
consumerWriteLag: number;
21+
consumerReadLag: number;
22+
consumerReadIdleTime: number;
23+
24+
uncommitedMessages: number;
25+
unreadMessages: number;
26+
27+
startOffset: string;
28+
endOffset: string;
29+
commitedOffset: string;
30+
31+
readSessionId: string | undefined;
32+
readerName: string | undefined;
33+
34+
partitionNodeId: number;
35+
connectionNodeId: number;
36+
}
37+
38+
export interface IConsumerState {
39+
loading: boolean;
40+
wasLoaded: boolean;
41+
data?: DescribeConsumerResult;
42+
error?: IResponseError;
43+
}
44+
45+
type IConsumerApiRequestAction = ApiRequestAction<
46+
typeof FETCH_CONSUMER,
47+
DescribeConsumerResult,
48+
IResponseError
49+
>;
50+
51+
export type IConsumerAction = IConsumerApiRequestAction | ReturnType<typeof setDataWasNotLoaded>;
52+
53+
export interface IConsumerRootStateSlice {
54+
consumer: IConsumerState;
55+
}

src/types/store/topic.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {FETCH_TOPIC} from '../../store/reducers/topic';
1+
import {FETCH_TOPIC, setDataWasNotLoaded} from '../../store/reducers/topic';
22
import type {ApiRequestAction} from '../../store/utils';
33
import type {IResponseError} from '../api/error';
44
import type {DescribeTopicResult} from '../api/topic';
@@ -10,11 +10,9 @@ export interface ITopicState {
1010
error?: IResponseError;
1111
}
1212

13-
export type ITopicAction = ApiRequestAction<
14-
typeof FETCH_TOPIC,
15-
DescribeTopicResult,
16-
IResponseError
17-
>;
13+
export type ITopicAction =
14+
| ApiRequestAction<typeof FETCH_TOPIC, DescribeTopicResult, IResponseError>
15+
| ReturnType<typeof setDataWasNotLoaded>;
1816

1917
export interface ITopicRootStateSlice {
2018
topic: ITopicState;

0 commit comments

Comments
 (0)