Что это такое?
Метод хранения древовидных структур данных Adjacency List один из самых простых в понимании, так как узел включает прямую связь с родителем и не перенасыщен избыточными данными.
По сути за структуру девева отвечает два поля: id и pid и вычислить цепочку наследования довольно просто. Правда наследование ограниченно одним уровнем, т.е. за один шаг мы можем вычислить только родительский узел или ребенка, но не «дедушек» и «внуков».
Создадим таблицу для нашего дерева:
CREATE TABLE my_tree (
id INTEGER NOT NULL,
pid INTEGER,
field1 TEXT,
PRIMARY KEY (id)
);
Где:
- id — идентификатор узла;
- pid — идентификатор родительского узла;
Соответственно id у нас либо AUTO_INCREMENT или SERIAL.
Как использовать
Кстати да, выборка данных из Adjacency List, по моему мнению, самая сложная, ибо подводных камней неимоверное количество. Хотя, на самом деле, все зависит от степени понимания работы других алгоритмов.
Определим, как мы можем выбирать данные из этого дерева. Для начала соберем список требуемых данных:
- родительский узел;
- подчиненные узлы;
- родительская ветка;
- подчиненные ветви;
Существует два основных аглоритма выборки — с использованием рекурсивной процедуры и обычным формированием массива.
Уровень реализации решения, может быть тоже различен, эти задачи можно решить как на уровне приложения так и на уровне базы данных, все зависит только лишь от знаний разработчика и его пристрастий.
Родительский узел и подчиненные узлы
Эти две задачи самые простые и решаются буквально одним запросом, так родительский узел можно получить с помощью запроса:
SELECT * FROM my_tree WHERE id = [pid];
… а подчиненные узлы:
SELECT * FROM my_tree WHERE pid = [id];
… начало многообещающее, но простыми выборками закончили, теперь более сложное действо:
Родительская ветка
Когда нам требуется выбрать всю родительскую ветку, то мы не можем её выбрать одним стандартным запросом. Впрочем, одним запросом мы можем выбрать, но он будет динамическим, то есть формироваться относительно текущего узла.
Выбрать родителькую ветку одним запросом с помощью JOIN таблицы, по моему мнению не самый правильный способ, так как мы выберем несколько строк, склееных в одну, и потом нам придется её нарезать, а так же «многоэтажный» JOIN не самое лучшее решение. Но такое решение имеет место быть, поэтому рассмотрим его.
Если использовать такой вариант решения, то рекомендую добавить в таблицу дополнительное поле level, в котором будем указывать уровень узла. В итоге для получения родительской ветки получим запрос вида:
...
my $dbh = DBI->connect(...);
...
# Текущий узел, допустим, мы его уже получили
my $item = {
id => 10,
pid => 5,
level => 4,
field1 => 'blabla'
};
# Формируем начало запроса до FROM
my $query = 'SELECT * FROM my_tree AS l' . $item->{level} . ' ';
# Формируем в цикле JOIN родителя
for (my $i = $item->{level} - 1; $i > 0; $i--) {
$query .= 'LEFT JOIN my_tree AS l'.$i.' ON l'.$i.'.id=l'.($i + 1).'.pid '
}
# Дописываем условие
$query .= 'WHERE l' . $item->{level} .'.id=' . $item->{pid};
my $sth = $dbh->prepare($query); $sht->execute;
my @row = $sth->fetchrow_array;
$sth->finish;
my @rows = ();
# Нарезаем полученную строку по количеству элементов
while (my @arr = splice(@row, 0, scalar keys %$item)) {
push @rows, \@arr;
};
# Список родителей в массиве @rows
...
вышеуказанный код динамически формирует запрос к базе данных такого вида:
SELECT *
FROM my_tree AS l4
LEFT JOIN my_tree AS l3 ON l3.id = l4.pid
LEFT JOIN my_tree AS l2 ON l2.id = l3.pid
LEFT JOIN my_tree AS l1 ON l1.id = l2.pid
WHERE l4.id = [item.pid];
Ресурсоемко, но работает. Впрочем, это еще не предел. Можно собрать более сложный запрос, который будет возвращать не одну строку, а как и положено — несколько:
SELECT l4.*
FROM my_tree AS l4
WHERE l4.id = [item.pid]
UNION SELECT l3.*
FROM my_tree AS l4
LEFT JOIN my_tree AS l3 ON l3.id = l4.pid
WHERE l4.id = [item.pid]
UNION SELECT l2.*
FROM my_tree AS l4
LEFT JOIN my_tree AS l3 ON l3.id = l4.pid
LEFT JOIN my_tree AS l2 ON l2.id = l3.pid
WHERE l4.id = [item.pid]
UNION SELECT l1.*
FROM my_tree AS l4
LEFT JOIN my_tree AS l3 ON l3.id = l4.pid
LEFT JOIN my_tree AS l2 ON l2.id = l3.pid
LEFT JOIN my_tree AS l1 ON l1.id = l2.pid
WHERE l4.id = [item.pid]
ORDER BY level;
Список-то такой запрос вернет, но использовать такой алгоритм крайне не рекомендовано, так как количество реальных запросов к таблице будет равно сумме натуральных чисел уровня, то бишь, чуть более чем.
Разработчикам использующим PostgreSQL еще проще, так как начиная с версии 8.4 PostgreSQL поддерживает рекурсивные запросы:
WITH RECURSIVE parents AS (
SELECT *, ARRAY[t.id] AS exist, FALSE AS cycle
FROM my_tree AS t
WHERE id = [item.pid]
UNION ALL
SELECT t.*, exist || t.id, t.id = ANY(exist)
FROM parents AS p, my_tree AS t
WHERE t.id = p.pid AND NOT cycle
)
SELECT * FROM parents WHERE NOT cycle LIMIT [max_deep];
Немного поясню, для тех кто не понял: WITH RECURSIVE мы создаем рекурсивный подзапрос parents, первый запрос: первый запрос рекурсии, в нем указываем точку отсчета. Второй запрос, собственно, рекурсивный запрос. Поле exist — массив уже полученных id, который мы проверяем в поле cycle, что бы не уйти в бесконечную рекурсию. LIMIT ограничивает глубину выборки ветки, так как родитель у узла может быть только лишь один.
На этом решения использующие всего один запрос можно считать исчерпаны. Теперь можно рассмотреть следущие алгоритмы:
Рекурсивная функция по определению — это функция вызывающая саму себя, поэтому очень важно соблюдать область видимости переменных внутри нее, итак:
...
sub parent_recurse {
my %params = @_;
# Текущая глубина рекурсии
$params{deep} ||= 1;
# Хеш ранее выбранных узлов
$params{exist} ||= {};
# Возвращаем пустой список если превысили максимальную заданную глубину рекурсии
return () if $params{deep} && $params{max_deep} && $params{deep} > $params{max_deep};
# делаем запрос к базе данных на поллучение следующего узла
my $query = 'SELECT * FROM my_tree WHERE id = ' . $params{pid};
my $sth = $dbh->prepare($query); $sth->execute;
my $item = $sth->fetchrow_hashref;
$sth->finish;
# Объявляем внутренний список узлов - родителей
my @parents = ();
# Если узел найден и у него явно есть родитель, то
if ($item && $item->{pid}) {
# Для начала проверим, а не выбирали ли мы уже такой родительский узел
return () if $params{exist}->{$item->{pid}};
# Не выбирали, тогда добавляем в хеш
$params{exist}->{$item->{id}} = 1;
# И вызываем снова рекурсивную функцию
@parents = &parent_recurse(
pid => $item->{pid},
max_deep => $params{max_deep},
deep => $params{deep} + 1,
exist => $params{exist}
);
}
# Добавлем выбранный узел к массиву, если есть
push @parents, $item if $item;
# Возвращаем список выбранных узлов
return @parents;
}
...
my @parents = &parent_recurse(pid => $item->{pid}, max_deep => 2);
...
Практически, ничего сложного, правда мы делаем несколько запросов, в зависимости от заданной глубины или длинны родительской ветки.
Можно обойтись и без рекурсивной функции посредством простого цикла динамического массива:
...
# Объявляем пустой список родителей
my @parents = ();
# Объявляем хеш уже полученных узлов, и добавляем в него id текущего узла
my %exist = ($item->{id} => 1);
# Объявляем динамический массив и вносим в него текущий узел
my @while = ($item);
# Максимальная глубина выборки
my $max_deep = 3;
# Текущая глубина выборки
my $deep = 1;
# В цикле отрезаем первый эмемент динамического массива до тех пор пока это возможно
while (my $item = shift @while && $deep <= $max_deep) {
# Прерываем цикл, если узла явно нет родителя или такого родителя мы уже получали
last if !$item->{pid} || $exist{$item->{pid}};
# Делаем запрос к базе данных
my $query = 'SELECT * FROM my_tree WHERE id = ' . $item->{pid};
my $sth = $dbh->prepare($query); $sth->execute;
my $parent = $sth->fetchrow_hashref;
$sth->finish;
# Прерываем цикл, если узел не получили
last unless $parent;
# Добавляем узел в массив родителей
push @parents, $parent;
# Добавляем id узла в хеш полученных узлов
$exist{$parent->{id}} = 1;
# Добавляем узел в динамический массив
push @while, $parent;
# Инкрементим текущую глубину
$deep++;
}
...
Это решение проще в понимании и менее ресурсоемко, так как нет лишних вызовов, но, на самом деле, в основном все зависит от пристрастий разработчика, так как оба решения по сути — равнозначны. Кстати использовать динамический массив в этой задаче вовсе не обязательно, так как родитель у узла всегда один, можно обойтись обычной проверкой текущего pid, что, впрочем, рассмотрим далее.
Так же можно еще решить эту задачу используя хранимые процедуры базы данных:
Для PostgreSQL:
CREATE OR REPLACE FUNCTION "public"."get_parents" (
pid_in INTEGER,
deep INTEGER DEFAULT NULL
) RETURNS SETOF "public"."my_tree" AS
$body$
DECLARE
exist_ids INTEGER[] := ARRAY[0]; -- Для пустого массива плохо работает ALL
pid_now INTEGER := pid_in; -- Текущий pid
deep_now INTEGER := deep; -- Текущаа глубина
item my_tree;
BEGIN
WHILE
pid_now IS NOT NULL AND
pid_now > 0 AND
pid_now <> ALL (exist_ids) AND
(deep_now IS NULL OR
deep_now > 0)
LOOP
SELECT * INTO item
FROM my_tree
WHERE id = pid_now;
IF item.id IS NULL THEN
EXIT;
END IF;
RETURN NEXT item;
pid_now := item.pid;
exist_ids := exist_ids || item.id;
IF deep_now IS NOT NULL THEN
deep_now := deep_now - 1;
END IF;
END LOOP;
RETURN;
END;
$body$
LANGUAGE 'plpgsql';
Я выбрал второй алгоритм циклической выборки, так как он более простой в понимании, тем более, из-за того, что родитель всего один, то можно вообще не использовать динамический массив, а проверять один параметр pin_now.
Вызов на уровне приложения более чем прост:
...
my @parents = ();
my $deep = 2;
my $query = 'SELECT * FROM get_parents(' . $item->{pid} . ($deep ? ', '.$deep : '') . ')';
my $sth = $dbh->prepare($query); $sth->execute;
while (my $row = $sth->fetchrow_hashref) {
push @parents, $row;
}
$sth->finish;
...
Для MySQL:
CREATE DEFINER = CURRENT_USER PROCEDURE `get_parents`(
IN pid_in INTEGER,
IN max_deep INTEGER
)
BEGIN
DECLARE id_now INTEGER;
DECLARE pid_now INTEGER DEFAULT pid_in;
DECLARE field1_now TEXT;
DECLARE exist TEXT DEFAULT '|';
WHILE pid_now > 0 AND
exist NOT LIKE CONCAT('%|', pid_now, '|%') AND
(max_deep IS NULL OR max_deep > 0)
DO
SELECT * INTO id_now, pid_now, field1_now
FROM my_tree
WHERE id = pid_now;
SELECT id_now, pid_now, field1_now;
IF (max_deep IS NOT NULL)
THEN SET max_deep = max_deep - 1;
END IF;
SET exist = CONCAT(exist, id_now, '|');
END WHILE;
END;
Конечно, приходится жестко указывать получаемые поля из запросов, но увы, либо так, либо лишние запросы на получение pid.
Есть одно «но» — процедуры в MySQL возвращают несколько result set для каждого запроса внутри процедуры, что немного усложняет получение результата на уровне приложения, но не на много:
Так, при использовании Perl DBI код получения будет выглядеть так:
...
my @parents = ();
my $query = 'CALL get_parents(8, NULL)';
my $sth = $dbh->prepare($query); $sth->execute;
do {
while (my $row = $sth->fetchrow_hashref) {
push @parents, $row;
}
} while (my $more = $sth->more_results);
$sth->finish;
...
При использовании PHP:
...
$query = "CALL get_parents(7, 10);";
$res_list = array();
if (mysqli_multi_query($link, $query)) {
do {
if ($result = mysqli_store_result($link)) {
while ($row = $result->fetch_assoc()) {
$parents = array_push($parents, $row);
}
mysqli_free_result($result);
}
} while (mysqli_next_result($link));
}
...
Правда для этого нам потребуется MySQLi. Хотя можно обойтись и без нескольких result sets, но это рассмотрим далее.
Итак мы рассмотрели основные алгоритмы выборки родительской ветки, продолжаем:
Подчиненные ветви
Основное отличие выборки подчиненной ветки от выборки родительских узлов заключается в направлении выборки и в том, что подчиненных узлов может быть долее одного на каждом из уровней выборки. Это накладывает довольно существенные ограничения и изменения на алгоритмы.
Так, мы не можем знать максимальную глубину выборки по дополнительному полю level, поэтому варианты запросов с JOIN таблицы отпадает. Впрочем, можно вычислять максимальную глубину дерева и относительно нее формировать запросы с JOIN но это решение будет настолько ресурсоемким для базы данных, что сводит на нет любую оптимизацию.
Хотя, для разработчиков, использующих PostreSQL 8.4, есть возможность использовать рекурсивные запросы, потребуется сделать определенный «финт ушами»:
WITH RECURSIVE children AS (
SELECT *,
pid || '|' || [order_field] AS ord,
ARRAY[id] AS exist,
FALSE AS cycle
FROM my_tree
WHERE pid = [item.id]
UNION ALL
SELECT t.*,
ord || '.' || t.pid || '|' || t.[order_field],
exist || t.id,
t.id = ANY(exist)
FROM children AS c,
my_tree AS t
WHERE t.pid = c.id AND
NOT cycle AND
array_length(exist, 1) < [max_deep]
)
SELECT * FROM children WHERE NOT cycle ORDER BY ord LIMIT [limit];
Где:
- [order_field] — поле сортировки в пределах подчинения;
- [item.id] — ID узла от которого производится выборка;
- [max_deep] — максимальная глубина рекурсии;
- [limit] — количество выбираемых строк;
Основной особенностью этого запроса является дополнительное поле path, которое практически является Materialized path, за исключением того, что мы добавляем в него дополнительное поле сортировки текущего узла.
Причем защита от зацикливания (поля exist и cycle) локализована в пределах отдельных ветвей, поэтому это хоть и предохранит от бесконечного зацикливания, но позволит повторно выбрать строки, так что будте внимательны.
В общем, пока вышеуказанные пользователи PostgreSQL 8.4 тихо радуются, остальные пишут свои процедуры.
Рекурсивная процедура выборки подчиненных ветвей:
...
sub child_recurse {
my %params = @_;
# Текущая глубина рекурсии
$params{deep} ||= 1;
# Хеш ранее выбранных узлов
$params{exist} ||= {};
# Возвращаем пустой список если превысили максимальную заданную глубину рекурсии
return () if $params{deep} &&
$params{max_deep} &&
$params{deep} > $params{max_deep};
# Объявляем внутренний список узлов - потомков
my @children = ();
# делаем запрос к базе данных на получение списка подчиненных узлов
my $query = 'SELECT *
FROM my_tree
WHERE pid = ' . $params{pid} .
($params{order} ? ' ORDER BY ' . $params{order} : '');
my $sth = $dbh->prepare($query); $sth->execute;
while (my $item = $sth->fetchrow_hashref) {
# Для начала проверим, а не выбирали ли мы уже такой узел
next () if $params{exist}->{$item->{id}};
# Не выбирали, тогда добавляем в хеш
$params{exist}->{$item->{id}} = 1;
# Добавлем выбранный узел к массиву
push @children, $item;
# Вызываем снова рекурсивную функцию для получения подчиненных узлов текущего узла
my @children_deep = &child_recurse(
pid => $item->{id},
max_deep => $params{max_deep},
order => $params{order},
deep => $params{deep} + 1,
exist => $params{exist},
);
# Добавляем список подчиненных узлов текущего узла
push @children, @children_deep;
}
$sth->finish;
# Возвращаем список выбранных узлов
return @children;
}
my @children = &child_recurse(pid => $item->{id}, max_deep => 5, order => 'id');
...
Кажется, что все хорошо, но стоит посмотреть, что происходит во время выполнения этого кода. Так, для каждого из узлов ветви делается запрос на получение его потомков, в итоге, к базе данных делается ровно столько запросов, сколько мы получим в итоге.
Данную проблему можно решить двумя способами:
- Ввести денормализацию таблицы создав дополнительное поле счетчика потомков. В этом случае можно будет проверять наличие потомков и делать или нет запрос на их получение. Это решение рассмотрено более детально позже;
- Производить выборки не построчно, а списочно для каждого уровня вложенности. Это решение подробно рассмотрим здесь;
Так как мы выбираем список потомков узла, то из него можно сформировать массив pid для следующего уровня вложенности. Но, следует иметь ввиду, что список потомков должен возвращаться в строго определенном порядке, относительно порядка выборки родительских узлов. Это делается довольно просто:
SELECT children.*
FROM my_tree AS children
JOIN my_tree AS parents
ON parents.id = children.pid
WHERE children.pid = ANY([parents_array])
ORDER BY parents.[order_field], children.[order_field];
Где:
- [parents_array] — массив id родительских узлов;
- [order_field] — поле сортировки;
Отсюда реализуем алгоритм выборки. В качестве базового алгоритма возьмем алгоритм выборки посредством цикла динамического массива:
...
# Объявляем параметры выборки потомков
my $max_deep = 5; # Максимальная глубина
my $order = 'field1'; # Поле сортировки
my $limit = 20; # Максимальное количество строк
# Объявляем переменные:
my @children = (); # Результирующий массив
my @while = ($item); # Динамический массив
my %exist = (); # Хеш полученных узлов
my %indexes = (); # Хеш индексов массива для узлов
my $skew = 0; # Смещение относительно индекса
my $pids = []; # Список ID родительских узлов
my $deep_now = 1; # Текущая глубина
while (my $child = shift @while) {
# Пропускаем узел, если его уже выбирали
next if $exist{$child->{id}};
$exist{$child->{id}} = 1;
# Добавляем ID узла в список родительских узлов
push @$pids, $child->{id};
# Если у нас нет родителя в списке координат, то это корневой узел
unless (defined $indexes{$child->{pid} || 'NULL'}) {
# Просто добавляем его в список и устанавливаем для него координату
# Если только это не начальный узел $item
if ($child ne $item) {
push @children, $child;
$indexes{$child->{id}} = $#children;
}
} else {
# Иначе добавляем текущий узел в список за родителем относительно его коодинаты
splice @children, $indexes{$child->{pid}} + $skew + 1, 0 , ($child);
# Записываем координату вставленного узла
$indexes{$child->{id}} = $indexes{$child->{pid}} + $skew + 1;
# Прибавляем смещение для следующих узлов этого же уровня
$skew++;
}
# Если динамический массив пустой, то:
unless ($while[0]) {
# Сбрасываем смещение
$skew = 0;
# Увеличиваем текущую глубину
$deep_now++;
# Прерываем выполнение если достигли предела глубины
last if $max_deep && $deep_now > $max_deep;
# Формируем и выполняем запрос на выборку потомков текущей глубины
my $query = 'SELECT children.*
FROM my_tree AS children
JOIN my_tree AS parents
ON parents.id = children.pid
WHERE children.pid = ANY(?)
ORDER BY parents.' . $order . ', children.' . $order .
($limit ? ' LIMIT ' . $limit : '');
my $sth = $dbh->prepare($query); $sth->execute($pids);
while (my $child = $sth->fetchrow_hashref) {push @while, $child}
$sth->finish;
}
}
...
Логика немного сложнее, но гораздо дешевле по ресурсам.
Теперь можно и хранимые процедуры сделать для получения подчиненных ветвей:
Для PostgreSQL:
-- В PostgreSQL нет понятия хешей, поэтому сделаем хранимую процедуру для
-- псевдохеша в котором ключ и значение целочисленные
CREATE OR REPLACE FUNCTION "public"."hash_int" (
hash_array INTEGER[],
key INTEGER
) RETURNS INTEGER AS
$body$
DECLARE
element INTEGER;
value INTEGER := 0;
found_flag BOOLEAN := FALSE;
counter INTEGER := 1;
BEGIN
-- передаваемый массив включает в себя последовательность ключ, значение
-- поэтому просто находим ключ, следующий элемент массива - значение ключа
-- что бы не возникло пересечений что значение ключа может быть равно ключу
-- введен счетчик counter
FOR element IN SELECT * FROM unnest(hash_array) LOOP
IF found_flag IS TRUE THEN
value := element;
EXIT;
END IF;
IF counter = 1 AND key = element THEN
found_flag := TRUE;
counter := 2;
ELSIF counter = 1 THEN counter := 2;
ELSIF counter = 2 THEN counter := 1;
END IF;
END LOOP;
RETURN value;
END;
$body$
LANGUAGE 'plpgsql'
RETURNS NULL ON NULL INPUT;
-- Собственно сама функция получения детей
CREATE OR REPLACE FUNCTION "public"."get_children" (
pid_in INTEGER,
order_in TEXT,
max_deep INTEGER
) RETURNS SETOF "public"."my_tree" AS
$body$
DECLARE
item my_tree;
temp_list my_tree[];
query TEXT;
ordering TEXT := order_in;
exist INTEGER[] := ARRAY[pid_in];
deep_now INTEGER;
deep_hash INTEGER[];
BEGIN
-- Проверяем, устанавливаем сортировку выборки
IF ordering IS NULL THEN ordering := 'id'; END IF;
-- Если выбираем дерево от корня, то NULL делаем как ноль, что бы не было
-- проблем со сравнением
IF exist[1] IS NULL THEN exist[1] := 0; END IF;
-- Увы переменные не имплементятся на уровне ORDER BY, поэтому прийдется
-- делать динамический запрос
query := 'SELECT *
FROM my_tree
WHERE ' ||
CASE WHEN pid_in IS NULL
THEN 'pid IS NULL '
ELSE 'pid = ' || pid_in
END
|| ' ORDER BY ' || ordering;
-- Делаем первый запрос на получение потомков первого колена
FOR item IN EXECUTE query LOOP
IF item.id <> ALL(exist) THEN
-- Заполняем динамический массив
temp_list := temp_list || item;
-- Заполняем массив выбранных ID
exist := exist || item.id;
-- Заполняем псевдохеш глубины выборки
deep_hash := deep_hash || ARRAY[item.id, 1];
END IF;
END LOOP;
-- Делаем цикл на динамический массив
WHILE temp_list[1].id IS NOT NULL LOOP
-- Отрезаем от динамического массива первый элемент и возвращаем его приложению
item := temp_list[1];
RETURN NEXT item;
temp_list := temp_list[2:array_length(temp_list, 1)];
-- Проверяем не достигли лы мы максимальной глубины выборки
IF max_deep IS NULL OR
max_deep > hash_int(deep_hash, item.id)
THEN
-- И делаем следующий динамический запрос. Только внимательно,
-- что сортировка запроса обратная, так, как мы подставляем элементы вперед массива
query := 'SELECT *
FROM my_tree
WHERE pid = ' || item.id || '
ORDER BY ' || ordering || ' DESC';
FOR item IN EXECUTE query LOOP
IF item.id <> ALL(exist) THEN
temp_list := array_prepend(item, temp_list);
exist := exist || item.id;
deep_now := hash_int(deep_hash, item.pid) + 1;
deep_hash := deep_hash || ARRAY[item.id, deep_now];
END IF;
END LOOP;
END IF;
END LOOP;
RETURN;
END;
$body$
LANGUAGE 'plpgsql';
Делать рекурсивную хранимую процедуру я бы не рекомендовал, так как для этого придется передавать достаточно много дополнительных переменных и массивов при каждом её вызове.
Для MySQL хранимой процедуры я делать не буду, так как, как не лепил все одно кучка говна получается. БД MySQL не рассчитана та такие сложности.