HBase里的Phoenix的UDFs的实现,HBase的版本是0.98,Phoenix的也需要选择对应的版本。
参考文章:
http://phoenix.apache.org/udf.html
http://phoenix-hbase.blogspot.com/2013/04/how-to-add-your-own-built-in-function.html(翻墙才能打开,而且这篇文章很旧,2013年的)
官网说Phoenix 4.4.0版本才实现了让用户拥有使用自定义函数的功能。话说以前的3.0版本我们都是把自定义函数写到系统函数里的,这个需要编译Phoenix的源码,对后期版本的升级很不友好。因此4.4.0版本可以说有他的积极意义的O(∩_∩)O哈哈~
直接上代码吧,UrlParseFunction.java类

package phoenix.function;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.parse.FunctionParseNode.Argument;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PVarchar;
import phoenix.util.DiscoverUrlUtil;
import phoenix.util.DomainInfo;
@BuiltInFunction(name=UrlParseFunction.NAME,  args={
		@Argument(allowedTypes={PVarchar.class}),
		@Argument(allowedTypes={PInteger.class})} )
public class UrlParseFunction extends ScalarFunction {
    public static final String NAME = "URLPARSE";
    private static final int TYPE1 = 1;
    private static final int TYPE2 = 2;
    private static final int TYPE3 = 3;
    public UrlParseFunction() {
    }
    public UrlParseFunction(List children) throws SQLException {
        super(children);
    }
    @Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
    	Expression strExpression = getStrExpression();
        if (!strExpression.evaluate(tuple, ptr)) {
            return false;
        }
        String sourceStr = (String)PVarchar.INSTANCE.toObject(ptr, strExpression.getSortOrder());
        if (sourceStr == null) {
            return true;
        }
        Expression typeExpression = getTypeExpression();
        if (!typeExpression.evaluate(tuple, ptr)) {
            return false;
        }
        int panelType = (Integer)PInteger.INSTANCE.toObject(ptr, typeExpression.getSortOrder());
        if (panelType == TYPE1) {
            String topDomain = DomainInfo.getDomainGroup(sourceStr);
            ptr.set(PVarchar.INSTANCE.toBytes(topDomain != null ? topDomain : "other"));
        } else if (panelType == TYPE2) {
        	String sClassificationName = DiscoverUrlUtil.getSClassificationName(sourceStr);
        	ptr.set(PVarchar.INSTANCE.toBytes(sClassificationName));
        } else if (panelType == TYPE3) {
        	String urlName = DiscoverUrlUtil.getUrlName(sourceStr);
        	ptr.set(PVarchar.INSTANCE.toBytes(urlName));
        } else {
        	throw new IllegalStateException("parse type should be in (1,2,3).");
        }
        return true;
    }
    @Override
    public PDataType getDataType() {
        return getStrExpression().getDataType();
    }
    @Override
    public boolean isNullable() {
        return getStrExpression().isNullable();
    }
    @Override
    public String getName() {
        return NAME;
    }
    private Expression getStrExpression() {
        return children.get(0);
    }
    private Expression getTypeExpression() {
        return children.get(1);
    }
}

其中,DomainInfo.java和DiscoverUrlUtil.java是两个工具类,获取url对应的中文名称。
下面的两段代码是要求必须要重写的:

@Override
    public PDataType getDataType() {
        return getStrExpression().getDataType();
    }
@Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
    	Expression strExpression = getStrExpression();
        if (!strExpression.evaluate(tuple, ptr)) {
            return false;
        }
        ......
        return true;
    }

pom.xml代码:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.youku</groupId>
    <artifactId>phoenix</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
		<dependency>
		  <groupId>sqlline</groupId>
		  <artifactId>sqlline</artifactId>
		  <version>1.1.9</version>
		</dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>4.4.0-HBase-0.98</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hbase</groupId>
                    <artifactId>hbase-server</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>


以上代码需要打包成jar包,上传到HBase集群的HDFS上,并在进入Phoenix的shell以后,输入下边的代码,将函数注册到Phoenix里。

CREATE FUNCTION URLPARSE(varchar,integer) returns varchar as 'phoenix.function.UrlParseFunction' using jar 'hdfs://0.0.0.0:8010/hbase/udf/phoenix-1.0-SNAPSHOT.jar';

完整的代码结构:

hadoop jar包移除和上传命令:

hadoop fs -rm /hbase/udf/phoenix-1.0-SNAPSHOT.jar
hadoop fs -put /root/test/phoenix-1.0-SNAPSHOT.jar /hbase/udf/

PS:hbase-site.xml也需要做修改,需要将修改后的文件上传到Phoenix的客户端里,只需要修改客户端即可。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
<property>
  <name>phoenix.functions.allowUserDefinedFunctions</name>
  <value>true</value>
</property>
<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
<property>
  <name>hbase.dynamic.jars.dir</name>
  <value>${hbase.rootdir}/lib</value>
  <description>
    The directory from which the custom udf jars can be loaded
    dynamically by the phoenix client/region server without the need to restart. However,
    an already loaded udf class would not be un-loaded. See
    HBASE-1936 for more details.
  </description>
</property>
<property>
           <name>hbase.master</name>
           <value>0.0.0.0:6000</value>
</property>
<property>
           <name>hbase.master.maxclockskew</name>
           <value>180000</value>
</property>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://0.0.0.0:8010/hbase</value>
</property>
<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
</property>
<property>
    <name>hbase.tmp.dir</name>
    <value>/opt/hbase/tmp</value>
</property>
<property>
    <name>hbase.zookeeper.quorum</name>
    <value>localhost</value>
</property>
 <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/storage/zookeeper</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
  </property>
  <property>
           <name>dfs.replication</name>
           <value>1</value>
   </property>
</configuration>

最后的效果:

0: jdbc:phoenix:localhost:2181&gt; select urlparse(cat4,3) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
|            URLPARSE(CAT4, 3)             |
+------------------------------------------+
| www.baidu.com                            |
+------------------------------------------+
1 row selected (0.081 seconds)
0: jdbc:phoenix:localhost:2181&gt; select urlparse(cat4,2) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
|            URLPARSE(CAT4, 2)             |
+------------------------------------------+
| 其他                                       |
+------------------------------------------+
1 row selected (0.104 seconds)
0: jdbc:phoenix:localhost:2181&gt; select urlparse(cat4,1) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
|            URLPARSE(CAT4, 1)             |
+------------------------------------------+
| *.baidu.com                              |
+------------------------------------------+
1 row selected (0.086 seconds)