Data Import Handler – import danych z baz SQL (cz. 2)

W pierwszej części udało nam się zindeksować informacje zawarte w bazie danych. W części drugiej spróbujemy rozszerzyć funkcjonalność importu o import przyrostowy.

Dokumentów było trochę powyżej 1mln i zajęło to niecałe pół godziny. W zasadzie tu moglibyśmy zakończyć kwestię importu, ale wyobraźmy sobie, że chcielibyśmy te dane aktualizować w indeksie na bieżąco, w miarę, jak zmieniają się one w źródle. Nie będzie to oczywiście prawdziwy RTS (real time search) – pomiędzy zmianą danych a ich udostępnieniem w systemie wyszukiwawczym minie trochę czasu, ale załóżmy, że aktualizacja co np. godzinę jest wystarczająca. Pierwszą rzeczą, którą musimy wykonać w celu implementacji indeksowania przyrostowego jest przygotowanie bazy danych.

Przygotowanie bazy danych

Indeksowanie przyrostowe wymaga możliwości uzyskania z bazy danych informacji: Jakie dokumenty zmieniły się od ostatniej indeksacji. Jeśli mamy szczęście, to dane takie są dostępne – jeśli pecha: musimy zmodyfikować istniejącą strukturę bazy danych. W zależności od bazy danych, możliwości zmiany zastanego schematu bazy i aplikacji korzystających z bazy, mamy kilka możliwości. My w swojej praktyce wykorzystywaliśmy najczęściej:

  • dodanie dodatkowej kolumny z dokładną datą ostatniej modyfikacji, uaktualnianej automatycznie (np. trigger lub default/on update w mysql) lub (gorsze rozwiązanie) „ręcznie” (przez aplikacje)
  • stworzenie kolejki zleceń – wpisywanie (np. triggerem ) identyfikatorów zmienionych dokumentów w oddzielnej tabeli

Oba rozwiązania wymagają zwrócenia uwagę na zmiany danych wszystkich encji wchodzących w skład dokumentu.

Wracając do naszego przykładu z części pierwszej (polska wikipedia, zaimportowana do bazy postgreSQL, do tabel aplikacji mediawiki), nasza struktura wygląda tak:

Tabela „page”:

ColumnTypeModifiers
page_idintegernot null default nextval(‘page_page_id_seq’::regclass)
page_titletextnot null
page_restrictionstext
page_counterbigintnot null default 0
page_is_redirectsmallintnot null default 0
page_is_newsmallintnot null default 0
page_randomnumeric(15,14)not null default random()
page_touchedtimestamp with time zone
page_latestintegernot null
page_lenintegernot null
titlevectortsvector

Tabela: „revision”:

ColumnTypeModifiers
rev_idintegernot null default nextval('revision_rev_id_seq'::regclass)
rev_pageinteger
red_text_idinteger
rev_commenttext
rev_userintegernot null
rev_user_texttextnot null
rev_timestamptimestamp with time zonenot null
rev_minor_editsmallintnot null default 0
rev_deletedsmallintnot null default 0
rev_leninteger
rev_parent_idinteger

Tabela: „pagecontent”

ColumnTypeModifiers
old_idintegernot null default nextval('text_old_id_seq'::regclass)
old_texttext
old_flagstext
textvectortsvector

Pierwsza tabela zawiera kolumnę „page_touched”, druga: „rev_timestamp” co wygląda na dokładnie to co potrzebujemy: datę modyfikacji. Trzecia tabela nie ma takiego pola, ale zawiera teksty dla konkretnej wersji strony – teksty te nie zmieniają się w czasie – gdy użytkownik modyfikuje stronę, dochodzi tylko nowa wersja.

Przypomnijmy sobie definicję źródła z pierwszej części:

<dataConfig>
  <dataSource driver="org.postgresql.Driver"
    url="jdbc:postgresql://localhost:5432/wikipedia"
    user="wikipedia"
    password="secret" />
  <document>
    <entity name="page" query="SELECT page_id, page_title from page">
      <field column="page_id" name="id" />
      <field column="page_title" name="name" />
      <entity name="revision" query="select rev_id from revision where rev_page=$'{page.page_id}">
        <entity name="pagecontent" query="select old_text from pagecontent where old_id=$'{revision.rev_id}">
          <field column="old_text" name="text" />
        </entity>
      </entity>
    </entity>
  </document>
</dataConfig>

To co musimy zrobić to dodać definicje zapytań wykorzystywanych w indeksowaniu przyrostowych. Nic prostszego:

<dataConfig>
 <dataSource driver="org.postgresql.Driver" url="jdbc:postgresql://localhost:5432/wikipedia" user="wikipedia" password="secret" />
 <document>
  <entity name="page" query="SELECT page_id, page_title from page" deltaQuery="select page_id from page where page_touched > '$'{dataimporter.last_index_time}'" deltaImportQuery="SELECT page_id, page_title from page where page_id=$'{dataimporter.delta.page_id}">
   <field column="page_id" name="id" />
   <field column="page_title" name="name" />
   <entity name="revision" query="select rev_id from revision where rev_page=$'{page.page_id}" deltaQuery="select rev_id from revision where rev_timestamp > '$'{dataimporter.last_index_time}'" parentDeltaQuery="select page_id from page where page_id=$'{revision.rev_page}">
    <entity name="pagecontent" query="select old_text from pagecontent where old_id=$'{revision.rev_id}" deltaQuery="select old_id from pagecontent where old_id < 0">
     <field column="old_text" name="text" />
    </entity>
   </entity>
  </entity>
 </document>
</dataConfig>

No dobrze – są prostsze rzeczy 🙂

Porównując oba pliki dostrzegamy tylko dodatkowe definicje dwóch atrybutów:

  • deltaQuery – zapytanie odpowiedzialne za zwrócenie identyfikatorów tych rekordów, które zmieniły się od ostatniego indeksowania (pełnego lub przyrostowego) – czas ostatniego indeksowania DIH dostarcza w zmiennej: dataimporter.last_index_time. To zapytanie jest używane przez SOLR do znajdowania tych rekordów, które się zmieniły.
  • deltaImportQuery – zapytanie zwracające dane dla rekordu o identyfikatorze podanym jako zmienna DIH: dataimport.delta.id.
  • parentDeltaQuery – zapytanie zwracające dane dla rekordu encji-rodzica. Dzięki tym zapytaniom SOLR jest w stanie pobrać wszystkie dane składające się na dokument, niezależnie od tego, z której encji pochodzą. Konieczne jest to dlatego, że silnik indeksowania nie ma możliwości modyfikacji zindeksowanych danych – musimy więc zindeksować cały dokument, niezależnie od tego, że część danych się nie zmieniła.

W naszym przykładzie nie znajdziemy usuwania  dokumentów. Dlatego też odpadł nam problem uwzględniania usuniętych rekordów i usuwania odpowiadających im dokumentów z indeksu. W przypadku, gdy taka funkcjonalność okazała się konieczna możemy posłużyć się wyżej opisanym sposobem z kolejką zleceń, wpisując do niej dokumenty do usunięcia. W konfiguracji DIH skorzystalibyśmy wtedy z atrybutu:

  • deletedPkQuery – dostarcza identyfikatorów usuniętych elementów.

W kolejnym odcinku postaramy się też uporządkować kwestie współpracy z bazą danych, spróbujemy też zrobić jeszcze raz naszą integrację z bazą w trochę inny sposób.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *