Skip to content

Commit 7f86eb4

Browse files
authored
Use actual data for date predictions if available, not partition names. (#37)
* Use actual data for date predictions if available, not partition names. Fixes #30 This commit limits the functionality to tables with single-column primary keys. * More text to the README about insertion_date_query * Set to v0.3.0 unstable * Rename insertion_date_query to earliest_utc_timestamp_query * Use SQL quoting in the examples * Fix spelling error
1 parent 9116265 commit 7f86eb4

File tree

7 files changed

+409
-48
lines changed

7 files changed

+409
-48
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,17 @@ Similar tools:
2828
days: 90
2929
dburl: "sql://user:password@localhost3306:/test_db"
3030
tables:
31-
cats: {}
31+
cats:
32+
earliest_utc_timestamp_query: >
33+
SELECT UNIX_TIMESTAMP(`created`) FROM `cats` WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;
3234
dogs:
3335
partition_period:
3436
days: 30
37+
earliest_utc_timestamp_query: >
38+
SELECT UNIX_TIMESTAMP(`c`.`created`) FROM `dogs` AS `d`
39+
JOIN `cats` AS `c` ON `c`.`house_id` = `d`.`house_id`
40+
WHERE `d`.`id` > '?'
41+
ORDER BY `d`.`id` ASC LIMIT 1;
3542
prometheus_stats: "/tmp/prometheus-textcollect-partition-manager.prom"
3643
EOF
3744
→ partition-manager --config /tmp/partman.conf.yml maintain --noop
@@ -80,15 +87,23 @@ partitionmanager:
8087
table1:
8188
retention:
8289
days: 60
90+
earliest_utc_timestamp_query: >
91+
SELECT UNIX_TIMESTAMP(created) FROM table1 WHERE id > ? ORDER BY id ASC LIMIT 1;
8392
table2:
8493
partition_period:
8594
days: 30
95+
earliest_utc_timestamp_query: >
96+
SELECT UNIX_TIMESTAMP(created) FROM table2 WHERE id > ? ORDER BY id ASC LIMIT 1;
8697
table3:
8798
retention:
8899
days: 14
100+
earliest_utc_timestamp_query: >
101+
SELECT UNIX_TIMESTAMP(created) FROM table3 WHERE id > ? ORDER BY id ASC LIMIT 1;
89102
table4: {}
90103
```
91104
105+
The `earliest_utc_timestamp_query` entries are optional SQL queries that are run during partition map analysis to determine the eact timestamp of the earliest entry in each partition. If you configure such a query for a table, it must return a single row and column, specifically the epoch timestamp in UTC of the earliest entry the partition. There is expcected a single `?` entry which will be replaced with the partition value of that partition.
106+
92107
For tables which are either partitioned but not yet using this tool's schema, or which have no empty partitions, the `migrate` command can be useful for proposing alterations to run manually. Note that `migrate` proposes commands that are likely to require partial copies of each table, so likely they will require a maintenance period.
93108

94109
```sh

partitionmanager/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,15 @@ def from_yaml_file(self, file):
133133
tabledata["partition_period"]
134134
)
135135
)
136+
if (
137+
isinstance(tabledata, dict)
138+
and "earliest_utc_timestamp_query" in tabledata
139+
):
140+
tab.set_earliest_utc_timestamp_query(
141+
partitionmanager.types.SqlQuery(
142+
tabledata["earliest_utc_timestamp_query"]
143+
)
144+
)
136145

137146
self.tables.add(tab)
138147
if "prometheus_stats" in data:
@@ -327,6 +336,7 @@ def do_partition(conf):
327336
cur_pos.set_position([positions[col] for col in map_data["range_cols"]])
328337

329338
sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
339+
database=conf.dbcmd,
330340
table=table,
331341
partition_list=map_data["partitions"],
332342
current_position=cur_pos,

partitionmanager/table_append_partition.py

Lines changed: 132 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Design and perform partition management.
33
"""
44

5-
from datetime import timedelta
5+
from datetime import datetime, timedelta, timezone
66
import logging
77
import operator
88
import re
@@ -360,34 +360,16 @@ def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan):
360360
return partition_start_time.replace(minute=0, second=0, microsecond=0)
361361

362362

363-
def _plan_partition_changes(
364-
table,
365-
partition_list,
366-
current_position,
367-
evaluation_time,
368-
allowed_lifespan,
369-
num_empty_partitions,
363+
def _get_rate_partitions_with_implicit_timestamps(
364+
table, filled_partitions, current_position, evaluation_time, active_partition
370365
):
371-
"""Return a list of partitions to modify or create.
366+
""" Return a list of PositionPartitions for use in rate calculations.
372367
373-
This method makes recommendations in order to meet the supplied table
374-
requirements, using an estimate as to the rate of fill from the supplied
375-
partition_list, current_position, and evaluation_time.
368+
The partitions are set with implicit timestamps.
376369
"""
377-
log = logging.getLogger(f"plan_partition_changes:{table.name}")
378-
379-
filled_partitions, active_partition, empty_partitions = _split_partitions_around_position(
380-
partition_list, current_position
370+
log = logging.getLogger(
371+
f"_get_rate_partitions_with_implicit_timestamps:{table.name}"
381372
)
382-
if not empty_partitions:
383-
log.error(
384-
f"Partition {active_partition.name} requires manual ALTER "
385-
"as without an empty partition to manipulate, you'll need to "
386-
"perform an expensive copy operation. See the bootstrap mode."
387-
)
388-
raise partitionmanager.types.NoEmptyPartitionsAvailableException()
389-
if not active_partition:
390-
raise Exception("Active Partition can't be None")
391373

392374
rate_relevant_partitions = None
393375

@@ -400,10 +382,10 @@ def _plan_partition_changes(
400382
# partition's dates and positions.
401383
rate_relevant_partitions = filled_partitions + [
402384
partitionmanager.types.InstantPartition(
403-
active_partition.timestamp(), current_position
385+
"p_current_pos", active_partition.timestamp(), current_position
404386
),
405387
partitionmanager.types.InstantPartition(
406-
evaluation_time, active_partition.position
388+
"p_prev_pos", evaluation_time, active_partition.position
407389
),
408390
]
409391
else:
@@ -420,12 +402,132 @@ def _plan_partition_changes(
420402
lambda f: f.timestamp() < evaluation_time, filled_partitions
421403
)
422404
rate_relevant_partitions = list(filled_partitions) + [
423-
partitionmanager.types.InstantPartition(evaluation_time, current_position)
405+
partitionmanager.types.InstantPartition(
406+
"p_current_pos", evaluation_time, current_position
407+
)
424408
]
425409

410+
return rate_relevant_partitions
411+
412+
413+
def _get_rate_partitions_with_queried_timestamps(
414+
database, table, partition_list, current_position, evaluation_time, active_partition
415+
):
416+
""" Return a list of PositionPartitions for use in rate calculations.
417+
418+
The partitions' timestamps are explicitly queried.
419+
"""
420+
log = logging.getLogger(
421+
f"_get_rate_partitions_with_queried_timestamps:{table.name}"
422+
)
423+
424+
if not table.has_date_query:
425+
raise ValueError("Table has no defined date query")
426+
427+
instant_partitions = list()
428+
429+
for partition in partition_list:
430+
if len(partition.position) != 1:
431+
raise ValueError(
432+
"This method is only valid for single-column partitions right now"
433+
)
434+
arg = partition.position.as_sql_input()[0]
435+
436+
sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(
437+
arg
438+
)
439+
log.debug(
440+
"Executing %s to derive partition %s at position %s",
441+
sql_select_cmd,
442+
partition.name,
443+
partition.position,
444+
)
445+
446+
start = datetime.now()
447+
exact_time_result = database.run(sql_select_cmd)
448+
end = datetime.now()
449+
450+
assert len(exact_time_result) == 1
451+
assert len(exact_time_result[0]) == 1
452+
for key, value in exact_time_result[0].items():
453+
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
454+
break
455+
456+
log.debug(
457+
"Exact time of %s returned for %s at position %s, query took %s",
458+
exact_time,
459+
partition.name,
460+
partition.position,
461+
(end - start),
462+
)
463+
464+
instant_partitions.append(
465+
partitionmanager.types.InstantPartition(
466+
partition.name, exact_time, partition.position
467+
)
468+
)
469+
470+
instant_partitions.append(
471+
partitionmanager.types.InstantPartition(
472+
active_partition.name, evaluation_time, current_position
473+
)
474+
)
475+
476+
return instant_partitions
477+
478+
479+
def _plan_partition_changes(
480+
database,
481+
table,
482+
partition_list,
483+
current_position,
484+
evaluation_time,
485+
allowed_lifespan,
486+
num_empty_partitions,
487+
):
488+
"""Return a list of partitions to modify or create.
489+
490+
This method makes recommendations in order to meet the supplied table
491+
requirements, using an estimate as to the rate of fill from the supplied
492+
partition_list, current_position, and evaluation_time.
493+
"""
494+
log = logging.getLogger(f"plan_partition_changes:{table.name}")
495+
496+
filled_partitions, active_partition, empty_partitions = _split_partitions_around_position(
497+
partition_list, current_position
498+
)
499+
if not empty_partitions:
500+
log.error(
501+
f"Partition {active_partition.name} requires manual ALTER "
502+
"as without an empty partition to manipulate, you'll need to "
503+
"perform an expensive copy operation. See the bootstrap mode."
504+
)
505+
raise partitionmanager.types.NoEmptyPartitionsAvailableException()
506+
if not active_partition:
507+
raise Exception("Active Partition can't be None")
508+
509+
if table.has_date_query:
510+
rate_relevant_partitions = _get_rate_partitions_with_queried_timestamps(
511+
database,
512+
table,
513+
filled_partitions,
514+
current_position,
515+
evaluation_time,
516+
active_partition,
517+
)
518+
else:
519+
rate_relevant_partitions = _get_rate_partitions_with_implicit_timestamps(
520+
table,
521+
filled_partitions,
522+
current_position,
523+
evaluation_time,
524+
active_partition,
525+
)
526+
426527
rates = _get_weighted_position_increase_per_day_for_partitions(
427528
rate_relevant_partitions
428529
)
530+
429531
log.debug(
430532
f"Rates of change calculated as {rates} per day from "
431533
f"{len(rate_relevant_partitions)} partitions"
@@ -620,6 +722,7 @@ def generate_sql_reorganize_partition_commands(table, changes):
620722

621723
def get_pending_sql_reorganize_partition_commands(
622724
*,
725+
database,
623726
table,
624727
partition_list,
625728
current_position,
@@ -656,6 +759,7 @@ def get_pending_sql_reorganize_partition_commands(
656759
)
657760

658761
partition_changes = _plan_partition_changes(
762+
database,
659763
table,
660764
partition_list,
661765
current_position,

0 commit comments

Comments
 (0)