Commit df2dd283 authored by Junio C Hamano's avatar Junio C Hamano

Merge branch 'jt/subprocess-handshake' into maint

Code cleanup.

* jt/subprocess-handshake:
  sub-process: refactor handshake to common function
  Documentation: migrate sub-process docs to header
  convert: add "status=delayed" to filter process protocol
  convert: refactor capabilities negotiation
  convert: move multiple file filter error handling to separate function
  convert: put the flags field before the flag itself for consistent style
  t0021: write "OUT <size>" only on success
  t0021: make debug log file name configurable
  t0021: keep filter log files on comparison
parents de557036 fa64a2fd
......@@ -425,8 +425,8 @@ packet: git< capability=clean
packet: git< capability=smudge
packet: git< 0000
------------------------
Supported filter capabilities in version 2 are "clean" and
"smudge".
Supported filter capabilities in version 2 are "clean", "smudge",
and "delay".
Afterwards Git sends a list of "key=value" pairs terminated with
a flush packet. The list will contain at least the filter command
......@@ -512,12 +512,73 @@ the protocol then Git will stop the filter process and restart it
with the next file that needs to be processed. Depending on the
`filter.<driver>.required` flag Git will interpret that as error.
After the filter has processed a blob it is expected to wait for
the next "key=value" list containing a command. Git will close
After the filter has processed a command it is expected to wait for
a "key=value" list containing the next command. Git will close
the command pipe on exit. The filter is expected to detect EOF
and exit gracefully on its own. Git will wait until the filter
process has stopped.
Delay
^^^^^
If the filter supports the "delay" capability, then Git can send the
flag "can-delay" after the filter command and pathname. This flag
denotes that the filter can delay filtering the current blob (e.g. to
compensate network latencies) by responding with no content but with
the status "delayed" and a flush packet.
------------------------
packet: git> command=smudge
packet: git> pathname=path/testfile.dat
packet: git> can-delay=1
packet: git> 0000
packet: git> CONTENT
packet: git> 0000
packet: git< status=delayed
packet: git< 0000
------------------------
If the filter supports the "delay" capability then it must support the
"list_available_blobs" command. If Git sends this command, then the
filter is expected to return a list of pathnames representing blobs
that have been delayed earlier and are now available.
The list must be terminated with a flush packet followed
by a "success" status that is also terminated with a flush packet. If
no blobs for the delayed paths are available, yet, then the filter is
expected to block the response until at least one blob becomes
available. The filter can tell Git that it has no more delayed blobs
by sending an empty list. As soon as the filter responds with an empty
list, Git stops asking. All blobs that Git has not received at this
point are considered missing and will result in an error.
------------------------
packet: git> command=list_available_blobs
packet: git> 0000
packet: git< pathname=path/testfile.dat
packet: git< pathname=path/otherfile.dat
packet: git< 0000
packet: git< status=success
packet: git< 0000
------------------------
After Git received the pathnames, it will request the corresponding
blobs again. These requests contain a pathname and an empty content
section. The filter is expected to respond with the smudged content
in the usual way as explained above.
------------------------
packet: git> command=smudge
packet: git> pathname=path/testfile.dat
packet: git> 0000
packet: git> 0000 # empty content!
packet: git< status=success
packet: git< 0000
packet: git< SMUDGED_CONTENT
packet: git< 0000
packet: git< 0000 # empty list, keep "status=success" unchanged!
------------------------
Example
^^^^^^^
A long running filter demo implementation can be found in
`contrib/long-running-filter/example.pl` located in the Git
core repository. If you develop your own long running filter
......
sub-process API
===============
The sub-process API makes it possible to run background sub-processes
for the entire lifetime of a Git invocation. If Git needs to communicate
with an external process multiple times, then this can reduces the process
invocation overhead. Git and the sub-process communicate through stdin and
stdout.
The sub-processes are kept in a hashmap by command name and looked up
via the subprocess_find_entry function. If an existing instance can not
be found then a new process should be created and started. When the
parent git command terminates, all sub-processes are also terminated.
This API is based on the run-command API.
Data structures
---------------
* `struct subprocess_entry`
The sub-process structure. Members should not be accessed directly.
Types
-----
'int(*subprocess_start_fn)(struct subprocess_entry *entry)'::
User-supplied function to initialize the sub-process. This is
typically used to negotiate the interface version and capabilities.
Functions
---------
`cmd2process_cmp`::
Function to test two subprocess hashmap entries for equality.
`subprocess_start`::
Start a subprocess and add it to the subprocess hashmap.
`subprocess_stop`::
Kill a subprocess and remove it from the subprocess hashmap.
`subprocess_find_entry`::
Find a subprocess in the subprocess hashmap.
`subprocess_get_child_process`::
Get the underlying `struct child_process` from a subprocess.
`subprocess_read_status`::
Helper function to read packets looking for the last "status=<foo>"
key/value pair.
......@@ -358,6 +358,8 @@ static int checkout_paths(const struct checkout_opts *opts,
state.force = 1;
state.refresh_cache = 1;
state.istate = &the_index;
enable_delayed_checkout(&state);
for (pos = 0; pos < active_nr; pos++) {
struct cache_entry *ce = active_cache[pos];
if (ce->ce_flags & CE_MATCHED) {
......@@ -372,6 +374,7 @@ static int checkout_paths(const struct checkout_opts *opts,
pos = skip_same_name(ce, pos) - 1;
}
}
errs |= finish_delayed_checkout(&state);
if (write_locked_index(&the_index, lock_file, COMMIT_LOCK))
die(_("unable to write new index file"));
......
......@@ -1500,6 +1500,7 @@ struct checkout {
struct index_state *istate;
const char *base_dir;
int base_dir_len;
struct delayed_checkout *delayed_checkout;
unsigned force:1,
quiet:1,
not_new:1,
......@@ -1509,6 +1510,8 @@ struct checkout {
#define TEMPORARY_FILENAME_LENGTH 25
extern int checkout_entry(struct cache_entry *ce, const struct checkout *state, char *topath);
extern void enable_delayed_checkout(struct checkout *state);
extern int finish_delayed_checkout(struct checkout *state);
struct cache_def {
struct strbuf path;
......
This diff is collapsed.
......@@ -4,6 +4,8 @@
#ifndef CONVERT_H
#define CONVERT_H
#include "string-list.h"
struct index_state;
enum safe_crlf {
......@@ -34,6 +36,26 @@ enum eol {
#endif
};
enum ce_delay_state {
CE_NO_DELAY = 0,
CE_CAN_DELAY = 1,
CE_RETRY = 2
};
struct delayed_checkout {
/*
* State of the currently processed cache entry. If the state is
* CE_CAN_DELAY, then the filter can delay the current cache entry.
* If the state is CE_RETRY, then this signals the filter that the
* cache entry was requested before.
*/
enum ce_delay_state state;
/* List of filter drivers that signaled delayed blobs. */
struct string_list filters;
/* List of delayed blobs identified by their path. */
struct string_list paths;
};
extern enum eol core_eol;
extern const char *get_cached_convert_stats_ascii(const struct index_state *istate,
const char *path);
......@@ -46,6 +68,10 @@ extern int convert_to_git(const struct index_state *istate,
struct strbuf *dst, enum safe_crlf checksafe);
extern int convert_to_working_tree(const char *path, const char *src,
size_t len, struct strbuf *dst);
extern int async_convert_to_working_tree(const char *path, const char *src,
size_t len, struct strbuf *dst,
void *dco);
extern int async_query_available_blobs(const char *cmd, struct string_list *available_paths);
extern int renormalize_buffer(const struct index_state *istate,
const char *path, const char *src, size_t len,
struct strbuf *dst);
......
......@@ -137,6 +137,105 @@ static int streaming_write_entry(const struct cache_entry *ce, char *path,
return result;
}
void enable_delayed_checkout(struct checkout *state)
{
if (!state->delayed_checkout) {
state->delayed_checkout = xmalloc(sizeof(*state->delayed_checkout));
state->delayed_checkout->state = CE_CAN_DELAY;
string_list_init(&state->delayed_checkout->filters, 0);
string_list_init(&state->delayed_checkout->paths, 0);
}
}
static int remove_available_paths(struct string_list_item *item, void *cb_data)
{
struct string_list *available_paths = cb_data;
struct string_list_item *available;
available = string_list_lookup(available_paths, item->string);
if (available)
available->util = (void *)item->string;
return !available;
}
int finish_delayed_checkout(struct checkout *state)
{
int errs = 0;
struct string_list_item *filter, *path;
struct delayed_checkout *dco = state->delayed_checkout;
if (!state->delayed_checkout)
return errs;
dco->state = CE_RETRY;
while (dco->filters.nr > 0) {
for_each_string_list_item(filter, &dco->filters) {
struct string_list available_paths = STRING_LIST_INIT_NODUP;
if (!async_query_available_blobs(filter->string, &available_paths)) {
/* Filter reported an error */
errs = 1;
filter->string = "";
continue;
}
if (available_paths.nr <= 0) {
/*
* Filter responded with no entries. That means
* the filter is done and we can remove the
* filter from the list (see
* "string_list_remove_empty_items" call below).
*/
filter->string = "";
continue;
}
/*
* In dco->paths we store a list of all delayed paths.
* The filter just send us a list of available paths.
* Remove them from the list.
*/
filter_string_list(&dco->paths, 0,
&remove_available_paths, &available_paths);
for_each_string_list_item(path, &available_paths) {
struct cache_entry* ce;
if (!path->util) {
error("external filter '%s' signaled that '%s' "
"is now available although it has not been "
"delayed earlier",
filter->string, path->string);
errs |= 1;
/*
* Do not ask the filter for available blobs,
* again, as the filter is likely buggy.
*/
filter->string = "";
continue;
}
ce = index_file_exists(state->istate, path->string,
strlen(path->string), 0);
errs |= (ce ? checkout_entry(ce, state, NULL) : 1);
}
}
string_list_remove_empty_items(&dco->filters, 0);
}
string_list_clear(&dco->filters, 0);
/* At this point we should not have any delayed paths anymore. */
errs |= dco->paths.nr;
for_each_string_list_item(path, &dco->paths) {
error("'%s' was not filtered properly", path->string);
}
string_list_clear(&dco->paths, 0);
free(dco);
state->delayed_checkout = NULL;
return errs;
}
static int write_entry(struct cache_entry *ce,
char *path, const struct checkout *state, int to_tempfile)
{
......@@ -179,11 +278,34 @@ static int write_entry(struct cache_entry *ce,
/*
* Convert from git internal format to working tree format
*/
if (ce_mode_s_ifmt == S_IFREG &&
convert_to_working_tree(ce->name, new, size, &buf)) {
free(new);
new = strbuf_detach(&buf, &newsize);
size = newsize;
if (ce_mode_s_ifmt == S_IFREG) {
struct delayed_checkout *dco = state->delayed_checkout;
if (dco && dco->state != CE_NO_DELAY) {
/* Do not send the blob in case of a retry. */
if (dco->state == CE_RETRY) {
new = NULL;
size = 0;
}
ret = async_convert_to_working_tree(
ce->name, new, size, &buf, dco);
if (ret && string_list_has_string(&dco->paths, ce->name)) {
free(new);
goto finish;
}
} else
ret = convert_to_working_tree(
ce->name, new, size, &buf);
if (ret) {
free(new);
new = strbuf_detach(&buf, &newsize);
size = newsize;
}
/*
* No "else" here as errors from convert are OK at this
* point. If the error would have been fatal (e.g.
* filter is required), then we would have died already.
*/
}
fd = open_output_fd(path, ce, to_tempfile);
......
......@@ -171,25 +171,6 @@ int packet_write_fmt_gently(int fd, const char *fmt, ...)
return status;
}
int packet_writel(int fd, const char *line, ...)
{
va_list args;
int err;
va_start(args, line);
for (;;) {
if (!line)
break;
if (strlen(line) > LARGE_PACKET_DATA_MAX)
return -1;
err = packet_write_fmt_gently(fd, "%s\n", line);
if (err)
return err;
line = va_arg(args, const char*);
}
va_end(args);
return packet_flush_gently(fd);
}
static int packet_write_gently(const int fd_out, const char *buf, size_t size)
{
static char packet_write_buffer[LARGE_PACKET_MAX];
......
......@@ -25,8 +25,6 @@ void packet_buf_flush(struct strbuf *buf);
void packet_buf_write(struct strbuf *buf, const char *fmt, ...) __attribute__((format (printf, 2, 3)));
int packet_flush_gently(int fd);
int packet_write_fmt_gently(int fd, const char *fmt, ...) __attribute__((format (printf, 2, 3)));
LAST_ARG_MUST_BE_NULL
int packet_writel(int fd, const char *line, ...);
int write_packetized_from_fd(int fd_in, int fd_out);
int write_packetized_from_buf(const char *src_in, size_t len, int fd_out);
......
......@@ -105,3 +105,107 @@ int subprocess_start(struct hashmap *hashmap, struct subprocess_entry *entry, co
hashmap_add(hashmap, entry);
return 0;
}
static int handshake_version(struct child_process *process,
const char *welcome_prefix, int *versions,
int *chosen_version)
{
int version_scratch;
int i;
char *line;
const char *p;
if (!chosen_version)
chosen_version = &version_scratch;
if (packet_write_fmt_gently(process->in, "%s-client\n",
welcome_prefix))
return error("Could not write client identification");
for (i = 0; versions[i]; i++) {
if (packet_write_fmt_gently(process->in, "version=%d\n",
versions[i]))
return error("Could not write requested version");
}
if (packet_flush_gently(process->in))
return error("Could not write flush packet");
if (!(line = packet_read_line(process->out, NULL)) ||
!skip_prefix(line, welcome_prefix, &p) ||
strcmp(p, "-server"))
return error("Unexpected line '%s', expected %s-server",
line ? line : "<flush packet>", welcome_prefix);
if (!(line = packet_read_line(process->out, NULL)) ||
!skip_prefix(line, "version=", &p) ||
strtol_i(p, 10, chosen_version))
return error("Unexpected line '%s', expected version",
line ? line : "<flush packet>");
if ((line = packet_read_line(process->out, NULL)))
return error("Unexpected line '%s', expected flush", line);
/* Check to make sure that the version received is supported */
for (i = 0; versions[i]; i++) {
if (versions[i] == *chosen_version)
break;
}
if (!versions[i])
return error("Version %d not supported", *chosen_version);
return 0;
}
static int handshake_capabilities(struct child_process *process,
struct subprocess_capability *capabilities,
unsigned int *supported_capabilities)
{
int i;
char *line;
for (i = 0; capabilities[i].name; i++) {
if (packet_write_fmt_gently(process->in, "capability=%s\n",
capabilities[i].name))
return error("Could not write requested capability");
}
if (packet_flush_gently(process->in))
return error("Could not write flush packet");
while ((line = packet_read_line(process->out, NULL))) {
const char *p;
if (!skip_prefix(line, "capability=", &p))
continue;
for (i = 0;
capabilities[i].name && strcmp(p, capabilities[i].name);
i++)
;
if (capabilities[i].name) {
if (supported_capabilities)
*supported_capabilities |= capabilities[i].flag;
} else {
warning("external filter requested unsupported filter capability '%s'",
p);
}
}
return 0;
}
int subprocess_handshake(struct subprocess_entry *entry,
const char *welcome_prefix,
int *versions,
int *chosen_version,
struct subprocess_capability *capabilities,
unsigned int *supported_capabilities)
{
int retval;
struct child_process *process = &entry->process;
sigchain_push(SIGPIPE, SIG_IGN);
retval = handshake_version(process, welcome_prefix, versions,
chosen_version) ||
handshake_capabilities(process, capabilities,
supported_capabilities);
sigchain_pop(SIGPIPE);
return retval;
}
......@@ -6,41 +6,88 @@
#include "run-command.h"
/*
* Generic implementation of background process infrastructure.
* See: Documentation/technical/api-sub-process.txt
* The sub-process API makes it possible to run background sub-processes
* for the entire lifetime of a Git invocation. If Git needs to communicate
* with an external process multiple times, then this can reduces the process
* invocation overhead. Git and the sub-process communicate through stdin and
* stdout.
*
* The sub-processes are kept in a hashmap by command name and looked up
* via the subprocess_find_entry function. If an existing instance can not
* be found then a new process should be created and started. When the
* parent git command terminates, all sub-processes are also terminated.
*
* This API is based on the run-command API.
*/
/* data structures */
/* Members should not be accessed directly. */
struct subprocess_entry {
struct hashmap_entry ent; /* must be the first member! */
const char *cmd;
struct child_process process;
};
struct subprocess_capability {
const char *name;
/*
* subprocess_handshake will "|=" this value to supported_capabilities
* if the server reports that it supports this capability.
*/
unsigned int flag;
};
/* subprocess functions */
/* Function to test two subprocess hashmap entries for equality. */
extern int cmd2process_cmp(const void *unused_cmp_data,
const struct subprocess_entry *e1,
const struct subprocess_entry *e2,
const void *unused_keydata);
/*
* User-supplied function to initialize the sub-process. This is
* typically used to negotiate the interface version and capabilities.
*/
typedef int(*subprocess_start_fn)(struct subprocess_entry *entry);
/* Start a subprocess and add it to the subprocess hashmap. */
int subprocess_start(struct hashmap *hashmap, struct subprocess_entry *entry, const char *cmd,
subprocess_start_fn startfn);
/* Kill a subprocess and remove it from the subprocess hashmap. */
void subprocess_stop(struct hashmap *hashmap, struct subprocess_entry *entry);
/* Find a subprocess in the subprocess hashmap. */
struct subprocess_entry *subprocess_find_entry(struct hashmap *hashmap, const char *cmd);
/* subprocess helper functions */
/* Get the underlying `struct child_process` from a subprocess. */
static inline struct child_process *subprocess_get_child_process(
struct subprocess_entry *entry)
{
return &entry->process;
}
/*
* Perform the version and capability negotiation as described in the "Long
* Running Filter Process" section of the gitattributes documentation using the
* given requested versions and capabilities. The "versions" and "capabilities"
* parameters are arrays terminated by a 0 or blank struct.
*
* This function is typically called when a subprocess is started (as part of
* the "startfn" passed to subprocess_start).
*/
int subprocess_handshake(struct subprocess_entry *entry,
const char *welcome_prefix,
int *versions,
int *chosen_version,
struct subprocess_capability *capabilities,
unsigned int *supported_capabilities);
/*
* Helper function that will read packets looking for "status=<foo>"
* key/value pairs and return the value from the last "status" packet
......
This diff is collapsed.
......@@ -2,8 +2,9 @@
# Example implementation for the Git filter protocol version 2
# See Documentation/gitattributes.txt, section "Filter Protocol"
#
# The script takes the list of supported protocol capabilities as
# arguments ("clean", "smudge", etc).
# The first argument defines a debug log file that the script write to.
# All remaining arguments define a list of supported protocol
# capabilities ("clean", "smudge", etc).
#
# This implementation supports special test cases:
# (1) If data with the pathname "clean-write-fail.r" is processed with
......@@ -17,6 +18,16 @@
# operation then the filter signals that it cannot or does not want
# to process the file and any file after that is processed with the
# same command.
# (5) If data with a pathname that is a key in the DELAY hash is
# requested (e.g. "test-delay10.a") then the filter responds with
# a "delay" status and sets the "requested" field in the DELAY hash.
# The filter will signal the availability of this object after
# "count" (field in DELAY hash) "list_available_blobs" commands.
# (6) If data with the pathname "missing-delay.a" is processed that the
# filter will drop the path from the "list_available_blobs" response.
# (7) If data with the pathname "invalid-delay.a" is processed that the
# filter will add the path "unfiltered" which was not delayed before
# to the "list_available_blobs" response.
#
use strict;
......@@ -24,9 +35,19 @@
use IO::File;
my $MAX_PACKET_CONTENT_SIZE = 65516;
my $log_file = shift @ARGV;
my @capabilities = @ARGV;
open my $debug, ">>", "rot13-filter.log" or die "cannot open log file: $!";
open my $debug, ">>", $log_file or die "cannot open log file: $!";
my %DELAY = (
'test-delay10.a' => { "requested" => 0, "count" => 1 },
'test-delay11.a' => { "requested" => 0, "count" => 1 },
'test-delay20.a' => { "requested" => 0, "count" => 2 },
'test-delay10.b' => { "requested" => 0, "count" => 1 },
'missing-delay.a' => { "requested" => 0, "count" => 1 },
'invalid-delay.a' => { "requested" => 0, "count" => 1 },
);
sub rot13 {
my $str = shift;
......@@ -64,7 +85,7 @@ sub packet_bin_read {
sub packet_txt_read {
my ( $res, $buf ) = packet_bin_read();
unless ( $buf =~ s/\n$// ) {
unless ( $buf eq '' or $buf =~ s/\n$// ) {
die "A non-binary line MUST be terminated by an LF.";
}
return ( $res, $buf );
......@@ -99,6 +120,7 @@ sub packet_flush {
( packet_txt_read() eq ( 0, "capability=clean" ) ) || die "bad capability";
( packet_txt_read() eq ( 0, "capability=smudge" ) ) || die "bad capability";
( packet_txt_read() eq ( 0, "capability=delay" ) ) || die "bad capability";
( packet_bin_read() eq ( 1, "" ) ) || die "bad capability end";
foreach (@capabilities) {
......@@ -109,88 +131,142 @@ sub packet_flush {
$debug->flush();
while (1) {
my ($command) = packet_txt_read() =~ /^command=(.+)$/;
my ( $command ) = packet_txt_read() =~ /^command=(.+)$/;
print $debug "IN: $command";
$debug->flush();
my ($pathname) = packet_txt_read() =~ /^pathname=(.+)$/;
print $debug " $pathname";
$debug->flush();
if ( $pathname eq "" ) {
die "bad pathname '$pathname'";
}
if ( $command eq "list_available_blobs" ) {
# Flush
packet_bin_read();
# Flush
packet_bin_read();
my $input = "";
{
binmode(STDIN);
my $buffer;
my $done = 0;
while ( !$done ) {
( $done, $buffer ) = packet_bin_read();
$input .= $buffer;
foreach my $pathname ( sort keys %DELAY ) {
if ( $DELAY{$pathname}{"requested"} >= 1 ) {
$DELAY{$pathname}{"count"} = $DELAY{$pathname}{"count"} - 1;
if ( $pathname eq "invalid-delay.a" ) {
# Send Git a pathname that was not delayed earlier
packet_txt_write("pathname=unfiltered");
}
if ( $pathname eq "missing-delay.a" ) {
# Do not signal Git that this file is available
} elsif ( $DELAY{$pathname}{"count"} == 0 ) {
print $debug " $pathname";
packet_txt_write("pathname=$pathname");
}
}
}
print $debug " " . length($input) . " [OK] -- ";
$debug->flush();
}
my $output;
if ( $pathname eq "error.r" or $pathname eq "abort.r" ) {
$output = "";
}
elsif ( $command eq "clean" and grep( /^clean$/, @capabilities ) ) {
$output = rot13($input);
}
elsif ( $command eq "smudge" and grep( /^smudge$/, @capabilities ) ) {
$output = rot13($input);
}
else {
die "bad command '$command'";
}
print $debug "OUT: " . length($output) . " ";