Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new twitter event scriber impl #58

Merged
merged 6 commits into from
Dec 6, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
<module>presto-hive-cdh5</module>
<module>presto-teradata-functions</module>
<module>presto-example-http</module>
<module>twitter-eventlistener-plugin</module>
<module>presto-local-file</module>
<module>presto-tpch</module>
<module>presto-raptor</module>
Expand Down
5 changes: 5 additions & 0 deletions presto-server/src/main/provisio/presto.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,9 @@
</artifact>
</artifactSet>

<artifactSet to="plugin/twitter-eventlistener-plugin">
<artifact id="${project.groupId}:twitter-eventlistener-plugin:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>
</runtime>
105 changes: 105 additions & 0 deletions twitter-eventlistener-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.149-tw-0.27</version>
</parent>

<artifactId>twitter-eventlistener-plugin</artifactId>
<description>Twitter Event Listener - scribes QueryCompletedEvent</description>
<packaging>presto-plugin</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.149-tw-0.27</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- twitter deps -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>presto-thrift-java</artifactId>
<version>0.0.1</version>
<exclusions>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>util-core_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>util-core-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>util-function_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>util-function-java</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parser-combinators_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>scrooge-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>util-logging_2.10</artifactId>
<version>6.34.0</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accidental commit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(, removed.

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.presto.plugin.eventlistener;

import com.twitter.logging.BareFormatter$;
import com.twitter.logging.Level;
import com.twitter.logging.QueueingHandler;
import com.twitter.logging.ScribeHandler;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

import java.util.Base64;
import java.util.logging.LogRecord;

public class TwitterScriber
{
private static final String DASH = "-";
private static final int MAX_QUEUE_SIZE = 1000;

private QueueingHandler queueingHandler;

// TSerializer is not thread safe
private final ThreadLocal<TSerializer> serializer = new ThreadLocal<TSerializer>()
{
@Override protected TSerializer initialValue()
{
return new TSerializer();
}
};

public TwitterScriber(String scribeCategory)
{
ScribeHandler scribeHandler = new ScribeHandler(
ScribeHandler.DefaultHostname(),
ScribeHandler.DefaultPort(),
scribeCategory,
ScribeHandler.DefaultBufferTime(),
ScribeHandler.DefaultConnectBackoff(),
ScribeHandler.DefaultMaxMessagesPerTransaction(),
ScribeHandler.DefaultMaxMessagesToBuffer(),
BareFormatter$.MODULE$,
scala.Option.apply((Level) null));
queueingHandler = new QueueingHandler(scribeHandler, MAX_QUEUE_SIZE);
}

public void scribe(TBase thriftMessage) throws TException
{
scribe(serializeThriftToString(thriftMessage));
}

/**
* Serialize a thrift object to bytes, compress, then encode as a base64 string.
* Throws TException
*/
private String serializeThriftToString(TBase thriftMessage) throws TException
{
return Base64.getEncoder().encodeToString(serializer.get().serialize(thriftMessage));
}

private void scribe(String message)
{
LogRecord logRecord = new LogRecord(Level.ALL, message);
queueingHandler.publish(logRecord);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.presto.plugin.eventlistener;

import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryContext;
import com.facebook.presto.spi.eventlistener.QueryFailureInfo;
import com.facebook.presto.spi.eventlistener.QueryMetadata;
import com.facebook.presto.spi.eventlistener.QueryStatistics;

import com.twitter.presto.thriftjava.QueryCompletionEvent;
import com.twitter.presto.thriftjava.QueryState;

import io.airlift.log.Logger;
import org.apache.thrift.TException;

/**
* Class that scribes query completion events
*/
public class QueryCompletedEventScriber
{
private static final String DASH = "-";
private static final Logger log = Logger.get(QueryCompletedEventScriber.class);

private TwitterScriber scriber = new TwitterScriber("presto_query_completion");

public void handle(QueryCompletedEvent event)
{
try {
scriber.scribe(toThriftQueryCompletionEvent(event));
}
catch (TException e) {
log.warn(e,
String.format("Could not serialize thrift object of Query(id=%s, user=%s, env=%s, schema=%s.%s)",
event.getMetadata().getQueryId(),
event.getContext().getUser(),
event.getContext().getEnvironment(),
event.getContext().getCatalog().orElse(DASH),
event.getContext().getSchema().orElse(DASH)));
}
}

private static QueryCompletionEvent toThriftQueryCompletionEvent(QueryCompletedEvent event)
{
QueryMetadata eventMetadata = event.getMetadata();
QueryContext eventContext = event.getContext();
QueryStatistics eventStat = event.getStatistics();

QueryCompletionEvent thriftEvent =
new com.twitter.presto.thriftjava.QueryCompletionEvent();

thriftEvent.query_id = eventMetadata.getQueryId();
thriftEvent.transaction_id = eventMetadata.getTransactionId().orElse(DASH);
thriftEvent.user = eventContext.getUser();
thriftEvent.principal = eventContext.getPrincipal().orElse(DASH);
thriftEvent.source = eventContext.getSource().orElse(DASH);
thriftEvent.server_version = eventContext.getServerVersion();
thriftEvent.environment = eventContext.getEnvironment();
thriftEvent.catalog = eventContext.getCatalog().orElse(DASH);
thriftEvent.schema = eventContext.getSchema().orElse(DASH);
thriftEvent.remote_client_address = eventContext.getRemoteClientAddress().orElse(DASH);
thriftEvent.user_agent = eventContext.getUserAgent().orElse(DASH);
thriftEvent.query_state = QueryState.valueOf(eventMetadata.getQueryState());
thriftEvent.uri = eventMetadata.getUri().toString();
thriftEvent.query = eventMetadata.getQuery();
thriftEvent.create_time_ms = event.getCreateTime().toEpochMilli();
thriftEvent.execution_start_time_ms = event.getExecutionStartTime().toEpochMilli();
thriftEvent.end_time_ms = event.getEndTime().toEpochMilli();
thriftEvent.queued_time_ms = eventStat.getQueuedTime().toMillis();
thriftEvent.query_wall_time_ms = eventStat.getWallTime().toMillis();
if (eventStat.getAnalysisTime().isPresent()) {
thriftEvent.analysis_time_ms = eventStat.getAnalysisTime().get().toMillis();
}
if (eventStat.getDistributedPlanningTime().isPresent()) {
thriftEvent.distributed_planning_time_ms = eventStat.getDistributedPlanningTime().get().toMillis();
}
thriftEvent.total_bytes = eventStat.getTotalBytes();
thriftEvent.total_rows = eventStat.getTotalRows();
thriftEvent.splits = eventStat.getCompletedSplits();
if (event.getFailureInfo().isPresent()) {
QueryFailureInfo eventFailureInfo = event.getFailureInfo().get();
thriftEvent.error_code_id = eventFailureInfo.getErrorCode().getCode();
thriftEvent.error_code_name = eventFailureInfo.getErrorCode().getName();
thriftEvent.failure_type = eventFailureInfo.getFailureType().orElse(DASH);
thriftEvent.failure_message = eventFailureInfo.getFailureMessage().orElse(DASH);
thriftEvent.failure_task = eventFailureInfo.getFailureTask().orElse(DASH);
thriftEvent.failure_host = eventFailureInfo.getFailureHost().orElse(DASH);
thriftEvent.failures_json = eventFailureInfo.getFailuresJson();
}

return thriftEvent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.presto.plugin.eventlistener;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;

import io.airlift.log.Logger;

public class TwitterEventListener implements EventListener
{
private static final Logger log = Logger.get(TwitterEventListener.class);
private final QueryCompletedEventScriber scriber = new QueryCompletedEventScriber();

@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
scriber.handle(queryCompletedEvent);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we scribe before logging so if there is an exception in the toString logic it won't effect the scribe?

}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.presto.plugin.eventlistener;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;

import java.util.Map;

public class TwitterEventListenerFactory implements EventListenerFactory
{
@Override
public String getName()
{
return "twitter-event-listener";
}

@Override
public EventListener create(Map<String, String> config)
{
return new TwitterEventListener();
}
}
Loading