Data Import Handler – How to import data from SQL databases (part 2)

In the first part we were able to index the information contained in the database. In the second part we will try to extend the functionality by adding incremental imports.

There was a little over 1 million documents, and the import took less than half an hour. In principle, there could we end the issue of data import, but imagine that we would like this data to be indexed on an ongoing basis, as far as they change in the source. I won’t be, of course, true RTS (real time search) – there will be interval between the change in the data and time they will be indexed in the search system, but let’s assume that update every hour is sufficient. The first thing we must do in order to implement incremental indexing the database preparation.

Database preparation

Incremental indexing needs to obtain information from the database – what documents have changed since the last indexation. If we are lucky, such data is available – if you are unlucky, you must modify the existing database structure. Depending on the database structure we have several options. In our practice we used most often:

adding an additional column with the exact date of last modification, which were automatically updated (eg trigger or default/update it in mysql), or (worse solution) manually (by application)
create a queue of orders – to write (eg with a trigger) revised identifiers of the documents in a separate table

In both solutions we need to pay attention to the changes of all entities that are included in the document.

Returning to our example from the first part of the article (Polish wikipedia, imported into a PostgreSQL database, the mediawiki application tables), our structure looks like this:

Table “page“:

ColumnTypeModifiers
page_idintegernot null default nextval(‘page_page_id_seq’::regclass)
page_titletextnot null
page_restrictionstext
page_counterbigintnot null default 0
page_is_redirectsmallintnot null default 0
page_is_newsmallintnot null default 0
page_randomnumeric(15,14)not null default random()
page_touchedtimestamp with time zone
page_latestintegernot null
page_lenintegernot null
titlevectortsvector

Table: “revision“:

ColumnTypeModifiers
rev_idintegernot null default nextval('revision_rev_id_seq'::regclass)
rev_pageinteger
red_text_idinteger
rev_commenttext
rev_userintegernot null
rev_user_texttextnot null
rev_timestamptimestamp with time zonenot null
rev_minor_editsmallintnot null default 0
rev_deletedsmallintnot null default 0
rev_leninteger
rev_parent_idinteger

Table: “pagecontent“:

ColumnTypeModifiers
old_idintegernot null default nextval('text_old_id_seq'::regclass)
old_texttext
old_flagstext
textvectortsvector

The first table contains a column “page_touched” Second: “rev_timestamp” what appears to be exactly what we need: the date of modification. The third table does not have such a field, but contains the texts for a specific version of the page – these texts do not change over time – when a user modifies a page, there is only the new version.

Let us recall the definition of the source from the first part of the article:

<dataConfig>
  <dataSource driver="org.postgresql.Driver"
    url="jdbc:postgresql://localhost:5432/wikipedia"
    user="wikipedia"
    password="secret" />
  <document>
    <entity name="page" query="SELECT page_id, page_title from page">
      <field column="page_id" name="id" />
      <field column="page_title" name="name" />
      <entity name="revision" query="select rev_id from revision where rev_page=$'{page.page_id}">
        <entity name="pagecontent" query="select old_text from pagecontent where old_id=$'{revision.rev_id}">
          <field column="old_text" name="text" />
        </entity>
      </entity>
    </entity>
  </document>
</dataConfig>

Well – there are easier things:)

<dataConfig>
 <dataSource driver="org.postgresql.Driver" url="jdbc:postgresql://localhost:5432/wikipedia" user="wikipedia" password="secret" />
 <document>
  <entity name="page" query="SELECT page_id, page_title from page" deltaQuery="select page_id from page where page_touched > '$'{dataimporter.last_index_time}'" deltaImportQuery="SELECT page_id, page_title from page where page_id=$'{dataimporter.delta.page_id}">
   <field column="page_id" name="id" />
   <field column="page_title" name="name" />
   <entity name="revision" query="select rev_id from revision where rev_page=$'{page.page_id}" deltaQuery="select rev_id from revision where rev_timestamp > '$'{dataimporter.last_index_time}'" parentDeltaQuery="select page_id from page where page_id=$'{revision.rev_page}">
    <entity name="pagecontent" query="select old_text from pagecontent where old_id=$'{revision.rev_id}" deltaQuery="select old_id from pagecontent where old_id < 0">
     <field column="old_text" name="text" />
    </entity>
   </entity>
  </entity>
 </document>
</dataConfig>

Comparing those two files the only difference that we see is an additional definitions of two attributes:

  • deltaQuery – query responsible for returning the IDs of those records that have changed since the last crawl (full or incremental) – the last crawl time is provided by DIH in the variable: dataimporter.last_index_time. This query is used by Solr to find those records that have changed.
  • deltaImportQuery – query requesting data for a given record identified by ID that is avaiable as a DIH variable: dataimport.delta.id.
  • parentDeltaQuery – query requesting data for the parent entity record. With these queries Solr is able to retrieve all the data that make up the document, regardless of the entity from which they originate. This is necessary because the indexing engine is not possible to modify the indexed data – so we need to index the entire document, regardless of the fact that some data has not changed.

In our example we do not remove the documents. Therefore, we eliminated the problem of taking deleted documents into account and of course the process of documents deleting from the index. If this functionality will be needed, we can use the process described above (with the queue of orders) and add another attribute. In the DIH configuration we would use the attribute:

  • deletedPkQuery – provides identifiers of deleted items.

In the next part of the article we will also sort out issues of cooperation with the database, we will try revise our database integration and do it in a slightly different way.

Leave a Reply

Your email address will not be published. Required fields are marked *

We use cookies to personalise content and ads, to provide social media features and to analyse our traffic. We also share information about your use of our site with our social media, advertising and analytics partners. View more
Cookies settings
Accept
Privacy & Cookie policy
Privacy & Cookies policy
Cookie name Active
Save settings
Cookies settings