Saturday, December 15, 2012

HBase Profiling

By Lars Hofhansl

Modern CPU cores can execute hundreds of instructions in the time it takes to reload the L1 cache. "RAM is the new disk" as a coworker at Salesforce likes to say. The L1-cache is the new RAM I might add.

As we add more and more CPU cores, we can easily be memory IO bound unless we are a careful.

Many common problems I have seen over the years were related to:
  1. concurrency problems
    Aside from safety and liveliness considerations, a typical problem is too much synchronization limiting potential parallel execution.
  2. unneeded or unintended memory barriers
    Memory barriers are required in Java by the following language constructs:
    • synchronized - sets read and write barriers as needed (details depend on JVM, version, and settings)
    • volatile - sets a read barrier before a read to a volatile, and write barrier after a write
    • final - set a write barrier after the assignment
    • AtomicInteger, AtomicLong, etc - uses volatiles and hardware CAS instructions
  3. unnecessary, unintended, or repeated memory copy or access
    Memory copying is often seen in Java for example because of the lack of in-array pointers, or really just general unawareness and the expectation that "garbage collector will clean up the mess." Well, it does, but not without a price.
(Entire collections of books are dedicated to each of these topics, so I won't embarrass myself by going into more detail.)

Like any software project of reasonable size, HBase has problems of all the above categories.

Profiling in Java has become extremely convenient. Just start jVisualVM which ships with the SunOracleJDK, pick the process to profile (in my case a local HBase regionserver) and start profiling.

Over the past few weeks I did some on and off profiling in HBase, which lead to the following issues:

HBASE-6603 - RegionMetricsStorage.incrNumericMetric is called too often

Ironically here it was the collection of a performance metric that caused a measurable slowdown of up 15%(!) for very wide rows (> 10k columns).
The metric was maintained as an AtomicLong, which introduced a memory barrier in one of the hottest code paths in HBase.
The good folks at Facebook have found the same issue at roughly the same time. (It turns that they were also... uhm... the folks who introduced the problem.)

HBASE-6621 - Reduce calls to Bytes.toInt

A KeyValue (the data structure that represents "columns" in HBase) is currently backed by a single byte[]. The sizes of the various parts are encoded in this byte[] and have to read and decoded; each time an extra memory access. In many cases that can be avoided, leading to slight performance improvement.

HBASE-6711 - Avoid local results copy in StoreScanner

All references pertaining to a single row (i.e. KeyValue with the same row key) were copied at the StoreScanner layer. Removing this lead to another slight performance increase with wide rows.

HBASE-7180 - RegionScannerImpl.next() is inefficient

This introduces a mechanism for coprocessors to access RegionScanners at a lower level, thus allowing skipping of a lot of unnecessary setup for each next() call. In tight loops a coprocessor can make use of this new API to save another 10-15%.

HBASE-7279 - Avoid copying the rowkey in RegionScanner, StoreScanner, and ScanQueryMatcher

The row key of KeyValue was copied in the various scan related classes. To reduce that effect the row key was previously cached in the KeyValue class - leading to extra memory required for each KeyValue.
This change avoids all copying and hence also obviates the need for caching the row key.
A KeyValue now is hardly more than an array pointer (a byte[], an offset, and a length), and no data is copied any longer all the way from the block loaded from disk or cache to the RPC layer (unless the KeyValues are optionally encoded on disk, in which case they still need to be decoded in memory - we're working on improving that too).

Previously the size of a KeyValue on the scan path was at least 116 bytes + the length of the rowkey (which can be arbitrarily long). Now it is ~60 bytes, flat and including its own reference.
(remember during a course of a large scan we might be creating millions or even billions of KeyValue objects)

This is nice improvement both in term of scan performance (15-20% for small row keys of few bytes, much more for large ones) and in terms of produced garbage.
Since all copying is avoided, scanning now scales almost linearly with the number of cores.

HBASE-6852 - SchemaMetrics.updateOnCacheHit costs too much while full scanning a table with all of its fields

Other folks have been busy too. Here Cheng Hao found another problem with a scan related metric that caused a noticeable slowdown (even though I did not believe him first).
This removed another set of unnecessary memory barriers.

HBASE-7336 - HFileBlock.readAtOffset does not work well with multiple threads

This is slightly different issue caused by bad synchronization of the FSReader associated with a Storefile. There is only a single reader per storefile. So if the file's blocks are not cached - possibly because the scan indicated that it wants no caching, because it expects to touch too many blocks - the scanner threads are now competing for read access to the store file. That lead to outright terrible performance, such a scanners timing out even with just two scanners accessing the same file in tight loop.
This patch is a stop gap measure: Attempt to acquire the lock on the reader, if that failed switch to HDFS positional reads, which can read at an offset without affecting the state of the stream, and hence requires no locking.

Summary

Together these various changes can lead to ~40-50% scan performance improvement when using a single core. Even more when using multiple cores on the same machines (as is the case with HBase)

An entirely unscientific benchmark

20m rows, with two column families just a few dozen bytes each.
I performed two tests:
1. A scan that returns rows to the client
2. A scan that touches all rows via a filter but does not return anything to the client.
(This is useful to gauge the actual server side performance).

Further I tested with (1) no caching, all reads from disk (2) all data in the OS cache and (3) all data in HBase's block cache.

I compared 0.94.0 against the current 0.94 branch (what I will soon release as 0.94.4).

Results:
  • Scanning with scanner caching set to 10000:
    • 0.94.0
      no data in cache: 54s
      data in OS cache: 51s
      data in block cache: 35s

    • 0.94.4-snapshot
      no data in cache: 50s (IO bound between disk and network)
      data in OS cache: 43s
      data in block cache: 32s
      (limiting factor was shipping the results to the client)
  • all data filtered at the server (with a SingleValueColumnFilter that does not match anything, so each rows is still scanned)
    • 0.94.0
      no data in cache: 31s

      data in OS cache: 25s
      data in block cache: 11s
    • 0.94.4-snapshot
      no data in cache: 22s
      data in OS cache: 17s
      cache in block cache: 6.3s
I have not quantified the same with multiple concurrent scanners, yet.
So as you can see scan performance has significantly improved since 0.94.0.

Salesforce just hired some performance engineers from a well known chip manufacturer, and I plan to get some of their time to analyze HBase in even more details, to track down memory stalls, etc. 

Saturday, November 10, 2012

HBase MVCC and built-in Atomic Operations

By Lars Hofhansl

(This is a follow to my ACID in HBase post from March this year)
HBase has a few special atomic operations:
  • checkAndPut, checkAndDelete - these simply check a value of a column as a precondition and then apply the Put or Delete if the check succeeded.
  • Increment, Append - these allow an atomic add to a column value interpreted as an integer, or append to the end of a column, resp.
checkAndPut and checkAndDelete are idempotent in the sense that they can safely be applied multiple time (but note that their outcome might differ when other changes are applied between the retries).

Increment and Append are not idempotent. They are the only non-repeatable operations in HBase. Increment and Append are also the only operations for which the snapshot isolation provided by HBase's MVCC model is not sufficient... More on that later.

In turns out that checkAndPut and checkAndDelete are not as atomic as expected (the issue was raised by Gregory and despite me not believing it first he is right - see HBASE-7051).

A look at the code makes this quite obvious:
Some of the Put optimizations (HBASE-4528) allow releasing the rowlock before the changes are sync'ed to the WAL. This also requires the lock to be released before the MVCC changes are committed so that changes are not visible to other transaction before they are guaranteed to be durable.
Another operation (such as checkAndXXX) that acquires the rowlock to make atomic changes may in fact not see current picture of things despite holding the rowlock as there could be still outstanding MVCC changes that only become visible after the row lock was release and re-acquired. So it might operate on stale data. Holding the rowlock is no longer good enough after HBASE-4528.

Increment and Append have the same issue.

The fix for this part is relatively simple: We need a "MVCC barrier" of sorts. Instead of completing a single MVCC transaction at the end of the update phase (which will wait for all prior transactions to finish), we just wait a little earlier instead for prior transactions to finish before we start the check or get phase of the atomic operation. This only reduces concurrency slightly, since before the end of the operation we have to await all prior transactions anyway. HBASE-7051 does exactly that for the checkAndXXX operations.

In addition - as mentioned above - Increment and Append have another issue, they need to be serializable transactions. Snapshot isolation is not good enough.
For example: If you start with 0 and issue an increment of 1 and another increment of 2 the outcome must always be 3. If both could start with the same start value (a snapshot) the outcome could 1 or 2 depending on which one finishes first.

Increment and Append currently skirt the issue with an ugly "hack": When they write their changes into the memstore they set the memstoreTS of all new KeyValues to 0! The effect is that they are made visible to other transactions immediately, violating HBase's MVCC. Again see ACID in HBase for an explanation of the memstoreTS.
This guarantees the correct outcome of concurrent Increment and Append operations, but the visibility to concurrent scanners is not what you expect. An Incremented and Appended value even for partial rows can be become visible to any scanner at any time even though the scanner should see an earlier snapshot of the data.
Increment and Append are also designed for very high throughput so they actually manipulate HBase's memstore to remove older versions of the columns just modified. Thus you lose the version history of the changes in exchange for avoiding a memstore exploding with version of the many Increments or Appends. This is called "upsert" in HBase. Upsert is nice in that it prevents the memstore being filled will a lot of old value if nobody cares for them. The downside is that is a special operation on the memstore, and hard to get right w.r.t. MVCC. It also does not work with mslab (see this Cloudera blog post for explanation of mslab).

If you don't care about visibility this is a simple problem, since you can just look through the memstore and remove old values. If you care about MVCC, though, you have to prove first that is safe to remove a KV.

I tried to fix this almost exactly a year ago (HBASE-4583), but after some discussions with my fellow committers we collectively gave up on that.

A few days ago I reopened HBASE-4583 and started with a radical patch that gets rid of all upsert-type logic (which set the memstoreTS to 0) and just awaits prior transactions before commencing the Increment/Append. Then I rely on my changes from HBASE-4241 to only flush the versions of columns needed when it is time to flush the memstore to disk. Turns out this is still quite a bit slower (10-15%), since it needs to flush the memstore frequently even thought it leads to mostly empty files. Still that was nice try, as it gets rid of a lot of special code and turns Increment and Append into normal HBase citizens.

A 2nd less radical version makes upsert MVCC aware.

That is actually not as easy as it looks. In order to remove a version of a column (a KeyValue) from the memstore you have to prove that is not and will not be seen by any concurrent or future scanner. That means we have to find the earliest readpoint of any scanner and ensure that there is at least one version of the KV older than that smallest readpoint; then we can safely remove any older versions of that KV from the memstore - because any scanner is guaranteed to see a newer version of the KV.
The "less radical" patch in  does exactly that.

In the end the patch I ended up committed with HBASE-4583 does both:
If the column family that has the column to be incremented or appended to has VERSIONS set to 1, we perform an MVCC aware upsert added by the patch. If VERSIONS is > 1, we use the usual logic to add a KeyValue to the memstore. So now this behaves as expected in all cases. If multiple versions are requested they are retained and time range queries will work even with Increment and Append; and it also keeps the performance characteristics (mostly) when VERSIONS is set to 1.

Saturday, October 20, 2012

Coprocessor access to HBase internals

By Lars Hofhansl

Most folks familiar with HBase have heard of coprocessors.
Coprocessors come in two flavors: Observers and Endpoints.

An Observer is similar to a database trigger, an Endpoint can be likened to a stored procedure.
This analogy only goes that far, though.

While triggers and stored procedures are (typically) sandboxed and expressed in a highlevel language (typically SQL with procedural extensions), coprocessors are written in Java and are designed to extend HBase directly (in the sense of avoiding subclassing the HRegionServer class in order to extend it). Code in a coprocessor will happily shutdown a region server by calling System.exit(...)!

On the other hand coprocessors are strangely limited. Before HBASE-6522 they had no access to a RegionServer's locks and leases and hence it was impossible to implement check-and-set type as a coprocessor (because the row modified would need to be locked), or to time out expensive server side data structures (via leases).
HBASE-6522 makes some trivial changes to remedy that.

It was also hard to maintain any kind of share state in coprocessors.
Keep in mind that region coprocessors are loaded per region and there might be 100's of regions for a given region server.

Static members won't work reliably, because coprocessor classes are loaded by special classloaders.

HBASE-6505 fixes that too. Now the RegionCoprocessorEnvironment provides a getSharedData() method, which returns a ConcurrentMap, which is held by the coprocessor environment as a weak reference (in a special map with strongly referenced keys and weakly referenced values), and held strongly by the environment that manages each coprocessor.
That way if the coprocessor is blacklisted (due to throwing an unexpected exception) the coprocessors environment is removed, and any shared data is immediately available for garbage collection, thus avoiding ugly and error prone reference counting (maybe this warrants a separate post).

This shared data is per coprocessor class and per regionserver. As long as there is at least one region observer or endpoint active this shared data is not garbage collected and can be accessed to share state between the remaining coprocessors of the same class.

These changes allow coprocessor to be used for a variety of use cases.
State can be shared across them, allowing coordination between many regions, for example for coordinated queries.
Row locks can be created and released - allowing for check-and-set type operations.
And leases can be used to safely expire expensive data structures or to time out locks among other uses.

Update:
I should also mention that RegionObservers already have access to a region's MVCC.

Sunday, October 14, 2012

Secondary Indexes (Part II)

By Lars Hofhansl

This post last week on secondary indexes caused a lot of discussion - which I find interesting, because this is (IMHO) one of the less interesting posts here.

It appears that I did not provide enough context to frame the discussion.

So... What do people means when they say "secondary index". Turns out many things:
  • Reverse Lookup
  • Range Scans
  • Sorting
  • Index Covered Queries
  • etc
The idea outlined in last weeks post is good for global Reverse Lookup, partially useful for Range Scans (as long as the range is reasonably small), not very useful for Sorting (all KVs would have to be brought back to the client), and entirely useless for Index Covered Queries (the index might not contain the whole truth).

I also was not clear about the consistency guarantees provided.
The "ragged edge" of current time is always a thorny issue in distributed systems. Without a centralized clock (or other generator of monotonically increasing ids) the notion of "at the same time" is at best a fuzzy one (see also Google's Spanner paper and it's notion of current time being an interval bounded by clock precision, rather than a single point).

The idea of last week's indexing is consistent when the index is scanned as of a time in the past (as long at the time is far enough in the past to cover clock skew between servers - since all RegionServers must be installed with NTP, a few dozen milliseconds should be OK). Note you need my HBASE-4536 (0.94+) for this to be correct with deletes.

Queries for current results lead to interesting anomalies pointed out by Daniel (rephrased here):
  1. a client could read along the index
  2. it just passed index entry X
  3. another client changes the indexed column's value to Y and hence adds index entry Y
  4. the first client now checks back with the main row (whose latest version of the indexed column was changed, so X != Y and the timestamps won't match)
  5. the first client would conclude that the indexentry resulted from a partial update, and hence ignore it (to be consistent it should return either the new or the old value)
That is mostly in line with HBase's other operation, with the difference that it won't return a value that did exist when the logical scan operation started.

If that is not good enough it can be addressed in two ways:
  1. Instead of retrieving the latest versions of the KVs of the main row, retrieve all versions. If there is a version of the indexed column that matches the timestamp of the index entry, then use the versions of all other columns less then next version of the indexed columns (those are all the column values that existed before the index was updated to the new value). If there is no matching version of the indexed column then ignore, if the matching version is the latest version, then use the latest versions of all columns.
  2. Have one extra roundtrip, retrieve all version of the indexed column(s).
    If no version of the indexed column matched the timestamp... ignore, if is it the latest version retrieve the rest of the main at the same timestamp used to the retrieve the indexed column (to avoid another client who could have added later rows). Or if the matching value is not latest one retrieve the rest of the row as of the time of the next value (as in #1)
Whether #1 or #2 is more efficient depends on how many versions of all columns have to be retrieved vs. the extra roundtrip for just the indexed column(s).

That way this should be correct, and makes use of the atomicity guarantees provided by HBase.

The last part that was not mentioned last week is how we should encode the values. All column values to be indexed have be encoded in a way that lends itself to lexicographic sorting of bytes. This is easy for string (ignoring locale specific rules), but harder to integer and even floating point numbers.
The excellent Lily library has code to perform this kind of encoding.
(Incidentally it is also a complete index solution using write ahead logging - into HBase itself - to provide idempotent index transactions).

--
 
Lastly, it is also the simplest solution one can possibly get away with that requires no changes to HBase and no write ahead logging.

This all turned out to to be bit more complex than anticipated, so it should definitely be encapsulated in a client side library.

Sunday, October 7, 2012

Musings on Secondary Indexes

By Lars Hofhansl

There has been a lot of discussion about 2ndary indexes in HBase recently.

Since HBase is a distributed system there are many naive ways to do this, but it is not trivial to get it correct. HBase is a consistent key value store, so you would expect the same guarantees from your indexes.

The choices mostly boil down to two solutions:
  1. "Local" (Region level) indexes. These are good for low latency (everything is handled at locally at a server). The trade off is that it is impossible to ask globally where the row matching an index is located. Instead all regions have to be consulted.
  2. Global indexes. These can be used for global index queries, but involve multi server updates for index maintenance.
Here I will focus on the latter: Global Indexes.

In turns out a correct solution can be implemented with relative simplicity without any changes to HBase.
First off there are three important observations when talking about index updates:
  1. An index update is like a transaction
  2. An index update transaction is idempotent
  3. With a bit of discipline partial index transactions can be detected at runtime. I.e. the atomicity can be guaranteed at read-time.
Naively we could just write data to the "main" table and to a special index table:
The index table would be keyed by <columnValue>|<rowkey>. The <rowkey> is necessary to keep the index entries unique.

This simple approach breaks down in the face of failures. Say a client writes to these two tables, but fails before one of the changes could be applied. Now we may have the main table update but missed the index update or vice versa.
Notice that it is OK to have a stale index entry (a false index match) but not to have a stale main table entry.

So here is a solution for consistent secondary indexes in HBase:
  • Update: If using TTL, setup the main table to expire slightly before the index table(s).
  • For a Put, first retrieve a local timestamp, apply all changes to the index table(s) first using that timestamp. Then apply the changes to the main table. Using the same timestamp.
    The timestamp can safely be created locally (with the same caveat that generally applies to HBase: Updates to the same KV in the same ms leads to more or less undefined results).
  • Upon delete, read back the row, then delete the main table entry, then remove all index entries (using the column values and timestamps read back from the row and using the same delete type - family, column, or version - that we used for the main table). Since we used the same timestamps this will be correctly.
    Update:
    As Daniel Gómez Ferro points out, this in fact can only work for version deletes. I.e. only a specific version of a row can be deleted. In order to delete all versions of a column (a column delete marker) of a row or all versions of all columns (a family delete marker), all of these versions would need to be enumerated in the client and be deleted one by one.
Note that if there is a failure we might end up with an extra entry in the index, but we'll never have an entry in the main table and not in the index table.
  • Now upon reading along an index, we find the rows that the index entries point to. When we retrieve the rows we have to verify that a row we read has the expected timestamp for the index column. If not we simply ignore that row/index entry. It must be from a partial index update - either from a concurrent update that is still in progress, or from a partially failed update.
    This is how we assure atomicity at read-time; or in other words we can tolerate a slightly wrong index without affecting correctness.
Lastly we need a M/R job that can build an index from an existing table. Because of the idempotent nature of index transaction - stemming from the fact that everything in HBase is timestamped - we can enable the index on a table (i.e. clients will immediately start to write to both tables) and then run the M/R job. As soon as the M/R is finished the index is up to date and can be used (the M/R job would update some index metadata in the end to mark the index as available).

That's it. No server side code needed. No locking. And this will scale nicely. Just a little bit of discipline. Discipline that could easily be encapsulated in a client library.

Since partial failures will be rare, and we can filter those at read time, we need not clean the index up.

Because of the timestamps we can also still perform correct time range queries.

Sunday, September 16, 2012

KeyValue explicit timestamps vs memstoreTS

In this post I tried to summarize HBase's ACID implementation. This topic was also the main subject of my talk at HBaseCon.

I talked to a few folks afterwards, and it seems there is some general confusion about the memstoreTS (and MVCC) and how it relates to the explicit timestamp stored with each KeyValue.


The key difference is that the timestamp is part of the data model. Each KeyValue has a rowkey, a column identifier, a value... and a timestamp.
An application can retrieve and set the timestamp, decide as of which timestamp(s) it wants to see a certain value, etc. It should be considered as just another part of a column (a KeyValue pair to be specific) that happens to have special meaning.

The memstoreTS on the other hand is a construct purely internal to HBase. Even though it is called a timestamp (TS), it has nothing to do with time, but is just a monotonically increasing "transaction number". It is used to control visibility (MVCC) of a "transaction" in HBase (i.e. a Put or Delete operation), so that changes are not partially visible.

Remember that the explicit timestamp is part of the application data that gets changed by a transaction.

Say you have two concurrent threads that update the same columns on the same row (a row is the unit at which HBase provides transactions):
Thread1, Put: c1 = a, c2 =b
Thread2, Put: c1 = x, c2 =y

That's cool. One of the threads will be first, HBase makes sure (via the memstoreTS) that Puts are not partially visible, and also makes sure that all explicit timestamp are set to the same value for the same Put operation.
So any client (looking at the latest timestamp) will either see (a,b) or (x,y).

Things get more confusing when a client sets the explicit timestamp of (say) a Put. Again consider this example where two concurrent threads both issue a Put to the same columns of the same row, but set different timestamps:

Thread1, Put: c1(t1) = a, c2(t2) =b
Thread2, Put: c1(t2) = x, c2(t1) =y
(t2>t1)

So what will a client see?

The first observation is still that each Put is seen in its entirety, or not at all.

If a Get just retrieves the latest versions and does so after both Thread1 and Thread2 finished it will see:
c1(t2) = x, c2(t2) = b

This is where many people get confused.

It looks like we're seeing mixed data from multiple Puts. Well, in fact we do, but we asked for only the latest versions, which include values from different transactions; so that is OK and correct.

To understand this better let's perform a Get of all versions. (i.e. call setMaxVersions() on your Get). Now we'll potentially see this:
  • nothing (if the Get happens to be processed before either the Puts).
  • c1(t1) = a, c2(t2) = b (Thread1 was processed first, and Thread2 is not yet finished)
  • c1(t2) = x, c2(t1) = y (Thread2 was processed first, and Thread1 is not yet finished)
  • c1(t1) = a, c1(t2) = x, c2(t1) = y, c2(t2) = b (both Thread1 and Thread2 finished)
We see that each Put indeed is either completely visible or not at all, and that HBase correctly avoid visibility of a partial Put operation.
In addition we can confirm that the timestamp is part of the data model that HBase exposes to clients, and the transaction visibility has nothing to do with the application controlled/visible timestamps.

Lastly in this scenario we also see that it is impossible for a client to piece together the state of the row at precise transaction boundaries.
Assuming both Thread1 and Thread2 have finished, a client can request the state as of t1 (in which case it gets c1(t1) = a, c2(t1) = y) or as of t2 (in which case it'll see c1(t2) = x, c2(t2) = b).

This behavior is the same as you'd expect from a relational database - the current state is the result of many past transactions; with the extra mental leap that the timestamp is actually part of the data model in HBase.

Wednesday, September 12, 2012

HBase client timeouts

The HBase client is a somewhat jumbled mess of layers with unintended nested retries, nested connection pools, etc. among others. Mixed in are connections to the Zookeeper ensemble.

It is important to realize that the client directly handles all communication with the RegionServers, there is no proxy at the server side. Consequently the client needs to do the service discovery and caching as well as the connection and thread management necessary. And hence some of the complexity is understandable: The client is part of the cluster.

See also this blog post. Before HBASE-5682 a client would potentially never recover when it could not reach the cluster. And before HBASE-4805 and HBASE-6326, a client could not - with good conscience - be used in a long running ApplicationServer.

An important aspect of any client library is what I like to call "time to exception". If things go wrong the client should (at least as an option) fail fast and let the calling application - which has the necessary semantic context - decide how to handle this situation.

Unfortunately the HBase and Zookeeper clients were not designed with this in mind.

Among the various time outs are:
  • ZK session timeout (zookeeper.session.timeout)
  • RPC timeout (hbase.rpc.timeout)
  • RecoverableZookeeper retry count and retry wait (zookeeper.recovery.retry, zookeeper.recovery.retry.intervalmill)
  • Client retry count and wait (hbase.client.retries.number, hbase.client.pause)
In some error paths these retry loops are nested, so that in the default setting if both ZK and HBase are down a client will throw an Exception after a whooping 20 Minutes! The application has no chance to react to outages in any meaningful way.

HBASE-6326 fixes one issue, where .META. and -ROOT- lookups would be nested, each time causing a ZK timeout N^2 times (N being the client retry count, 10 by default), which itself would be retried by RecoverableZookeeper (3 by default).

The defaults for some of these settings are optimized for the various server side components. If the network "blips" for five seconds the RegionServers should not abort themselves. So a session timeout of 180s makes sense there.

For clients running inside a stateless ApplicationServer the design goals are different. Short timeouts of five seconds seem reasonable. A failure is quickly detected and the application can react (potentially by controlled retrying).

With the fixes in the various jiras mentioned above, it is now possible (in HBase 0.94+) to set the various retry counts and timeouts to low values and get reasonably short timespans after which the client would report a connection error to calling application thread.
And this is in fact what should done when the HBaseClient (HTable, etc) is used inside an ApplicationServer for HBase requests that are synchronous in the calling thread (for example a web server serving data from HBase).

Wednesday, August 1, 2012

Pluggable compaction and scanning policies

As I described here, HBase maintains multiple versions of all key-values (KVs) stored and has two essential knobs to control the collection old versions: TimeToLive (TTL) and MaximumNumberOfVersion (versions).

These two attributes are "statically" defined for each column family. To change these the table needs to be disabled, the column families changed, and then the table needs to be enabled again. A very heavy weight operation.

An area where this is problematic is M/R based, timestamp consistent, incremental backups (a scheme that I outlined here). Another scenario might be MVCC based transaction engine on top of HBase that use the HBase timestamps to maintain the version.

The problem in both cases is that it is hard to know a priori how long HBase needs to keep older versions around.

In HBASE-6427 I suggest some extensions to the coprocessor framework that would allow a RegionObserver to finely control what KVs are targeted for compaction, by allowing it to create the scanner that is used to scan the incoming KVs.

RegionObserver has three new hooks:
  • preFlushScannerOpen - called before a scanner iterating over the MemStore being flushed is created
  • preCompactScannerOpen - called before a scanner iterating over all StoreFiles to be compacted is created
  • preStoreScannerOpen - called before a user-initiated scan is started
These hooks can return a custom scanner to define the set of KVs that should copied over to the compacted/flushed files.
(The various internal scanner interfaces in HBase are in need of some consolidation work - pre{Flush|Compact}ScannerOpen return an InternalScanner, whereas preStoreScannerOpen return a KeyValueScanner - but that is a different story).
 
HBASE-6427 also makes some HBase internal data structures accessible to coprocessors, so that a simple RegionObserver implementation could return a slightly modified StoreScanner (public API site is not, yet, updated so a link to the Javadoc is not availabel) from any of these hooks.

Looking at test classes introduced by the changed might be interesting, as they have a RegionObserver implement the default logic and then verify that the behavior of various operations is indeed unchanged.

For example now it would be possible for a RegionObserver implementation to listen to a ZooKeeper node, which could indicate what data can safely eliminated (i.e. not be visible to any of the scanner returned from these hooks). An incremental update process can use this to indicate the last time a successful backup was executed.

I intend to provide an implementation of a sample ZK based coprocessor to listen for scan policy changes (HBASE-6496).

Note: Coprocessors are developer tools and not for the casual user. The intend of these changes was to make this form of control possible, not necessarily easy. The very nature of the flush/compaction process makes interfering with them a difficult proposition.


Wednesday, May 30, 2012

HBase, HDFS and durable sync

HBase and HDFS go hand in hand to provide HBase's durability and consistency guarantees.

One way of looking at this setup is that HDFS handles the distribution and storage of your data whereas HBase handles the distribution of CPU cycles and provides a consistent view of that data.

As described in many other places, HBase
  1. Appends all changes to a WAL
  2. Batches/sorts changes in memory
  3. Flushes memory to immutable, sorted data files in HDFS
  4. Combines smaller data files to fewer, larger ones during compactions.
The part that is typical less clearly understood and documented are the exact durability guarantees provided by HDFS (and hence HBase).

HDFS sync has a colorful history, with needed support for HBase only available in an unreleased "append"-branch of HDFS for a long time. (Note that the append and sync features are independent and against common believe HBase only relies on the sync feature). See also this Cloudera blog post.

In order to understand what HDFS provides let's take a look at how a DFSClient (client) interacts with a Datanode (DN).

In a nutshell a DN just waits for commands. One of these commands is WRITE_BLOCK. When the DN receives a WRITE_BLOCK command it instantiates a BlockReceiver thread.

The BlockReceiver then simply waits for packets on an InputStream and flushes the data to OS buffers. An open block is maintained at the DN as an open file. When a block is filled, the block and hence its associate files is closed and the BlockReceiver ends. For all practical purposes the DN forgets that the block existed.

Replication to replica DNs is done via pipelining. The first DN forwards each packet to the next DN in the chain before the data is flushed locally, and waits for the downstreadm DN to respond. The default length of the replication chain is 3.

The other side of the equation is the DFSClient. The DFSClient batches changes until a packet is filled.
Since HADOOP-6313 a Syncable supports hflush and hsync.
  • hflush flushes all outstanding data (i.e. the current unfinished packet) from the client into the OS buffers on all DN replicas.
  • hsync flushes the data to the DNs like hflush and should also force the data to disk via fsync (or equivalent). But currently for HDFS hsync is implemented as hflush!
The gist is that both closing of a block and issuing hsync/hflush on the client currently only guarantees that data was flushed from the client to the replica DNs and to OS buffers on each DN; not that the data actually reached a physical disk.

For HBase and similar applications with durability guarantees this can be insufficient. If three or more DN machines crash at the same time (assume three replicas), for example caused by a multi rack or data center power outage, data might be lost.
Further, since HBase constantly compacts older, smaller HFiles into newer, larger ones, this potential data loss is not limited to new data.
(But note that, like most database setups, HBase should be deployed with redundant power supply anyway, so this is not necessarily an issue).

Due to the inner working of the DN it is difficult to implement 100% Posix fsync semantics. Imagine a client, which writes many blocks worth of data and then issues an hsync. In order to sync data correctly to disk either the client or all involved DNs would need to keep track of all blocks (full or partial) written to so far that have not yet been sync'ed.

This would be a significant change to how either the client or the DN work and lead to more complicated code. It would also require to keep the block files open in order to retain the file descriptors so that an fsync could be potentially issued in the future. Since the client might in fact never issue a sync request the number of open files to retain is unbounded.

The other option (similar to Posix' O_SYNC) is to have the DNs call fsync upon receipt of every single packet. leading to many unnecessary fsyncs.

In HDFS-744 I propose a hybrid solution. A data stream can be created with a SYNC_BLOCK flag. This flag causes the DFSClient set a "sync" flag on the last packet of a block. I.e. the block file is fsync'ed upon close.

This flag is also set when the client issues hsync. If the client has outstanding data the current packet is tagged with the "sync" flag and sent immediately, otherwise an empty packet with this flags is sent.

When a DN receives such a packet, it will immediately flush the currently open file (representing the current block - full on close or partial on hsync - being written) to disk.

In summary: With this compromise a client can guarantee - byte-by-byte if needed - which portion of an open file is guaranteed on a durable medium while avoiding either syncing every packet to disk or keeping track of past unsync'ed block.
For HBase this would conveniently deal with compactions as blocks are sync'ed upon close and also with WAL edits as it correctly allows sync'ing the current block.

The downside is that upon close each block needs to be sync'ed to disk, even though the client might never issue a sync request for this stream; this leads to potentially unneeded fsyncs.

HBASE-5954 proposes matching changes to HBase to make use of this new HDFS feature. This issue introduces a WAL sync config option and an HFile sync option.
The former causes HBase to issue an hsync when a batch of WAL entries is written. The latter makes sure HFiles (generated from memstore flushes or compactions) are guaranteed to be on a durable medium when the stream is closed.

There are also a few simple performance tests listed in that issue.

Future optimization is possible:
  • Only one of the replica DNs could issue the sync
  • Only one DN in each Rack could issue the sync
  • The sync could be done in parallel and the response from the DN need not wait for for it to finish (in this case the client has no guarantee that the sync actually finished when hsync returns, only that the DN promised to do it)
  • For HBase, both options could be made configurable per column family.

HBaseCon

HBaseCon on May 22nd was a blast.

I hosted the development track and presented on HBase Internals (which is essentially a condensed version of this blog). And that all despite my being mortally afraid of public speaking.

The slides of the Internals talk are now available.
Slideshare does not deal well with animation, unfortunately.

I'll add a link to the video once it becomes available.

Tuesday, May 29, 2012

HBase 0.94

In all the hectic leading up to HBaseCon, I forgot to blog about HBase 0.94.

I released HBase 0.94.0 on May 15, 2012. This was the 4th release candidate. 

0.94 is mostly a performance release with a lot of contributions from Facebook.
As mentioned in the release announcement these are some notable improvements:
  • HBASE-5010 - Filter HFiles based on TTL
  • HBASE-4465 - Lazy-seek optimization for StoreFile scanners
  • HBASE-4469 - Avoid top row seek by looking up ROWCOL bloomfilter
  • HBASE-4532 - Avoid top row seek by dedicated bloom filter for delete family bloom filter
  • HBASE-5074 - support checksums in HBase block cache
among others. Many thanks to Mikhail Bautin and others at Facebook for all these improvements.

Of course there are some new features as well.
  • Jonathan Hsieh's HBASE-5128 - [uber hbck] Online automated repair of table integrity and region consistency problems
  • Jon Gray's HBASE-4460 - Support running an embedded ThriftServer within a RegionServer
And then of course my pets (I blogged about these before):
  • HBASE-3584 - Allow atomic put/delete in one call
  • HBASE-4102 - atomicAppend: A put that appends to the latest version of a cell
  • HBASE-4536 - Allow CF to retain deleted rows
  • HBASE-5229 - Provide basic building blocks for "multi-row" local transactions.
0.94 is binary compatible with 0.92. Server and client can be freely mixed between versions, it is even possible to mix servers of different version within the same cluster (for rolling upgrades).

I expect 0.94.x to be used for a while until the "singularity" (0.96) stabilizes into HBase 1.0.

Update Wednesday, May 30th, 2012:
HBase can be downloaded from here: http://www.apache.org/dyn/closer.cgi/hbase/
The list of features and bug fixes is here.

Wednesday, April 18, 2012

(timestamp-) Consistent backups in HBase

The topic consistent backups in HBase comes up every now and then.
In this article I will outline a scheme that does provide timestamp-consistent backups.

Consistent backups are possible in HBase. With "consistent" I mean "consistent as of a specific timestamp" (this is limited by the HBase timestamp granularity.)

Over the past few months I have contributed various changes to HBase that now hopefully lead full circle to a more coherent story for "system of record" type data retention.

The basic setup is simple. You'll need
(HBase 0.94 will have all the relevant patches once it is releases - which should be any time now)

HBase's built-in Export tool can then be used to generate consistent snapshots.

Normally the data collected by an Export job is "smeared" over the time interval it takes to execute the job; an Export-Scan sees a row (cells of a row to be precise) as of the time when it happens to get to it, and rows can change while the Export is running.

That is problematic, because it is not possible to recreate a consistent view of the database from export generated this way.

The trick then is to keep all data in HBase long enough for the backup job to finish, and to only collect information before the start time of the export job.

Say the backup takes no more than T to complete (yes, it is hard to know ahead of time how long an Export is going to run). In that case the table's column families can be setup as follows:
  • set TTL to T + some headroom (so maybe 2T to be safe)
  • set VERSIONS to a very large number, (max int = 2147483647 for example, i.e. nothing is evicted from HBase due to VERSIONS)
  • set MIN_VERSIONS to how many versions you want to keep around, otherwise all versions could be removed if their TTL expired
  • set KEEP_DELETED_CELLS to true (this makes sure that deleted cell and delete markers are kept until they expire by the TTL)
In the shell (setting TTL to two hours):
create <table>, {NAME=><columnfamily>, VERSIONS=>2147483647, TTL=>7200, MIN_VERSIONS=>1, KEEP_DELETED_CELLS=>true}

Export can now we used as follows:
  • set versions to 2147483647 (i.e. all versions)
  • set startTime to -2147483648 (min int, i.e. everything is guaranteed to be included)
  • set endTime to the start time of the Export (i.e. the current time when the export starts. This is the crucial part)
  • enable support for deleted cell
hbase org.apache.hadoop.hbase.mapreduce.Export
-D hbase.mapreduce.include.deleted.rows=true
<tablename> <outputdir>
2147483647 -2147483648 <now in ms>

As long as the Export finishes within 2T, a consistent snapshot as of the time the Export was started is created. Otherwise some data might be missing, as it could have been compacted away before the Export had a chance to see it.

Since the backups also copied deleted rows and delete markers, a backup restored to an HBase instance can be queried using a time range (see Scan) to retrieve the state of the data at any arbitrary time.

Export is current limited to a single table, but given enough storage in your live cluster this can be extended to multiple table Exports, simply by setting the endTime of all Exports jobs to the start time of the first job.


This same trick can also be used for incremental backups. In that case the TTL has to be large enough to cover the interval between incremental backups.
If, for example, the incremental backups frequency is daily, the TTL above can be set to 2 days (TTL=>172800). Then use Export again:

hbase org.apache.hadoop.hbase.mapreduce.Export
-D hbase.mapreduce.include.deleted.rows=true
<tablename> <outputdir>
2147483647 <time of last backup in ms> <now in ms>

The longer TTL guarantees that there will be no gaps that are not covered by the incremental backups.

An example:
  1. A Put (p1) happens at T1
  2. Full backup starts at T2, time interval [0, T2)
  3. Another Put (p2) at T3
  4. full backup jobs finishes
  5. A Delete happens at T4 
  6. Incremental backup starts at T5, time interval [T2, T5)
  7. Yet another Put (p3)
  8. Incremental backup finishes
Note that in this scenario is does not matter when the backup jobs finish.

The full backup contains only p1. The incremental backup contains p2 and the Delete. p3 is not included in any backup, yet.
The state at T2 (p1) and T5 (p1, p2, delete) can be directly restored. Using time range Scans or Gets the state as of T4 and T3 can also be retrieved, once both backups have been restored into the same HBase instance (you need HBASE-4536 for this to work correctly with Deletes).

Finally, if keeping enough data to cover the time between two incremental backups in the live HBase cluster is problematic for your organization, it is also possible to archive HBase's Write Ahead Logs (WAL) and then replay with the built-in WALPlayer (HBASE-5604), but that is for another post.

Wednesday, March 21, 2012

ACID in HBase

By Lars Hofhansl

As we know, ACID stands for Atomicity, Consistency, Isolation, and Durability.

HBase supports ACID in limited ways, namely Puts to the same row provide all ACID guarantees. (HBASE-3584 adds multi op transactions and HBASE-5229 adds multi row transactions, but the principle remains the same)

So how does ACID work in HBase?

HBase employs a kind of MVCC. And HBase has no mixed read/write transactions.

The nomenclature in HBase is bit strange for historical reasons. In a nutshell each RegionServer maintains what I will call "strictly monotonically increasing transaction numbers".

When a write transaction (a set of puts or deletes) starts it retrieves the next highest transaction number. In HBase this is called a WriteNumber.
When a read transaction (a Scan or Get) starts it retrieves the transaction number of the last committed transaction. HBase calls this the ReadPoint.

Each created KeyValue is tagged with its transaction's WriteNumber (this tag, for historical reasons, is called the memstore timestamp in HBase. Note that this is separate from the application-visible timestamp.)

The highlevel flow of a write transaction in HBase looks like this:
  1. lock the row(s), to guard against concurrent writes to the same row(s)
  2. retrieve the current writenumber
  3. apply changes to the WAL (Write Ahead Log)
  4. apply the changes to the Memstore (using the acquired writenumber to tag the KeyValues)
  5. commit the transaction, i.e. attempt to roll the Readpoint forward to the acquired Writenumber.
  6. unlock the row(s)
The highlevel flow of a read transaction looks like this:
  1. open the scanner
  2. get the current readpoint
  3. filter all scanned KeyValues with memstore timestamp > the readpoint
  4. close the scanner (this is initiated by the client)
In reality it is a bit more complicated, but this is enough to illustrate the point. Note that a reader acquires no locks at all, but we still get all of ACID.

It is important to realize that this only works if transactions are committed strictly serially; otherwise an earlier uncommitted transaction could become visible when one that started later commits first. In HBase transaction are typically short, so this is not a problem.

HBase does exactly that: All transactions are committed serially.

Committing a transaction in HBase means settting the current ReadPoint to the transaction's WriteNumber, and hence make its changes visible to all new Scans.
HBase keeps a list of all unfinished transactions. A transaction's commit is delayed until all prior transactions committed. Note that HBase can still make all changes immediately and concurrently, only the commits are serial.

Since HBase does not guarantee any consistency between regions (and each region is hosted at exactly one RegionServer) all MVCC data structures only need to be kept in memory on every region server.

The next interesting question is what happens during compactions.

In HBase compactions are used to join multiple small store files (create by flushes of the MemStore to disk) into a larger ones and also to remove "garbage" in the process.
Garbage here are KeyValues that either expired due to a column family's TTL or VERSION settings or were marked for deletion. See here and here for more details.

Now imagine a compaction happening while a scanner is still scanning through the KeyValues. It would now be possible see a partial row (see here for how HBase defines a "row") - a row comprised of versions of KeyValues that do not reflect the outcome of any serializable transaction schedule.

The solution in HBase is to keep track of the earliest readpoint used by any open scanner and never collect any KeyValues with a memstore timestamp larger than that readpoint. That logic was - among other enhancements - added with HBASE-2856, which allowed HBase to support ACID guarantees even with concurrent flushes.
HBASE-5569 finally enables the same logic for the delete markers (and hence deleted KeyValues).

Lastly, note that a KeyValue's memstore timestamp can be cleared (set to 0) when it is older than the oldest scanner. I.e. it is known to be visible to every scanner, since all earlier scanner are finished.

Update Thursday, March 22: 
A couple of extra points:
  • The readpoint is rolled forward even if the transaction failed in order to not stall later transactions that waiting to be committed (since this is all in the same process, that just mean the roll forward happens in a Java finally block).
  • When updates are written to the WAL a single record is created for the all changes. There is no separate commit record.
  • When a RegionServer crashes, all in flight transaction are eventually replayed on another RegionServer if the WAL record was written completely or discarded otherwise.

Friday, February 10, 2012

(limited) cross row transactions in HBase

Atomic operations in HBase are currently limited to single row operations. This is achieved by co-locating all cells with the same row-key (see introduction-to-hbase) in the same region.

The key observation is that all that is required for atomic operations are co-located cells.

HBASE-5304 allows defining more flexible RegionSplitPolicies (note Javadoc is not yet updated, so this link leads to nowhere right now).
HBASE-5368 provides an example (KeyPrefixRegionSplitPolicies) that co-locates cells with the same row-key-prefix in the same region.
By choosing the prefix smartly, useful ranges of rows can be co-located together. Obviously one needs to be careful, as regions cannot grow without bounds.

HBASE-5229 finally allows cross row atomic operations over multiple rows as long as they are co-located in the same region.
This is a developer-only feature and only accessible through coprocessor endpoints. However, HBASE-5229 also includes an endpoint that can be used directly.

An example can be setup as follows:
1. add this to hbase-site.xml:
  <property>
    <name>hbase.coprocessor.user.region.classes</name>
    <value>org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint</value>
  </property>

This loads the necessary coprocessor endpoint into all regions of all user tables.

2. Create a table that uses KeyPrefixRegionSplitPolicy:
HTableDescriptor myHtd = new HTableDescriptor("myTable"); myHtd.setValue(HTableDescriptor.SPLIT_POLICY,
               KeyPrefixRegionSplitPolicy.class.getName()); 

// set the prefix to 3 in this example
myHtd.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY, 
               String.valueOf(3));
HColumnDescriptor hcd = new HColumnDescriptor(...);
myHtd.addFamily(hcd);
HBaseAdmin admin = ...;
admin.createTable(myHtd);

Regions of "myTable" are now split according to KeyPrefixRegionSplitPolicy with a prefix of 3.


3. Execute an atomic multirow transaction:
List<Mutation> ms = new ArrayList<Mutation>();
Put p = new Put(Bytes.toBytes("xxxabc"));
...
ms.add(p);
Put p1 = new Put(
Bytes.toBytes("xxx123"));
...
ms.add(p1);
Delete d = new Delete(Bytes.toBytes("xxxzzz"));
...
ms.add(d);
// get a proxy for MultiRowMutationEndpoint
// Note that the passed row is used to locate the 
// region. Any of the other row keys could have
// been used as well, as long as they identify 
// the same region.
MultiRowMutationProtocol mr = t.coprocessorProxy(
   MultiRowMutationProtocol.class,
   Bytes.toBytes("xxxabc"));
// perform the atomic operation
mr.mutateRows(ms);
 

Update, Saturday, February 18, 2012:
In fact using just "xxx" in the call to get the coprocessorProxy would be correct as well, since it identifies he same region. "xxx" could be considered the "partition" key.

That's it... The two puts and the delete are executed atomically, even though these are different rows.
KeyPrefixRegionSplitPolicy ensures that the three rows are co-located, because they share the same 3-byte prefix.
If any of the involved row-keys is located in a different region, the operation will fail, the client will not try again.
Again, care must be taken to pick a useful prefix length for co-location here, or regions will grow out of bounds, or at least be very skewed in size. 

This feature is useful in certain multi-tenant setups, or for time-based data sets (for example the split boundaries could be set to whole hours, or days).

Sunday, January 29, 2012

Filters in HBase (or intra row scanning part II)

Filters in HBase are a somewhat obscure and under-documented feature. (Even us committers are often not aware of their usefulness - see HBASE-5229, and HBASE-4256... Or maybe it's just me...).

Intras row scanning can be done using ColumnRangeFilter. Other filters such as ColumnPrefixFilter or MultipleColumnPrefixFilter might also be handy for this. All three filters have in common that they can provide scanners (see scanning in hbase) with what I will call "seek hints". These hints allow a scanner to seek to the next column, the next row, or an arbitrary next cell determined by the filter. This is far more efficient than having a dumb filter that is passed each cell and determines whether the cell is included in the result or not.

Many other filters also provide these "seek hints". The exception here are filters that filter on column values, as there is no inherent ordering between column values; these filters need to look at the value for each column.

For example check out this code in MultipleColumnPrefixFilter (ASF 2.0 license):
    TreeSet<byte []> lesserOrEqualPrefixes =
      (TreeSet<byte []>) sortedPrefixes.headSet(qualifier, true);
    if (lesserOrEqualPrefixes.size() != 0) {
      byte [] largestPrefixSmallerThanQualifier = lesserOrEqualPrefixes.last();
      if (Bytes.startsWith(qualifier, largestPrefixSmallerThanQualifier)) {
        return ReturnCode.INCLUDE;
      }
      if (lesserOrEqualPrefixes.size() == sortedPrefixes.size()) {
        return ReturnCode.NEXT_ROW;
      } else {
        hint = sortedPrefixes.higher(largestPrefixSmallerThanQualifier);
        return ReturnCode.SEEK_NEXT_USING_HINT;
      }
    } else {
      hint = sortedPrefixes.first();
      return ReturnCode.SEEK_NEXT_USING_HINT;
    }

(the <hint> is used later to skip ahead to that column prefix)

See how this code snippet allows the filter to
  1. seek to the next row if all prefixes are know to be less or equal the current qualifier (and the largest didn't match the passed column qualifier). Note that a single seek to the next row can potentially skip millions of columns with a single seek operation.
  2. seek to the next larger prefix if there are more prefixes, but the current does not match the qualifier.
  3. seek to the first prefix (the smallest) if none the prefixes are less or equal to the current qualifier.
If you didn't feel like looking at the code, you can take away from this that these filters can be safely and efficiently used in very wide rows. If the filter instead would indicate only INCLUDE or SKIP and be forced to visit/examine every version of every column of every row, it would be inefficient to use for wide rows with hundreds of thousands or millions of columns.

I'm in the process of adding more information for these Filter to the HBase Book Reference Guide.

Friday, January 27, 2012

Scanning in HBase

In HBASE-5268 I proposed a "prefix delete marker" (a delete marker that would mark a set of columns identified by a column prefix as deleted).

As it turns out, I didn't quite think through how scanning/seeking in HBase works, especially when delete markers are involved. So I thought I'd write about that here.
At the end of the article I hope you will understand why column prefix delete marker that are sorted with the KeyValues that they affect cannot work in HBase.

This blog entry describes how deletion works in HBase. One interesting piece of information not mentioned there is that version and column delete markers are ordered in line together with the KeyValues that they affect and family delete markers are always sorted to the beginning of their row.

Generally each column family is represented by a Store, which manages one or more StoreFiles. Scanning is a form of merge-sort performed by a RegionScanner, which merges results of one or more StoreScanners (one per family), who in turn merge results of one or more StoreFileScanners (one for each file for this family):


                                                    RegionScanner
                                                    /                     \
                               StoreScanner                      StoreScanner
                               /                   \                      /                  \
        StoreFileScanner   StoreFileScanner   StoreFileScanner  StoreFileScanner
                   |                             |                            |                          |
             StoreFile                  StoreFile                StoreFile              StoreFile



Say you performed the following actions (T is time):
  1. put:            row1, family, col1, value1, T
  2. delete family:  row1, family, T+1
  3. put:            row1, family, col1, value2, T+2
  4. delete columns: row1, family, col1, T+3
  5. put:            row1, family, col1, value3, T+4

What we will find in the StoreFile for "family" is this:
family-delete row1, T+1
row1,col1,value3, T+4
column-delete row1,col1, T+3
row1,col1,value2, T+2
row1,col1,value1, T

KeyValues are ordered in reverse chronological order (within their row and column). The family delete marker, however, is always first on the row. That makes sense, because family delete marker affects potentially many columns in this row, so in order to allow scanners to scan forward-only, the family delete markers need to be seen by a scanner first.

That also means that
  1. even if we are only looking for a specific column, we always seek to the beginning of the row to check if there is a family delete with a timestamp that is greater of equal to the versions of column that we are interested in. After that the scanner seeks to the column.
    And
  2. even if we are looking for a past version of a column we have to seek to the "beginning" of the column (i.e. the potential delete marker right before it), before we can scan forward to the version we're interested in.
My initial patch for HBASE-5268 would sort the prefix delete markers just like column delete markers.

By now it should be obvious why this does not work.

The beginning of a row is a known point, so it the "beginning" of a column. The beginning of a prefix of a column is not. So to find out whether a column is marked for deletion we would have to start at the beginning of the row and then scan forward to find all prefix delete markers. That clearly is not efficient.

My 2nd attempt placed the all prefix delete markers at the beginning of the row. That technically works. But notice that a column delete marker only has to be retained by the scanner for a short period of time (until after we scanned past all versions that it affects). For prefix delete markers we'd have to keep them into memory until we scanned past all columns that start with the prefix. In addition the number of prefix delete markers for a row is not principally limited.
Family delete markers do not have this problem because (1) the number of column families is limited for other reasons and (2) store files are per family, so all we have to remember for a family in a StoreScanner is a timestamp.

Monday, January 23, 2012

HBase intra row scanning

By Lars Hofhansl

Updated (again) Wednesday, January 25th, 2012.

As I painfully worked through HBASE-5229 I realized that HBase already has all the building blocks needed for complex (local) transactions.

What's important here is that (see my introduction to HBase):
  1. HBase ensures atomicity for operations for the same row key
  2. HBase keys have internal structure: (row-key, column family, column, ...)
The missing piece was ColumnRangeFilter. With this filter it is possible to retrieve all columns whose identifier starts with "abc", or all columns whose identifier sorts > "test". For example:

// all columns whose identifier starts with "abc"
Filter f = new ColumnRangeFilter(Bytes.toBytes("abc"), true,
                                 Bytes.toBytes("abd"), false);

// all columns whose identifier sorts after "test"
Filter f = new ColumnRangeFilter(Bytes.toBytes("test"), true,
                                 null, true);


So this allows to search (scan) inside a row by column identifier just  as HBase allows searching by row key.

A client application can exploit this to achieve transactions by grouping all entities that can participate in the same transaction into a single row (and single column family).
Then using prefixes of the column identifiers can be used to define rows inside that group. Basically the search criteria for keys was moved one level down to the column identifier.

Say we wanted to implement a store with transactional tables that contain rows and columns. One way to doing this with HBase as follows:
  • the HBase row-key/column-family maps to a "table"
  • a prefix of the HBase column identifier maps to a "row"
  • the rest of the HBase column identifier identifies the "column"
This is in fact similar to what Google's Megastore (pdf) does.

This leads to potentially wide HBase rows with many columns. The missing piece is allowing a Scan to efficiently retrieve a slice of a wide row.

This where ColumnRangeFilter comes into play. This filter seeks efficiently into the row by seeking ahead to the first HBase block that contains the first KeyValue (or cell) for that column.

Let's model a table "pets" this way. And let's say a pet has a name and a species. The HBase key for entries would look like this:
(table, CF1, rowA|column1) -> value for column1 in rowA
The code would look something like this:
(apologies for the initial incorrect code that I had posted here)

HTable t = ...;
Scan s = ...;
s.setStartRow("pets");
s.setStopRow("pets");
// get all columns for my pet "fluffy".
Filter f = new ColumnRangeFilter(Bytes.toBytes("fluffy"), true,
                                 Bytes.toBytes("fluffz"), false);
s.setFilter(f);
s.setBatch(20); // avoid getting all columns for the HBase row
ResultScanner rs = t.getScanner(s);
for (Result r = rs.next(); r != null; r = rs.next()) {
  // r will now have all HBase columns that start with "fluffy",
  // which would represent a single row
  for (KeyValue kv : r.raw()) {
    // each kv represent - the latest version of - a column
  }
}

The downside of this is that HBase achieves atomicity by collocating all cells with the same row-key, so it has to be hosted by a single region server.

Tuesday, January 17, 2012

HBase (Intra) Row Transactions

As said in a previous post "HBase ensures that all new versions created by single Put operation for a particular rowkey are either all seen by other clients or seen by none."

Indeed HBase can execute atomic Put operations and atomic Delete operations (as well as a few specialized operations like Increment and Append).

What HBase cannot currently do is to execute a grouping of different operations atomically. For example you cannot execute a Put and Delete operation atomically.

HBASE-3584 and HBASE-5203 change that. It is now possible to group multiple Puts and Deletes for the same row key together as a single atomic operation. The combined operation is atomic even when the executing regionserver fails half way through the operation.

The client facing API looks like this:

HTable t = ...; 
byte[] row = ...;
RowMutation arm = new RowMutation(row);
Put p = new Put(row);
p.add(...)
Delete d = new Delete(now);
p.delete{Column|Columns|Family}(...);
arm.add(p);
arm.add(d);
t.mutateRow(arm); 

RowMutation implements the Row interface and can hence itself be part of a multi row batch operation:

HTable t = ...; 
byte[] row1, row2;
RowMutation arm1 = new RowMutation(row1);
RowMutation arm2 = new RowMutation(row2);
...
List<Row> rows = ...;
rows.add(arm1);
rows.add(arm2);
t.batch(rows);

But note that this multi row batch is not atomic between different rows.